From 398ac285012768ee6c9fc6f2cbc4697ace3dfc27 Mon Sep 17 00:00:00 2001 From: Guido Imperiale Date: Sun, 3 Sep 2017 00:14:30 +0100 Subject: [PATCH 1/7] Load non-index coords to memory ahead of concat --- doc/whats-new.rst | 10 ++++++++-- xarray/core/combine.py | 13 +++++++++++++ xarray/tests/test_dask.py | 16 ++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 25c14d4de6a..5f2761d78af 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -116,7 +116,13 @@ Bug fixes ``rtol`` arguments when called on ``DataArray`` objects. By `Stephan Hoyer `_. -- Xarray ``quantile`` methods now properly raise a ``TypeError`` when applied to +- :py:func:`~xarray.concat` was computing multiple times coordinates that are + not index and not in memory (e.g. dask-based); :py:func:`~xarray.open_mfdataset` + was loading them multiple times from disk. Now, both functions will instead + load them once and store them as numpy arrays (:issue:`1521`). + By `Guido Imperiale `_. + +- xarray ``quantile`` methods now properly raise a ``TypeError`` when applied to objects with data stored as ``dask`` arrays (:issue:`1529`). By `Joe Hamman `_. @@ -2032,4 +2038,4 @@ Miles. v0.1 (2 May 2014) ----------------- -Initial release. \ No newline at end of file +Initial release. diff --git a/xarray/core/combine.py b/xarray/core/combine.py index d139151064b..f6c7c392580 100644 --- a/xarray/core/combine.py +++ b/xarray/core/combine.py @@ -195,6 +195,17 @@ def differs(vname): return concat_over +def _load_coords(dataset): + """Load into memory any non-index coords. Preserve original. + """ + if all(coord._in_memory for coord in dataset.coords.values()): + return dataset + dataset = dataset.copy() + for coord in dataset.coords.values(): + coord.load() + return dataset + + def _dataset_concat(datasets, dim, data_vars, coords, compat, positions): """ Concatenate a sequence of datasets along a new or existing dimension @@ -208,6 +219,8 @@ def _dataset_concat(datasets, dim, data_vars, coords, compat, positions): dim, coord = _calc_concat_dim_coord(dim) datasets = [as_dataset(ds) for ds in datasets] datasets = align(*datasets, join='outer', copy=False, exclude=[dim]) + # TODO: compute dask coords with a single invocation of dask.compute() + datasets = [_load_coords(ds) for ds in datasets] concat_over = _calc_concat_over(datasets, dim, data_vars, coords) diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 92f616b8bd6..f888f3d58c3 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -228,6 +228,22 @@ def test_lazy_array(self): actual = xr.concat([v[:2], v[2:]], 'x') self.assertLazyAndAllClose(u, actual) + def test_concat_loads_coords(self): + # Test that concat() computes dask-based, non-index + # coordinates exactly once and loads them in the output, + # while leaving the input unaltered. + y = build_dask_array() + ds1 = Dataset(coords={'x': [1], 'y': ('x', y)}) + ds2 = Dataset(coords={'x': [1], 'y': ('x', [2.0])}) + assert kernel_call_count == 0 + ds3 = xr.concat([ds1, ds2], dim='z') + # BUG fixed in #1532 where getattr('to_dataset') + # will cause non-index coords to be computed. + assert kernel_call_count == 2 + assert ds1['y'].data is y + assert isinstance(ds3['y'].data, np.ndarray) + assert ds3['y'].values.tolist() == [[1.0], [2.0]] + def test_groupby(self): u = self.eager_array v = self.lazy_array From 1065f4caa7bd8bca861be3be56b3e7f63548cc84 Mon Sep 17 00:00:00 2001 From: Guido Imperiale Date: Thu, 21 Sep 2017 23:38:39 +0100 Subject: [PATCH 2/7] Update unit test after #1522 --- xarray/tests/test_dask.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 5163ce2a386..98420452f72 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -253,14 +253,12 @@ def test_concat_loads_coords(self): # Test that concat() computes dask-based, non-index # coordinates exactly once and loads them in the output, # while leaving the input unaltered. - y = build_dask_array() + y = build_dask_array('y') ds1 = Dataset(coords={'x': [1], 'y': ('x', y)}) ds2 = Dataset(coords={'x': [1], 'y': ('x', [2.0])}) assert kernel_call_count == 0 ds3 = xr.concat([ds1, ds2], dim='z') - # BUG fixed in #1532 where getattr('to_dataset') - # will cause non-index coords to be computed. - assert kernel_call_count == 2 + assert kernel_call_count == 1 assert ds1['y'].data is y assert isinstance(ds3['y'].data, np.ndarray) assert ds3['y'].values.tolist() == [[1.0], [2.0]] From ab90318dce8fc7c5bf23b77f607758ed3ed57d47 Mon Sep 17 00:00:00 2001 From: Guido Imperiale Date: Fri, 22 Sep 2017 22:00:02 +0100 Subject: [PATCH 3/7] Minimise loads on concat. Extend new concat logic to data_vars. --- doc/whats-new.rst | 11 ++-- xarray/core/combine.py | 120 +++++++++++++++++++++++------------------ 2 files changed, 74 insertions(+), 57 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index aab969a1f8f..074f4292f27 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -54,9 +54,9 @@ Breaking changes [...] Note that both versions are currently supported, but using the old syntax will - produce a warning encouraging users to adopt the new syntax. + produce a warning encouraging users to adopt the new syntax. By `Daniel Rothenberg `_. - + - ``repr`` and the Jupyter Notebook won't automatically compute dask variables. Datasets loaded with ``open_dataset`` won't automatically read coords from disk when calling ``repr`` (:issue:`1522`). @@ -189,10 +189,11 @@ Bug fixes ``rtol`` arguments when called on ``DataArray`` objects. By `Stephan Hoyer `_. -- :py:func:`~xarray.concat` was computing multiple times coordinates that are - not index and not in memory (e.g. dask-based); :py:func:`~xarray.open_mfdataset` +- :py:func:`~xarray.concat` was computing variables that aren't in memory + (e.g. dask-based) multiple times; :py:func:`~xarray.open_mfdataset` was loading them multiple times from disk. Now, both functions will instead - load them once and store them as numpy arrays (:issue:`1521`). + load them the bare minimum amount of times and, if they do, store them + in memory in the concatenated array/dataset (:issue:`1521`). By `Guido Imperiale `_. - xarray ``quantile`` methods now properly raise a ``TypeError`` when applied to diff --git a/xarray/core/combine.py b/xarray/core/combine.py index 799672fbe9a..7a51bb59fdd 100644 --- a/xarray/core/combine.py +++ b/xarray/core/combine.py @@ -148,29 +148,45 @@ def _calc_concat_over(datasets, dim, data_vars, coords): Determine which dataset variables need to be concatenated in the result, and which can simply be taken from the first dataset. """ - def process_subset_opt(opt, subset): - if subset == 'coords': - subset_long_name = 'coordinates' - else: - subset_long_name = 'data variables' + # Return values + concat_over = set() + equals = {} + if dim in datasets[0]: + concat_over.add(dim) + for ds in datasets: + concat_over.update(k for k, v in ds.variables.items() + if dim in v.dims) + + def process_subset_opt(opt, subset): if isinstance(opt, basestring): if opt == 'different': - def differs(vname): - # simple helper function which compares a variable - # across all datasets and indicates whether that - # variable differs or not. - v = datasets[0].variables[vname] - return any(not ds.variables[vname].equals(v) - for ds in datasets[1:]) # all nonindexes that are not the same in each dataset - concat_new = set(k for k in getattr(datasets[0], subset) - if k not in concat_over and differs(k)) + for k in getattr(datasets[0], subset): + if k not in concat_over: + # Compare the variable of all datasets vs. the one + # of the first dataset. Perform the minimum amount of + # loads in order to avoid multiple loads from disk while + # keeping the RAM footprint low. + v_lhs = datasets[0].variables[k].load() + # We'll need to know later on if variables are equal. + for ds_rhs in datasets[1:]: + v_rhs = ds_rhs.variables[k].compute() + if not v_lhs.equals(v_rhs): + concat_over.add(k) + equals[k] = False + # rhs variable is not to be discarded, therefore + # avoid re-computing it in the future + ds_rhs.variables[k].data = v_rhs.data + break + else: + equals[k] = True + elif opt == 'all': - concat_new = (set(getattr(datasets[0], subset)) - - set(datasets[0].dims)) + concat_over.update(set(getattr(datasets[0], subset)) - + set(datasets[0].dims)) elif opt == 'minimal': - concat_new = set() + pass else: raise ValueError("unexpected value for concat_%s: %s" % (subset, opt)) @@ -178,51 +194,37 @@ def differs(vname): invalid_vars = [k for k in opt if k not in getattr(datasets[0], subset)] if invalid_vars: - raise ValueError('some variables in %s are not ' - '%s on the first dataset: %s' - % (subset, subset_long_name, invalid_vars)) - concat_new = set(opt) - return concat_new + if subset == 'coords': + raise ValueError( + 'some variables in coords are not coordinates on ' + 'the first dataset: %s' % invalid_vars) + else: + raise ValueError( + 'some variables in data_vars are not data variables on ' + 'the first dataset: %s' % invalid_vars) + concat_over.update(opt) - concat_over = set() - for ds in datasets: - concat_over.update(k for k, v in ds.variables.items() - if dim in v.dims) - concat_over.update(process_subset_opt(data_vars, 'data_vars')) - concat_over.update(process_subset_opt(coords, 'coords')) - if dim in datasets[0]: - concat_over.add(dim) - return concat_over - - -def _load_coords(dataset): - """Load into memory any non-index coords. Preserve original. - """ - if all(coord._in_memory for coord in dataset.coords.values()): - return dataset - dataset = dataset.copy() - for coord in dataset.coords.values(): - coord.load() - return dataset + process_subset_opt(data_vars, 'data_vars') + process_subset_opt(coords, 'coords') + return concat_over, equals def _dataset_concat(datasets, dim, data_vars, coords, compat, positions): """ Concatenate a sequence of datasets along a new or existing dimension """ - from .dataset import Dataset, as_dataset + from .dataset import Dataset if compat not in ['equals', 'identical']: raise ValueError("compat=%r invalid: must be 'equals' " "or 'identical'" % compat) dim, coord = _calc_concat_dim_coord(dim) - datasets = [as_dataset(ds) for ds in datasets] + # Make sure we're working on a copy (we'll be loading variables) + datasets = [ds.copy() for ds in datasets] datasets = align(*datasets, join='outer', copy=False, exclude=[dim]) - # TODO: compute dask coords with a single invocation of dask.compute() - datasets = [_load_coords(ds) for ds in datasets] - concat_over = _calc_concat_over(datasets, dim, data_vars, coords) + concat_over, equals = _calc_concat_over(datasets, dim, data_vars, coords) def insert_result_variable(k, v): assert isinstance(v, Variable) @@ -252,11 +254,25 @@ def insert_result_variable(k, v): elif (k in result_coord_names) != (k in ds.coords): raise ValueError('%r is a coordinate in some datasets but not ' 'others' % k) - elif (k in result_vars and k != dim and - not getattr(v, compat)(result_vars[k])): - verb = 'equal' if compat == 'equals' else compat - raise ValueError( - 'variable %r not %s across datasets' % (k, verb)) + elif k in result_vars and k != dim: + # Don't use Variable.identical as it internally invokes + # Variable.equals, and we may already know the answer + if compat == 'identical' and not utils.dict_equiv( + v.attrs, result_vars[k].attrs): + raise ValueError( + 'variable %s not identical across datasets' % k) + + # Proceed with equals() + try: + # May be populated when using the "different" method + is_equal = equals[k] + except KeyError: + result_vars[k].load() + is_equal = v.equals(result_vars[k]) + if not is_equal: + raise ValueError( + 'variable %s not equal across datasets' % k) + # we've already verified everything is consistent; now, calculate # shared dimension sizes so we can expand the necessary variables From 68d51c63ce5f6bf8e9ba651ae3fb816bdf0b41d2 Mon Sep 17 00:00:00 2001 From: Guido Imperiale Date: Fri, 22 Sep 2017 22:11:22 +0100 Subject: [PATCH 4/7] Trivial tweaks --- doc/whats-new.rst | 4 ++-- xarray/core/combine.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 074f4292f27..2f754c935df 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -192,8 +192,8 @@ Bug fixes - :py:func:`~xarray.concat` was computing variables that aren't in memory (e.g. dask-based) multiple times; :py:func:`~xarray.open_mfdataset` was loading them multiple times from disk. Now, both functions will instead - load them the bare minimum amount of times and, if they do, store them - in memory in the concatenated array/dataset (:issue:`1521`). + load them at most once and, if they do, store them in memory in the + concatenated array/dataset (:issue:`1521`). By `Guido Imperiale `_. - xarray ``quantile`` methods now properly raise a ``TypeError`` when applied to diff --git a/xarray/core/combine.py b/xarray/core/combine.py index 7a51bb59fdd..539fdbda1c7 100644 --- a/xarray/core/combine.py +++ b/xarray/core/combine.py @@ -188,8 +188,7 @@ def process_subset_opt(opt, subset): elif opt == 'minimal': pass else: - raise ValueError("unexpected value for concat_%s: %s" - % (subset, opt)) + raise ValueError("unexpected value for %s: %s" % (subset, opt)) else: invalid_vars = [k for k in opt if k not in getattr(datasets[0], subset)] From 1c3047478eb84d73ac09d5e51c40d09969cc8a88 Mon Sep 17 00:00:00 2001 From: Guido Imperiale Date: Sun, 24 Sep 2017 17:31:51 +0100 Subject: [PATCH 5/7] Added unit tests Fix loads when vars are found different halfway through --- xarray/core/combine.py | 9 ++-- xarray/tests/test_dask.py | 86 ++++++++++++++++++++++++++++++++------- 2 files changed, 78 insertions(+), 17 deletions(-) diff --git a/xarray/core/combine.py b/xarray/core/combine.py index 539fdbda1c7..007b9640e20 100644 --- a/xarray/core/combine.py +++ b/xarray/core/combine.py @@ -170,14 +170,17 @@ def process_subset_opt(opt, subset): # keeping the RAM footprint low. v_lhs = datasets[0].variables[k].load() # We'll need to know later on if variables are equal. + computed = [] for ds_rhs in datasets[1:]: v_rhs = ds_rhs.variables[k].compute() + computed.append(v_rhs) if not v_lhs.equals(v_rhs): concat_over.add(k) equals[k] = False - # rhs variable is not to be discarded, therefore - # avoid re-computing it in the future - ds_rhs.variables[k].data = v_rhs.data + # computed variables are not to be re-computed + # again in the future + for ds, v in zip(datasets[1:], computed): + ds.variables[k].data = v.data break else: equals[k] = True diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 98420452f72..a73d3913bde 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -249,19 +249,76 @@ def test_lazy_array(self): actual = xr.concat([v[:2], v[2:]], 'x') self.assertLazyAndAllClose(u, actual) - def test_concat_loads_coords(self): - # Test that concat() computes dask-based, non-index - # coordinates exactly once and loads them in the output, - # while leaving the input unaltered. - y = build_dask_array('y') - ds1 = Dataset(coords={'x': [1], 'y': ('x', y)}) - ds2 = Dataset(coords={'x': [1], 'y': ('x', [2.0])}) + def test_concat_loads_variables(self): + # Test that concat() computes not-in-memory variables at most once + # and loads them in the output, while leaving the input unaltered. + d1 = build_dask_array('d1') + c1 = build_dask_array('c1') + d2 = build_dask_array('d2') + c2 = build_dask_array('c2') + d3 = build_dask_array('d3') + c3 = build_dask_array('c3') + # Note: c is a non-index coord. + # Index coords are loaded by IndexVariable.__init__. + ds1 = Dataset(data_vars={'d': ('x', d1)}, coords={'c': ('x', c1)}) + ds2 = Dataset(data_vars={'d': ('x', d2)}, coords={'c': ('x', c2)}) + ds3 = Dataset(data_vars={'d': ('x', d3)}, coords={'c': ('x', c3)}) + assert kernel_call_count == 0 - ds3 = xr.concat([ds1, ds2], dim='z') - assert kernel_call_count == 1 - assert ds1['y'].data is y - assert isinstance(ds3['y'].data, np.ndarray) - assert ds3['y'].values.tolist() == [[1.0], [2.0]] + out = xr.concat([ds1, ds2, ds3], dim='n', data_vars='different', + coords='different') + # each kernel is computed exactly once + assert kernel_call_count == 6 + # variables are loaded in the output + assert isinstance(out['d'].data, np.ndarray) + assert isinstance(out['c'].data, np.ndarray) + + out = xr.concat([ds1, ds2, ds3], dim='n', data_vars='all', coords='all') + # no extra kernel calls + assert kernel_call_count == 6 + assert isinstance(out['d'].data, dask.array.Array) + assert isinstance(out['c'].data, dask.array.Array) + + out = xr.concat([ds1, ds2, ds3], dim='n', data_vars=['d'], coords=['c']) + # no extra kernel calls + assert kernel_call_count == 6 + assert isinstance(out['d'].data, dask.array.Array) + assert isinstance(out['c'].data, dask.array.Array) + + out = xr.concat([ds1, ds2, ds3], dim='n', data_vars=[], coords=[]) + # variables are loaded once as we are validing that they're identical + assert kernel_call_count == 12 + assert isinstance(out['d'].data, np.ndarray) + assert isinstance(out['c'].data, np.ndarray) + + out = xr.concat([ds1, ds2, ds3], dim='n', data_vars='different', + coords='different', compat='identical') + # compat=identical doesn't do any more kernel calls than compat=equals + assert kernel_call_count == 18 + assert isinstance(out['d'].data, np.ndarray) + assert isinstance(out['c'].data, np.ndarray) + + # When the test for different turns true halfway through, + # stop computing variables as it would not have any benefit + ds4 = Dataset(data_vars={'d': ('x', [2.0])}, coords={'c': ('x', [2.0])}) + out = xr.concat([ds1, ds2, ds4, ds3], dim='n', data_vars='different', + coords='different') + # the variables of ds1 and ds2 were computed, but those of ds3 didn't + assert kernel_call_count == 22 + assert isinstance(out['d'].data, dask.array.Array) + assert isinstance(out['c'].data, dask.array.Array) + # the data of ds1 and ds2 was loaded into numpy and then + # concatenated to the data of ds3. Thus, only ds3 is computed now. + out.compute() + assert kernel_call_count == 24 + + # Finally, test that riginals are unaltered + assert ds1['d'].data is d1 + assert ds1['c'].data is c1 + assert ds2['d'].data is d2 + assert ds2['c'].data is c2 + assert ds3['d'].data is d3 + assert ds3['c'].data is c3 def test_groupby(self): u = self.eager_array @@ -529,10 +586,11 @@ def test_dask_kwargs_dataset(method): kernel_call_count = 0 -def kernel(): +def kernel(name): """Dask kernel to test pickling/unpickling and __repr__. Must be global to make it pickleable. """ + print("kernel(%s)" % name) global kernel_call_count kernel_call_count += 1 return np.ones(1, dtype=np.int64) @@ -542,5 +600,5 @@ def build_dask_array(name): global kernel_call_count kernel_call_count = 0 return dask.array.Array( - dask={(name, 0): (kernel, )}, name=name, + dask={(name, 0): (kernel, name)}, name=name, chunks=((1,),), dtype=np.int64) From f99313cc7b266964cb540c4dbd858f701461eb2b Mon Sep 17 00:00:00 2001 From: Guido Imperiale Date: Fri, 6 Oct 2017 21:23:20 +0100 Subject: [PATCH 6/7] Add xfail for #1586 --- xarray/tests/test_combine.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/xarray/tests/test_combine.py b/xarray/tests/test_combine.py index f4bb5e83f98..b00329514be 100644 --- a/xarray/tests/test_combine.py +++ b/xarray/tests/test_combine.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +import pytest from xarray import Dataset, DataArray, auto_combine, concat, Variable from xarray.core.pycompat import iteritems, OrderedDict @@ -268,6 +269,7 @@ def test_concat(self): with self.assertRaisesRegexp(ValueError, 'not a valid argument'): concat([foo, bar], dim='w', data_vars='minimal') + @pytest.mark.xfail(reason='https://github.com/pydata/xarray/issues/1586') def test_concat_encoding(self): # Regression test for GH1297 ds = Dataset({'foo': (['x', 'y'], np.random.random((2, 3))), From 2f80cef14d0ede182e9521c096f37f88a4e69488 Mon Sep 17 00:00:00 2001 From: Guido Imperiale Date: Fri, 6 Oct 2017 21:23:20 +0100 Subject: [PATCH 7/7] Revert "Add xfail for #1586" This reverts commit f99313cc7b266964cb540c4dbd858f701461eb2b. --- xarray/tests/test_combine.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/xarray/tests/test_combine.py b/xarray/tests/test_combine.py index b00329514be..f4bb5e83f98 100644 --- a/xarray/tests/test_combine.py +++ b/xarray/tests/test_combine.py @@ -5,7 +5,6 @@ import numpy as np import pandas as pd -import pytest from xarray import Dataset, DataArray, auto_combine, concat, Variable from xarray.core.pycompat import iteritems, OrderedDict @@ -269,7 +268,6 @@ def test_concat(self): with self.assertRaisesRegexp(ValueError, 'not a valid argument'): concat([foo, bar], dim='w', data_vars='minimal') - @pytest.mark.xfail(reason='https://github.com/pydata/xarray/issues/1586') def test_concat_encoding(self): # Regression test for GH1297 ds = Dataset({'foo': (['x', 'y'], np.random.random((2, 3))),