diff --git a/doc/io.rst b/doc/io.rst index 398afe8642e..85cb73a0734 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -604,6 +604,29 @@ store is already present at that path, an error will be raised, preventing it from being overwritten. To override this behavior and overwrite an existing store, add ``mode='w'`` when invoking ``to_zarr``. +It is also possible to append to an existing store. For that, add ``mode='a'`` +and set ``append_dim`` to the name of the dimension along which to append. + +.. ipython:: python + :suppress: + + ! rm -rf path/to/directory.zarr + +.. ipython:: python + + ds1 = xr.Dataset({'foo': (('x', 'y', 't'), np.random.rand(4, 5, 2))}, + coords={'x': [10, 20, 30, 40], + 'y': [1,2,3,4,5], + 't': pd.date_range('2001-01-01', periods=2)}) + ds1.to_zarr('path/to/directory.zarr') + ds2 = xr.Dataset({'foo': (('x', 'y', 't'), np.random.rand(4, 5, 2))}, + coords={'x': [10, 20, 30, 40], + 'y': [1,2,3,4,5], + 't': pd.date_range('2001-01-03', periods=2)}) + ds2.to_zarr('path/to/directory.zarr', mode='a', append_dim='t') + +To store variable length strings use ``dtype=object``. + To read back a zarr dataset that has been created this way, we use the :py:func:`~xarray.open_zarr` method: diff --git a/doc/whats-new.rst b/doc/whats-new.rst index b33cb992874..4fd83bfc370 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -226,6 +226,11 @@ Other enhancements report showing what exactly differs between the two objects (dimensions / coordinates / variables / attributes) (:issue:`1507`). By `Benoit Bovy `_. +- Added append capability to the zarr store. + By `Jendrik Jördening `_, + `David Brochart `_, + `Ryan Abernathey `_ and + `Shikhar Goenka`_. - Resampling of standard and non-standard calendars indexed by :py:class:`~xarray.CFTimeIndex` is now possible. (:issue:`2191`). By `Jwen Fai Low `_ and diff --git a/xarray/backends/api.py b/xarray/backends/api.py index dbe6cdcdd74..7a8c022b87d 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -4,17 +4,21 @@ from io import BytesIO from numbers import Number from pathlib import Path +import re import numpy as np +import pandas as pd -from .. import Dataset, DataArray, backends, conventions +from .. import Dataset, DataArray, backends, conventions, coding from ..core import indexing from .. import auto_combine from ..core.combine import (combine_by_coords, _nested_combine, _infer_concat_order_from_positions) from ..core.utils import close_on_error, is_grib_path, is_remote_uri +from ..core.variable import Variable from .common import ArrayWriter from .locks import _get_scheduler +from ..coding.variables import safe_setitem, unpack_for_encoding DATAARRAY_NAME = '__xarray_dataarray_name__' DATAARRAY_VARIABLE = '__xarray_dataarray_variable__' @@ -1024,8 +1028,48 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, for w, s in zip(writes, stores)]) +def _validate_datatypes_for_zarr_append(dataset): + """DataArray.name and Dataset keys must be a string or None""" + def check_dtype(var): + if (not np.issubdtype(var.dtype, np.number) + and not coding.strings.is_unicode_dtype(var.dtype) + and not var.dtype == object): + # and not re.match('^bytes[1-9]+$', var.dtype.name)): + raise ValueError('Invalid dtype for data variable: {} ' + 'dtype must be a subtype of number, ' + 'a fixed sized string, a fixed size ' + 'unicode string or an object'.format(var)) + for k in dataset.data_vars.values(): + check_dtype(k) + + +def _validate_append_dim_and_encoding(ds_to_append, store, append_dim, + encoding, **open_kwargs): + try: + ds = backends.zarr.open_zarr(store, **open_kwargs) + except ValueError: # store empty + return + if append_dim: + if append_dim not in ds.dims: + raise ValueError( + "{} not a valid dimension in the Dataset".format(append_dim) + ) + for data_var in ds_to_append: + if data_var in ds: + if append_dim is None: + raise ValueError( + "variable '{}' already exists, but append_dim " + "was not set".format(data_var) + ) + if data_var in encoding.keys(): + raise ValueError( + "variable '{}' already exists, but encoding was" + "provided".format(data_var) + ) + + def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True, consolidated=False): + encoding=None, compute=True, consolidated=False, append_dim=None): """This function creates an appropriate datastore for writing a dataset to a zarr ztore @@ -1040,11 +1084,18 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, _validate_dataset_names(dataset) _validate_attrs(dataset) + if mode == "a": + _validate_datatypes_for_zarr_append(dataset) + _validate_append_dim_and_encoding(dataset, store, append_dim, + group=group, + consolidated=consolidated, + encoding=encoding) + zstore = backends.ZarrStore.open_group(store=store, mode=mode, synchronizer=synchronizer, group=group, consolidate_on_close=consolidated) - + zstore.append_dim = append_dim writer = ArrayWriter() # TODO: figure out how to properly handle unlimited_dims dump_to_store(dataset, zstore, writer, encoding=encoding) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 5819292bb8a..a433c80505a 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -158,14 +158,19 @@ class ArrayWriter: def __init__(self, lock=None): self.sources = [] self.targets = [] + self.regions = [] self.lock = lock - def add(self, source, target): + def add(self, source, target, region=None): if isinstance(source, dask_array_type): self.sources.append(source) self.targets.append(target) + self.regions.append(region) else: - target[...] = source + if region: + target[region] = source + else: + target[...] = source def sync(self, compute=True): if self.sources: @@ -173,11 +178,13 @@ def sync(self, compute=True): # TODO: consider wrapping targets with dask.delayed, if this makes # for any discernable difference in perforance, e.g., # targets = [dask.delayed(t) for t in self.targets] + delayed_store = da.store(self.sources, self.targets, lock=self.lock, compute=compute, - flush=True) + flush=True, regions=self.regions) self.sources = [] self.targets = [] + self.regions = [] return delayed_store diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index f5364314af8..c0634fff009 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -8,7 +8,8 @@ from ..core import indexing from ..core.pycompat import integer_types from ..core.utils import FrozenOrderedDict, HiddenKeyDict -from .common import AbstractWritableDataStore, BackendArray +from .common import AbstractWritableDataStore, BackendArray, \ + _encode_variable_name # need some special secret attributes to tell us the dimensions _DIMENSION_KEY = '_ARRAY_DIMENSIONS' @@ -212,7 +213,7 @@ def encode_zarr_variable(var, needs_copy=True, name=None): # zarr allows unicode, but not variable-length strings, so it's both # simpler and more compact to always encode as UTF-8 explicitly. # TODO: allow toggling this explicitly via dtype in encoding. - coder = coding.strings.EncodedStringCoder(allows_unicode=False) + coder = coding.strings.EncodedStringCoder(allows_unicode=True) var = coder.encode(var, name=name) var = coding.strings.ensure_fixed_length_bytes(var) @@ -257,6 +258,7 @@ def __init__(self, zarr_group, consolidate_on_close=False): self._synchronizer = self.ds.synchronizer self._group = self.ds.path self._consolidate_on_close = consolidate_on_close + self.append_dim = None def open_store_variable(self, name, zarr_array): data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self)) @@ -313,40 +315,122 @@ def encode_variable(self, variable): def encode_attribute(self, a): return _encode_zarr_attr_value(a) - def prepare_variable(self, name, variable, check_encoding=False, - unlimited_dims=None): - - attrs = variable.attrs.copy() - dims = variable.dims - dtype = variable.dtype - shape = variable.shape - - fill_value = attrs.pop('_FillValue', None) - if variable.encoding == {'_FillValue': None} and fill_value is None: - variable.encoding = {} - - encoding = _extract_zarr_variable_encoding( - variable, raise_on_invalid=check_encoding) - - encoded_attrs = OrderedDict() - # the magic for storing the hidden dimension data - encoded_attrs[_DIMENSION_KEY] = dims - for k, v in attrs.items(): - encoded_attrs[k] = self.encode_attribute(v) - - zarr_array = self.ds.create(name, shape=shape, dtype=dtype, - fill_value=fill_value, **encoding) - zarr_array.attrs.put(encoded_attrs) - - return zarr_array, variable.data - - def store(self, variables, attributes, *args, **kwargs): - AbstractWritableDataStore.store(self, variables, attributes, - *args, **kwargs) + def store(self, variables, attributes, check_encoding_set=frozenset(), + writer=None, unlimited_dims=None): + """ + Top level method for putting data on this store, this method: + - encodes variables/attributes + - sets dimensions + - sets variables + + Parameters + ---------- + variables : dict-like + Dictionary of key/value (variable name / xr.Variable) pairs + attributes : dict-like + Dictionary of key/value (attribute name / attribute) pairs + check_encoding_set : list-like + List of variables that should be checked for invalid encoding + values + writer : ArrayWriter + unlimited_dims : list-like + List of dimension names that should be treated as unlimited + dimensions. + dimension on which the zarray will be appended + only needed in append mode + """ + + existing_variables = set([vn for vn in variables + if _encode_variable_name(vn) in self.ds]) + new_variables = set(variables) - existing_variables + variables_without_encoding = OrderedDict([(vn, variables[vn]) + for vn in new_variables]) + variables_encoded, attributes = self.encode( + variables_without_encoding, attributes) + + if len(existing_variables) > 0: + # there are variables to append + # their encoding must be the same as in the store + ds = open_zarr(self.ds.store, chunks=None) + variables_with_encoding = OrderedDict() + for vn in existing_variables: + variables_with_encoding[vn] = variables[vn].copy(deep=False) + variables_with_encoding[vn].encoding = ds[vn].encoding + variables_with_encoding, _ = self.encode(variables_with_encoding, + {}) + variables_encoded.update(variables_with_encoding) + + self.set_attributes(attributes) + self.set_dimensions(variables_encoded, unlimited_dims=unlimited_dims) + self.set_variables(variables_encoded, check_encoding_set, writer, + unlimited_dims=unlimited_dims) def sync(self): pass + def set_variables(self, variables, check_encoding_set, writer, + unlimited_dims=None): + """ + This provides a centralized method to set the variables on the data + store. + + Parameters + ---------- + variables : dict-like + Dictionary of key/value (variable name / xr.Variable) pairs + check_encoding_set : list-like + List of variables that should be checked for invalid encoding + values + writer : + unlimited_dims : list-like + List of dimension names that should be treated as unlimited + dimensions. + """ + + for vn, v in variables.items(): + name = _encode_variable_name(vn) + check = vn in check_encoding_set + attrs = v.attrs.copy() + dims = v.dims + dtype = v.dtype + shape = v.shape + + fill_value = attrs.pop('_FillValue', None) + if v.encoding == {'_FillValue': None} and fill_value is None: + v.encoding = {} + if name in self.ds: + zarr_array = self.ds[name] + if self.append_dim in dims: + # this is the DataArray that has append_dim as a + # dimension + append_axis = dims.index(self.append_dim) + new_shape = list(zarr_array.shape) + new_shape[append_axis] += v.shape[append_axis] + new_region = [slice(None)] * len(new_shape) + new_region[append_axis] = slice( + zarr_array.shape[append_axis], + None + ) + zarr_array.resize(new_shape) + writer.add(v.data, zarr_array, + region=tuple(new_region)) + else: + # new variable + encoding = _extract_zarr_variable_encoding( + v, raise_on_invalid=check) + encoded_attrs = OrderedDict() + # the magic for storing the hidden dimension data + encoded_attrs[_DIMENSION_KEY] = dims + for k2, v2 in attrs.items(): + encoded_attrs[k2] = self.encode_attribute(v2) + + if coding.strings.check_vlen_dtype(dtype) == str: + dtype = str + zarr_array = self.ds.create(name, shape=shape, dtype=dtype, + fill_value=fill_value, **encoding) + zarr_array.attrs.put(encoded_attrs) + writer.add(v.data, zarr_array) + def close(self): if self._consolidate_on_close: import zarr diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 2df0400609e..500f2bb5d7f 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -1365,7 +1365,8 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None, compute=compute) def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True, consolidated=False): + encoding=None, compute=True, consolidated=False, + append_dim=None): """Write dataset contents to a zarr group. .. note:: Experimental @@ -1376,9 +1377,10 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, ---------- store : MutableMapping or str, optional Store or path to directory in file system. - mode : {'w', 'w-'} + mode : {'w', 'w-', 'a'} Persistence mode: 'w' means create (overwrite if exists); - 'w-' means create (fail if exists). + 'w-' means create (fail if exists); + 'a' means append (create if does not exist). synchronizer : object, optional Array synchronizer group : str, obtional @@ -1393,6 +1395,8 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, consolidated: bool, optional If True, apply zarr's `consolidate_metadata` function to the store after writing. + append_dim: str, optional + If mode='a', the dimension on which the data will be appended. References ---------- @@ -1400,14 +1404,14 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, """ if encoding is None: encoding = {} - if mode not in ['w', 'w-']: - # TODO: figure out how to handle 'r+' and 'a' - raise ValueError("The only supported options for mode are 'w' " - "and 'w-'.") + if mode not in ['w', 'w-', 'a']: + # TODO: figure out how to handle 'r+' + raise ValueError("The only supported options for mode are 'w'," + "'w-' and 'a'.") from ..backends.api import to_zarr return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer, group=group, encoding=encoding, compute=compute, - consolidated=consolidated) + consolidated=consolidated, append_dim=append_dim) def __repr__(self): return formatting.dataset_repr(self) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index f48b2d6e563..f14cf5b788c 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -38,7 +38,7 @@ requires_scipy_or_netCDF4, requires_zarr) from .test_coding_times import ( _ALL_CALENDARS, _NON_STANDARD_CALENDARS, _STANDARD_CALENDARS) -from .test_dataset import create_test_data +from .test_dataset import create_test_data, create_append_test_data try: import netCDF4 as nc4 @@ -1615,11 +1615,18 @@ def test_write_persistence_modes(self): with pytest.raises(ValueError): self.save(original, store, mode='w-') - # check that we can't use other persistence modes - # TODO: reconsider whether other persistence modes should be supported - with pytest.raises(ValueError): - with self.roundtrip(original, save_kwargs={'mode': 'a'}) as actual: - pass + # check append mode for normal write + with self.roundtrip(original, save_kwargs={'mode': 'a'}) as actual: + assert_identical(original, actual) + + ds, ds_to_append, _ = create_append_test_data() + + # check append mode for append write + with self.create_zarr_target() as store_target: + ds.to_zarr(store_target, mode='w') + ds_to_append.to_zarr(store_target, mode='a', append_dim='time') + original = xr.concat([ds, ds_to_append], dim='time') + assert_identical(original, xr.open_zarr(store_target)) def test_compressor_encoding(self): original = create_test_data() @@ -1649,17 +1656,82 @@ def test_encoding_kwarg_fixed_width_string(self): def test_dataset_caching(self): super(CFEncodedBase, self).test_dataset_caching() - @pytest.mark.xfail(reason="Zarr stores can not be appended to") def test_append_write(self): - super(CFEncodedBase, self).test_append_write() + ds, ds_to_append, _ = create_append_test_data() + with self.create_zarr_target() as store_target: + ds.to_zarr(store_target, mode='w') + ds_to_append.to_zarr(store_target, mode='a', append_dim='time') + original = xr.concat([ds, ds_to_append], dim='time') + assert_identical(original, xr.open_zarr(store_target)) @pytest.mark.xfail(reason="Zarr stores can not be appended to") def test_append_overwrite_values(self): super(CFEncodedBase, self).test_append_overwrite_values() - @pytest.mark.xfail(reason="Zarr stores can not be appended to") def test_append_with_invalid_dim_raises(self): - super(CFEncodedBase, self).test_append_with_invalid_dim_raises() + + ds, ds_to_append, _ = create_append_test_data() + + # check failure when append_dim not valid + with pytest.raises(ValueError): + with self.create_zarr_target() as store_target: + ds.to_zarr(store_target, mode='w') + ds_to_append.to_zarr(store_target, mode='a', + append_dim='notvalid') + + def test_append_with_append_dim_not_set_raises(self): + + ds, ds_to_append, _ = create_append_test_data() + + # check failure when append_dim not set + with pytest.raises(ValueError): + with self.create_zarr_target() as store_target: + ds.to_zarr(store_target, mode='w') + ds_to_append.to_zarr(store_target, mode='a') + + def test_append_with_existing_encoding_raises(self): + + ds, ds_to_append, _ = create_append_test_data() + + # check failure when providing encoding to existing variable + with pytest.raises(ValueError): + with self.create_zarr_target() as store_target: + ds.to_zarr(store_target, mode='w') + ds_to_append.to_zarr(store_target, mode='a', + append_dim='time', + encoding={'da': {'compressor': None}}) + + def test_check_encoding_is_consistent_after_append(self): + + ds, ds_to_append, _ = create_append_test_data() + + # check encoding consistency + with self.create_zarr_target() as store_target: + import zarr + compressor = zarr.Blosc() + encoding = {'da': {'compressor': compressor}} + ds.to_zarr(store_target, mode='w', encoding=encoding) + ds_to_append.to_zarr(store_target, mode='a', append_dim='time') + actual_ds = xr.open_zarr(store_target) + actual_encoding = actual_ds['da'].encoding['compressor'] + assert actual_encoding.get_config() == compressor.get_config() + assert_identical( + xr.open_zarr(store_target).compute(), + xr.concat([ds, ds_to_append], dim='time') + ) + + def test_append_with_new_variable(self): + + ds, ds_to_append, ds_with_new_var = create_append_test_data() + + # check append mode for new variable + with self.create_zarr_target() as store_target: + xr.concat([ds, ds_to_append], dim='time').to_zarr(store_target, + mode='w') + ds_with_new_var.to_zarr(store_target, mode='a') + combined = xr.concat([ds, ds_to_append], dim='time') + combined['new_var'] = ds_with_new_var['new_var'] + assert_identical(combined, xr.open_zarr(store_target)) def test_to_zarr_compute_false_roundtrip(self): from dask.delayed import Delayed @@ -1669,11 +1741,51 @@ def test_to_zarr_compute_false_roundtrip(self): with self.create_zarr_target() as store: delayed_obj = self.save(original, store, compute=False) assert isinstance(delayed_obj, Delayed) + + # make sure target store has not been written to yet + with pytest.raises(AssertionError): + with self.open(store) as actual: + assert_identical(original, actual) + delayed_obj.compute() with self.open(store) as actual: assert_identical(original, actual) + def test_to_zarr_append_compute_false_roundtrip(self): + from dask.delayed import Delayed + + ds, ds_to_append, _ = create_append_test_data() + ds, ds_to_append = ds.chunk(), ds_to_append.chunk() + + with self.create_zarr_target() as store: + delayed_obj = self.save(ds, store, compute=False, mode='w') + assert isinstance(delayed_obj, Delayed) + + with pytest.raises(AssertionError): + with self.open(store) as actual: + assert_identical(ds, actual) + + delayed_obj.compute() + + with self.open(store) as actual: + assert_identical(ds, actual) + + delayed_obj = self.save(ds_to_append, store, compute=False, + mode='a', append_dim='time') + assert isinstance(delayed_obj, Delayed) + + with pytest.raises(AssertionError): + with self.open(store) as actual: + assert_identical(xr.concat([ds, ds_to_append], dim='time'), + actual) + + delayed_obj.compute() + + with self.open(store) as actual: + assert_identical(xr.concat([ds, ds_to_append], dim='time'), + actual) + def test_encoding_chunksizes(self): # regression test for GH2278 # see also test_encoding_chunksizes_unlimited diff --git a/xarray/tests/test_dataset.py b/xarray/tests/test_dataset.py index 7b3ae88608b..501179431ed 100644 --- a/xarray/tests/test_dataset.py +++ b/xarray/tests/test_dataset.py @@ -52,6 +52,58 @@ def create_test_data(seed=None): return obj +def create_append_test_data(seed=None): + rs = np.random.RandomState(seed) + + lat = [2, 1, 0] + lon = [0, 1, 2] + nt1 = 3 + nt2 = 2 + time1 = pd.date_range('2000-01-01', periods=nt1) + time2 = pd.date_range('2000-02-01', periods=nt2) + string_var = np.array(["ae", "bc", "df"], dtype=object) + string_var_to_append = np.array(['asdf', 'asdfg'], dtype=object) + unicode_var = ["áó", "áó", "áó"] + + ds = xr.Dataset( + data_vars={ + 'da': xr.DataArray(rs.rand(3, 3, nt1), coords=[lat, lon, time1], + dims=['lat', 'lon', 'time']), + 'string_var': xr.DataArray(string_var, coords=[time1], + dims=['time']), + 'unicode_var': xr.DataArray(unicode_var, coords=[time1], + dims=['time']).astype(np.unicode_) + } + ) + + ds_to_append = xr.Dataset( + data_vars={ + 'da': xr.DataArray(rs.rand(3, 3, nt2), coords=[lat, lon, time2], + dims=['lat', 'lon', 'time']), + 'string_var': xr.DataArray(string_var_to_append, coords=[time2], + dims=['time']), + 'unicode_var': xr.DataArray(unicode_var[:nt2], coords=[time2], + dims=['time']).astype(np.unicode_) + } + ) + + ds_with_new_var = xr.Dataset( + data_vars={ + 'new_var': xr.DataArray( + rs.rand(3, 3, nt1 + nt2), + coords=[lat, lon, time1.append(time2)], + dims=['lat', 'lon', 'time'] + ), + } + ) + + assert all(objp.data.flags.writeable for objp in ds.variables.values()) + assert all( + objp.data.flags.writeable for objp in ds_to_append.variables.values() + ) + return ds, ds_to_append, ds_with_new_var + + def create_test_multiindex(): mindex = pd.MultiIndex.from_product([['a', 'b'], [1, 2]], names=('level_1', 'level_2'))