Skip to content

Commit 2258217

Browse files
committed
Switch to shared Lock (SerializableLock if possible) for reading and writing
Fixes pydata#1172 The serializable lock will be useful for dask.distributed or multi-processing (xref pydata#798, pydata#1173, among others).
1 parent aec3e8e commit 2258217

File tree

3 files changed

+19
-11
lines changed

3 files changed

+19
-11
lines changed

doc/whats-new.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Breaking changes
5656
By `Guido Imperiale <https://github.com/crusaderky>`_ and
5757
`Stephan Hoyer <https://github.com/shoyer>`_.
5858
- Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer
59-
caches its values into memory before pickling :issue:`1128`. Instead, pickle
59+
caches its values into memory before pickling (:issue:`1128`). Instead, pickle
6060
stores file paths and restores objects by reopening file references. This
6161
enables preliminary, experimental use of xarray for opening files with
6262
`dask.distributed <https://distributed.readthedocs.io>`_.
@@ -206,6 +206,10 @@ Bug fixes
206206
- Fixed a bug with facetgrid (the ``norm`` keyword was ignored, :issue:`1159`).
207207
By `Fabien Maussion <https://github.com/fmaussion>`_.
208208

209+
- Resolved a concurrency bug that could cause Python to crash when
210+
simultaneously reading and writing netCDF4 files with dask (:issue:`1172`).
211+
By `Stephan Hoyer <https://github.com/shoyer>`_.
212+
209213
.. _whats-new.0.8.2:
210214

211215
v0.8.2 (18 August 2016)

xarray/backends/api.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import print_function
44
import gzip
55
import os.path
6-
import threading
76
from distutils.version import StrictVersion
87
from glob import glob
98
from io import BytesIO
@@ -12,7 +11,7 @@
1211
import numpy as np
1312

1413
from .. import backends, conventions
15-
from .common import ArrayWriter
14+
from .common import ArrayWriter, GLOBAL_LOCK
1615
from ..core import indexing
1716
from ..core.combine import auto_combine
1817
from ..core.utils import close_on_error, is_remote_uri
@@ -55,9 +54,6 @@ def _normalize_path(path):
5554
return os.path.abspath(os.path.expanduser(path))
5655

5756

58-
_global_lock = threading.Lock()
59-
60-
6157
def _default_lock(filename, engine):
6258
if filename.endswith('.gz'):
6359
lock = False
@@ -71,9 +67,9 @@ def _default_lock(filename, engine):
7167
else:
7268
# TODO: identify netcdf3 files and don't use the global lock
7369
# for them
74-
lock = _global_lock
70+
lock = GLOBAL_LOCK
7571
elif engine in {'h5netcdf', 'pynio'}:
76-
lock = _global_lock
72+
lock = GLOBAL_LOCK
7773
else:
7874
lock = False
7975
return lock

xarray/backends/common.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from __future__ import division
33
from __future__ import print_function
44
import numpy as np
5-
import itertools
65
import logging
76
import time
87
import traceback
@@ -12,7 +11,12 @@
1211

1312
from ..conventions import cf_encoder
1413
from ..core.utils import FrozenOrderedDict
15-
from ..core.pycompat import iteritems, dask_array_type, OrderedDict
14+
from ..core.pycompat import iteritems, dask_array_type
15+
16+
try:
17+
from dask.utils import SerializableLock as Lock
18+
except ImportError:
19+
from threading import Lock
1620

1721
# Create a logger object, but don't add any handlers. Leave that to user code.
1822
logger = logging.getLogger(__name__)
@@ -21,6 +25,10 @@
2125
NONE_VAR_NAME = '__values__'
2226

2327

28+
# dask.utils.SerializableLock if available, otherwise just a threading.Lock
29+
GLOBAL_LOCK = Lock()
30+
31+
2432
def _encode_variable_name(name):
2533
if name is None:
2634
name = NONE_VAR_NAME
@@ -150,7 +158,7 @@ def sync(self):
150158
import dask.array as da
151159
import dask
152160
if StrictVersion(dask.__version__) > StrictVersion('0.8.1'):
153-
da.store(self.sources, self.targets, lock=threading.Lock())
161+
da.store(self.sources, self.targets, lock=GLOBAL_LOCK)
154162
else:
155163
da.store(self.sources, self.targets)
156164
self.sources = []

0 commit comments

Comments
 (0)