Skip to content

Parallel open_mfdataset #1983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 20, 2018
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a364b22
proof of concept implementation for parallel open using dask.bag
Mar 7, 2018
9397aca
Merge branch 'master' of github.com:pydata/xarray into feature/parall…
Mar 12, 2018
037fb4d
parallel open option in open_mfdataset
Mar 12, 2018
4150fd0
use dask delayed, further tests to check that dask arrays make it thr…
Mar 13, 2018
0e0eb7c
parallel=None by default and automagicalize to True when dask distrib…
Mar 13, 2018
e703077
Merge branch 'master' of github.com:pydata/xarray into feature/parall…
Mar 15, 2018
04212f9
docs and minor improvement to delayed/compute
Mar 15, 2018
b9d5eea
500 file max
Mar 15, 2018
af4883d
Merge branch 'master' of github.com:pydata/xarray into feature/parall…
Mar 23, 2018
80ed614
skip many-files-test on windows
Mar 23, 2018
dd1c4cf
cleanup parallel open a bit
Mar 27, 2018
4475d13
refactor parallel open a bit, fix tests and fixtures
Apr 6, 2018
9a5d63b
Merge branch 'feature/parallel_open_netcdf' of github.com:jhamman/xar…
Apr 6, 2018
417ab5a
Merge branch 'master' of github.com:pydata/xarray into feature/parall…
Apr 6, 2018
0115d78
default parallel open when dask is installed + style fixes
Apr 7, 2018
e33fc15
fix parallel kwarg
Apr 10, 2018
06df0f6
Merge branch 'master' of github.com:pydata/xarray into feature/parall…
Apr 10, 2018
5f496ad
pynio from conda-forge/dev for pynio test suite
Apr 10, 2018
90b8eab
manually skip tests
Apr 12, 2018
280a46f
Merge branch 'master' of github.com:pydata/xarray into feature/parall…
Apr 14, 2018
ba831b9
Merge branch 'master' of github.com:pydata/xarray into feature/parall…
Apr 17, 2018
b0a7948
parallel defaults to false
Apr 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ Documentation
Enhancements
~~~~~~~~~~~~

- Added the ``parallel`` option to :py:func:`open_mfdataset`. This option uses
``dask.delayed`` to parallelize the open and preprocessing steps withing
``open_mfdataset``. This is expected to provide performance improvements when
opening many files, particularly when used in conjunction with
``dask.distributed`` (:issue:`1981`).
By `Joe Hamman <https://github.com/jhamman>`_.
- Some speed improvement to construct :py:class:`~xarray.DataArrayRolling`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This item ended up duplicated when you merged in master.

object (:issue:`1993`)
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
Expand Down
32 changes: 25 additions & 7 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ def close(self):

def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
compat='no_conflicts', preprocess=None, engine=None,
lock=None, data_vars='all', coords='different', **kwargs):
lock=None, data_vars='all', coords='different',
autoclose=False, parallel=None, **kwargs):
"""Open multiple files as a single dataset.

Requires dask to be installed. See documentation for details on dask [1].
Expand Down Expand Up @@ -534,7 +535,10 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
those corresponding to other dimensions.
* list of str: The listed coordinate variables will be concatenated,
in addition the 'minimal' coordinates.

parallel : bool, optional
If True, the open and preprocess steps of this function will be performed
in parallel using ``dask.delayed``. Default is False unless the dask's
distributed scheduler is being used in which case the default is True.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this with both a local system and an HPC cluster?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the logic of defaulting to parallel when using distributed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a better choice would be to default to true when using distributed or multi-processing? That would probably be a simpler choice.

Actually, this is probably fine even with using multi-threading -- it just won't give you any noticeable speedup. So I guess we could default to parallel=True.

**kwargs : optional
Additional arguments passed on to :py:func:`xarray.open_dataset`.

Expand Down Expand Up @@ -562,12 +566,26 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,

if lock is None:
lock = _default_lock(paths[0], engine)
datasets = [open_dataset(p, engine=engine, chunks=chunks or {}, lock=lock,
**kwargs) for p in paths]
file_objs = [ds._file_obj for ds in datasets]

if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]
open_kwargs = dict(engine=engine, chunks=chunks or {}, lock=lock,
autoclose=autoclose, **kwargs)

if parallel or (parallel is None and get_scheduler() == 'distributed'):
import dask
datasets = [delayed(open_dataset)(p, **open_kwargs) for p in paths]
# important: get the file_objs before calling preprocess
file_objs = [ds._file_obj for ds in datasets]
if preprocess is not None:
datasets = [delayed(preprocess)(ds) for p in datasets]

# calling compute here will return compute the datasets/file_objs lists
# the underlying datasets will still be stored as dask arrays
datasets, file_objs = dask.compute([datasets, file_objs])
else:
datasets = [open_dataset(p, **open_kwargs) for p in paths]
file_objs = [ds._file_obj for ds in datasets]
if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]

# close datasets in case of a ValueError
try:
Expand Down
137 changes: 62 additions & 75 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from . import (
TestCase, assert_allclose, assert_array_equal, assert_equal,
assert_identical, flaky, has_netCDF4, has_scipy, network, raises_regex,
assert_identical, has_dask, has_netCDF4, has_scipy, network, raises_regex,
requires_dask, requires_h5netcdf, requires_netCDF4, requires_pathlib,
requires_pydap, requires_pynio, requires_rasterio, requires_scipy,
requires_scipy_or_netCDF4, requires_zarr)
Expand Down Expand Up @@ -1658,88 +1658,75 @@ class H5NetCDFDataTestAutocloseTrue(H5NetCDFDataTest):
autoclose = True


class OpenMFDatasetManyFilesTest(TestCase):
def validate_open_mfdataset_autoclose(self, engine, nfiles=10):
randdata = np.random.randn(nfiles)
original = Dataset({'foo': ('x', randdata)})
# test standard open_mfdataset approach with too many files
with create_tmp_files(nfiles) as tmpfiles:
for readengine in engine:
writeengine = (readengine if readengine != 'pynio'
else 'netcdf4')
# split into multiple sets of temp files
for ii in original.x.values:
subds = original.isel(x=slice(ii, ii + 1))
subds.to_netcdf(tmpfiles[ii], engine=writeengine)

# check that calculation on opened datasets works properly
ds = open_mfdataset(tmpfiles, engine=readengine,
autoclose=True)
self.assertAllClose(ds.x.sum().values,
(nfiles * (nfiles - 1)) / 2)
self.assertAllClose(ds.foo.sum().values, np.sum(randdata))
self.assertAllClose(ds.sum().foo.values, np.sum(randdata))
ds.close()

def validate_open_mfdataset_large_num_files(self, engine):
self.validate_open_mfdataset_autoclose(engine, nfiles=2000)
@pytest.fixture(params=['scipy', 'netcdf4', 'h5netcdf', 'pynio'])
def readengine(request):
return request.param
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can anyone guess as to why the pynio tests are not being properly skipped using this fixture configuration?

@mrocklin - I think you suggested this approach, which I like, but something weird is happening.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should note, this is working on python 2.7 and 3.4. Just not for 3.5 or 3.6.


@requires_dask
@requires_netCDF4
def test_1_autoclose_netcdf4(self):
self.validate_open_mfdataset_autoclose(engine=['netcdf4'])

@requires_dask
@requires_scipy
def test_2_autoclose_scipy(self):
self.validate_open_mfdataset_autoclose(engine=['scipy'])
@pytest.fixture(params=[1, 10, 500])
def nfiles(request):
return request.param

@requires_dask
@requires_pynio
def test_3_autoclose_pynio(self):
self.validate_open_mfdataset_autoclose(engine=['pynio'])

# use of autoclose=True with h5netcdf broken because of
# probable h5netcdf error
@requires_dask
@requires_h5netcdf
@pytest.mark.xfail
def test_4_autoclose_h5netcdf(self):
self.validate_open_mfdataset_autoclose(engine=['h5netcdf'])
@pytest.fixture(params=[True, False])
def autoclose(request):
return request.param

# These tests below are marked as flaky (and skipped by default) because
# they fail sometimes on Travis-CI, for no clear reason.

@requires_dask
@requires_netCDF4
@flaky
@pytest.mark.slow
def test_1_open_large_num_files_netcdf4(self):
self.validate_open_mfdataset_large_num_files(engine=['netcdf4'])
@pytest.fixture(params=[True, False])
def parallel(request):
return request.param

@requires_dask
@requires_scipy
@flaky
@pytest.mark.slow
def test_2_open_large_num_files_scipy(self):
self.validate_open_mfdataset_large_num_files(engine=['scipy'])

@requires_dask
@requires_pynio
@flaky
@pytest.mark.slow
def test_3_open_large_num_files_pynio(self):
self.validate_open_mfdataset_large_num_files(engine=['pynio'])

# use of autoclose=True with h5netcdf broken because of
# probable h5netcdf error
@requires_dask
@requires_h5netcdf
@flaky
@pytest.mark.xfail
@pytest.mark.slow
def test_4_open_large_num_files_h5netcdf(self):
self.validate_open_mfdataset_large_num_files(engine=['h5netcdf'])
@pytest.fixture(params=[None, 5])
def chunks(request):
return request.param


def skip_if_not_engine(engine):
if engine == 'netcdf4':
pytest.importorskip('netCDF4')
elif engine == 'pynio':
pytest.importorskip('Nio')
else:
pytest.importorskip(engine)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxim-lian - do you know a better way to skip parameterized tests based on some condition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might include the engine as a fixture. This is how we do it with Parquet in dask.dataframe

@pytest.fixture(params=[pytest.mark.skipif(not fastparquet, 'fastparquet',
                                           reason='fastparquet not found'),
                        pytest.mark.skipif(not pq, 'pyarrow',
                                           reason='pyarrow not found')])
def engine(request):
    return request.param



def test_open_mfdataset_manyfiles(readengine, nfiles, autoclose, parallel,
chunks):

# skip certain combinations
skip_if_not_engine(readengine)

if not has_dask and parallel:
pytest.skip('parallel requires dask')

if readengine == 'h5netcdf' and autoclose:
pytest.skip('h5netcdf does not support autoclose yet')

if ON_WINDOWS:
pytest.skip('Skipping on Windows')

randdata = np.random.randn(nfiles)
original = Dataset({'foo': ('x', randdata)})
# test standard open_mfdataset approach with too many files
with create_tmp_files(nfiles) as tmpfiles:
writeengine = (readengine if readengine != 'pynio'
else 'netcdf4')
# split into multiple sets of temp files
for ii in original.x.values:
subds = original.isel(x=slice(ii, ii + 1))
subds.to_netcdf(tmpfiles[ii], engine=writeengine)

# check that calculation on opened datasets works properly
actual = open_mfdataset(tmpfiles, engine=readengine,
autoclose=autoclose,
chunks=chunks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to have a test that explicitly calls open_mfdataset(parallel=parallel)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch :)


# check that using open_mfdataset returns dask arrays for variables
assert isinstance(actual['foo'].data, dask_array_type)

assert_identical(original, actual)


@requires_scipy_or_netCDF4
Expand Down