Skip to content

Commit 0816424

Browse files
committed
Add a lock when opening datasets with dask
Fixes pydata#444
1 parent 5c07ed3 commit 0816424

File tree

5 files changed

+50
-12
lines changed

5 files changed

+50
-12
lines changed

doc/whats-new.rst

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,17 @@ Enhancements
1818
- :py:func:`~xray.open_mfdataset` now supports a ``preprocess`` argument for
1919
preprocessing datasets prior to concatenaton. This is useful if datasets
2020
cannot be otherwise merged automatically, e.g., if the original datasets
21-
have conflicting index coordinates.
21+
have conflicting index coordinates (:issue:`443`).
22+
- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` now use a
23+
thread lock by default for reading from netCDF files. This avoids possible
24+
segmentation faults for reading from netCDF4 files when HDF5 is not
25+
configured properly for concurrent access (:issue:`444`).
26+
27+
Bug fixes
28+
~~~~~~~~~
29+
30+
- :py:func:`~xray.open_dataset` and :py:func:`~xray.open_mfdataset` support
31+
supplying chunks as a single integer.
2232

2333
v0.5.1 (15 June 2015)
2434
---------------------

xray/backends/api.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def _get_default_engine(path, allow_remote=False):
3838
def open_dataset(filename_or_obj, group=None, decode_cf=True,
3939
mask_and_scale=True, decode_times=True,
4040
concat_characters=True, decode_coords=True, engine=None,
41-
chunks=None):
41+
chunks=None, lock=True):
4242
"""Load and decode a dataset from a file or file-like object.
4343
4444
Parameters
@@ -79,6 +79,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
7979
If chunks is provided, it used to load the new dataset into dask
8080
arrays. This is an experimental feature; see the documentation for more
8181
details.
82+
lock : optional
83+
If chunks is provided, this argument is passed on to
84+
:py:func:`dask.array.from_array`. By default, a lock is used to avoid
85+
issues with concurrent access with dask's multithreaded backend.
8286
8387
Returns
8488
-------
@@ -100,7 +104,7 @@ def maybe_decode_store(store):
100104
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
101105
concat_characters=concat_characters, decode_coords=decode_coords)
102106
if chunks is not None:
103-
ds = ds.chunk(chunks)
107+
ds = ds.chunk(chunks, lock=lock)
104108
return ds
105109

106110
if isinstance(filename_or_obj, backends.AbstractDataStore):
@@ -162,7 +166,7 @@ def close(self):
162166

163167

164168
def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
165-
**kwargs):
169+
lock=True, **kwargs):
166170
"""Open multiple files as a single dataset.
167171
168172
Experimental. Requires dask to be installed.
@@ -186,6 +190,10 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
186190
want to stack a collection of 2D arrays along a third dimension.
187191
preprocess : callable, optional
188192
If provided, call this function on each dataset prior to concatenation.
193+
lock : optional
194+
This argument is passed on to :py:func:`dask.array.from_array`. By
195+
default, a lock is used to avoid issues with concurrent access with
196+
dask's multithreaded backend.
189197
**kwargs : optional
190198
Additional arguments passed on to :py:func:`xray.open_dataset`.
191199
@@ -204,7 +212,7 @@ def open_mfdataset(paths, chunks=None, concat_dim=None, preprocess=None,
204212
raise IOError('no files to open')
205213
datasets = [open_dataset(p, **kwargs) for p in paths]
206214
file_objs = [ds._file_obj for ds in datasets]
207-
datasets = [ds.chunk(chunks) for ds in datasets]
215+
datasets = [ds.chunk(chunks, lock=lock) for ds in datasets]
208216
if preprocess is not None:
209217
datasets = [preprocess(ds) for ds in datasets]
210218
combined = auto_combine(datasets, concat_dim=concat_dim)

xray/core/dataset.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ def chunks(self):
883883
chunks.update(new_chunks)
884884
return Frozen(SortedKeysDict(chunks))
885885

886-
def chunk(self, chunks=None):
886+
def chunk(self, chunks=None, lock=False):
887887
"""Coerce all arrays in this dataset into dask arrays with the given
888888
chunks.
889889
@@ -899,13 +899,16 @@ def chunk(self, chunks=None):
899899
chunks : int or dict, optional
900900
Chunk sizes along each dimension, e.g., ``5`` or
901901
``{'x': 5, 'y': 5}``.
902+
lock : optional
903+
Passed on to :py:func:`dask.array.from_array`, if the array is not
904+
already as dask array.
902905
903906
Returns
904907
-------
905908
chunked : xray.Dataset
906909
"""
907910
if isinstance(chunks, Number):
908-
chunks = dict.fromkeys(chunks, chunks)
911+
chunks = dict.fromkeys(self.dims, chunks)
909912

910913
if chunks is not None:
911914
bad_dims = [d for d in chunks if d not in self.dims]
@@ -923,7 +926,7 @@ def maybe_chunk(name, var, chunks):
923926
if not chunks:
924927
chunks = None
925928
if var.ndim > 0:
926-
return var.chunk(chunks, name=name)
929+
return var.chunk(chunks, name=name, lock=lock)
927930
else:
928931
return var
929932

xray/core/variable.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ def chunks(self):
413413

414414
_array_counter = itertools.count()
415415

416-
def chunk(self, chunks=None, name=''):
416+
def chunk(self, chunks=None, name='', lock=False):
417417
"""Coerce this array's data into a dask arrays with the given chunks.
418418
419419
If this variable is a non-dask array, it will be converted to dask
@@ -432,6 +432,9 @@ def chunk(self, chunks=None, name=''):
432432
name : str, optional
433433
Used to generate the name for this array in the internal dask
434434
graph. Does not need not be unique.
435+
lock : optional
436+
Passed on to :py:func:`dask.array.from_array`, if the array is not
437+
already as dask array.
435438
436439
Returns
437440
-------
@@ -458,7 +461,7 @@ def chunk(self, chunks=None, name=''):
458461
chunks = tuple(chunks.get(n, s)
459462
for n, s in enumerate(self.shape))
460463

461-
data = da.from_array(data, chunks, name=name)
464+
data = da.from_array(data, chunks, name=name, lock=lock)
462465

463466
return type(self)(self.dims, data, self._attrs, self._encoding,
464467
fastpath=True)

xray/test/test_backends.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from io import BytesIO
2+
from threading import Lock
23
import contextlib
34
import os.path
45
import pickle
@@ -724,6 +725,17 @@ def test_preprocess_mfdataset(self):
724725
with open_mfdataset(tmp, preprocess=preprocess) as actual:
725726
self.assertDatasetIdentical(expected, actual)
726727

728+
def test_lock(self):
729+
original = Dataset({'foo': ('x', np.random.randn(10))})
730+
with create_tmp_file() as tmp:
731+
original.to_netcdf(tmp)
732+
with open_dataset(tmp, chunks=10) as ds:
733+
task = ds.foo.data.dask[ds.foo.data.name, 0]
734+
self.assertIsInstance(task[-1], type(Lock()))
735+
with open_mfdataset(tmp) as ds:
736+
task = ds.foo.data.dask[ds.foo.data.name, 0]
737+
self.assertIsInstance(task[-1], type(Lock()))
738+
727739
def test_open_and_do_math(self):
728740
original = Dataset({'foo': ('x', np.random.randn(10))})
729741
with create_tmp_file() as tmp:
@@ -739,10 +751,12 @@ def test_open_dataset(self):
739751
with open_dataset(tmp, chunks={'x': 5}) as actual:
740752
self.assertIsInstance(actual.foo.variable.data, da.Array)
741753
self.assertEqual(actual.foo.variable.data.chunks, ((5, 5),))
742-
self.assertDatasetAllClose(original, actual)
754+
self.assertDatasetIdentical(original, actual)
755+
with open_dataset(tmp, chunks=5) as actual:
756+
self.assertDatasetIdentical(original, actual)
743757
with open_dataset(tmp) as actual:
744758
self.assertIsInstance(actual.foo.variable.data, np.ndarray)
745-
self.assertDatasetAllClose(original, actual)
759+
self.assertDatasetIdentical(original, actual)
746760

747761
def test_dask_roundtrip(self):
748762
with create_tmp_file() as tmp:

0 commit comments

Comments
 (0)