Skip to content

Support parallel writes to zarr store #3096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
VincentDehaye opened this issue Jul 11, 2019 · 9 comments
Closed

Support parallel writes to zarr store #3096

VincentDehaye opened this issue Jul 11, 2019 · 9 comments
Labels
topic-zarr Related to zarr storage library

Comments

@VincentDehaye
Copy link

VincentDehaye commented Jul 11, 2019

MCVE Code Sample

import multiprocessing
import xarray as xr
import numpy as np
from s3fs import S3FileSystem, S3Map
from time import sleep

from main import set_aws_credentials
set_aws_credentials()


def download_file(lead_time):
    return 'path_to_your_file'


def make_xarray_dataset(file_path, lead_time):
    var1 = np.random.rand(1, 721, 1440, 22)
    var2 = np.random.rand(1, 721, 1440, 22)
    lat = np.linspace(-90, 90, 721)
    lon = np.linspace(0, 360, 1440)
    height = range(22)
    ds = xr.Dataset({'var1': (['lead_time', 'lat', 'lon', 'height'], var1),
                     'var2': (['lead_time', 'lat', 'lon', 'height'], var2)},
                    coords={'lat': lat,
                            'lon': lon,
                            'height': height,
                            'lead_time': [lead_time]})
    return ds

def upload_to_s3(dataset, append):
    s3 = S3FileSystem()
    s3map = S3Map('S3_path_to_your_zarr', s3=s3)
    # If we are appending to an already existing dataset
    if append:
        dataset.to_zarr(store=s3map, mode='a', append_dim='lead_time')
    else:
        dataset.to_zarr(store=s3map, mode='w')


def lead_time_worker(lead_time, append=True):
    file_path = download_file(lead_time)
    dataset = make_xarray_dataset(file_path, lead_time)
    upload_to_s3(dataset, append=True)
    return 0


if __name__ == '__main__':
    lead_times = range(10)
    first_lead_time = True
    processes = []
    for lead_time in lead_times:
        if first_lead_time:
            process = multiprocessing.Process(target=lead_time_worker,
                                              args=(lead_time, False))
            process.start()
            process.join()
            first_lead_time = False
        else:
            process = multiprocessing.Process(target=lead_time_worker,
                                              args=(lead_time,))
            process.start()
            processes.append(process)
            sleep(5) # Sleep in order to shift the different processes so that they don't begin at the same time
    for p in processes:
        p.join()

will raise

ValueError: conflicting sizes for dimension 'lead_time': length X on 'Var1' and length Y on 'Var2'

Traceback (most recent call last):
  File "main.py", line 200, in lead_time_worker
    upload_to_s3(dataset, cloud_zarr_path, append=True)
  File "main.py", line 167, in upload_to_gcloud
    ds.to_zarr(store=s3map, mode='a', append_dim='lead_time')
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 1414, in to_zarr
    consolidated=consolidated, append_dim=append_dim)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/api.py", line 1101, in to_zarr
    dump_to_store(dataset, zstore, writer, encoding=encoding)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/api.py", line 929, in dump_to_store
    unlimited_dims=unlimited_dims)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/zarr.py", line 354, in store
    ds = open_zarr(self.ds.store, chunks=None)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/zarr.py", line 557, in open_zarr
    ds = maybe_decode_store(zarr_store)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/zarr.py", line 545, in maybe_decode_store
    drop_variables=drop_variables)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/conventions.py", line 527, in decode_cf
    ds = Dataset(vars, attrs=attrs)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 423, in __init__
    self._set_init_vars_and_dims(data_vars, coords, compat)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 445, in _set_init_vars_and_dims
    data_vars, coords, compat=compat)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/merge.py", line 379, in merge_data_and_coords
    indexes=indexes)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/merge.py", line 460, in merge_core
    dims = calculate_dimensions(variables)
  File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 125, in calculate_dimensions
    (dim, size, k, dims[dim], last_used[dim]))
ValueError: conflicting sizes for dimension 'lead_time': length X on 'var1' and length Y on 'var2'

Problem Description

First of all, thanks a lot to the community for the PR #2706, I was really looking forward to it. I already experienced using the new append parameter, and got some problems trying to do it in a parallel way.

I want to upload a very big zarr (global numerical weather prediction, output of GFS model that you can check out here) on a S3 bucket. In order to speed this up, as each single file of the source contains the data for one lead time(length of time between the issuance of a forecast and the occurrence of the phenomena that were predicted) and I want to concatenate them all, I tried to have one process per lead time and all of them to append to the same data store using Dataset.to_zarr() with append=True.

However, when doing that, I get the error described above. Indeed, the processes are appending simultaneously, so the data is not necessarily consistent when a new process tries to append, some variables will already have the values of one lead time and some will not because the process is not finished, which will lead to calculate_dimensions() raising this error.

I wonder if there is a way I haven't found to work around this using simply a synchronizer? If not, do you think it would be possible (and reasonable) to implement a parameter allowing to bypass this check on the append dimension in an 'eventually consistent' approach?

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 4.15.0-1032-aws
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: C.UTF-8
LOCALE: en_US.UTF-8
libhdf5: None
libnetcdf: None

xarray: 0.12.2
pandas: 0.24.2
numpy: 1.16.4
scipy: None
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: 1.5.5
zarr: 2.3.2
cftime: None
nc_time_axis: None
PseudonetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2.0.0
distributed: 2.0.1
matplotlib: None
cartopy: None
seaborn: None
numbagg: None
setuptools: 41.0.1
pip: 19.1.1
conda: None
pytest: None
IPython: None
sphinx: None

@VincentDehaye VincentDehaye changed the title Support parallel append to zarr store PR#2706 Support parallel append to zarr store PR #2706 Jul 11, 2019
@VincentDehaye VincentDehaye changed the title Support parallel append to zarr store PR #2706 Support parallel append to zarr store Jul 11, 2019
@rabernat
Copy link
Contributor

Hi @VincentDehaye. Thanks for being an early adopter! We really appreciate your feedback. I'm sorry it didn't work as expected. We are in really new territory with this feature.

I'm a bit confused about why you are using the multiprocessing module here. The recommended way of parallelizing xarray operations is via the built-in dask support. There are no guarantees that multiprocessing like you're doing will work right. When we talk about parallel append, we are always talking about dask.

Your MCVE is not especially helpful for debugging because the two key functions (make_xarray_dataset and upload_to_s3) are not shown. Could you try simplifying your example a bit? I know it is hard when cloud is involved. But try to let us see more of what is happening under the hood.

If you are creating a dataset for the first time, you probably don't want append. You want to do

ds = xr.open_mfdataset(all_the_source_files)
ds.to_zarr(s3fs_target)

If you are using a dask cluster, this will automatically parallelize everything.

@VincentDehaye
Copy link
Author

Hi @VincentDehaye. Thanks for being an early adopter! We really appreciate your feedback. I'm sorry it didn't work as expected. We are in really new territory with this feature.

I'm a bit confused about why you are using the multiprocessing module here. The recommended way of parallelizing xarray operations is via the built-in dask support. There are no guarantees that multiprocessing like you're doing will work right. When we talk about parallel append, we are always talking about dask.

Your MCVE is not especially helpful for debugging because the two key functions (make_xarray_dataset and upload_to_s3) are not shown. Could you try simplifying your example a bit? I know it is hard when cloud is involved. But try to let us see more of what is happening under the hood.

If you are creating a dataset for the first time, you probably don't want append. You want to do

ds = xr.open_mfdataset(all_the_source_files)
ds.to_zarr(s3fs_target)

If you are using a dask cluster, this will automatically parallelize everything.

Hi @rabernat, thank you for your quick answer. I edited my MCVE so that you can reproduce the error(as long as you have access to a S3 bucket). I actually forgot about open_mfdataset, that's why I was doing it this way. However in the future I would still like to be able to have standalone workers, because the bandwidth quickly becomes a bottleneck for me (both on downloading the files and uploading to the cloud) so I would like to split the tasks on different machines.

With regards to open_mfdataset(), I checked the code and realized under the hood it's only calling multiple open_dataset(). I was worried it would load the values (and not only metadata) in memory, but I checked it on one file and it apparently does not. Can you confirm this ? In this case I could probably open my whole dataset at once, which would be very convenient. After reading your issue #1385, I also need to check that my case works fine with decode_cf=False. I experienced some troubles with the append on a time dimension but found a workaround, I will probably open another issue for documenting this.

@dcherian dcherian added the topic-zarr Related to zarr storage library label Jul 12, 2019
@shoyer
Copy link
Member

shoyer commented Jul 14, 2019

With regards to open_mfdataset(), I checked the code and realized under the hood it's only calling multiple open_dataset(). I was worried it would load the values (and not only metadata) in memory, but I checked it on one file and it apparently does not. Can you confirm this ? In this case I could probably open my whole dataset at once, which would be very convenient.

Yes, this is the suggested workflow! open_mfdataset opens a collection of files lazily (with dask) into a single xarray dataset, suitable for converting into zarr all at once with to_zarr().

It is definitely possible to create a zarr dataset and then write to it in parallel with a bunch of processes, but not via xarray's to_zarr() method -- which can only parallelize with dask. You would have to create the dataset and write to it with the zarr Python API directly.

@VincentDehaye
Copy link
Author

VincentDehaye commented Jul 29, 2019

Coming back on this issue (still haven't had time to try the open_mfdataset approach), I have another use case where I would like to store different variables being indexed by the same dimension, but not all available at the same moment.

For example, I would have variables V1 and V2 indexed on dimension D1. V1 would be available at time T, and I would like to store it in my S3 bucket at this moment, but V2 would only be available at time T+1. In this case, I would like to be able to save the values of V2 at time T+1, leaving the missing V2 values filled with the fill_value specified in the metadata between T and T+1.

What actually happens is that you can append such data, but then if you want to open the resulting zarr the open_zarr function needs to be given V2 as value for its drop_variables argument, otherwise you get the error shown in my original post. However, as the open_zarr function is called when appending as well (cf. original post's error trace), and in this case you can not provide this argument, you will fail the next append attempts, thus preventing you from appending the values of V2. Your dataset is now frozen.

Am I misusing the functionality, or do you know any workaround using xarray and not coding everything myself (for optimization reasons)?

@rabernat
Copy link
Contributor

@VincentDehaye - we are eager to help you. But it is difficult to hit a moving target.

I would like to politely suggest that we keep this issue on topic: making sure that parallel append to zarr store works as expected. Your latest post revealed that you did not try our suggested resolution (use open_mfdataset + dask parallelization) but instead introduced a new, possibly unrelated issue.

I recommend you open a new, separate issue related to "storing different variables being indexed by the same dimension".

@VincentDehaye
Copy link
Author

Coming back on this issue in order not to leave it inactive and to provide some feedback to the community.

The problem with the open_mfdataset solution was that the lazy open of a single lead time dataset was still taking 150MB in memory, leading to 150*209 = 31,35GB minimum memory requirement. When I tried with a bigger (64GB memory) machine, I was then blocked with the rechunking which was exceeding the machine's resources and making the script crash. So we ended up using a dask cluster which solved the concurrency and resources limitations.

My second use-case (#3096 (comment)) still remains though, I am wondering if it matches the intended use of zarr and if we want to do something about it, in this case I can open a separate issue documenting it.

All in all I would say my original problem is not relevant anymore, either one can do it with open_mfdataset on a single machine as proposed by @rabernat, you just need some amount of memory (and probably much more if you need to rechunk), or you do it with a dask cluster, which is the solution we chose.

@shoyer shoyer changed the title Support parallel append to zarr store Support parallel writes to zarr store May 4, 2020
@cdibble
Copy link

cdibble commented Aug 12, 2020

Hi All,

Thanks for all of your great work, support, and discussion on these and other pages. I very much appreciate it as I am working with Xarray and Zarr quite a lot for large geospatial data storage and manipulation.

I wanted to add a note to this discussion that I have had success using Zarr's built-in ProcessSynchornizer (which relies on the fasteners package). This provides a pretty easy and clean implementation of file locks as long as you can provide a file system that is shared across any and all process that might try to access the Zarr file. For me, that means using an AWS EFS mount, which gives me the flexibility to deploy this in a serverless context or on a more standard cloud cluster.

It does seem that providing explicit chunking rules as you have mentioned above (or using the Zarr encoding argument, which I haven't tried but I think is another option) is a great way to handle this and likely outperforms the locking approach (just a guess- would love to hear from others about this). But the locks are pretty easily implemented and seem to have helped me avoid the problems related to race conditions with Zarr.

For the sake of completeness, here is a simple example of how you might do this:

synchronizer = zarr.ProcessSynchronizer(f"/mnt/efs_mnt/tmp/mur_regional_raw_sst/zarr_locks/{bounding_box['grid_loc']}_locker.sync")
compressor = zarr.Blosc(cname='zstd', clevel=3)
encoding = {vname: {'compressor': compressor} for vname in current_region.data_vars}
current_region.to_zarr(store=store, mode='w',encoding=encoding, consolidated=True, synchronizer = synchronizer)

I would be happy to discuss further and am very much open to critique, instruction, etc.

@rabernat
Copy link
Contributor

Just a note that #4035 provides a new way to do parallel writing to zarr stores.

@VincentDehaye & @cdibble, would you be willing to test this out and see if it meets your needs?

@max-sixty
Copy link
Collaborator

I think this was closed by #4035 (which I'm going to start using shortly!), so I'll close this, but feel free to reopen if I missed something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-zarr Related to zarr storage library
Projects
None yet
Development

No branches or pull requests

6 participants