Skip to content

Commit dd806b8

Browse files
shoyerkeewis
andauthored
Support parallel writes to regions of zarr stores (#4035)
* WIP: support writing to a region with zarr * Consolidate before closing * write -> save * Integration test for writing to regions * Skip compute=False if dask is not installed * raise an error for non-matching vars in to_zarr with region * wip docstring * Update to_zarr docstring * Error handling and tests for writing to a zarr region * Add narrative docs on to_zarr() with region * Add whats-new note on region * Add dask.array import * Fixup docs * Add in PR link to whats-new * more description in docs * don't override attrs when writing to regions * Another check for edge cases * Blacken * edits per Ryan's review * move whats-new * Mark tests as requiring dask * doc clarifications * Update doc/whats-new.rst Co-authored-by: keewis <[email protected]> Co-authored-by: keewis <[email protected]>
1 parent 83884a1 commit dd806b8

File tree

6 files changed

+399
-106
lines changed

6 files changed

+399
-106
lines changed

doc/io.rst

Lines changed: 111 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,9 @@ N-dimensional arrays.
833833
Zarr has the ability to store arrays in a range of ways, including in memory,
834834
in files, and in cloud-based object storage such as `Amazon S3`_ and
835835
`Google Cloud Storage`_.
836-
Xarray's Zarr backend allows xarray to leverage these capabilities.
836+
Xarray's Zarr backend allows xarray to leverage these capabilities, including
837+
the ability to store and analyze datasets far too large fit onto disk
838+
(particularly :ref:`in combination with dask <dask>`).
837839

838840
.. warning::
839841

@@ -845,7 +847,8 @@ metadata (attributes) describing the dataset dimensions and coordinates.
845847
At this time, xarray can only open zarr datasets that have been written by
846848
xarray. For implementation details, see :ref:`zarr_encoding`.
847849

848-
To write a dataset with zarr, we use the :py:attr:`Dataset.to_zarr` method.
850+
To write a dataset with zarr, we use the :py:meth:`Dataset.to_zarr` method.
851+
849852
To write to a local directory, we pass a path to a directory:
850853

851854
.. ipython:: python
@@ -869,39 +872,10 @@ To write to a local directory, we pass a path to a directory:
869872
there.) If the directory does not exist, it will be created. If a zarr
870873
store is already present at that path, an error will be raised, preventing it
871874
from being overwritten. To override this behavior and overwrite an existing
872-
store, add ``mode='w'`` when invoking ``to_zarr``.
873-
874-
It is also possible to append to an existing store. For that, set
875-
``append_dim`` to the name of the dimension along which to append. ``mode``
876-
can be omitted as it will internally be set to ``'a'``.
877-
878-
.. ipython:: python
879-
:suppress:
880-
881-
! rm -rf path/to/directory.zarr
882-
883-
.. ipython:: python
884-
885-
ds1 = xr.Dataset(
886-
{"foo": (("x", "y", "t"), np.random.rand(4, 5, 2))},
887-
coords={
888-
"x": [10, 20, 30, 40],
889-
"y": [1, 2, 3, 4, 5],
890-
"t": pd.date_range("2001-01-01", periods=2),
891-
},
892-
)
893-
ds1.to_zarr("path/to/directory.zarr")
894-
ds2 = xr.Dataset(
895-
{"foo": (("x", "y", "t"), np.random.rand(4, 5, 2))},
896-
coords={
897-
"x": [10, 20, 30, 40],
898-
"y": [1, 2, 3, 4, 5],
899-
"t": pd.date_range("2001-01-03", periods=2),
900-
},
901-
)
902-
ds2.to_zarr("path/to/directory.zarr", append_dim="t")
875+
store, add ``mode='w'`` when invoking :py:meth:`~Dataset.to_zarr`.
903876

904-
To store variable length strings use ``dtype=object``.
877+
To store variable length strings, convert them to object arrays first with
878+
``dtype=object``.
905879

906880
To read back a zarr dataset that has been created this way, we use the
907881
:py:func:`open_zarr` method:
@@ -987,6 +961,109 @@ Xarray can't perform consolidation on pre-existing zarr datasets. This should
987961
be done directly from zarr, as described in the
988962
`zarr docs <https://zarr.readthedocs.io/en/latest/tutorial.html#consolidating-metadata>`_.
989963

964+
.. _io.zarr.appending:
965+
966+
Appending to existing Zarr stores
967+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
968+
969+
Xarray supports several ways of incrementally writing variables to a Zarr
970+
store. These options are useful for scenarios when it is infeasible or
971+
undesirable to write your entire dataset at once.
972+
973+
.. tip::
974+
975+
If you can load all of your data into a single ``Dataset`` using dask, a
976+
single call to ``to_zarr()`` will write all of your data in parallel.
977+
978+
.. warning::
979+
980+
Alignment of coordinates is currently not checked when modifying an
981+
existing Zarr store. It is up to the user to ensure that coordinates are
982+
consistent.
983+
984+
To add or overwrite entire variables, simply call :py:meth:`~Dataset.to_zarr`
985+
with ``mode='a'`` on a Dataset containing the new variables, passing in an
986+
existing Zarr store or path to a Zarr store.
987+
988+
To resize and then append values along an existing dimension in a store, set
989+
``append_dim``. This is a good option if data always arives in a particular
990+
order, e.g., for time-stepping a simulation:
991+
992+
.. ipython:: python
993+
:suppress:
994+
995+
! rm -rf path/to/directory.zarr
996+
997+
.. ipython:: python
998+
999+
ds1 = xr.Dataset(
1000+
{"foo": (("x", "y", "t"), np.random.rand(4, 5, 2))},
1001+
coords={
1002+
"x": [10, 20, 30, 40],
1003+
"y": [1, 2, 3, 4, 5],
1004+
"t": pd.date_range("2001-01-01", periods=2),
1005+
},
1006+
)
1007+
ds1.to_zarr("path/to/directory.zarr")
1008+
ds2 = xr.Dataset(
1009+
{"foo": (("x", "y", "t"), np.random.rand(4, 5, 2))},
1010+
coords={
1011+
"x": [10, 20, 30, 40],
1012+
"y": [1, 2, 3, 4, 5],
1013+
"t": pd.date_range("2001-01-03", periods=2),
1014+
},
1015+
)
1016+
ds2.to_zarr("path/to/directory.zarr", append_dim="t")
1017+
1018+
Finally, you can use ``region`` to write to limited regions of existing arrays
1019+
in an existing Zarr store. This is a good option for writing data in parallel
1020+
from independent processes.
1021+
1022+
To scale this up to writing large datasets, the first step is creating an
1023+
initial Zarr store without writing all of its array data. This can be done by
1024+
first creating a ``Dataset`` with dummy values stored in :ref:`dask <dask>`,
1025+
and then calling ``to_zarr`` with ``compute=False`` to write only metadata
1026+
(including ``attrs``) to Zarr:
1027+
1028+
.. ipython:: python
1029+
:suppress:
1030+
1031+
! rm -rf path/to/directory.zarr
1032+
1033+
.. ipython:: python
1034+
1035+
import dask.array
1036+
# The values of this dask array are entirely irrelevant; only the dtype,
1037+
# shape and chunks are used
1038+
dummies = dask.array.zeros(30, chunks=10)
1039+
ds = xr.Dataset({"foo": ("x", dummies)})
1040+
path = "path/to/directory.zarr"
1041+
# Now we write the metadata without computing any array values
1042+
ds.to_zarr(path, compute=False, consolidated=True)
1043+
1044+
Now, a Zarr store with the correct variable shapes and attributes exists that
1045+
can be filled out by subsequent calls to ``to_zarr``. The ``region`` provides a
1046+
mapping from dimension names to Python ``slice`` objects indicating where the
1047+
data should be written (in index space, not coordinate space), e.g.,
1048+
1049+
.. ipython:: python
1050+
1051+
# For convenience, we'll slice a single dataset, but in the real use-case
1052+
# we would create them separately, possibly even from separate processes.
1053+
ds = xr.Dataset({"foo": ("x", np.arange(30))})
1054+
ds.isel(x=slice(0, 10)).to_zarr(path, region={"x": slice(0, 10)})
1055+
ds.isel(x=slice(10, 20)).to_zarr(path, region={"x": slice(10, 20)})
1056+
ds.isel(x=slice(20, 30)).to_zarr(path, region={"x": slice(20, 30)})
1057+
1058+
Concurrent writes with ``region`` are safe as long as they modify distinct
1059+
chunks in the underlying Zarr arrays (or use an appropriate ``lock``).
1060+
1061+
As a safety check to make it harder to inadvertently override existing values,
1062+
if you set ``region`` then *all* variables included in a Dataset must have
1063+
dimensions included in ``region``. Other variables (typically coordinates)
1064+
need to be explicitly dropped and/or written in a separate calls to ``to_zarr``
1065+
with ``mode='a'``.
1066+
9901067
.. _io.cfgrib:
9911068

9921069
.. ipython:: python

doc/whats-new.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ New Features
3232
By `Miguel Jimenez <https://github.com/Mikejmnez>`_ and `Wei Ji Leong <https://github.com/weiji14>`_.
3333
- Unary & binary operations follow the ``keep_attrs`` flag (:issue:`3490`, :issue:`4065`, :issue:`3433`, :issue:`3595`, :pull:`4195`).
3434
By `Deepak Cherian <https://github.com/dcherian>`_.
35+
- :py:meth:`Dataset.to_zarr` now supports a ``region`` keyword for writing to
36+
limited regions of existing Zarr stores (:pull:`4035`).
37+
See :ref:`io.zarr.appending` for full details.
38+
By `Stephan Hoyer <https://github.com/shoyer>`_.
3539

3640
Bug fixes
3741
~~~~~~~~~

xarray/backends/api.py

Lines changed: 106 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
Hashable,
1212
Iterable,
1313
Mapping,
14+
MutableMapping,
1415
Tuple,
1516
Union,
1617
)
@@ -1304,38 +1305,89 @@ def check_dtype(var):
13041305

13051306

13061307
def _validate_append_dim_and_encoding(
1307-
ds_to_append, store, append_dim, encoding, **open_kwargs
1308+
ds_to_append, store, append_dim, region, encoding, **open_kwargs
13081309
):
13091310
try:
13101311
ds = backends.zarr.open_zarr(store, **open_kwargs)
13111312
except ValueError: # store empty
13121313
return
1314+
13131315
if append_dim:
13141316
if append_dim not in ds.dims:
13151317
raise ValueError(
13161318
f"append_dim={append_dim!r} does not match any existing "
13171319
f"dataset dimensions {ds.dims}"
13181320
)
1319-
for var_name in ds_to_append:
1320-
if var_name in ds:
1321-
if ds_to_append[var_name].dims != ds[var_name].dims:
1321+
if region is not None and append_dim in region:
1322+
raise ValueError(
1323+
f"cannot list the same dimension in both ``append_dim`` and "
1324+
f"``region`` with to_zarr(), got {append_dim} in both"
1325+
)
1326+
1327+
if region is not None:
1328+
if not isinstance(region, dict):
1329+
raise TypeError(f"``region`` must be a dict, got {type(region)}")
1330+
for k, v in region.items():
1331+
if k not in ds_to_append.dims:
1332+
raise ValueError(
1333+
f"all keys in ``region`` are not in Dataset dimensions, got "
1334+
f"{list(region)} and {list(ds_to_append.dims)}"
1335+
)
1336+
if not isinstance(v, slice):
1337+
raise TypeError(
1338+
"all values in ``region`` must be slice objects, got "
1339+
f"region={region}"
1340+
)
1341+
if v.step not in {1, None}:
1342+
raise ValueError(
1343+
"step on all slices in ``region`` must be 1 or None, got "
1344+
f"region={region}"
1345+
)
1346+
1347+
non_matching_vars = [
1348+
k
1349+
for k, v in ds_to_append.variables.items()
1350+
if not set(region).intersection(v.dims)
1351+
]
1352+
if non_matching_vars:
1353+
raise ValueError(
1354+
f"when setting `region` explicitly in to_zarr(), all "
1355+
f"variables in the dataset to write must have at least "
1356+
f"one dimension in common with the region's dimensions "
1357+
f"{list(region.keys())}, but that is not "
1358+
f"the case for some variables here. To drop these variables "
1359+
f"from this dataset before exporting to zarr, write: "
1360+
f".drop({non_matching_vars!r})"
1361+
)
1362+
1363+
for var_name, new_var in ds_to_append.variables.items():
1364+
if var_name in ds.variables:
1365+
existing_var = ds.variables[var_name]
1366+
if new_var.dims != existing_var.dims:
13221367
raise ValueError(
13231368
f"variable {var_name!r} already exists with different "
1324-
f"dimension names {ds[var_name].dims} != "
1325-
f"{ds_to_append[var_name].dims}, but changing variable "
1326-
"dimensions is not supported by to_zarr()."
1369+
f"dimension names {existing_var.dims} != "
1370+
f"{new_var.dims}, but changing variable "
1371+
f"dimensions is not supported by to_zarr()."
13271372
)
1328-
existing_sizes = {
1329-
k: v for k, v in ds[var_name].sizes.items() if k != append_dim
1330-
}
1373+
1374+
existing_sizes = {}
1375+
for dim, size in existing_var.sizes.items():
1376+
if region is not None and dim in region:
1377+
start, stop, stride = region[dim].indices(size)
1378+
assert stride == 1 # region was already validated above
1379+
size = stop - start
1380+
if dim != append_dim:
1381+
existing_sizes[dim] = size
1382+
13311383
new_sizes = {
1332-
k: v for k, v in ds_to_append[var_name].sizes.items() if k != append_dim
1384+
dim: size for dim, size in new_var.sizes.items() if dim != append_dim
13331385
}
13341386
if existing_sizes != new_sizes:
13351387
raise ValueError(
13361388
f"variable {var_name!r} already exists with different "
1337-
"dimension sizes: {existing_sizes} != {new_sizes}. "
1338-
"to_zarr() only supports changing dimension sizes when "
1389+
f"dimension sizes: {existing_sizes} != {new_sizes}. "
1390+
f"to_zarr() only supports changing dimension sizes when "
13391391
f"explicitly appending, but append_dim={append_dim!r}."
13401392
)
13411393
if var_name in encoding.keys():
@@ -1345,16 +1397,17 @@ def _validate_append_dim_and_encoding(
13451397

13461398

13471399
def to_zarr(
1348-
dataset,
1349-
store=None,
1400+
dataset: Dataset,
1401+
store: Union[MutableMapping, str, Path] = None,
13501402
chunk_store=None,
1351-
mode=None,
1403+
mode: str = None,
13521404
synchronizer=None,
1353-
group=None,
1354-
encoding=None,
1355-
compute=True,
1356-
consolidated=False,
1357-
append_dim=None,
1405+
group: str = None,
1406+
encoding: Mapping = None,
1407+
compute: bool = True,
1408+
consolidated: bool = False,
1409+
append_dim: Hashable = None,
1410+
region: Mapping[str, slice] = None,
13581411
):
13591412
"""This function creates an appropriate datastore for writing a dataset to
13601413
a zarr ztore
@@ -1368,6 +1421,35 @@ def to_zarr(
13681421
if encoding is None:
13691422
encoding = {}
13701423

1424+
if mode is None:
1425+
if append_dim is not None or region is not None:
1426+
mode = "a"
1427+
else:
1428+
mode = "w-"
1429+
1430+
if mode != "a" and append_dim is not None:
1431+
raise ValueError("cannot set append_dim unless mode='a' or mode=None")
1432+
1433+
if mode != "a" and region is not None:
1434+
raise ValueError("cannot set region unless mode='a' or mode=None")
1435+
1436+
if mode not in ["w", "w-", "a"]:
1437+
# TODO: figure out how to handle 'r+'
1438+
raise ValueError(
1439+
"The only supported options for mode are 'w', "
1440+
f"'w-' and 'a', but mode={mode!r}"
1441+
)
1442+
1443+
if consolidated and region is not None:
1444+
raise ValueError(
1445+
"cannot use consolidated=True when the region argument is set. "
1446+
"Instead, set consolidated=True when writing to zarr with "
1447+
"compute=False before writing data."
1448+
)
1449+
1450+
if isinstance(store, Path):
1451+
store = str(store)
1452+
13711453
# validate Dataset keys, DataArray names, and attr keys/values
13721454
_validate_dataset_names(dataset)
13731455
_validate_attrs(dataset)
@@ -1380,6 +1462,7 @@ def to_zarr(
13801462
append_dim,
13811463
group=group,
13821464
consolidated=consolidated,
1465+
region=region,
13831466
encoding=encoding,
13841467
)
13851468

@@ -1390,8 +1473,9 @@ def to_zarr(
13901473
group=group,
13911474
consolidate_on_close=consolidated,
13921475
chunk_store=chunk_store,
1476+
append_dim=append_dim,
1477+
write_region=region,
13931478
)
1394-
zstore.append_dim = append_dim
13951479
writer = ArrayWriter()
13961480
# TODO: figure out how to properly handle unlimited_dims
13971481
dump_to_store(dataset, zstore, writer, encoding=encoding)

0 commit comments

Comments
 (0)