Skip to content

Commit ffa8e69

Browse files
committed
Merge pull request #446 from shoyer/open_mfdataset
Preprocess argument for open_mfdataset and threading lock
2 parents fedd28b + 0816424 commit ffa8e69

File tree

5 files changed

+74
-11
lines changed

5 files changed

+74
-11
lines changed

doc/whats-new.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,27 @@ What's New
99
import xray
1010
np.random.seed(123456)
1111
12+
v0.5.2 (unreleased)
13+
-------------------
14+
15+
Enhancements
16+
~~~~~~~~~~~~
17+
18+
- :py:func:`~xray.open_mfdataset` now supports a ``preprocess`` argument for
19+
preprocessing datasets prior to concatenaton. This is useful if datasets
20+
cannot be otherwise merged automatically, e.g., if the original datasets
21+
have conflicting index coordinates (:issue:`443`).
22+
- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` now use a
23+
thread lock by default for reading from netCDF files. This avoids possible
24+
segmentation faults for reading from netCDF4 files when HDF5 is not
25+
configured properly for concurrent access (:issue:`444`).
26+
27+
Bug fixes
28+
~~~~~~~~~
29+
30+
- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` support
31+
supplying chunks as a single integer.
32+
1233
v0.5.1 (15 June 2015)
1334
---------------------
1435

xray/backends/api.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def _get_default_engine(path, allow_remote=False):
3838
def open_dataset(filename_or_obj, group=None, decode_cf=True,
3939
mask_and_scale=True, decode_times=True,
4040
concat_characters=True, decode_coords=True, engine=None,
41-
chunks=None):
41+
chunks=None, lock=True):
4242
"""Load and decode a dataset from a file or file-like object.
4343
4444
Parameters
@@ -79,6 +79,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
7979
If chunks is provided, it used to load the new dataset into dask
8080
arrays. This is an experimental feature; see the documentation for more
8181
details.
82+
lock : optional
83+
If chunks is provided, this argument is passed on to
84+
:py:func:`dask.array.from_array`. By default, a lock is used to avoid
85+
issues with concurrent access with dask's multithreaded backend.
8286
8387
Returns
8488
-------
@@ -100,7 +104,7 @@ def maybe_decode_store(store):
100104
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
101105
concat_characters=concat_characters, decode_coords=decode_coords)
102106
if chunks is not None:
103-
ds = ds.chunk(chunks)
107+
ds = ds.chunk(chunks, lock=lock)
104108
return ds
105109

106110
if isinstance(filename_or_obj, backends.AbstractDataStore):
@@ -161,7 +165,8 @@ def close(self):
161165
f.close()
162166

163167

164-
def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs):
168+
def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
169+
lock=True, **kwargs):
165170
"""Open multiple files as a single dataset.
166171
167172
Experimental. Requires dask to be installed.
@@ -183,6 +188,12 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs):
183188
need to provide this argument if the dimension along which you want to
184189
concatenate is not a dimension in the original datasets, e.g., if you
185190
want to stack a collection of 2D arrays along a third dimension.
191+
preprocess : callable, optional
192+
If provided, call this function on each dataset prior to concatenation.
193+
lock : optional
194+
This argument is passed on to :py:func:`dask.array.from_array`. By
195+
default, a lock is used to avoid issues with concurrent access with
196+
dask's multithreaded backend.
186197
**kwargs : optional
187198
Additional arguments passed on to :py:func:`xray.open_dataset`.
188199
@@ -201,7 +212,9 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, **kwargs):
201212
raise IOError('no files to open')
202213
datasets = [open_dataset(p, **kwargs) for p in paths]
203214
file_objs = [ds._file_obj for ds in datasets]
204-
datasets = [ds.chunk(chunks) for ds in datasets]
215+
datasets = [ds.chunk(chunks, lock=lock) for ds in datasets]
216+
if preprocess is not None:
217+
datasets = [preprocess(ds) for ds in datasets]
205218
combined = auto_combine(datasets, concat_dim=concat_dim)
206219
combined._file_obj = _MultiFileCloser(file_objs)
207220
return combined

xray/core/dataset.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ def chunks(self):
883883
chunks.update(new_chunks)
884884
return Frozen(SortedKeysDict(chunks))
885885

886-
def chunk(self, chunks=None):
886+
def chunk(self, chunks=None, lock=False):
887887
"""Coerce all arrays in this dataset into dask arrays with the given
888888
chunks.
889889
@@ -899,13 +899,16 @@ def chunk(self, chunks=None):
899899
chunks : int or dict, optional
900900
Chunk sizes along each dimension, e.g., ``5`` or
901901
``{'x': 5, 'y': 5}``.
902+
lock : optional
903+
Passed on to :py:func:`dask.array.from_array`, if the array is not
904+
already as dask array.
902905
903906
Returns
904907
-------
905908
chunked : xray.Dataset
906909
"""
907910
if isinstance(chunks, Number):
908-
chunks = dict.fromkeys(chunks, chunks)
911+
chunks = dict.fromkeys(self.dims, chunks)
909912

910913
if chunks is not None:
911914
bad_dims = [d for d in chunks if d not in self.dims]
@@ -923,7 +926,7 @@ def maybe_chunk(name, var, chunks):
923926
if not chunks:
924927
chunks = None
925928
if var.ndim > 0:
926-
return var.chunk(chunks, name=name)
929+
return var.chunk(chunks, name=name, lock=lock)
927930
else:
928931
return var
929932

xray/core/variable.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ def chunks(self):
413413

414414
_array_counter = itertools.count()
415415

416-
def chunk(self, chunks=None, name=''):
416+
def chunk(self, chunks=None, name='', lock=False):
417417
"""Coerce this array's data into a dask arrays with the given chunks.
418418
419419
If this variable is a non-dask array, it will be converted to dask
@@ -432,6 +432,9 @@ def chunk(self, chunks=None, name=''):
432432
name : str, optional
433433
Used to generate the name for this array in the internal dask
434434
graph. Does not need not be unique.
435+
lock : optional
436+
Passed on to :py:func:`dask.array.from_array`, if the array is not
437+
already as dask array.
435438
436439
Returns
437440
-------
@@ -458,7 +461,7 @@ def chunk(self, chunks=None, name=''):
458461
chunks = tuple(chunks.get(n, s)
459462
for n, s in enumerate(self.shape))
460463

461-
data = da.from_array(data, chunks, name=name)
464+
data = da.from_array(data, chunks, name=name, lock=lock)
462465

463466
return type(self)(self.dims, data, self._attrs, self._encoding,
464467
fastpath=True)

xray/test/test_backends.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from io import BytesIO
2+
from threading import Lock
23
import contextlib
34
import os.path
45
import pickle
@@ -715,6 +716,26 @@ def test_open_mfdataset(self):
715716
with self.assertRaisesRegexp(IOError, 'no files to open'):
716717
open_mfdataset('foo-bar-baz-*.nc')
717718

719+
def test_preprocess_mfdataset(self):
720+
original = Dataset({'foo': ('x', np.random.randn(10))})
721+
with create_tmp_file() as tmp:
722+
original.to_netcdf(tmp)
723+
preprocess = lambda ds: ds.assign_coords(z=0)
724+
expected = preprocess(original)
725+
with open_mfdataset(tmp, preprocess=preprocess) as actual:
726+
self.assertDatasetIdentical(expected, actual)
727+
728+
def test_lock(self):
729+
original = Dataset({'foo': ('x', np.random.randn(10))})
730+
with create_tmp_file() as tmp:
731+
original.to_netcdf(tmp)
732+
with open_dataset(tmp, chunks=10) as ds:
733+
task = ds.foo.data.dask[ds.foo.data.name, 0]
734+
self.assertIsInstance(task[-1], type(Lock()))
735+
with open_mfdataset(tmp) as ds:
736+
task = ds.foo.data.dask[ds.foo.data.name, 0]
737+
self.assertIsInstance(task[-1], type(Lock()))
738+
718739
def test_open_and_do_math(self):
719740
original = Dataset({'foo': ('x', np.random.randn(10))})
720741
with create_tmp_file() as tmp:
@@ -730,10 +751,12 @@ def test_open_dataset(self):
730751
with open_dataset(tmp, chunks={'x': 5}) as actual:
731752
self.assertIsInstance(actual.foo.variable.data, da.Array)
732753
self.assertEqual(actual.foo.variable.data.chunks, ((5, 5),))
733-
self.assertDatasetAllClose(original, actual)
754+
self.assertDatasetIdentical(original, actual)
755+
with open_dataset(tmp, chunks=5) as actual:
756+
self.assertDatasetIdentical(original, actual)
734757
with open_dataset(tmp) as actual:
735758
self.assertIsInstance(actual.foo.variable.data, np.ndarray)
736-
self.assertDatasetAllClose(original, actual)
759+
self.assertDatasetIdentical(original, actual)
737760

738761
def test_dask_roundtrip(self):
739762
with create_tmp_file() as tmp:

0 commit comments

Comments
 (0)