Skip to content

Preprocess function for save_mfdataset #4475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
heerad opened this issue Sep 30, 2020 · 9 comments
Open

Preprocess function for save_mfdataset #4475

heerad opened this issue Sep 30, 2020 · 9 comments

Comments

@heerad
Copy link

heerad commented Sep 30, 2020

Is your feature request related to a problem? Please describe.
I would like to supply a preprocess argument to save_mfdataset that gets applied to each dataset before getting written to disk, similar to how open_mfdataset gives you such option. Specifically, have a dataset that I want to split by unique values along dimension, apply some further logic to each sub-dataset, then save each sub-dataset to a different file. Currently I'm able to split and save using the following code provided in the API docs:

years, datasets = zip(*ds.groupby("time.year"))
paths = ["%s.nc" % y for y in years]
xr.save_mfdataset(datasets, paths)

What's missing is the ability to insert further logic to each of the sub-datasets given by the groupby object. If I try iterating through datasets here and chain further operations to each element, the calculations begin to execute serially even though ds is a dask array:

save_mfdataset([ds.foo() for ds in datasets], paths)

Describe the solution you'd like
Instead, I'd like the ability to do:

xr.save_mfdataset(datasets, paths, preprocess=lambda ds: ds.foo())

Describe alternatives you've considered
Not sure.

@dcherian
Copy link
Contributor

you could use dask.delayed here

new_datasets = [dask.delayed(your_function)(dset) for dset in datasets]
xr.save_mfdataset(new_datasets, paths)

I think this will work, but I've never used save_mfdataset. This is how preprocess is implemented with open_mfdataset btw.

@heerad
Copy link
Author

heerad commented Sep 30, 2020

Unfortunately that doesn't work:

TypeError: save_mfdataset only supports writing Dataset objects, received type <class 'dask.delayed.Delayed'>

@dcherian
Copy link
Contributor

You could write to netCDF in your_function and avoid save_mfdataset altogether...

I guess this is a good argument for adding a preprocess kwarg.

@shoyer
Copy link
Member

shoyer commented Sep 30, 2020

I think we could support delayed objects in save_mfdataset, at least in principle. But if you're OK using delayed objects, you might as well write each netCDF file separately using dask.delayed, e.g.,

def write_dataset(dataset, path):
  your_function(ds).to_netcdf(path)

result = [dask.delayed(write_dataset)(ds, path) for ds, path in zip(datasets, path)]
dask.compute(result)

@heerad
Copy link
Author

heerad commented Oct 1, 2020

Thank you, this works for me. However, it's quite slow and seems to scale faster than linearly as the length of datasets increases (the number of groups in the groupby).

Could it be connected to #2912 (comment) where they suggest to use save_mfdataset instead of to_netcdf? If so, there's a stronger case for supporting delayed objects in save_mfdataset as you said.

Appreciate the help!

@dcherian
Copy link
Contributor

dcherian commented Oct 1, 2020

Are you using multiple threads or multiple processes? IIUC you should be using multiple processes for max writing efficiency.

@heerad
Copy link
Author

heerad commented Oct 1, 2020

Multiple threads (the default), because it's recommended "for numeric code that releases the GIL (like NumPy, Pandas, Scikit-Learn, Numba, …)" according to the dask docs.

I guess I could do multi-threaded for the compute part (everything up to the definition of ds), then multi-process for the write part, but doesn't that then require me to load everything into memory before writing?

@dcherian
Copy link
Contributor

dcherian commented Oct 1, 2020

doesn't that then require me to load everything into memory before writing?

I think so.

I would try multiple processes and see if that is fast enough for what you want to do. Or else, write to zarr. This will be parallelized and is a lot easier than dealing with HDF5

@heerad
Copy link
Author

heerad commented Oct 1, 2020

Sounds good, I'll do this in the meantime. Still quite interested in save_mfdataset dealing with these lower level details, if possible. The ideal case would be loading with load_mfdataset, defining some ops lazily, then piping that directly to save_mfdataset.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants