From 2258217a8c4dc1133b1a2145f762c28668d142dc Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 21 Dec 2016 19:48:47 -0700 Subject: [PATCH 1/4] Switch to shared Lock (SerializableLock if possible) for reading and writing Fixes #1172 The serializable lock will be useful for dask.distributed or multi-processing (xref #798, #1173, among others). --- doc/whats-new.rst | 6 +++++- xarray/backends/api.py | 10 +++------- xarray/backends/common.py | 14 +++++++++++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 05828051b6d..edbc8ae599b 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -56,7 +56,7 @@ Breaking changes By `Guido Imperiale `_ and `Stephan Hoyer `_. - Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer - caches its values into memory before pickling :issue:`1128`. Instead, pickle + caches its values into memory before pickling (:issue:`1128`). Instead, pickle stores file paths and restores objects by reopening file references. This enables preliminary, experimental use of xarray for opening files with `dask.distributed `_. @@ -206,6 +206,10 @@ Bug fixes - Fixed a bug with facetgrid (the ``norm`` keyword was ignored, :issue:`1159`). By `Fabien Maussion `_. +- Resolved a concurrency bug that could cause Python to crash when + simultaneously reading and writing netCDF4 files with dask (:issue:`1172`). + By `Stephan Hoyer `_. + .. _whats-new.0.8.2: v0.8.2 (18 August 2016) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index bc2afa4b373..c69fc63acec 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -3,7 +3,6 @@ from __future__ import print_function import gzip import os.path -import threading from distutils.version import StrictVersion from glob import glob from io import BytesIO @@ -12,7 +11,7 @@ import numpy as np from .. import backends, conventions -from .common import ArrayWriter +from .common import ArrayWriter, GLOBAL_LOCK from ..core import indexing from ..core.combine import auto_combine from ..core.utils import close_on_error, is_remote_uri @@ -55,9 +54,6 @@ def _normalize_path(path): return os.path.abspath(os.path.expanduser(path)) -_global_lock = threading.Lock() - - def _default_lock(filename, engine): if filename.endswith('.gz'): lock = False @@ -71,9 +67,9 @@ def _default_lock(filename, engine): else: # TODO: identify netcdf3 files and don't use the global lock # for them - lock = _global_lock + lock = GLOBAL_LOCK elif engine in {'h5netcdf', 'pynio'}: - lock = _global_lock + lock = GLOBAL_LOCK else: lock = False return lock diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 27291e65e3a..a7cce03a33a 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -2,7 +2,6 @@ from __future__ import division from __future__ import print_function import numpy as np -import itertools import logging import time import traceback @@ -12,7 +11,12 @@ from ..conventions import cf_encoder from ..core.utils import FrozenOrderedDict -from ..core.pycompat import iteritems, dask_array_type, OrderedDict +from ..core.pycompat import iteritems, dask_array_type + +try: + from dask.utils import SerializableLock as Lock +except ImportError: + from threading import Lock # Create a logger object, but don't add any handlers. Leave that to user code. logger = logging.getLogger(__name__) @@ -21,6 +25,10 @@ NONE_VAR_NAME = '__values__' +# dask.utils.SerializableLock if available, otherwise just a threading.Lock +GLOBAL_LOCK = Lock() + + def _encode_variable_name(name): if name is None: name = NONE_VAR_NAME @@ -150,7 +158,7 @@ def sync(self): import dask.array as da import dask if StrictVersion(dask.__version__) > StrictVersion('0.8.1'): - da.store(self.sources, self.targets, lock=threading.Lock()) + da.store(self.sources, self.targets, lock=GLOBAL_LOCK) else: da.store(self.sources, self.targets) self.sources = [] From 9ec921bf873ee32bf2139c731919657cc549ef28 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Tue, 3 Jan 2017 18:29:56 -0800 Subject: [PATCH 2/4] Test serializable lock --- doc/dask.rst | 2 +- xarray/test/test_distributed.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/doc/dask.rst b/doc/dask.rst index fc2361f5947..5c992ee7ff7 100644 --- a/doc/dask.rst +++ b/doc/dask.rst @@ -225,7 +225,7 @@ larger chunksizes. import os os.remove('example-data.nc') - + Optimization Tips ----------------- diff --git a/xarray/test/test_distributed.py b/xarray/test/test_distributed.py index a807f72387a..2ab7b0e2ffc 100644 --- a/xarray/test/test_distributed.py +++ b/xarray/test/test_distributed.py @@ -28,9 +28,7 @@ def test_dask_distributed_integration_test(loop, engine): original = create_test_data() with create_tmp_file() as filename: original.to_netcdf(filename, engine=engine) - # TODO: should be able to serialize locks - restored = xr.open_dataset(filename, chunks=3, lock=False, - engine=engine) + restored = xr.open_dataset(filename, chunks=3, engine=engine) assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_dataset_allclose(original, computed) From 1599196107218e1ecd3eb28fc7fc55a33c610d85 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Tue, 3 Jan 2017 19:56:06 -0800 Subject: [PATCH 3/4] Use conda-forge for builds --- ci/requirements-py27-netcdf4-dev.yml | 2 ++ ci/requirements-py27-pydap.yml | 2 ++ ci/requirements-py35.yml | 2 ++ 3 files changed, 6 insertions(+) diff --git a/ci/requirements-py27-netcdf4-dev.yml b/ci/requirements-py27-netcdf4-dev.yml index 4ce193a2a82..85c5dcbf34d 100644 --- a/ci/requirements-py27-netcdf4-dev.yml +++ b/ci/requirements-py27-netcdf4-dev.yml @@ -1,4 +1,6 @@ name: test_env +channels: + - conda-forge dependencies: - python=2.7 - cython diff --git a/ci/requirements-py27-pydap.yml b/ci/requirements-py27-pydap.yml index e391eee514f..e8d4c3088ed 100644 --- a/ci/requirements-py27-pydap.yml +++ b/ci/requirements-py27-pydap.yml @@ -1,4 +1,6 @@ name: test_env +channels: + - conda-forge dependencies: - python=2.7 - dask diff --git a/ci/requirements-py35.yml b/ci/requirements-py35.yml index c6641598fca..df69d89c520 100644 --- a/ci/requirements-py35.yml +++ b/ci/requirements-py35.yml @@ -1,4 +1,6 @@ name: test_env +channels: + - conda-forge dependencies: - python=3.5 - cython From 5d56a738d43cb7b6450f686e4eb269df010350df Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Tue, 3 Jan 2017 20:39:03 -0800 Subject: [PATCH 4/4] remove broken/fragile .test_lock --- xarray/test/test_backends.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/xarray/test/test_backends.py b/xarray/test/test_backends.py index 8c1e00a698c..c7867dc6d3a 100644 --- a/xarray/test/test_backends.py +++ b/xarray/test/test_backends.py @@ -1034,20 +1034,6 @@ def preprocess(ds): with open_mfdataset(tmp, preprocess=preprocess) as actual: self.assertDatasetIdentical(expected, actual) - def test_lock(self): - original = Dataset({'foo': ('x', np.random.randn(10))}) - with create_tmp_file() as tmp: - original.to_netcdf(tmp, format='NETCDF3_CLASSIC') - with open_dataset(tmp, chunks=10) as ds: - task = ds.foo.data.dask[ds.foo.data.name, 0] - self.assertIsInstance(task[-1], type(Lock())) - with open_mfdataset(tmp) as ds: - task = ds.foo.data.dask[ds.foo.data.name, 0] - self.assertIsInstance(task[-1], type(Lock())) - with open_mfdataset(tmp, engine='scipy') as ds: - task = ds.foo.data.dask[ds.foo.data.name, 0] - self.assertNotIsInstance(task[-1], type(Lock())) - def test_save_mfdataset_roundtrip(self): original = Dataset({'foo': ('x', np.random.randn(10))}) datasets = [original.isel(x=slice(5)),