Skip to content

Support parallel writes to regions of zarr stores #4035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Nov 4, 2020

Conversation

shoyer
Copy link
Member

@shoyer shoyer commented May 6, 2020

This PR adds support for a region keyword argument to to_zarr(), to support parallel writes to different parts of arrays in a zarr stores, e.g., ds.to_zarr(..., region={'x': slice(1000, 2000)}) to write a dataset over the range 1000:2000 along the x dimension.

This is useful for creating large Zarr datasets without requiring dask. For example, the separate workers in a simulation job might each write a single non-overlapping chunk of a Zarr file. The standard way to handle such datasets today is to first write netCDF files in each process, and then consolidate them afterwards with dask (see #3096).

Creating empty Zarr stores

In order to do so, the Zarr file must be pre-existing with desired variables in the right shapes/chunks. It is desirable to be able to create such stores without actually writing data, because datasets that we want to write in parallel may be very large.

In the example below, I achieve this filling a Dataset with dask arrays, and passing compute=False to to_zarr(). This works, but it relies on an undocumented implementation detail of the compute argument. We should either:

  1. Officially document that the compute argument only controls writing array values, not metadata (at least for zarr).
  2. Add a new keyword argument or entire new method for creating an unfilled Zarr store, e.g., write_values=False.

I think (1) is maybe the cleanest option (no extra API endpoints).

Unchunked variables

One potential gotcha concerns coordinate arrays that are not chunked, e.g., consider parallel writing of a dataset divided along time with 2D latitude and longitude arrays that are fixed over all chunks. With the current PR, such coordinate arrays would get rewritten by each separate writer.

If a Zarr store does not have atomic writes, then conceivably this could result in corrupted data. The default DirectoryStore has atomic writes and cloud based object stores should also be atomic, so perhaps this doesn't matter in practice, but at the very least it's inefficient and could cause issues for large-scale jobs due to resource contention.

Options include:

  1. Current behavior. Variables whose dimensions do not overlap with region are written by to_zarr(). This is likely the most intuitive behavior for writing from a single process at a time.
  2. Exclude variables whose dimensions do not overlap with region from being written. This is likely the most convenient behavior for writing from multiple processes at once.
  3. Like (2), but issue a warning if any such variables exist instead of silently dropping them.
  4. Like (2), but raise an error instead of a warning. Require the user to explicitly drop them with .drop(). This is probably the safest behavior.

I think (4) would be my preferred option. Some users would undoubtedly find this annoying, but the power-users for whom we are adding this feature would likely appreciate it.

Usage example

import xarray
import dask.array as da

ds = xarray.Dataset({'u': (('x',), da.arange(1000, chunks=100))})

# create the new zarr store, but don't write data
path = 'my-data.zarr'
ds.to_zarr(path, compute=False)

# look at the unwritten data
ds_opened = xarray.open_zarr(path)
print('Data before writing:', ds_opened.u.data[::100].compute())
# Data before writing: [  1 100   1 100 100   1   1   1   1   1]

# write out each slice (could be in separate processes)
for start in range(0, 1000, 100):
  selection = {'x': slice(start, start + 100)}
  ds.isel(selection).to_zarr(path, region=selection)

print('Data after writing:', ds_opened.u.data[::100].compute())
# Data after writing: [  0 100 200 300 400 500 600 700 800 900]

@shoyer shoyer requested review from rabernat and jhamman May 6, 2020 02:40
@rabernat
Copy link
Contributor

rabernat commented May 8, 2020

Stephan, this seems like a great addition. Thanks for getting it started!

I'm curious how this interacts with dimension coordinates. Your example bypasses this. But what if dimension coordinates are present. How do we handle alignment issues? For example, what if I call ds.to_zarr(path , region=selection), but the dimension coordinates of ds don't align with the dimension coordinates of the store at path"

  1. Officially document that the compute argument only controls writing array values, not metadata (at least for zarr).

👍

4. Like (2), but raise an error instead of a warning. Require the user to explicitly drop them with .drop(). This is probably the safest behavior.

👍

I think only advanced users will want to use this feature.

@shoyer
Copy link
Member Author

shoyer commented May 9, 2020

I'm curious how this interacts with dimension coordinates. Your example bypasses this. But what if dimension coordinates are present. How do we handle alignment issues? For example, what if I call ds.to_zarr(path , region=selection), but the dimension coordinates of ds don't align with the dimension coordinates of the store at path"

It’s entirely unsafe. Currently the coordinates would be overridden with the new values , which is consistent with how to_netcdf() with mode=‘a’ works.

This is probably another good reason for requiring users to explicitly drop variables that don’t include a dimension in the selected region, because at least in that case there can be no user expectations about alignment with coordinates that don’t exist.

In the long term, it might make sense to make both to_netcdf and to_zarr check coordinates by alignment by default, but we wouldn’t want that in all cases, because sometimes users really do want to update variables.

@nbren12
Copy link
Contributor

nbren12 commented May 12, 2020

@rabernat pointed this PR out to me, and this is great progress towards allowing more database-like CRUD operations on zarr datasets. A similar neat feature would be to read xarray datasets from regions of zarr groups w/o dask arrays.

@rabernat
Copy link
Contributor

rabernat commented May 12, 2020

A similar neat feature would be to read xarray datasets from regions of zarr groups w/o dask arrays.

@nbren12 - this has always been supported. Just call open_zarr(..., chunks=False) and then subset using sel / isel.

@nbren12
Copy link
Contributor

nbren12 commented May 13, 2020

@rabernat I learn something new everyday. sorry for cluttering up this PR with my ignorance haha.

@shoyer shoyer marked this pull request as ready for review June 18, 2020 07:57
@shoyer
Copy link
Member Author

shoyer commented Jun 18, 2020

I've add error checking, tests and documentation, so this is ready for review now!

Take a look here for a rendered version of the new docs section:
https://xray--4035.org.readthedocs.build/en/4035/io.html#appending-to-existing-zarr-stores

@rabernat rabernat added the topic-zarr Related to zarr storage library label Jun 30, 2020
@zflamig
Copy link

zflamig commented Jul 9, 2020

This looks nice. Is there a thought if this would work with functions as a service (GCP cloud functions, AWS Lambda, etc) for supporting parallel transformation from netcdf to zarr?

@shoyer
Copy link
Member Author

shoyer commented Jul 9, 2020

This looks nice. Is there a thought if this would work with functions as a service (GCP cloud functions, AWS Lambda, etc) for supporting parallel transformation from netcdf to zarr?

I haven't used functions as a service before, but yes, I imagine this might be useful for that sort of thing. As long as you can figure out the structure of the overall Zarr datasets ahead of time, you could use region to fill out different parts entirely independently.

@rabernat
Copy link
Contributor

rabernat commented Jul 10, 2020 via email

@tomdurrant
Copy link

This is a very desirable feature for us. We have been using this branch in development, and it is working great for our use case. We are reluctant to put into production until it is merged and released - is there any expected timeline for that to occur?

@shoyer
Copy link
Member Author

shoyer commented Oct 19, 2020

I just fixed a race condition with writing attributes. Let me spend a little bit of time responding to Ryan's review, and then I think we can submit it.

@shoyer
Copy link
Member Author

shoyer commented Oct 19, 2020

But yes, we've also been successfully using this for parallel writes for a few months now (aside from the race condition).

@shoyer
Copy link
Member Author

shoyer commented Oct 20, 2020

OK, I think this is ready for a final review.

@shoyer
Copy link
Member Author

shoyer commented Oct 24, 2020

Anyone else want to take a look at this?

Copy link
Contributor

@dcherian dcherian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only looked at the docs, and found some minor things.

@shoyer
Copy link
Member Author

shoyer commented Oct 29, 2020

If there are no additional reviews or objections, I will merge this tomorrow.

@rafa-guedes
Copy link
Contributor

@shoyer thanks for implementing this, it is going to be very useful. I am trying to write this dataset below:

dsregion:

<xarray.Dataset>
Dimensions:    (latitude: 2041, longitude: 4320, time: 31)
Coordinates:
  * latitude   (latitude) float32 -80.0 -79.916664 -79.833336 ... 89.916664 90.0
  * time       (time) datetime64[ns] 2008-10-01T12:00:00 ... 2008-10-31T12:00:00
  * longitude  (longitude) float32 -180.0 -179.91667 ... 179.83333 179.91667
Data variables:
    vo         (time, latitude, longitude) float32 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>
    uo         (time, latitude, longitude) float32 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>
    sst        (time, latitude, longitude) float32 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>
    ssh        (time, latitude, longitude) float32 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>

As a region of this other dataset:

dset:

<xarray.Dataset>
Dimensions:    (latitude: 2041, longitude: 4320, time: 9490)
Coordinates:
  * latitude   (latitude) float32 -80.0 -79.916664 -79.833336 ... 89.916664 90.0
  * longitude  (longitude) float32 -180.0 -179.91667 ... 179.83333 179.91667
  * time       (time) datetime64[ns] 1993-01-01T12:00:00 ... 2018-12-25T12:00:00
Data variables:
    ssh        (time, latitude, longitude) float64 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>
    sst        (time, latitude, longitude) float64 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>
    uo         (time, latitude, longitude) float64 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>
    vo         (time, latitude, longitude) float64 dask.array<chunksize=(30, 510, 1080), meta=np.ndarray>

Using the following call:

dsregion.to_zarr(dset_url, region={"time": slice(5752, 5783)})

But I got stuck on the conditional below within xarray/backends/api.py:

   1347         non_matching_vars = [
   1348             k
   1349             for k, v in ds_to_append.variables.items()
   1350             if not set(region).intersection(v.dims)
   1351         ]
   1352         import ipdb; ipdb.set_trace()
-> 1353         if non_matching_vars:
   1354             raise ValueError(
   1355                 f"when setting `region` explicitly in to_zarr(), all "
   1356                 f"variables in the dataset to write must have at least "
   1357                 f"one dimension in common with the region's dimensions "
   1358                 f"{list(region.keys())}, but that is not "
   1359                 f"the case for some variables here. To drop these variables "
   1360                 f"from this dataset before exporting to zarr, write: "
   1361                 f".drop({non_matching_vars!r})"
   1362             )

Apparently because time is not a dimension in coordinate variables ["longitude", "latitude"]:

ipdb> p non_matching_vars                                
['latitude', 'longitude']
ipdb> p set(region)                                      
{'time'}

Should this checking be performed for all variables, or only for data_variables?

@shoyer
Copy link
Member Author

shoyer commented Nov 4, 2020

Should this checking be performed for all variables, or only for data_variables?

I agree that this requirement is a little surprising. The error is because otherwise you might be surprised that the array values for "latitude" and "longtitude" get overriden, rather than being checked for consistency. At least if you have to explicitly drop these variables (with the suggested call to .drop()) it is clear that they will neither be checked nor overriden in the Zarr store.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-zarr Related to zarr storage library
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants