diff --git a/docs/examples/gribjump.ipynb b/docs/examples/gribjump.ipynb new file mode 100644 index 000000000..a145e0ce4 --- /dev/null +++ b/docs/examples/gribjump.ipynb @@ -0,0 +1,972 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "cf28ddeb", + "metadata": {}, + "source": [ + "## Retrieving subsets from Grib files via GribJump\n", + "\n", + "This example demonstrates how the experimental `gribjump` source allows efficient retrieval of individual grid cells from Grib messages stored in an FDB. The source is a thin wrapper around the Python bindings of [GribJump](https://github.com/ecmwf/gribjump)." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "06c4aefb", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import numpy as np\n", + "import earthkit.data" + ] + }, + { + "cell_type": "markdown", + "id": "0e7e19c7", + "metadata": {}, + "source": [ + "GribJump can retrieve ranges of grid cells for GRIB files in an FDB that were\n", + "previously indexed by GribJump (e.g. using `gribjump-scan`). To use the\n", + "`gribjump` source in earthkit-data, the environment must point to an FDB in\n", + "addition to GribJump-specific environment variables.\n", + "\n", + "⚠️ Please be aware that this source currently does not perform any validation\n", + "that the grid indices specified by the user actually correspond to the fields'\n", + "underlying grids. Please make sure that any fields referenced by the specified\n", + "FDB requests will result in your expected grid. Because of this, we also need to\n", + "tell GribJump to ignore any missing grid validation information via the\n", + "`GRIBJUMP_IGNORE_GRID` environment variable." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "ffc76940", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'1'" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "os.environ.setdefault(\"FDB_HOME\", \"\")\n", + "os.environ.setdefault(\"FDB5_CONFIG_FILE\", \"\")\n", + "os.environ.setdefault(\"GRIBJUMP_CONFIG_FILE\", \"\")\n", + "os.environ.setdefault(\"GRIBJUMP_IGNORE_GRID\", \"1\")" + ] + }, + { + "cell_type": "markdown", + "id": "d0695a44", + "metadata": {}, + "source": [ + "### How To Use\n", + "\n", + "The `gribjump` source works similar to the `fdb` source and receives a dictionary with an FDB request.\n", + "Please note that the mars syntax for ranges and lists using \"/\" is not supported. Only scalar values and\n", + "Python lists are supported.\n", + "\n", + "The second required parameter is one of `ranges`, `indices`, or `mask`, selecting the grid cells which should\n", + "be extracted. For convenience, one can set an additional parameter `fetch_coords_from_fdb=True` to make an additional\n", + "request directly to the fdb to retrieve latitude and longitude information for the retrieved cells and include\n", + "them in the retrieved cell's metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cd0c1962", + "metadata": {}, + "outputs": [], + "source": [ + "source = earthkit.data.from_source(\n", + " \"gribjump\",\n", + " {\n", + " \"class\": \"ce\",\n", + " \"expver\": \"0001\",\n", + " \"stream\": \"efcl\",\n", + " \"date\": \"20230101\",\n", + " \"model\": \"lisflood\",\n", + " \"domain\": \"g\",\n", + " \"origin\": \"ecmf\",\n", + " \"step\": 6,\n", + " \"type\": \"sfo\",\n", + " \"levtype\": \"sfc\",\n", + " \"param\": \"240023\",\n", + " \"time\": [\"0000\", \"0600\"],\n", + " \"hdate\": [\"20200101\", \"20200102\"],\n", + " },\n", + " ranges=[(1234, 2345)],\n", + " fetch_coords_from_fdb=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "eb808136", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Gribjump Engine: Built file map: 0.022177 second elapsed, 0.011457 second cpu\n", + "Starting 8 threads\n", + "Gribjump Progress: 1 of 1 tasks complete\n", + "Gribjump Engine: All tasks finished: 0.334884 second elapsed, 0.162512 second cpu\n", + "Gribjump Engine: Repackaged results: 8e-06 second elapsed, 7e-06 second cpu\n", + "Engine::extract: 1.7e-05 second elapsed, 1.5e-05 second cpu\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
paramlevelbase_datetimevalid_datetimestepnumber
0240023None2020-01-01T00:00:002020-01-01T06:00:006None
1240023None2020-01-01T06:00:002020-01-01T12:00:006None
2240023None2020-01-02T00:00:002020-01-02T06:00:006None
3240023None2020-01-02T06:00:002020-01-02T12:00:006None
\n", + "
" + ], + "text/plain": [ + " param level base_datetime valid_datetime step number\n", + "0 240023 None 2020-01-01T00:00:00 2020-01-01T06:00:00 6 None\n", + "1 240023 None 2020-01-01T06:00:00 2020-01-01T12:00:00 6 None\n", + "2 240023 None 2020-01-02T00:00:00 2020-01-02T06:00:00 6 None\n", + "3 240023 None 2020-01-02T06:00:00 2020-01-02T12:00:00 6 None" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "source.ls()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "7eff5b19", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset> Size: 62kB\n",
+       "Dimensions:                  (forecast_reference_time: 4, index: 1111)\n",
+       "Coordinates:\n",
+       "  * forecast_reference_time  (forecast_reference_time) datetime64[ns] 32B 202...\n",
+       "    latitude                 (index) float64 9kB ...\n",
+       "    longitude                (index) float64 9kB ...\n",
+       "  * index                    (index) int64 9kB 1234 1235 1236 ... 2342 2343 2344\n",
+       "Data variables:\n",
+       "    240023                   (forecast_reference_time, index) float64 36kB ...\n",
+       "Attributes: (12/13)\n",
+       "    param:        240023\n",
+       "    class:        ce\n",
+       "    stream:       efcl\n",
+       "    levtype:      sfc\n",
+       "    type:         sfo\n",
+       "    expver:       0001\n",
+       "    ...           ...\n",
+       "    hdate:        20200101\n",
+       "    time:         0000\n",
+       "    origin:       ecmf\n",
+       "    domain:       g\n",
+       "    Conventions:  CF-1.8\n",
+       "    institution:  ECMWF
" + ], + "text/plain": [ + " Size: 62kB\n", + "Dimensions: (forecast_reference_time: 4, index: 1111)\n", + "Coordinates:\n", + " * forecast_reference_time (forecast_reference_time) datetime64[ns] 32B 202...\n", + " latitude (index) float64 9kB ...\n", + " longitude (index) float64 9kB ...\n", + " * index (index) int64 9kB 1234 1235 1236 ... 2342 2343 2344\n", + "Data variables:\n", + " 240023 (forecast_reference_time, index) float64 36kB ...\n", + "Attributes: (12/13)\n", + " param: 240023\n", + " class: ce\n", + " stream: efcl\n", + " levtype: sfc\n", + " type: sfo\n", + " expver: 0001\n", + " ... ...\n", + " hdate: 20200101\n", + " time: 0000\n", + " origin: ecmf\n", + " domain: g\n", + " Conventions: CF-1.8\n", + " institution: ECMWF" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds = source.to_xarray()\n", + "ds" + ] + }, + { + "cell_type": "markdown", + "id": "042e6382", + "metadata": {}, + "source": [ + "### Selection and Groupings\n", + "\n", + "The `gribjump` source offers limited support for selection methods (`.sel()` and\n", + "`.isel()`) and grouping method (`.group_by()`) and anything else implemented for a\n", + "`SimpleFieldList`. However, please keep in mind that the only available metadata\n", + "for these operations comes from the specified fdb request dictionary. Any\n", + "selection value must match the type in this dictionary supplied by the user." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "2c1e99ae", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "data=SimpleFieldList(2) 2\n", + "SimpleFieldList(1) (1, 1111) ['2020-01-01T00:00:00']\n", + "SimpleFieldList(1) (1, 1111) ['2020-01-01T06:00:00']\n" + ] + } + ], + "source": [ + "groups = source.sel(hdate=\"20200101\").group_by(\"time\")\n", + "for group in groups:\n", + " print(group, group.to_numpy().shape, group.metadata('base_datetime'))" + ] + }, + { + "cell_type": "markdown", + "id": "8e1626a9", + "metadata": {}, + "source": [ + "### Extraction Options\n", + "\n", + "You can specify the extraction points through one of three options. GribJump\n", + "treats all fields as flattened 1D arrays and all coordinates on the grid must\n", + "assume this representation.\n", + "\n", + "* **Ranges:** A list of tuples `(start, end)` defining contiguous ranges of grid\n", + " points to extract. As shown in the example above, each tuple specifies a start\n", + " index (inclusive) and end index (exclusive) in the flattened 1D array\n", + " representation of the grid. For example, `[(0, 100), (200, 300)]` would extract\n", + " grid points 0-99 and 200-299.\n", + "\n", + "* **Indices:** A 1D numpy array or list of specific grid point indices to extract\n", + " from the flattened grid. This allows for non-contiguous extraction of\n", + " individual grid points. For example, `np.array([5, 10, 15, 20])` would extract\n", + " exactly those four grid points. This array must be sorted in ascending order.\n", + "\n", + "* **Masks:** A numpy boolean array where `True` indicates grid points to extract\n", + " and `False` indicates points to skip. The mask must have the same length as\n", + " the total number of grid points in the field. However, no such validation is\n", + " performed and passing a mask with an invalid shape will silently return wrong\n", + " results.\n", + "\n", + "Only one of these methods can be used at a time. Please also note that GribJump\n", + "uses ranges internally regardless of what the user specifies. Converting the\n", + "user's chosen representation to ranges can be expensive when multiple\n", + "fields are accessed simultaneously." + ] + }, + { + "cell_type": "markdown", + "id": "6fe61883", + "metadata": {}, + "source": [ + "#### Code Examples" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "60165c68", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Gribjump Engine: Built file map: 0.010474 second elapsed, 0.008713 second cpu\n", + "Gribjump Progress: 1 of 1 tasks complete\n", + "Gribjump Engine: All tasks finished: 0.039335 second elapsed, 0.039178 second cpu\n", + "Gribjump Engine: Repackaged results: 6e-06 second elapsed, 5e-06 second cpu\n", + "Engine::extract: 2e-05 second elapsed, 2e-05 second cpu\n", + "Extracted dataset (ranges): Size: 36kB\n", + "Dimensions: (index: 2222)\n", + "Coordinates:\n", + " * index (index) int64 18kB 1234 1235 1236 1237 1238 ... 4563 4564 4565 4566\n", + "Data variables:\n", + " 240023 (index) float64 18kB ...\n", + "Attributes: (12/13)\n", + " param: 240023\n", + " class: ce\n", + " stream: efcl\n", + " levtype: sfc\n", + " type: sfo\n", + " expver: 0001\n", + " ... ...\n", + " hdate: 20200101\n", + " time: 0000\n", + " origin: ecmf\n", + " domain: g\n", + " Conventions: CF-1.8\n", + " institution: ECMWF\n", + "Gribjump Engine: Built file map: 0.009283 second elapsed, 0.007779 second cpu\n", + "Gribjump Progress: 1 of 1 tasks complete\n", + "Gribjump Engine: All tasks finished: 0.039215 second elapsed, 0.038721 second cpu\n", + "Gribjump Engine: Repackaged results: 5e-06 second elapsed, 5e-06 second cpu\n", + "Engine::extract: 2.3e-05 second elapsed, 2.2e-05 second cpu\n", + "Extracted dataset (indices): Size: 80B\n", + "Dimensions: (index: 5)\n", + "Coordinates:\n", + " * index (index) int64 40B 10 50 100 150 200\n", + "Data variables:\n", + " 240023 (index) float64 40B ...\n", + "Attributes: (12/13)\n", + " param: 240023\n", + " class: ce\n", + " stream: efcl\n", + " levtype: sfc\n", + " type: sfo\n", + " expver: 0001\n", + " ... ...\n", + " hdate: 20200101\n", + " time: 0000\n", + " origin: ecmf\n", + " domain: g\n", + " Conventions: CF-1.8\n", + " institution: ECMWF\n", + "Gribjump Engine: Built file map: 0.012851 second elapsed, 0.009124 second cpu\n", + "Gribjump Progress: 1 of 1 tasks complete\n", + "Gribjump Engine: All tasks finished: 1 second elapsed, 1 second cpu\n", + "Gribjump Engine: Repackaged results: 6e-06 second elapsed, 6e-06 second cpu\n", + "Engine::extract: 2.7e-05 second elapsed, 2.6e-05 second cpu\n", + "Extracted dataset (mask): Size: 11MB\n", + "Dimensions: (index: 672975)\n", + "Coordinates:\n", + " * index (index) int64 5MB 10 11 32 41 ... 13454079 13454087 13454093\n", + "Data variables:\n", + " 240023 (index) float64 5MB ...\n", + "Attributes: (12/13)\n", + " param: 240023\n", + " class: ce\n", + " stream: efcl\n", + " levtype: sfc\n", + " type: sfo\n", + " expver: 0001\n", + " ... ...\n", + " hdate: 20200101\n", + " time: 0000\n", + " origin: ecmf\n", + " domain: g\n", + " Conventions: CF-1.8\n", + " institution: ECMWF\n" + ] + } + ], + "source": [ + "request = {\n", + " \"class\": \"ce\",\n", + " \"expver\": \"0001\",\n", + " \"stream\": \"efcl\",\n", + " \"date\": \"20230101\",\n", + " \"model\": \"lisflood\",\n", + " \"domain\": \"g\",\n", + " \"origin\": \"ecmf\",\n", + " \"step\": 6,\n", + " \"type\": \"sfo\",\n", + " \"levtype\": \"sfc\",\n", + " \"param\": \"240023\",\n", + " \"time\": \"0000\",\n", + " \"hdate\": \"20200101\",\n", + "}\n", + "\n", + "# Example 1: Using ranges\n", + "source_ranges = earthkit.data.from_source(\n", + " \"gribjump\",\n", + " request,\n", + " ranges=[(1234, 2345), (3456, 4567)],\n", + ")\n", + "ds = source_ranges.to_xarray()\n", + "print(\"Extracted dataset (ranges):\", ds)\n", + "\n", + "# Example 2: Using indices to extract specific grid points\n", + "indices = np.array([10, 50, 100, 150, 200])\n", + "source_indices = earthkit.data.from_source(\n", + " \"gribjump\",\n", + " request,\n", + " indices=indices,\n", + ")\n", + "print(\"Extracted dataset (indices):\", source_indices.to_xarray())\n", + "\n", + "# Example 3: Using a boolean mask with random selection\n", + "shape = 4530 * 2970 # Depends on your grid size\n", + "mask = np.random.choice([True, False], size=shape, p=[0.05, 0.95])\n", + "\n", + "source_mask = earthkit.data.from_source(\n", + " \"gribjump\",\n", + " request,\n", + " mask=mask,\n", + ")\n", + "print(\"Extracted dataset (mask):\", source_mask.to_xarray())" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/examples/index.rst b/docs/examples/index.rst index cfc8af796..02e9e551a 100644 --- a/docs/examples/index.rst +++ b/docs/examples/index.rst @@ -30,6 +30,7 @@ Data sources polytope_feature.ipynb s3.ipynb wekeo.ipynb + gribjump.ipynb GRIB ++++++ diff --git a/docs/guide/sources.rst b/docs/guide/sources.rst index a1262919b..3670dd0ea 100644 --- a/docs/guide/sources.rst +++ b/docs/guide/sources.rst @@ -66,6 +66,8 @@ We can get data from a given source by using :func:`from_source`: - retrieve data from `WEkEO`_ using the WEkEO grammar * - :ref:`data-sources-wekeocds` - retrieve `CDS `_ data stored on `WEkEO`_ using the `cdsapi`_ grammar + * - :ref:`data-sources-gribjump` + - retrieve data from the `FDB (Fields DataBase)`_ using the `gribjump`_ library * - :ref:`data-sources-zarr` - load data from a `Zarr `_ store @@ -1231,6 +1233,85 @@ wekeocds - :ref:`/examples/wekeo.ipynb` +.. _data-sources-gribjump: + +gribjump +-------- + +.. py:function:: from_source("gribjump", request, *, ranges=None, mask=None, indices=None, fetch_coords_from_fdb=False, fdb_kwargs=None, **kwargs) + :noindex: + + The ``gribjump`` source enables fast retrieval of GRIB message subsets from the `FDB (Fields DataBase)`_ using the `gribjump `_ library. + Both `pygribjump `_ and `pyfdb`_ must be installed. The `pygribjump`_ package uses `findlibs `_ to locate an installation of the `gribjump`_ library. + If the library is not available on your system, you can install it via the `gribjumplib `_ wheel from PyPI. + Installing `gribjumplib` from PyPI will also automatically install `fdb5lib `_ and other dependencies, which may take priority over any existing installations on your system. + + .. warning:: + ⚠️ This source is **experimental** and may change in future versions without + warning. It performs **no validation** that the specified grid indices, + masks, or ranges correspond to the fields' actual underlying grids. + **Incorrect usage may silently return wrong data points.** + The provided ranges or masks might correspond to unexpected points on the + grid. This source is also currently **not thread-safe**. + + Exactly one of the parameters ``ranges``, ``mask`` or ``indices`` must be specified at a time. + + :param request: the FDB request as a dictionary. GribJump requires strict value formatting + (e.g., hdates as "YYYYMMDD", not "YYYY-MM-DD"). Format errors may result in "DataNotFound" errors. + :type request: dict + :param ranges: a list of tuples specifying the ranges of 1D grid indices to retrieve in the form + [(start1, end1), (start2, end2), ...]. Ranges are exclusive, meaning that the end index is not included in the range. + :type ranges: list[tuple[int, int]], optional + :param mask: a 1D boolean mask specifying which grid points to retrieve + :type mask: numpy.array, optional + :param indices: a 1D array of grid indices to retrieve + :type indices: numpy.array, optional + :param fetch_coords_from_fdb: if ``True``, loads the first field's metadata from + the FDB to extract the coordinates at the specified indices. If ``False``, the + coordinates are not loaded and no separate FDB request is made. + Default is ``False``. Please note that no validation is performed to + ensure that all fields in the requests share the same grid. + :type fetch_coords_from_fdb: bool, optional + :param fdb_kwargs: only used when ``fetch_coords_from_fdb=True``. A dict of + keyword arguments passed to the `pyfdb.FDB` constructor. This allows to + specify the FDB configuration, user configuration, etc. If not provided, + the default configuration is used. These arguments are only passed to the + FDB when fetching coordinates and are not used by GribJump for the + extraction itself. + :type fdb_kwargs: dict, optional + + + The following example retrieves a subset from a GRIB message in the FDB using a boolean mask: + + .. code-block:: python + + import earthkit.data as ekd + import numpy as np + + request = { + "class": "od", + "type": "fc", + "stream": "oper", + "expver": "0001", + "repres": "gg", + "levtype": "sfc", + "param": "2t", + "date": "20250703", + "time": 0, + "step": list(range(0, 24, 6)), + "domain": "g", + } + + ranges = [(0, 10), (20, 30)] + + source = ekd.from_source("gribjump", request, ranges=ranges) + ds = source.to_xarray() + + Further examples: + + - :ref:`/examples/gribjump.ipynb` + + .. _data-sources-zarr: diff --git a/docs/install.rst b/docs/install.rst index e9be3ef16..f445107ea 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -51,6 +51,7 @@ Alternatively, you can install the following components: - covjsonkit: provides access to CoverageJSON data served by the :ref:`data-sources-polytope` source - s3: provides access to non-public :ref:`s3 ` buckets (new in version *0.11.0*) - geotiff: adds GeoTIFF support (new in version *0.11.0*). Please note that this is not included in the ``[all]`` option and has to be invoked separately. + - gribjump: provides access to the :ref:`data-sources-gribjump` source - zarr: provides access to the :ref:`data-sources-zarr` source (new in version *0.15.0*). Please note that this is not included in the ``[all]`` option and has to be invoked separately. E.g. to add :ref:`data-sources-mars` support you can use: @@ -85,3 +86,9 @@ FDB +++++ For FDB (Fields DataBase) access FDB5 must be installed on the system. See the `FDB documentation `_ for details. + + +GribJump +++++++++++++ + +For FDB access with GribJump, both FDB5 and GribJump must be installed on the system. See the `GribJump project `_ for details. diff --git a/pyproject.toml b/pyproject.toml index 20b959ef1..70e09b271 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ dependencies = [ "xarray>=0.19", ] optional-dependencies.all = [ - "earthkit-data[cds,covjsonkit,ecmwf-opendata,fdb,geo,geopandas,mars,odb,polytope,projection,s3,wekeo]", + "earthkit-data[cds,covjsonkit,ecmwf-opendata,fdb,geo,geopandas,gribjump,mars,odb,polytope,projection,s3,wekeo]", ] optional-dependencies.cds = [ "cdsapi>=0.7.2" ] optional-dependencies.ci = [ "numpy" ] @@ -70,6 +70,7 @@ optional-dependencies.fdb = [ "pyfdb>=0.1" ] optional-dependencies.geo = [ "earthkit-geo>=0.2" ] optional-dependencies.geopandas = [ "geopandas" ] optional-dependencies.geotiff = [ "pyproj", "rasterio", "rioxarray" ] +optional-dependencies.gribjump = [ "pyfdb>=0.1", "pygribjump" ] optional-dependencies.mars = [ "ecmwf-api-client>=1.6.1" ] optional-dependencies.odb = [ "pyodc" ] optional-dependencies.polytope = [ "polytope-client>=0.7.6" ] diff --git a/src/earthkit/data/sources/gribjump.py b/src/earthkit/data/sources/gribjump.py new file mode 100644 index 000000000..a510d4913 --- /dev/null +++ b/src/earthkit/data/sources/gribjump.py @@ -0,0 +1,497 @@ +# (C) Copyright 2020 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + +try: + import pygribjump as pygj +except ImportError: + raise ImportError("GribJump access requires 'pygribjump' to be installed") + +import dataclasses +import itertools +import os +from collections import UserList +from typing import Any +from typing import Optional + +import numpy as np + +from earthkit.data.indexing.fieldlist import SimpleFieldList +from earthkit.data.readers.grib.metadata import GribMetadata +from earthkit.data.sources import Source +from earthkit.data.sources.array_list import ArrayField +from earthkit.data.sources.fdb import FDBRetriever +from earthkit.data.utils.metadata.dict import UserMetadata + + +def split_mars_requests(request: dict[str, Any]) -> list[dict[str, Any]]: + """Splits a MARS request into individual single-field requests by expanding list values. + + Creates all possible combinations of list values in the request dictionary, + generating separate requests for each field. This is required because GribJump + returns result arrays without metadata, so each field must be requested individually + to map outputs correctly. + + NOTE: Parsing of MARS requests should ideally not be handled here but in a dedicated + component like pymetkit. Consider updating this function once something appropriate + is available. + + Parameters + ---------- + request : dict[str, Any] + The request dictionary containing MARS keywords with potentially list values. + List keys are sorted alphabetically to ensure deterministic ordering. + + Returns + ------- + list[dict[str, Any]] + A list of individual request dictionaries, each representing a single field. + + Raises + ------ + ValueError + If the request contains unsupported "/" syntax for lists/ranges or empty lists. + TypeError + If list values contain mixed types. + + Examples + -------- + >>> split_mars_requests({"param": ["2t", "msl"], "date": "20230101"}) + [{'param': '2t', 'date': '20230101'}, {'param': 'msl', 'date': '20230101'}] + + >>> split_mars_requests({"param": ["2t", "msl"], "step": [0, 6]}) + [{'param': '2t', 'step': 0}, {'param': '2t', 'step': 6}, + {'param': 'msl', 'step': 0}, {'param': 'msl', 'step': 6}] + """ + request = request.copy() + + # Validate request values + for k in request.keys(): + v = request[k] + if isinstance(v, str) and "/" in v: + raise ValueError( + f"Found unsupported list or range using '/' in value '{v}' for keyword '{k}'. " + "Use Python lists to load from multiple fields." + ) + elif isinstance(v, list) and len(v) == 0: + raise ValueError(f"Cannot expand dictionary with empty list. " f"Found empty list for key '{k}'.") + elif isinstance(v, list) and len({type(v_) for v_ in v}) != 1: + raise TypeError( + f"All list values must share the same type but found types {set(map(type, v))} " f"in {k}={v}" + ) + + list_keywords = sorted(k for k, v in request.items() if isinstance(v, list)) + lists = [request[k] for k in list_keywords] + + expanded_requests = [] + for combination in itertools.product(*lists): + new_request = request.copy() + for k, v in zip(list_keywords, combination): + new_request[k] = v + expanded_requests.append(new_request) + return expanded_requests + + +def mask_to_ranges(mask: np.ndarray) -> list[tuple[int, int]]: + """Converts a boolean mask to ranges of indices where the mask is True. + + Parameters + ---------- + mask : np.ndarray + A 1D boolean numpy array. + + Returns + ------- + list[tuple[int, int]] + A list of tuples representing the start and end indices of True segments in the mask. + """ + if not isinstance(mask, np.ndarray): + raise TypeError(f"Expected 'mask' to be a numpy array, got {type(mask)}") + if not np.issubdtype(mask.dtype, np.bool_): + raise ValueError(f"Expected 'mask' to be a boolean array, got {mask.dtype}") + if mask.ndim != 1: + # NOTE: We could relax this and allow 2D masks, which we flatten using .ravel(). + raise ValueError(f"Expected 'mask' to be a 1D numpy array, got {mask.ndim}D") + + padded = np.concatenate(([False], mask, [False])) + d = np.diff(padded.astype(int)) + starts = np.where(d == 1)[0] + ends = np.where(d == -1)[0] + + ranges = list(zip(starts, ends)) + return ranges + + +@dataclasses.dataclass +class ExtractionRequest: + """ + Simple wrapper of pygribjump.ExtractionRequest and the original FDB request dict. + + Can be removed once pygribjump.ExtractionRequest provides a reference to the request dictionary + with original MARS keyword types. + + Parameters + ---------- + extraction_request : pygj.ExtractionRequest + The GribJump extraction request object. + request : dict[str, str] + The original request dictionary used to create the extraction request. + """ + + extraction_request: pygj.ExtractionRequest + request: dict[str, Any] + + @property + def ranges(self) -> list[tuple[int, int]]: + """Returns the ranges of the extraction request.""" + return self.extraction_request.ranges + + def indices(self) -> np.ndarray: + """Returns the indices of the extraction request.""" + return self.extraction_request.indices() + + +def build_extraction_request( + request: dict[str, str], + ranges: Optional[list[tuple[int, int]]] = None, + mask: Optional[np.ndarray] = None, + indices: Optional[np.ndarray] = None, +) -> ExtractionRequest: + """ + Builds an ExtractionRequest from the given request dictionary and optional parameters. + + Parameters + ---------- + request : dict[str, str] + The request dictionary containing MARS keywords. + ranges : Optional[list[tuple[int, int]]], optional + The ranges for the extraction request, by default None. + mask : Optional[np.ndarray], optional + The mask for the extraction request, by default None. + indices : Optional[np.ndarray], optional + The indices for the extraction request, by default None. + + Returns + ------- + ExtractionRequest + The constructed ExtractionRequest object. + """ + stringified_request_dict = {k: str(v) for (k, v) in request.items()} + + if sum(opt is not None for opt in (ranges, mask, indices)) != 1: + raise ValueError( + "Exactly one of 'ranges', 'mask' or 'indices' must be set. " f"Got {ranges=}, {mask=}, {indices=}" + ) + + if ranges is not None: + extraction_request = pygj.ExtractionRequest(stringified_request_dict, ranges) + elif mask is not None: + extraction_request = pygj.ExtractionRequest.from_mask(stringified_request_dict, mask) + elif indices is not None: + extraction_request = pygj.ExtractionRequest.from_indices(stringified_request_dict, indices) + else: + raise ValueError("No valid extraction method specified. Provide either ranges, mask, or indices.") + + return ExtractionRequest(extraction_request, request) + + +class ExtractionRequestCollection(UserList): + + @classmethod + def from_mars_requests( + cls, + mars_requests: list[dict[str, str]], + ranges: Optional[list[tuple[int, int]]] = None, + mask: Optional[np.ndarray] = None, + indices: Optional[np.ndarray] = None, + ) -> "ExtractionRequestCollection": + """Creates an ExtractionRequestCollection from MARS requests. + + One of the parameters `ranges`, `mask`, or `indices` must be provided. + + Parameters + ---------- + mars_requests : list[dict[str, str]] + List of MARS requests, each represented as a dictionary of keywords. + ranges : Optional[list[tuple[int, int]]], optional + The ranges for the extraction requests, by default None. + mask : Optional[np.ndarray], optional + The mask for the extraction requests, by default None. + indices : Optional[np.ndarray], optional + The indices for the extraction requests, by default None. + Returns + ------- + ExtractionRequestCollection + A collection of ExtractionRequest objects created from the MARS requests. + """ + + if sum(opt is not None for opt in (ranges, mask, indices)) != 1: + raise ValueError( + "Exactly one of 'ranges', 'mask' or 'indices' must be set. " + f"Got {ranges=}, {mask=}, {indices=}" + ) + + if mask is not None: + # Since PyGribJump converts the mask to ranges internally, + # we convert it to ranges here once to avoid doing this multiple times. + ranges = mask_to_ranges(mask) + mask = None + + extraction_requests = [build_extraction_request(req, ranges, mask, indices) for req in mars_requests] + return cls(extraction_requests) + + +class FieldExtractList(SimpleFieldList): + """Lazily loaded representation of points extracted from multiple fields using GribJump. + + .. warning:: + This implementation is **not thread-safe**. Concurrent access from multiple threads + may result in race conditions during lazy loading. Use appropriate synchronization + if accessing from multiple threads. + + .. note:: + This class should not be instantiated directly. Use the ``gribjump`` source instead: + ``earthkit.data.from_source("gribjump", request, ranges=ranges)`` + + This class inherits from SimpleFieldList and provides lazy loading of grid point + extractions from GRIB fields via GribJump. FieldList operations like ``sel()``, + ``group_by()``, etc. might work but are not guaranteed to be fully functional. + + Known Limitations + ----------------- + * **No validation**: Grid indices are not validated against actual field grids. + Incorrect indices may return unexpected grid points. + * **Not thread-safe**: Concurrent access may cause race conditions during lazy loading. + * **Limited metadata**: Only metadata from the request dictionary is available, + except for latitude/longitude coordinates when ``fetch_coords_from_fdb=True`` is used. + * **No efficient slicing**: Lazy loading of selections/slices is not supported. + * **Serialization issues**: Pickling/unpickling might not work reliably. + """ + + def __init__( + self, + gj: pygj.GribJump, + requests: ExtractionRequestCollection, + fdb_retriever: Optional[FDBRetriever] = None, + ): + self._gj = gj + self._requests = requests + self._fdb_retriever = fdb_retriever + + # These attributes are set lazily after loading the data. + self._loaded = False + self._grid_indices = None + self._reference_metadata: Optional[GribMetadata] = None + + super().__init__(fields=None) + + def __len__(self): + self._load() + return super().__len__() + + def __getitem__(self, n): + self._load() + return super().__getitem__(n) + + def _load(self): + if self._loaded: + return + + extraction_requests = [req.extraction_request for req in self._requests] + + context = {"origin": "earthkit-data"} + extraction_results = self._gj.extract(extraction_requests, ctx=context) + + fields = [] + indices = None + ranges = None + for request, result in zip(self._requests, extraction_results): + if ranges is None: + ranges = request.ranges + indices = request.indices() + else: + if request.ranges != ranges: + raise ValueError( + f"Extraction request has different ranges than the first request: {request.ranges} != {ranges}" + ) + arr = result.values_flat + shape = arr.shape + + metadata = UserMetadata(request.request, shape=shape) + metadata = self._enrich_metadata_with_coordinates(indices, metadata) + + field = ArrayField(arr, metadata) + fields.append(field) + + self.fields = fields + self._loaded = True + self._grid_indices = indices + + def _load_reference_metadata(self): + """Loads the reference metadata from the FDB retriever if available.""" + if self._fdb_retriever is None: + return None + if self._reference_metadata is not None: + return self._reference_metadata + + fields = self._fdb_retriever.get(self._requests[0].request) + metadatas = fields.metadata() + if not metadatas: + raise ValueError("FDB retriever returned no metadata.") + if len(metadatas) != 1: + raise ValueError(f"Expected exactly one metadata for the first request, got {len(metadatas)}.") + metadata = metadatas[0] + assert isinstance(metadata, GribMetadata), type(metadata) + self._reference_metadata = metadata + return metadata + + def _enrich_metadata_with_coordinates(self, indices: np.ndarray, metadata: UserMetadata) -> UserMetadata: + """Enriches the metadata with coordinates if reference metadata is available.""" + if (reference_metadata := self._load_reference_metadata()) is None: + return metadata + + reference_geography = reference_metadata.geography + grid_latitudes = reference_geography.latitudes()[indices] + grid_longitudes = reference_geography.longitudes()[indices] + metadata = metadata.override( + { + "latitudes": grid_latitudes, + "longitudes": grid_longitudes, + } + ) + return metadata + + def to_xarray(self, *args, **kwargs): + kwargs = kwargs.copy() + + flatten_values = kwargs.setdefault("flatten_values", True) + rename_dims = kwargs.setdefault("rename_dims", {"values": "index"}) + if not flatten_values: + raise ValueError( + "GribJump source only supports flattening of values. " + "Please skip the 'flatten_values' argument or set it to True." + ) + if rename_dims.get("values") != "index": + raise ValueError( + "GribJump source does not support renaming 'values' dimension. " + "Please remove 'values' from 'rename_dims' argument." + ) + + ds = super().to_xarray(*args, **kwargs) + ds = ds.assign_coords({"index": self._grid_indices}) + return ds + + def to_pandas(self, *args, **kwargs): + self._not_implemented() + + +class GribJumpSource(Source): + """Source for extracting grid points from GRIB messages in an FDB with GribJump. + + ⚠️ This source is experimental and may change in future versions without + warning. It performs no validation that the specified grid indices + correspond to the fields' actual underlying grids. The provided ranges + might, therefore, correspond to unexpected points on the grid. This source + is also currently not thread-safe. + """ + + def __init__( + self, + request: dict, + *, + ranges: Optional[list[tuple[int, int]]] = None, + mask: Optional[np.ndarray] = None, + indices: Optional[np.ndarray] = None, + fetch_coords_from_fdb: bool = False, + fdb_kwargs: Optional[dict[str, Any]] = None, + **kwargs, + ): + """ + Parameters + ---------- + request : dict + The MARS request dictionary describing the fields to retrieve. + ranges : Optional[list[tuple[int, int]]], optional + The ranges of grid indices to retrieve, by default None. + mask : Optional[np.ndarray], optional + A 1D boolean mask specifying which grid points to retrieve, by default None. + indices : Optional[np.ndarray], optional + A 1D array of grid indices to retrieve, by default None. + fetch_coords_from_fdb : bool, optional + If set to True, loads the first field's metadata from the FDB to extract the coordinates + at the specified indices. + fdb_kwargs : Optional[dict[str, Any]], optional + Only used when `fetch_coords_from_fdb=True`. A dict of + keyword arguments passed to the `pyfdb.FDB` constructor. These arguments are only passed + to the FDB when fetching coordinates and is not used by GribJump for the extraction itself. + """ + + super().__init__(**kwargs) + + if sum(opt is not None for opt in (ranges, mask, indices)) != 1: + raise ValueError( + "Exactly one of 'ranges', 'mask' or 'indices' must be set. " + f"Got {ranges=}, {mask=}, {indices=}" + ) + self._ranges = ranges + self._mask = mask + self._indices = indices + + self._check_env() + self._gj = pygj.GribJump() + + self._coords_from_fdb = fetch_coords_from_fdb + self._fdb_kwargs = fdb_kwargs if fdb_kwargs is not None else {} + self._mars_requests = split_mars_requests(request) + + def _check_env(self): + fdb_home = os.environ.get("FDB_HOME", None) + fdb_config = os.environ.get("FDB5_CONFIG", None) + fdb_config_file = os.environ.get("FDB5_CONFIG_FILE", None) + + gj_home = os.environ.get("GRIBJUMP_HOME", None) + gj_config_file = os.environ.get("GRIBJUMP_CONFIG_FILE", None) + gj_ignore_grid = os.environ.get("GRIBJUMP_IGNORE_GRID", None) + + if fdb_home is None and fdb_config is None and fdb_config_file is None: + raise RuntimeError( + """Neither FDB_HOME, FDB5_CONFIG, nor FDB5_CONFIG_FILE environment variable + was set! Please define at least one to access FDB. + See: https://fields-database.readthedocs.io for details about FDB.""" + ) + if gj_home is None and gj_config_file is None: + raise RuntimeError( + """Neither GRIBJUMP_HOME nor GRIBJUMP_CONFIG_FILE environment variable + was set! Please define at least one to access GribJump. + See: https://github.com/ecmwf/gribjump for details about GribJump.""" + ) + if gj_ignore_grid is None: + # We could consider setting this automatically but this would need + # to be done carefully to not accidentally activate this for other + # gribjump accesses (e.g. through polytope). + raise RuntimeError( + "Environment variable 'GRIBJUMP_IGNORE_GRID' is not set but " + "must be set (to '1' or 'True') for the 'gribjump' source to work." + ) + + def mutate(self): + extraction_requests = ExtractionRequestCollection.from_mars_requests( + self._mars_requests, + ranges=self._ranges, + mask=self._mask, + indices=self._indices, + ) + fdb_retriever = FDBRetriever(self._fdb_kwargs) if self._coords_from_fdb else None + return FieldExtractList( + self._gj, + requests=extraction_requests, + fdb_retriever=fdb_retriever, + ) + + +source = GribJumpSource diff --git a/src/earthkit/data/testing.py b/src/earthkit/data/testing.py index ef1dd6f21..2278c04c7 100644 --- a/src/earthkit/data/testing.py +++ b/src/earthkit/data/testing.py @@ -130,7 +130,7 @@ def modules_installed(*modules): fdb_home = os.environ.get("FDB_HOME", None) NO_PROD_FDB = fdb_home is None - +NO_GRIBJUMP = NO_FDB or not modules_installed("pygribjump") NO_POLYTOPE = not os.path.exists(os.path.expanduser("~/.polytopeapirc")) NO_COVJSONKIT = not modules_installed("covjsonkit") NO_RIOXARRAY = not modules_installed("rioxarray") diff --git a/tests/data/t_gribjump.grib b/tests/data/t_gribjump.grib new file mode 100644 index 000000000..ced96dc07 Binary files /dev/null and b/tests/data/t_gribjump.grib differ diff --git a/tests/documentation/test_notebooks.py b/tests/documentation/test_notebooks.py index df0d82cd3..88f6b6bb9 100644 --- a/tests/documentation/test_notebooks.py +++ b/tests/documentation/test_notebooks.py @@ -25,25 +25,26 @@ EXAMPLES = earthkit_file("docs", "examples") SKIP = [ + "ads.ipynb", + "cds.ipynb", + "demo_source_plugin.ipynb", + "ecmwf_open_data.ipynb", "fdb.ipynb", + "grib_fdb_write.ipynb", + "grib_to_fdb_target.ipynb", + "grib_to_xarray.ipynb", + "gribjump.ipynb", "mars.ipynb", - "cds.ipynb", - "ads.ipynb", - "wekeo.ipynb", + "netcdf_opendap.ipynb", "polytope.ipynb", "polytope_feature.ipynb", "polytope_polygon_coverage.ipynb", "polytope_time_series.ipynb", "polytope_vertical_profile.ipynb", - "grib_fdb_write.ipynb", - "demo_source_plugin.ipynb", - "ecmwf_open_data.ipynb", "shapefile.ipynb", - "grib_to_xarray.ipynb", - "grib_to_fdb_target.ipynb", - "xarray_engine_chunks_on_dask_cluster.ipynb", + "wekeo.ipynb", "xarray_cupy.ipynb", - "netcdf_opendap.ipynb", + "xarray_engine_chunks_on_dask_cluster.ipynb", ] if NO_TORCH: diff --git a/tests/sources/test_gribjump.py b/tests/sources/test_gribjump.py new file mode 100644 index 000000000..020a8ae09 --- /dev/null +++ b/tests/sources/test_gribjump.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python3 + +# (C) Copyright 2020 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + + +import shutil +from pathlib import Path + +import pytest +import yaml + +from earthkit.data import from_source +from earthkit.data.core.temporary import temp_directory +from earthkit.data.core.temporary import temp_env +from earthkit.data.testing import NO_GRIBJUMP +from earthkit.data.testing import earthkit_test_data_file + + +@pytest.fixture +def setup_fdb_with_gribjump(): + import pyfdb + + with temp_directory() as tmpdir: + fdb_dir = Path(tmpdir) / "fdb" + fdb_dir.mkdir(exist_ok=True) + + # Copy of FDB schema + fdb_schema = earthkit_test_data_file("fdb_schema.txt") + shutil.copy(fdb_schema, fdb_dir / "schema") + + # FDB config + fdb_config = { + "type": "local", + "engine": "toc", + "schema": str(fdb_dir / "schema"), + "spaces": [{"handler": "Default", "roots": [{"path": str(fdb_dir)}]}], + } + fdb_config_path = fdb_dir / "config.yaml" + fdb_config_path.write_text(yaml.dump(fdb_config)) + + # Gribjump config + gj_config = { + "plugin": { + "select": "class=(.)", + } + } + gj_config_path = fdb_dir / "gribjump.yaml" + gj_config_path.write_text(yaml.dump(gj_config)) + + with temp_env( + FDB5_CONFIG_FILE=str(fdb_config_path), + FDB_ENABLE_GRIBJUMP="1", + FDB_HOME=str(fdb_dir), + GRIBJUMP_CONFIG_FILE=str(gj_config_path), + GRIBJUMP_IGNORE_GRID="1", + ): + fdb = pyfdb.FDB(config=fdb_config) + yield fdb + + +@pytest.fixture +def seed_fdb(setup_fdb_with_gribjump): + ds = from_source("file", earthkit_test_data_file("t_gribjump.grib")) + for f in ds: + setup_fdb_with_gribjump.archive(f.message()) + setup_fdb_with_gribjump.flush() + yield setup_fdb_with_gribjump + + +@pytest.fixture +def ranges(): + return dict(ranges=[(0, 1), (5, 9), (25, 27)]) + + +@pytest.fixture +def indices(): + import numpy as np + + return dict(indices=np.array([0, 5, 6, 7, 8, 25, 26])) + + +@pytest.fixture +def mask(): + import numpy as np + + mask = np.zeros((7, 12), dtype=bool) + mask[0, 0] = True + mask[0, 5:9] = True + mask[2, 1:3] = True + mask = mask.ravel() + return dict(mask=mask) + + +@pytest.fixture +def arr_expected(): + import numpy as np + + arr_expected = np.array( + [ + [ + 1743.06591797, + 1743.06591797, + 1743.06591797, + 1743.06591797, + 1743.06591797, + 1607.31591797, + 1721.81591797, + ], + [ + 1641.43701172, + 1641.43701172, + 1641.43701172, + 1641.43701172, + 1641.43701172, + 1702.31201172, + 1887.18701172, + ], + ] + ) + return arr_expected + + +@pytest.fixture +def ds_expected_with_coords(): + import numpy as np + import xarray as xr + + arr_expected = np.array( + [ + [ + 1743.06591797, + 1743.06591797, + 1743.06591797, + 1743.06591797, + 1743.06591797, + 1607.31591797, + 1721.81591797, + ], + [ + 1641.43701172, + 1641.43701172, + 1641.43701172, + 1641.43701172, + 1641.43701172, + 1702.31201172, + 1887.18701172, + ], + ] + ) + latitude_expected = np.array( + [ + 90.0, + 90.0, + 90.0, + 90.0, + 90.0, + 30.0, + 30.0, + ] + ) + longitude_expected = np.array( + [ + 0.0, + 150.0, + 180.0, + 210.0, + 240.0, + 30.0, + 60.0, + ] + ) + ds_expected = xr.Dataset( + {"129": (("step", "index"), arr_expected)}, + coords={ + "step": np.array([0, 21600000000000], dtype="timedelta64[ns]"), + "index": np.array([0, 5, 6, 7, 8, 25, 26]), + "latitude": ("index", latitude_expected), + "longitude": ("index", longitude_expected), + }, + attrs={ + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "stream": "oper", + "param": "129", + "time": "1200", + "type": "fc", + "Conventions": "CF-1.8", + "institution": "ECMWF", + }, + ) + return ds_expected + + +@pytest.fixture +def ds_expected(ds_expected_with_coords): + # Remove coordinates to match the expected output + ds = ds_expected_with_coords.drop_vars(["latitude", "longitude"]) + return ds + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +def test_split_mars_requests(): + from earthkit.data.sources.gribjump import split_mars_requests + + request = { + "step": [0, 6, 12], + "param": ["129", "130"], + "date": "20230101", + "time": "1200", + } + result = split_mars_requests(request) + assert result == [ + {"param": "129", "step": 0, "date": "20230101", "time": "1200"}, + {"param": "129", "step": 6, "date": "20230101", "time": "1200"}, + {"param": "129", "step": 12, "date": "20230101", "time": "1200"}, + {"param": "130", "step": 0, "date": "20230101", "time": "1200"}, + {"param": "130", "step": 6, "date": "20230101", "time": "1200"}, + {"param": "130", "step": 12, "date": "20230101", "time": "1200"}, + ] + + assert split_mars_requests({}) == [{}] + assert split_mars_requests({"a": 1}) == [{"a": 1}] + assert split_mars_requests({"a": 1, "b": 2}) == [{"a": 1, "b": 2}] + + # Error: empty list + with pytest.raises(ValueError, match="Cannot expand dictionary with empty list"): + split_mars_requests({"a": 1, "b": []}) + + # Error: unsupported "/" syntax + with pytest.raises(ValueError, match="Found unsupported list or range using '/'"): + split_mars_requests({"param": "129/130", "date": "20230101"}) + + # Error: mixed types in lists + request_mixed = {"param": [129, "130"], "date": "20230101"} + with pytest.raises(TypeError, match="All list values must share the same type"): + split_mars_requests(request_mixed) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +@pytest.mark.parametrize( + "mask,expected_ranges", + [ + ([False, False, False], []), + ([True, True, False], [(0, 2)]), + ([False, False, True], [(2, 3)]), + ([False, False, True, True, False, True, False], [(2, 4), (5, 6)]), + ([True, False, True, True, True, False, True], [(0, 1), (2, 5), (6, 7)]), + ], +) +def test_mask_to_ranges(mask, expected_ranges): + import numpy as np + + from earthkit.data.sources.gribjump import mask_to_ranges + + mask = np.array(mask, dtype=bool) + result = mask_to_ranges(mask) + assert result == expected_ranges + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +@pytest.mark.parametrize( + "mask,error_type", + [ + ([], ValueError), + ([1, 0, 1], ValueError), + ([[False, True], [True, False]], ValueError), + ], +) +def test_mask_to_ranges_errors(mask, error_type): + import numpy as np + + from earthkit.data.sources.gribjump import mask_to_ranges + + with pytest.raises(error_type): + mask_to_ranges(np.array(mask)) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +@pytest.mark.parametrize("method", ["ranges", "indices", "mask"]) +def test_gribjump_to_numpy(seed_fdb, arr_expected, method, request): + import numpy as np + + kwargs = request.getfixturevalue(method) + mars_request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, 6], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + source = from_source("gribjump", mars_request, **kwargs) + arr = source.to_numpy() + + assert arr is not None and isinstance(arr, np.ndarray) + assert arr.shape == (2, 7) + np.testing.assert_array_almost_equal(arr, arr_expected) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +@pytest.mark.parametrize("method", ["ranges", "indices", "mask"]) +def test_gribjump_to_xarray_without_coords(seed_fdb, ds_expected, method, request): + import xarray as xr + + kwargs = request.getfixturevalue(method) + mars_request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, 6], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + source = from_source("gribjump", mars_request, **kwargs) + ds = source.to_xarray() + + xr.testing.assert_allclose(ds, ds_expected) + assert ds_expected.attrs == ds.attrs + assert set(ds_expected.coords.keys()) == set(ds.coords.keys()) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +@pytest.mark.parametrize("method", ["ranges", "indices", "mask"]) +def test_gribjump_to_xarray_with_coords(seed_fdb, ds_expected_with_coords, method, request): + import xarray as xr + + kwargs = request.getfixturevalue(method) + mars_request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, 6], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + source = from_source("gribjump", mars_request, fetch_coords_from_fdb=True, **kwargs) + ds = source.to_xarray() + + xr.testing.assert_allclose(ds, ds_expected_with_coords) + assert ds_expected_with_coords.attrs == ds.attrs + assert set(ds_expected_with_coords.coords.keys()) == set(ds.coords.keys()) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +def test_gribjump_selection(seed_fdb): + import numpy as np + + request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, 6], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + indices = np.array([0, 7, 14, 21, 28, 35, 42]) + source = from_source("gribjump", request, indices=indices) + + arr_orig = source.to_numpy() + arr_subset = source.sel(step=6).to_numpy() + + assert arr_subset.shape == (1, 7) + assert np.allclose(arr_orig[[1]], arr_subset) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +def test_gribjump_to_xarray_with_coords_does_not_fail_for_grids(seed_fdb): + mars_request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, 6], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + source = from_source("gribjump", mars_request, fetch_coords_from_fdb=True, indices=[0]) + ds = source.to_xarray() + assert set(ds.dims) == {"step", "index"} + assert set(ds.coords) == {"step", "index", "latitude", "longitude"} + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +def test_gribjump_with_mixed_types_in_lists(seed_fdb): + + request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, "6"], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + with pytest.raises(TypeError): + from_source("gribjump", request, ranges=[(1, 2)]) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +def test_gribjump_with_invalid_options(seed_fdb): + import numpy as np + + request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, 6], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + with pytest.raises(ValueError, match="Exactly one of"): + from_source( + "gribjump", + request, + ) + + with pytest.raises(ValueError, match="Exactly one of"): + from_source( + "gribjump", + request, + ranges=[(0, 1), (10, 12)], + indices=np.array([0, 7, 14, 21, 28, 35, 42]), + ) + + +@pytest.mark.skipif(NO_GRIBJUMP, reason="pygribjump or pyfdb not available") +def test_gribjump_with_invalid_mask(seed_fdb): + import numpy as np + + request = { + "class": "od", + "date": "20201221", + "domain": "g", + "expver": "xxxx", + "levelist": "1000", + "levtype": "pl", + "param": "129", + "step": [0, 6], + "stream": "oper", + "time": "1200", + "type": "fc", + } + + with pytest.raises(ValueError, match="Expected 'mask' to be a 1D numpy array"): + mask = np.array([[True, False], [False, True]], dtype=bool) + from_source( + "gribjump", + request, + mask=mask, + ) + + with pytest.raises(ValueError, match="Expected 'mask' to be a boolean array"): + mask = np.array([1, 0, 1], dtype=int) + from_source( + "gribjump", + request, + mask=mask, + ) + + with pytest.raises(TypeError, match="Expected 'mask' to be a numpy array"): + mask = [True, False, True] + from_source( + "gribjump", + request, + mask=mask, + ) + + +if __name__ == "__main__": + from earthkit.data.testing import main + + main(__file__)