Skip to content

'Parallelized' apply_ufunc for scripy.interpolate.griddata #5281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
LJaksic opened this issue May 9, 2021 · 5 comments
Closed

'Parallelized' apply_ufunc for scripy.interpolate.griddata #5281

LJaksic opened this issue May 9, 2021 · 5 comments

Comments

@LJaksic
Copy link

LJaksic commented May 9, 2021

Hi,

I'm working with large files from an ocean model with an unstructered grid. For instance, variable flow velocity ux with dimensions (194988, 1009, 20) for respectively: 'nFlowElement' (name unstructered grid element), 'time' and laydim (depth dimension). I'd like to interpolate these results to a structured grid with dimensions (600, 560, 1009, 20) for respectively: latitude, longitude, time and laydim. For this I am using scipy.interpolate.griddata. As these dataarrays are too large to load into your working memory at once, I am trying to work with 'chunks' (dask). Unfortunately, I bump into problems when trying to use apply_ufunc with setting: dask = 'parallelized'.

For smaller computational domains (smaller nFlowElement dimension) I ám still able to load the dataarray in my work memory. Then, the following code gives me the wanted result:

def interp_to_grid(u,xc,yc,xint,yint):
    print(u.shape,xc.shape,xint.shape)
    ug = griddata((xc,yc),u,(xint,yint), method='nearest', fill_value=np.nan)
    return ug 

uxg = xr.apply_ufunc(interp_to_grid,
                     ux, xc, yc, xint, yint,
                     dask = 'allowed',
                     input_core_dims=[['nFlowElem','time','laydim'],['nFlowElem'],['nFlowElem'],['dim_0','dim_1'],['dim_0','dim_1']],
                     output_core_dims=[['dim_0','dim_1','time','laydim']],
                     output_dtypes = [xr.DataArray]
                     )

Notice that in the function interp_to_grid the input variables have the following dimensions:

  • u (i.e. ux, the original flow velocity output): (194988, 1009, 20) for (nFlowElem, time, laydim)
  • xc,yc (the latitude and longitude coordinates associated with these 194988 elements) so both (194988,)
  • xint, yint (the structured grid coordinates to which I would like to interpolate the data): both are (600, 560) for (dim_0,dim_1)
    Notice that scipy.interpolate.griddata does not require me to loop over the time and laydim dimension (as formulated in the code above). For this it is criticial to feed griddata the dimensions in the right order ('time' and 'laydim' last). The interpolated result, uxg, has dimensions (600, 560, 1009, 20) - as wanted and expected.

However, for much larger spatial domains it is required to work with dask = 'parallelized', because these input dataarrays can nolonger be loaded into my working memory. I have tried to apply chunks over the time dimension, but also over the nFlowElement dimension. I am aware that it is not possible to chunk over core dimensions.

This is one of my "parallel" attempts (with chunks along the time dim):

Input ux:

<xarray.DataArray 'ucx' (nFlowElem: 194988, time: 1009, laydim: 20)>
dask.array<transpose, shape=(194988, 1009, 20), dtype=float64, chunksize=(194988, 10, 20), chunktype=numpy.ndarray>
Coordinates:
    FlowElem_xcc  (nFlowElem) float64 dask.array<chunksize=(194988,), meta=np.ndarray>
    FlowElem_ycc  (nFlowElem) float64 dask.array<chunksize=(194988,), meta=np.ndarray>
  * time          (time) datetime64[ns] 2014-09-17 ... 2014-10-01
Dimensions without coordinates: nFlowElem, laydim
Attributes:
    standard_name:  eastward_sea_water_velocity
    long_name:      velocity on flow element center, x-component
    units:          m s-1
    grid_mapping:   wgs84

Apply_func:

uxg = xr.apply_ufunc(interp_to_grid,
                     ux, xc, yc, xint, yint,
                     dask = 'parallelized',
                     input_core_dims=[['nFlowElem'],['nFlowElem'],['nFlowElem'],['dim_0','dim_1'],['dim_0','dim_1']],
                     output_core_dims=[['dim_0','dim_1']],
                     output_dtypes = [xr.DataArray],
                     )

Gives error:

  File "interpnd.pyx", line 78, in scipy.interpolate.interpnd.NDInterpolatorBase.__init__

  File "interpnd.pyx", line 192, in scipy.interpolate.interpnd._check_init_shape

ValueError: different number of values and points

I have played around a lot with changing the core dimensions in apply_ufunc and the dimension along which to chunk. Also I have tried to manually change the order of dimensions of dataarray u which is 'fed to' griddata (in interp_to_grid).

Any advice is very welcome!
Best Wishes,
Luka

@mathause
Copy link
Collaborator

map_blocks may be the function you are looking for: http://xarray.pydata.org/en/stable/user-guide/dask.html?highlight=map_blocks#map-blocks

@LJaksic
Copy link
Author

LJaksic commented May 13, 2021

@mathause Thank you for your response! I believe that map_blocks only works for functions that consume and return xarray objects, right? Scipy.interpolate.griddata unfortunately does not return Xarray object.

However, if there are other (xarray) functions which I could use to interpolate my results to a regular grid - I'd be very interested.

@LJaksic
Copy link
Author

LJaksic commented May 13, 2021

### UPDATE 1: ###
I have also tried to chunk over the nFlowElem dimension instead of 'time'.
Then I define the input and output core dimensions as written below:

input_core_dims=[['time','laydim'],[],[],['dim_0','dim_1'],['dim_0','dim_1']],
output_core_dims=[['dim_0','dim_1','time','laydim']],

Now I "only" get an error concerning the size of my output (which is nolonger chunked).
My output is nolonger chunked, because nFlowElem is lost after the interpolation.
Consequently, the (600,560,1009,20) dataarray is too large for my work memory.

Is there perhaps a way to have apply_ufunc chunk your output along any of the other dimensions?

### UPDATE 2: ###

Alternatively, I have tried to chunk over the time dimension (50 time steps) and I have removed all input/output core dimensions.

And if I then define interp_to_grid as follows (to get the right input dimensions):

def interp_to_grid(u,xc,yc,xint,yint):

    xc = xc[:,0,0,0,0]
    yc = yc[:,0,0,0,0]
    u = u[:,:,:,0,0]
    print(u.shape,xc.shape,xint.shape) #input shape

    ug = griddata((xc,yc),u,(xint,yint), method='nearest', fill_value=np.nan)
    print(ug.shape) #output shape

    return ug 

I do get the right dimensions for my interp_to_grid-input (see first line) and output (see second line). However, I get the ValueError: axes don't match array:

(194988, 50, 20) (194988,) (600, 560)
(600, 560, 50, 20)
Traceback (most recent call last):

  File "<ipython-input-11-6b65fe3dba5b>", line 1, in <module>
    uxg.loc[dict(time=np.datetime64('2014-09-18'),laydim=19)].plot()

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\plot\plot.py", line 444, in __call__
    return plot(self._da, **kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\plot\plot.py", line 160, in plot
    darray = darray.squeeze().compute()

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\core\dataarray.py", line 899, in compute
    return new.load(**kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\core\dataarray.py", line 873, in load
    ds = self._to_temp_dataset().load(**kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\core\dataset.py", line 798, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\threaded.py", line 84, in get
    **kwargs

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\local.py", line 487, in get_async
    raise_exception(exc, tb)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\local.py", line 317, in reraise
    raise exc

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\local.py", line 222, in execute_task
    result = _execute_task(task, data)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\optimization.py", line 963, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 151, in get
    result = _execute_task(task, cache)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\utils.py", line 35, in apply
    return func(*args, **kwargs)

  File "<__array_function__ internals>", line 6, in transpose

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\numpy\core\fromnumeric.py", line 658, in transpose
    return _wrapfunc(a, 'transpose', axes)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\numpy\core\fromnumeric.py", line 58, in _wrapfunc
    return bound(*args, **kwds)

ValueError: axes don't match array

@TomNicholas
Copy link
Member

TomNicholas commented May 17, 2021

@LJaksic are you aware that passing dask_gufunc_kwargs={'allow_rechunk': True} to xr.apply_ufunc will allow it to accept an input which is chunked along the core dims? (See #1995 (comment)) It does come with a warning though.

@dcherian
Copy link
Contributor

please reopen with an MVCE if you still need help. This would be a good example for https://tutorial.xarray.dev/advanced/apply_ufunc/apply_ufunc.html

@dcherian dcherian closed this as not planned Won't fix, can't repro, duplicate, stale Dec 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants