Skip to content

Add a GRIB backend via ECMWF cfgrib / ecCodes #2476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
72606f7
Integration of ECMWF cfgrib driver to read GRIB files into xarray.
alexamici Oct 9, 2018
71fcbe7
Remove all coordinate renaming from the cfgrib backend.
alexamici Oct 9, 2018
6faa7b9
Move flavour selection to `cfgrib.Dataset.from_path`.
alexamici Oct 9, 2018
1469a0e
Sync xarray backend import style with xarray.
alexamici Oct 9, 2018
12811e8
Make use of the new xarray.backends.FileCachingManager.
alexamici Oct 9, 2018
a4409b6
Add just-in-case locking for ecCodes.
alexamici Oct 9, 2018
80b8788
Explicitly assign attributes to CfGribArrayWrapper
alexamici Oct 10, 2018
9dfd660
Add missing locking in CfGribArrayWrapper and use explicit_indexing_a…
alexamici Oct 10, 2018
edc4e85
Add a comment about the ugly work-around needed for filter_by_keys.
alexamici Oct 10, 2018
9b5335a
Declare correct indexing support.
alexamici Oct 10, 2018
186a504
Merge branch 'upstream' into feature/grib-support-via-cfgrib
alexamici Oct 14, 2018
485a409
Add TestCfGrib test class.
alexamici Oct 14, 2018
81f18c2
cfgrib doesn't store a file reference so no need for CachingFileManager.
alexamici Oct 14, 2018
5dedb3f
Add cfgrib testing to Travis-CI.
alexamici Oct 14, 2018
831ae4f
Naming.
alexamici Oct 14, 2018
6372e6e
Fix line lengths and get to 100% coverage.
alexamici Oct 14, 2018
8e9b2e3
Add reference to *cfgrib* engine in inline docs.
alexamici Oct 14, 2018
07b9469
First cut of the documentation.
alexamici Oct 14, 2018
340720a
Tentative test cfgrib under dask.distributed.
alexamici Oct 14, 2018
4d84f70
Better integration test.
alexamici Oct 14, 2018
0b027db
Remove explicit copyright and license boilerplate to harmonise with o…
alexamici Oct 15, 2018
a4ead54
Add a usage example.
alexamici Oct 15, 2018
ec80d86
Fix code style.
alexamici Oct 15, 2018
f30b7d0
Fix doc style.
alexamici Oct 16, 2018
223d25c
Fix docs testing. The example.grib file is not accessible.
alexamici Oct 17, 2018
2ef993f
Merge remote-tracking branch 'upstream/master' into feature/grib-supp…
alexamici Oct 17, 2018
bbf01e3
Fix merge in docs.
alexamici Oct 17, 2018
da2b9dd
Fix merge in docs.
alexamici Oct 17, 2018
eda96a4
Fix doc style.
alexamici Oct 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/requirements-py36.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ dependencies:
- bottleneck
- zarr
- pseudonetcdf>=3.0.1
- eccodes
- pip:
- coveralls
- pytest-cov
- pydap
- lxml
- cfgrib
4 changes: 3 additions & 1 deletion doc/installing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ For netCDF and IO
- `rasterio <https://github.com/mapbox/rasterio>`__: for reading GeoTiffs and
other gridded raster datasets.
- `iris <https://github.com/scitools/iris>`__: for conversion to and from iris'
Cube objects.
Cube objects
- `cfgrib <https://github.com/ecmwf/cfgrib>`__: for reading GRIB files via the
*ECMWF ecCodes* library.

For accelerating xarray
~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
22 changes: 22 additions & 0 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,28 @@ For example:
Not all native zarr compression and filtering options have been tested with
xarray.

.. _io.cfgrib:

GRIB format via cfgrib
----------------------

xarray supports reading GRIB files via ECMWF cfgrib_ python driver and ecCodes_
C-library, if they are installed. To open a GRIB file supply ``engine='cfgrib'``
to :py:func:`~xarray.open_dataset`:

.. ipython::
:verbatim:

In [1]: ds_grib = xr.open_dataset('example.grib', engine='cfgrib')

We recommend installing ecCodes via conda::

conda install -c conda-forge eccodes
pip install cfgrib

.. _cfgrib: https://github.com/ecmwf/cfgrib
.. _ecCodes: https://confluence.ecmwf.int/display/ECC/ecCodes+Home

.. _io.pynio:

Formats supported by PyNIO
Expand Down
6 changes: 5 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ Enhancements
:py:meth:`~xarray.DataArray.interp`, and
:py:meth:`~xarray.Dataset.interp`.
By `Spencer Clark <https://github.com/spencerkclark>`_.

- Added a new backend for the GRIB file format based on ECMWF *cfgrib*
python driver and *ecCodes* C-library. (:issue:`2475`)
By `Alessandro Amici <https://github.com/alexamici>`_,
sponsored by `ECMWF <https://github.com/ecmwf>`_.

Bug fixes
~~~~~~~~~

Expand Down
2 changes: 2 additions & 0 deletions xarray/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
from .common import AbstractDataStore
from .file_manager import FileManager, CachingFileManager, DummyFileManager
from .cfgrib_ import CfGribDataStore
from .memory import InMemoryDataStore
from .netCDF4_ import NetCDF4DataStore
from .pydap_ import PydapDataStore
Expand All @@ -18,6 +19,7 @@
'AbstractDataStore',
'FileManager',
'CachingFileManager',
'CfGribDataStore',
'DummyFileManager',
'InMemoryDataStore',
'NetCDF4DataStore',
Expand Down
12 changes: 9 additions & 3 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
decode_coords : bool, optional
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'pseudonetcdf'}, optional
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib',
'pseudonetcdf'}, optional
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
Expand Down Expand Up @@ -296,6 +297,9 @@ def maybe_decode_store(store, lock=False):
elif engine == 'pseudonetcdf':
store = backends.PseudoNetCDFDataStore.open(
filename_or_obj, lock=lock, **backend_kwargs)
elif engine == 'cfgrib':
store = backends.CfGribDataStore(
filename_or_obj, lock=lock, **backend_kwargs)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)
Expand Down Expand Up @@ -356,7 +360,8 @@ def open_dataarray(filename_or_obj, group=None, decode_cf=True,
decode_coords : bool, optional
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio'}, optional
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib'},
optional
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
Expand Down Expand Up @@ -486,7 +491,8 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
of all non-null values.
preprocess : callable, optional
If provided, call this function on each dataset prior to concatenation.
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio'}, optional
engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', 'cfgrib'},
optional
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
Expand Down
78 changes: 78 additions & 0 deletions xarray/backends/cfgrib_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import absolute_import, division, print_function

import numpy as np

from .. import Variable
from ..core import indexing
from ..core.utils import Frozen, FrozenOrderedDict
from .common import AbstractDataStore, BackendArray
from .locks import ensure_lock, SerializableLock

# FIXME: Add a dedicated lock, even if ecCodes is supposed to be thread-safe
# in most circumstances. See:
# https://confluence.ecmwf.int/display/ECC/Frequently+Asked+Questions
ECCODES_LOCK = SerializableLock()


class CfGribArrayWrapper(BackendArray):
def __init__(self, datastore, array):
self.datastore = datastore
self.shape = array.shape
self.dtype = array.dtype
self.array = array

def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key, self.shape, indexing.IndexingSupport.OUTER, self._getitem)

def _getitem(self, key):
with self.datastore.lock:
return self.array[key]


class CfGribDataStore(AbstractDataStore):
"""
Implements the ``xr.AbstractDataStore`` read-only API for a GRIB file.
"""
def __init__(self, filename, lock=None, **backend_kwargs):
import cfgrib
if lock is None:
lock = ECCODES_LOCK
self.lock = ensure_lock(lock)

# NOTE: filter_by_keys is a dict, but CachingFileManager only accepts
# hashable types.
if 'filter_by_keys' in backend_kwargs:
filter_by_keys_items = backend_kwargs['filter_by_keys'].items()
backend_kwargs['filter_by_keys'] = tuple(filter_by_keys_items)

self.ds = cfgrib.open_file(filename, mode='r', **backend_kwargs)

def open_store_variable(self, name, var):
if isinstance(var.data, np.ndarray):
data = var.data
else:
wrapped_array = CfGribArrayWrapper(self, var.data)
data = indexing.LazilyOuterIndexedArray(wrapped_array)

encoding = self.ds.encoding.copy()
encoding['original_shape'] = var.data.shape

return Variable(var.dimensions, data, var.attributes, encoding)

def get_variables(self):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in self.ds.variables.items())

def get_attrs(self):
return Frozen(self.ds.attributes)

def get_dimensions(self):
return Frozen(self.ds.dimensions)

def get_encoding(self):
dims = self.get_dimensions()
encoding = {
'unlimited_dims': {k for k, v in dims.items() if v is None},
}
return encoding
1 change: 1 addition & 0 deletions xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def LooseVersion(vstring):
has_zarr, requires_zarr = _importorskip('zarr', minversion='2.2')
has_np113, requires_np113 = _importorskip('numpy', minversion='1.13.0')
has_iris, requires_iris = _importorskip('iris')
has_cfgrib, requires_cfgrib = _importorskip('cfgrib')

# some special cases
has_scipy_or_netCDF4 = has_scipy or has_netCDF4
Expand Down
Binary file added xarray/tests/data/example.grib
Binary file not shown.
24 changes: 23 additions & 1 deletion xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
has_dask, has_netCDF4, has_scipy, network, raises_regex, requires_cftime,
requires_dask, requires_h5netcdf, requires_netCDF4, requires_pathlib,
requires_pseudonetcdf, requires_pydap, requires_pynio, requires_rasterio,
requires_scipy, requires_scipy_or_netCDF4, requires_zarr)
requires_scipy, requires_scipy_or_netCDF4, requires_zarr, requires_cfgrib)
from .test_dataset import create_test_data

try:
Expand Down Expand Up @@ -2463,6 +2463,28 @@ def test_weakrefs(self):
assert_identical(actual, expected)


@requires_cfgrib
class TestCfGrib(object):

def test_read(self):
expected = {'number': 2, 'time': 3, 'air_pressure': 2, 'latitude': 3,
'longitude': 4}
with open_example_dataset('example.grib', engine='cfgrib') as ds:
assert ds.dims == expected
assert list(ds.data_vars) == ['z', 't']
assert ds['z'].min() == 12660.

def test_read_filter_by_keys(self):
kwargs = {'filter_by_keys': {'shortName': 't'}}
expected = {'number': 2, 'time': 3, 'air_pressure': 2, 'latitude': 3,
'longitude': 4}
with open_example_dataset('example.grib', engine='cfgrib',
backend_kwargs=kwargs) as ds:
assert ds.dims == expected
assert list(ds.data_vars) == ['t']
assert ds['t'].min() == 231.


@requires_pseudonetcdf
@pytest.mark.filterwarnings('ignore:IOAPI_ISPH is assumed to be 6370000')
class TestPseudoNetCDFFormat(object):
Expand Down
19 changes: 17 additions & 2 deletions xarray/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import xarray as xr
from xarray.backends.locks import HDF5_LOCK, CombinedLock
from xarray.tests.test_backends import (ON_WINDOWS, create_tmp_file,
create_tmp_geotiff)
create_tmp_geotiff,
open_example_dataset)
from xarray.tests.test_dataset import create_test_data

from . import (
assert_allclose, has_h5netcdf, has_netCDF4, requires_rasterio, has_scipy,
requires_zarr, raises_regex)
requires_zarr, requires_cfgrib, raises_regex)

# this is to stop isort throwing errors. May have been easier to just use
# `isort:skip` in retrospect
Expand Down Expand Up @@ -142,6 +143,20 @@ def test_dask_distributed_rasterio_integration_test(loop):
assert_allclose(actual, expected)


@requires_cfgrib
def test_dask_distributed_cfgrib_integration_test(loop):
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as c:
with open_example_dataset('example.grib',
engine='cfgrib',
chunks={'time': 1}) as ds:
with open_example_dataset('example.grib',
engine='cfgrib') as expected:
assert isinstance(ds['t'].data, da.Array)
actual = ds.compute()
assert_allclose(actual, expected)


@pytest.mark.skipif(distributed.__version__ <= '1.19.3',
reason='Need recent distributed version to clean up get')
@gen_cluster(client=True, timeout=None)
Expand Down