Skip to content

Parallel open_mfdataset #1983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 20, 2018
Merged

Parallel open_mfdataset #1983

merged 22 commits into from
Apr 20, 2018

Conversation

jhamman
Copy link
Member

@jhamman jhamman commented Mar 13, 2018

I'm sharing this in the hopes of getting comments from @mrocklin and @pydata/xarray.

What this does:

  • implements a dask.bag map/apply on the xarray open_dataset and preprocess steps in open_mfdataset
  • adds a new parallel option to open_mfdataset
  • provides about a 40% speedup in opening a multifile dataset when using the distributed scheduler (I tested on 1000 netcdf files that took about 9 seconds to open/concatenate in the default configuration)

What it does not do (yet):

  • check that autoclose=True when multiple processes are being use (multiprocessing/distributed scheduler)
  • provide any speedup with the multiprocessing backend (I do not understand why this is)

Benchmark Example

In [1]: import xarray as xr
   ...: import dask
   ...: import dask.threaded
   ...: import dask.multiprocessing
   ...: from dask.distributed import Client
   ...:

In [2]: c = Client()
   ...: c
   ...:
Out[2]: <Client: scheduler='tcp://127.0.0.1:59576' processes=4 cores=4>

In [4]: %%time
   ...: with dask.set_options(get=dask.multiprocessing.get):
   ...:     ds = xr.open_mfdataset('../test_files/test_netcdf_*nc', autoclose=True, parallel=True)
   ...:
CPU times: user 4.76 s, sys: 201 ms, total: 4.96 s
Wall time: 7.74 s

In [5]: %%time
   ...: with dask.set_options(get=c.get):
   ...:     ds = xr.open_mfdataset('../test_files/test_netcdf_*nc', autoclose=True, parallel=True)
   ...:
   ...:
CPU times: user 1.88 s, sys: 60.6 ms, total: 1.94 s
Wall time: 4.41 s

In [6]: %%time
   ...: with dask.set_options(get=dask.threaded.get):
   ...:     ds = xr.open_mfdataset('../test_files/test_netcdf_*nc')
   ...:
CPU times: user 7.77 s, sys: 247 ms, total: 8.02 s
Wall time: 8.17 s

In [7]: %%time
   ...: with dask.set_options(get=dask.threaded.get):
   ...:     ds = xr.open_mfdataset('../test_files/test_netcdf_*nc', autoclose=True)
   ...:
   ...:
CPU times: user 7.89 s, sys: 202 ms, total: 8.09 s
Wall time: 8.21 s

In [8]: ds
Out[8]:
<xarray.Dataset>
Dimensions:  (lat: 45, lon: 90, time: 1000)
Coordinates:
  * lon      (lon) float64 0.0 4.045 8.09 12.13 16.18 20.22 24.27 28.31 ...
  * lat      (lat) float64 -90.0 -85.91 -81.82 -77.73 -73.64 -69.55 -65.45 ...
  * time     (time) datetime64[ns] 1970-01-01 1970-01-02 1970-01-11 ...
Data variables:
    foo      (time, lon, lat) float64 dask.array<shape=(1000, 90, 45), chunksize=(1, 90, 45)>
    bar      (time, lon, lat) float64 dask.array<shape=(1000, 90, 45), chunksize=(1, 90, 45)>
    baz      (time, lon, lat) float32 dask.array<shape=(1000, 90, 45), chunksize=(1, 90, 45)>
Attributes:
    history:  created for xarray benchmarking

elif engine == 'pynio':
pytest.importorskip('Nio')
else:
pytest.importorskip(engine)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxim-lian - do you know a better way to skip parameterized tests based on some condition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might include the engine as a fixture. This is how we do it with Parquet in dask.dataframe

@pytest.fixture(params=[pytest.mark.skipif(not fastparquet, 'fastparquet',
                                           reason='fastparquet not found'),
                        pytest.mark.skipif(not pq, 'pyarrow',
                                           reason='pyarrow not found')])
def engine(request):
    return request.param

@requires_scipy
def test_2_autoclose_scipy(self):
self.validate_open_mfdataset_autoclose(engine=['scipy'])
@pytest.fixture(params=[1, 10, 1000])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would also be nice to mark the 1000 entry here as slow.

autoclose=autoclose, **kwargs)

if parallel:
import dask.bag as db
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor point, but I find dask.delayed() slightly more general and easier to understand than dask.bag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit I don't fully understand the tradeoffs between these two approaches.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are basically equivalent. The only meaningful difference I think of aside from the API is that dask.delayed has fewer required dependencies (a subset of those required for dask.array):
https://github.com/dask/dask/blob/47d025476312c6a8155192196b8da3b5af0c13e4/setup.py#L10-L17

import dask.bag as db
paths_bag = db.from_sequence(paths)
datasets = paths_bag.map(open_dataset,
**open_kwargs).compute()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does compute() do when called on a dask bag of xarray datasets of dask arrays? I'm assuming that only the bags get computed, but not the array values? This would be good to clarify in some comments on the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my understanding. In other words, delayed(open_dataset)(...).compute() is going to return whatever open_dataset would have. I have not tested this thoroughly and I would be interested to hear from @mrocklin on how dask is going to treat this situation.

open_kwargs = dict(engine=engine, chunks=chunks or {}, lock=lock,
autoclose=autoclose, **kwargs)

if parallel:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making the default parallel=None and using auto-detection logic for setting parallel=True if using dask-distributed.

@jhamman
Copy link
Member Author

jhamman commented Mar 13, 2018

@shoyer - I updated this to use dask.delayed. I actually like it more because I only have to call compute once. Thanks for the suggestion.


# calling compute here will return compute the datasets/file_objs lists
# the underlying datasets will still be stored as dask arrays
dask.compute([datasets, file_objs])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to assign these computed results, e.g., datasets, file_objs = dask.compute([datasets, file_objs])?

@jhamman
Copy link
Member Author

jhamman commented Mar 15, 2018

If anyone understands Windows file handling with Python, I'm all ears as to why this is failing on AppVeyor. I'm tempted to just skip this test there but thought I should ask for help first...

@jhamman
Copy link
Member Author

jhamman commented Mar 23, 2018

I'm tempted to just skip this test there but thought I should ask for help first...

I've skipped the offending test on appveyor for now. Objectors speak up please. I don't have a windows machine to test on and iterating via appveyor is not something a sane person does 😉.

@jhamman jhamman changed the title RFC: parallel open_mfdataset Parallel open_mfdataset Mar 23, 2018
@jhamman jhamman requested review from mrocklin and rabernat March 27, 2018 15:55
Copy link
Contributor

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems good to me!

It's interesting to see that opening metadata is sufficiently painful to engage dask here. Presumably this cost will only grow in time.

parallel : bool, optional
If True, the open and preprocess steps of this function will be performed
in parallel using ``dask.delayed``. Default is False unless the dask's
distributed scheduler is being used in which case the default is True.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this with both a local system and an HPC cluster?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the logic of defaulting to parallel when using distributed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a better choice would be to default to true when using distributed or multi-processing? That would probably be a simpler choice.

Actually, this is probably fine even with using multi-threading -- it just won't give you any noticeable speedup. So I guess we could default to parallel=True.

elif engine == 'pynio':
pytest.importorskip('Nio')
else:
pytest.importorskip(engine)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might include the engine as a fixture. This is how we do it with Parquet in dask.dataframe

@pytest.fixture(params=[pytest.mark.skipif(not fastparquet, 'fastparquet',
                                           reason='fastparquet not found'),
                        pytest.mark.skipif(not pq, 'pyarrow',
                                           reason='pyarrow not found')])
def engine(request):
    return request.param

@jhamman
Copy link
Member Author

jhamman commented Mar 27, 2018

Have you tested this with both a local system and an HPC cluster?

I have. See below for a simple example using this feature on Cheyenne.

In [1]: import xarray as xr
   ...:
   ...: import glob
   ...:

In [2]: pattern = '/glade/u/home/jhamman/workdir/LOCA_daily/met_data/CESM1-BGC/16th/rcp45/r1i1p1/*/*nc'

In [3]: len(glob.glob(pattern))
Out[3]: 285

In [4]: %time ds = xr.open_mfdataset(pattern)
CPU times: user 15.5 s, sys: 2.62 s, total: 18.1 s
Wall time: 42.4 s

In [5]: ds.close()

In [6]: %time ds = xr.open_mfdataset(pattern, parallel=True)
CPU times: user 18.4 s, sys: 5.28 s, total: 23.6 s
Wall time: 30.7 s

In [7]: ds.close()

In [8]: from dask.distributed import Client

In [9]: client = Client()
clien
In [10]: client
Out[10]: <Client: scheduler='tcp://127.0.0.1:39853' processes=72 cores=72>

In [11]: %time ds = xr.open_mfdataset(pattern, parallel=True, autoclose=True)
CPU times: user 10.8 s, sys: 808 ms, total: 11.6 s
Wall time: 12.4 s

Copy link
Contributor

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks nice and clean. It's just hard for me to tell whether your test is actually covering the parallel case explicitly.

parallel : bool, optional
If True, the open and preprocess steps of this function will be performed
in parallel using ``dask.delayed``. Default is False unless the dask's
distributed scheduler is being used in which case the default is True.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the logic of defaulting to parallel when using distributed.

# check that calculation on opened datasets works properly
actual = open_mfdataset(tmpfiles, engine=readengine,
autoclose=autoclose,
chunks=chunks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to have a test that explicitly calls open_mfdataset(parallel=parallel)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch :)

@jhamman
Copy link
Member Author

jhamman commented Apr 6, 2018

I'm curious about the logic of defaulting to parallel when using distributed.

I'm not tied to the behavior. It was suggested by @shoyer a while back. Perhaps we try this and evaluate how it works in the wild?

@shoyer
Copy link
Member

shoyer commented Apr 6, 2018

My reason for suggesting default parallel=True when using distributed is default to turning this feature on when we can expect it will probably improve performance.

@rabernat
Copy link
Contributor

rabernat commented Apr 6, 2018

Can we imagine cases where it might actually degrade performance?

@jhamman
Copy link
Member Author

jhamman commented Apr 6, 2018

I image there will be a small performance cost when the number of files is small. That cost is probably lost in the noise in most i/o operations.

@jhamman
Copy link
Member Author

jhamman commented Apr 6, 2018

All the tests are passing here? Any final objectors?

opening many files, particularly when used in conjunction with
``dask.distributed`` (:issue:`1981`).
By `Joe Hamman <https://github.com/jhamman>`_.
- Some speed improvement to construct :py:class:`~xarray.DataArrayRolling`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This item ended up duplicated when you merged in master.

# wrap the open_dataset, getattr, and preprocess with delayed
import dask
parallel = True
_open = dask.delayed(open_dataset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: per PEP8, we usually post-fix with an underscore for variables named to avoid conflicts with builtins, e.g., open_ rather than _open.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, wasn't aware of this detail.

parallel : bool, optional
If True, the open and preprocess steps of this function will be performed
in parallel using ``dask.delayed``. Default is False unless the dask's
distributed scheduler is being used in which case the default is True.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a better choice would be to default to true when using distributed or multi-processing? That would probably be a simpler choice.

Actually, this is probably fine even with using multi-threading -- it just won't give you any noticeable speedup. So I guess we could default to parallel=True.

@rabernat
Copy link
Contributor

I recently tried this branch with my data server and got an error.

I opened a dataset this way

# works fine with parallel=False
ds = xr.open_mfdataset(os.path.join(ddir, '*V1_1.204*.nc'), decode_cf=False, parallel=True)

and got the following error.

distributed.utils - ERROR - NetCDF: HDF error
Traceback (most recent call last):
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py", line 237, in f
    result[0] = yield make_coro()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py", line 1356, in _gather
    traceback)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/dask/compatibility.py", line 48, in apply
    return func(*args, **kwargs)
  File "/home/rpa/xarray/xarray/backends/api.py", line 318, in open_dataset
    return maybe_decode_store(store, lock)
  File "/home/rpa/xarray/xarray/backends/api.py", line 238, in maybe_decode_store
    drop_variables=drop_variables)
  File "/home/rpa/xarray/xarray/conventions.py", line 594, in decode_cf
    vars, attrs = obj.load()
  File "/home/rpa/xarray/xarray/backends/common.py", line 217, in load
    for k, v in self.get_variables().items())
  File "/home/rpa/xarray/xarray/backends/netCDF4_.py", line 319, in get_variables
    iteritems(self.ds.variables))
  File "/home/rpa/xarray/xarray/core/utils.py", line 308, in FrozenOrderedDict
    return Frozen(OrderedDict(*args, **kwargs))
  File "/home/rpa/xarray/xarray/backends/netCDF4_.py", line 318, in <genexpr>
    for k, v in
  File "/home/rpa/xarray/xarray/backends/netCDF4_.py", line 311, in open_store_variable
    encoding['original_shape'] = var.shape
  File "netCDF4/_netCDF4.pyx", line 3381, in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:34388)
  File "netCDF4/_netCDF4.pyx", line 2759, in netCDF4._netCDF4.Dimension.__len__ (netCDF4/_netCDF4.c:27006)
RuntimeError: NetCDF: HDF error

Without the distributed scheduler (but with parallel=True), I get no error, but the command never returns, and eventually I have to restart the kernel.

Any idea what could be going on? (Sorry for the non-reproducible bug report...I figured some trials "in the field" might be useful.)

@jhamman
Copy link
Member Author

jhamman commented Apr 10, 2018

@rabernat - my last commit(s) seem to have broken the CI so I'll need to revisit this.

@jhamman
Copy link
Member Author

jhamman commented Apr 10, 2018

@rabernat - I just pushed a few more commits here. Can I ask two questions:

When using the distributed scheduler, what configuration are you using? Can you try:

  • autoclose=True (in open_mfdataset)
  • processes=True (in client)

If this turns out to be a corner case with the distributed scheduler, I can add a integration test for that specific use case.

reason='requires h5netcdf'),
pytest.mark.skipif(not has_pynio, 'pynio', reason='requires pynio')])
def readengine(request):
return request.param
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can anyone guess as to why the pynio tests are not being properly skipped using this fixture configuration?

@mrocklin - I think you suggested this approach, which I like, but something weird is happening.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should note, this is working on python 2.7 and 3.4. Just not for 3.5 or 3.6.

@jhamman
Copy link
Member Author

jhamman commented Apr 13, 2018

@rabernat - I got the tests passing here again. If you can make the time to try your example/test again, it would be great to figure out what wasn't working before.

@NicWayand
Copy link

NicWayand commented Apr 17, 2018

Thanks @jhamman for working on this! I did a test on my real world data (1202 ~3mb files) on my local computer and am not getting results I expected:

  1. No speed up with parallel=True
  2. Slow down when using distributed (processes=16 cores=16).

Am I missing something?

nc_files = glob.glob(E.obs['NSIDC_0081']['sipn_nc']+'/*.nc')
print(len(nc_files))
1202

# Parallel False
%time ds = xr.open_mfdataset(nc_files, concat_dim='time', parallel=False, autoclose=True)
CPU times: user 57.8 s, sys: 3.2 s, total: 1min 1s
Wall time: 1min

# Parallel True with default scheduler 
%time ds = xr.open_mfdataset(nc_files, concat_dim='time', parallel=True, autoclose=True)
CPU times: user 1min 16s, sys: 9.82 s, total: 1min 26s
Wall time: 1min 16s

# Parallel True with distributed
from dask.distributed import Client
client = Client()
print(client)
<Client: scheduler='tcp://127.0.0.1:43291' processes=16 cores=16>
%time ds = xr.open_mfdataset(nc_files, concat_dim='time', parallel=True, autoclose=True)
CPU times: user 2min 17s, sys: 12.3 s, total: 2min 29s
Wall time: 3min 48s

On feature/parallel_open_netcdf commit 280a46f

@jhamman
Copy link
Member Author

jhamman commented Apr 17, 2018

@NicWayand - Thanks for giving this a go. Some thoughts on your problem...

I'm have been using this feature for the past few days and have been seeing a speedup on datasets with many files along the lines of what I showed above. I am applying my tests on perhaps the perfect test architecture (parallel shared fs, fast interconnect, etc.). I think there are many reasons/cases where this won't work as well.

@shoyer
Copy link
Member

shoyer commented Apr 17, 2018

It sounds like the right resolution for now would be to leave the default as parallel=False and leave this as an optional feature.

@jhamman
Copy link
Member Author

jhamman commented Apr 17, 2018

I think that makes sense for now. We need to experiment with this a bit more but I don't see a problem merging the basic workflow we have now (with a minor change to the default behavior).

@jhamman
Copy link
Member Author

jhamman commented Apr 18, 2018

With my last commits here, this feature is completely optional and defaults to the current behavior. I cleaned up the tests a bit further and am now ready to merge this. Baring any objections, I'll merge this on Friday.

@jhamman jhamman merged commit 0935182 into pydata:master Apr 20, 2018
@jhamman jhamman deleted the feature/parallel_open_netcdf branch April 20, 2018 12:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

use dask to open datasets in parallel
5 participants