Skip to content

Add asynchronous load method #10327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 50 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
01e7518
new blank whatsnew
TomNicholas Oct 24, 2024
83e553b
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Oct 24, 2024
e44326d
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Nov 8, 2024
4e4eeb0
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Nov 20, 2024
d858059
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Nov 21, 2024
d377780
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Nov 21, 2024
3132f6a
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Nov 23, 2024
900eef5
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Nov 29, 2024
4c4462f
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Dec 4, 2024
5b9b749
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Jan 6, 2025
fadb953
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Jan 8, 2025
57d9d23
Merge branch 'main' of https://github.com/TomNicholas/xarray
TomNicholas Mar 13, 2025
11170fc
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Mar 19, 2025
0b8fa41
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Mar 20, 2025
f769f85
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Mar 20, 2025
4eef318
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas Apr 7, 2025
29242a4
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas May 15, 2025
e6b3b3b
test async load using special zarr LatencyStore
TomNicholas May 15, 2025
3ceab60
don't use dask
TomNicholas May 16, 2025
071c35a
async all the way down
TomNicholas May 16, 2025
29374f9
remove assert False
TomNicholas May 16, 2025
ab12bb8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 16, 2025
62aa39d
add pytest-asyncio to CI envs
TomNicholas May 16, 2025
dfe8bf7
Merge branch 'async.load' of https://github.com/TomNicholas/xarray in…
TomNicholas May 16, 2025
a906dec
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 16, 2025
629ab31
assert results are identical
TomNicholas May 16, 2025
7e9ae0f
implement async load for dataarray and dataset
TomNicholas May 17, 2025
d288351
factor out common logic
TomNicholas May 17, 2025
e0731a0
consolidate tests via a parametrized fixture
TomNicholas May 17, 2025
9b41e78
async_load -> load_async
TomNicholas May 17, 2025
67ba26a
make BackendArray an ABC
TomNicholas May 18, 2025
9344e2e
explain how to add async support for any backend in the docs
TomNicholas May 18, 2025
f8f8563
add new methods to api docs
TomNicholas May 19, 2025
30ce9be
whatsnew
TomNicholas May 19, 2025
5d15bbd
Merge branch 'main' of https://github.com/pydata/xarray
TomNicholas May 19, 2025
1f02de1
Merge branch 'main' into async.load
TomNicholas May 19, 2025
2342b50
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 19, 2025
b6d4a82
Fix ci/minimum_versions.py
TomNicholas May 19, 2025
2079d7e
fix formatting
TomNicholas May 21, 2025
48e4534
concurrently load different variables in ds.load_async using asyncio.…
TomNicholas May 21, 2025
cca7589
test concurrent loading of multiple variables in one dataset
TomNicholas May 21, 2025
dfe9b87
fix non-awaited load_async
TomNicholas May 21, 2025
84099f3
rearrange test order
TomNicholas May 21, 2025
ab000c8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 21, 2025
a8b7b46
add test for orthogonal indexing
TomNicholas May 23, 2025
82c7654
explicitly forbid orthogonal indexing
TomNicholas May 23, 2025
5eacdb0
support async orthogonal indexing via https://github.com/zarr-develop…
TomNicholas May 23, 2025
9f33c09
Merge branch 'async.load' of https://github.com/TomNicholas/xarray in…
TomNicholas May 23, 2025
093bf50
add test for vectorized indexing (even if it doesn't work)
TomNicholas May 23, 2025
4073a24
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/minimum_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"coveralls",
"pip",
"pytest",
"pytest-asyncio",
"pytest-cov",
"pytest-env",
"pytest-mypy-plugins",
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/all-but-dask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies:
- pip
- pydap
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/all-but-numba.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies:
- pyarrow # pandas raises a deprecation warning without this, breaking doctests
- pydap
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/bare-minimum.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies:
- coveralls
- pip
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/environment-3.14.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dependencies:
- pyarrow # pandas raises a deprecation warning without this, breaking doctests
- pydap
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/environment-windows-3.14.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies:
- pyarrow # importing dask.dataframe raises an ImportError without this
- pydap
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/environment-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies:
- pyarrow # importing dask.dataframe raises an ImportError without this
- pydap
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies:
- pydap
- pydap-server
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions ci/requirements/min-all-deps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies:
- pip
- pydap=3.5
- pytest
- pytest-asyncio
- pytest-cov
- pytest-env
- pytest-mypy-plugins
Expand Down
1 change: 1 addition & 0 deletions doc/api-hidden.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
Variable.isnull
Variable.item
Variable.load
Variable.load_async
Variable.max
Variable.mean
Variable.median
Expand Down
2 changes: 2 additions & 0 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ Dataset methods
Dataset.filter_by_attrs
Dataset.info
Dataset.load
Dataset.load_async
Dataset.persist
Dataset.unify_chunks

Expand Down Expand Up @@ -1154,6 +1155,7 @@ DataArray methods
DataArray.compute
DataArray.persist
DataArray.load
DataArray.load_async
DataArray.unify_chunks

DataTree methods
Expand Down
48 changes: 33 additions & 15 deletions doc/internals/how-to-add-new-backend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,12 @@ information on plugins.
How to support lazy loading
+++++++++++++++++++++++++++

If you want to make your backend effective with big datasets, then you should
support lazy loading.
Basically, you shall replace the :py:class:`numpy.ndarray` inside the
variables with a custom class that supports lazy loading indexing.
If you want to make your backend effective with big datasets, then you should take advantage of xarray's
support for lazy loading and indexing.

Basically, when your backend constructs the ``Variable`` objects,
you need to replace the :py:class:`numpy.ndarray` inside the
variables with a custom :py:class:`~xarray.backends.BackendArray` subclass that supports lazy loading and indexing.
See the example below:

.. code-block:: python
Expand All @@ -339,25 +341,27 @@ See the example below:

Where:

- :py:class:`~xarray.core.indexing.LazilyIndexedArray` is a class
provided by Xarray that manages the lazy loading.
- ``MyBackendArray`` shall be implemented by the backend and shall inherit
- :py:class:`~xarray.core.indexing.LazilyIndexedArray` is a wrapper class
provided by Xarray that manages the lazy loading and indexing.
- ``MyBackendArray`` should be implemented by the backend and must inherit
from :py:class:`~xarray.backends.BackendArray`.

BackendArray subclassing
^^^^^^^^^^^^^^^^^^^^^^^^

The BackendArray subclass shall implement the following method and attributes:
The BackendArray subclass must implement the following method and attributes:

- the ``__getitem__`` method that takes in input an index and returns a
`NumPy <https://numpy.org/>`__ array
- the ``shape`` attribute
- the ``__getitem__`` method that takes an index as an input and returns a
`NumPy <https://numpy.org/>`__ array,
- the ``shape`` attribute,
- the ``dtype`` attribute.

Xarray supports different type of :doc:`/user-guide/indexing`, that can be
grouped in three types of indexes
It may also optionally implement an additional ``async_getitem`` method.

Xarray supports different types of :doc:`/user-guide/indexing`, that can be
grouped in three types of indexes:
:py:class:`~xarray.core.indexing.BasicIndexer`,
:py:class:`~xarray.core.indexing.OuterIndexer` and
:py:class:`~xarray.core.indexing.OuterIndexer`, and
:py:class:`~xarray.core.indexing.VectorizedIndexer`.
This implies that the implementation of the method ``__getitem__`` can be tricky.
In order to simplify this task, Xarray provides a helper function,
Expand Down Expand Up @@ -413,8 +417,22 @@ input the ``key``, the array ``shape`` and the following parameters:
For more details see
:py:class:`~xarray.core.indexing.IndexingSupport` and :ref:`RST indexing`.

Async support
^^^^^^^^^^^^^

Backends can also optionally support loading data asynchronously via xarray's asynchronous loading methods
(e.g. ``~xarray.Dataset.load_async``).
To support async loading the ``BackendArray`` subclass must additionally implement the ``BackendArray.async_getitem`` method.

Note that implementing this method is only necessary if you want to be able to load data from different xarray objects concurrently.
Even without this method your ``BackendArray`` implementation is still free to concurrently load chunks of data for a single ``Variable`` itself,
so long as it does so behind the synchronous ``__getitem__`` interface.

Dask support
^^^^^^^^^^^^

In order to support `Dask Distributed <https://distributed.dask.org/>`__ and
:py:mod:`multiprocessing`, ``BackendArray`` subclass should be serializable
:py:mod:`multiprocessing`, the ``BackendArray`` subclass should be serializable
either with :ref:`io.pickle` or
`cloudpickle <https://github.com/cloudpipe/cloudpickle>`__.
That implies that all the reference to open files should be dropped. For
Expand Down
4 changes: 3 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ v2025.05.0 (unreleased)

New Features
~~~~~~~~~~~~

- Added new asynchronous loading methods :py:meth:`~xarray.Dataset.load_async`, :py:meth:`~xarray.DataArray.load_async`, :py:meth:`~xarray.Variable.load_async`.
(:issue:`10326`, :pull:`10327`) By `Tom Nicholas <https://github.com/TomNicholas>`_.
- Allow an Xarray index that uses multiple dimensions checking equality with another
index for only a subset of those dimensions (i.e., ignoring the dimensions
that are excluded from alignment).
Expand All @@ -42,7 +45,6 @@ Bug fixes
~~~~~~~~~
- Fix :py:class:`~xarray.groupers.BinGrouper` when ``labels`` is not specified (:issue:`10284`).
By `Deepak Cherian <https://github.com/dcherian>`_.

- Allow accessing arbitrary attributes on Pandas ExtensionArrays.
By `Deepak Cherian <https://github.com/dcherian>`_.

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ dev = [
"pytest-mypy-plugins",
"pytest-timeout",
"pytest-xdist",
"pytest-asyncio",
"ruff>=0.8.0",
"sphinx",
"sphinx_autosummary_accessors",
Expand Down
13 changes: 12 additions & 1 deletion xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import time
import traceback
from abc import ABC, abstractmethod
from collections.abc import Hashable, Iterable, Mapping, Sequence
from glob import glob
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, Union, overload
Expand Down Expand Up @@ -267,13 +268,23 @@ def robust_getitem(array, key, catch=Exception, max_retries=6, initial_delay=500
time.sleep(1e-3 * next_delay)


class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed):
class BackendArray(ABC, NdimSizeLenMixin, indexing.ExplicitlyIndexed):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As __getitem__ is required, I feel like BackendArray should always have been an ABC.

__slots__ = ()

@abstractmethod
def __getitem__(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: ...

async def async_getitem(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike:
raise NotImplementedError("Backend does not not support asynchronous loading")
Comment on lines +277 to +278
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented this for the ZarrArray class but in theory it could be supported by other backends too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be the desired behaviour though - this currently means if you opened a dataset from netCDF and called ds.load_async you would get a NotImplementedError. Would it be better to quietly just block instead?


def get_duck_array(self, dtype: np.typing.DTypeLike = None):
key = indexing.BasicIndexer((slice(None),) * self.ndim)
return self[key] # type: ignore[index]

async def async_get_duck_array(self, dtype: np.typing.DTypeLike = None):
key = indexing.BasicIndexer((slice(None),) * self.ndim)
return await self.async_getitem(key) # type: ignore[index]


class AbstractDataStore:
__slots__ = ()
Expand Down
25 changes: 25 additions & 0 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ class ZarrArrayWrapper(BackendArray):
def __init__(self, zarr_array):
# some callers attempt to evaluate an array if an `array` property exists on the object.
# we prefix with _ to avoid this inference.

# TODO type hint this?
self._array = zarr_array
self.shape = self._array.shape

Expand Down Expand Up @@ -212,6 +214,14 @@ def _vindex(self, key):
def _getitem(self, key):
return self._array[key]

async def _async_getitem(self, key):
async_array = self._array._async_array
return await async_array.getitem(key)

async def _async_oindex(self, key):
async_array = self._array._async_array
return await async_array.oindex.getitem(key)

def __getitem__(self, key):
array = self._array
if isinstance(key, indexing.BasicIndexer):
Expand All @@ -227,6 +237,21 @@ def __getitem__(self, key):
# if self.ndim == 0:
# could possibly have a work-around for 0d data here

async def async_getitem(self, key):
array = self._array
if isinstance(key, indexing.BasicIndexer):
method = self._async_getitem
elif isinstance(key, indexing.VectorizedIndexer):
# method = self._vindex
raise NotImplementedError("async lazy vectorized indexing is not supported")
elif isinstance(key, indexing.OuterIndexer):
method = self._async_oindex

print("did an async get")
return await indexing.async_explicit_indexing_adapter(
key, array.shape, indexing.IndexingSupport.OUTER, method
)


def _determine_zarr_chunks(
enc_chunks, var_chunks, ndim, name, safe_chunks, region, mode, shape
Expand Down
8 changes: 8 additions & 0 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,14 @@ def load(self, **kwargs) -> Self:
self._coords = new._coords
return self

async def load_async(self, **kwargs) -> Self:
temp_ds = self._to_temp_dataset()
ds = await temp_ds.load_async(**kwargs)
new = self._from_temp_dataset(ds)
self._variable = new._variable
self._coords = new._coords
return self

def compute(self, **kwargs) -> Self:
"""Manually trigger loading of this array's data from disk or a
remote source into memory and return a new array.
Expand Down
43 changes: 35 additions & 8 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import copy
import datetime
import math
Expand Down Expand Up @@ -531,24 +532,50 @@ def load(self, **kwargs) -> Self:
dask.compute
"""
# access .data to coerce everything to numpy or dask arrays
lazy_data = {
chunked_data = {
k: v._data for k, v in self.variables.items() if is_chunked_array(v._data)
}
if lazy_data:
chunkmanager = get_chunked_array_type(*lazy_data.values())
if chunked_data:
chunkmanager = get_chunked_array_type(*chunked_data.values())

# evaluate all the chunked arrays simultaneously
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
*lazy_data.values(), **kwargs
*chunked_data.values(), **kwargs
)

for k, data in zip(lazy_data, evaluated_data, strict=False):
for k, data in zip(chunked_data, evaluated_data, strict=False):
self.variables[k].data = data

# load everything else sequentially
for k, v in self.variables.items():
if k not in lazy_data:
v.load()
[v.load() for k, v in self.variables.items() if k not in chunked_data]

return self

async def load_async(self, **kwargs) -> Self:
# TODO refactor this to pull out the common chunked_data codepath

# this blocks on chunked arrays but not on lazily indexed arrays

# access .data to coerce everything to numpy or dask arrays
chunked_data = {
k: v._data for k, v in self.variables.items() if is_chunked_array(v._data)
}
if chunked_data:
chunkmanager = get_chunked_array_type(*chunked_data.values())

# evaluate all the chunked arrays simultaneously
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
*chunked_data.values(), **kwargs
)

for k, data in zip(chunked_data, evaluated_data, strict=False):
self.variables[k].data = data

# load everything else concurrently
coros = [
v.load_async() for k, v in self.variables.items() if k not in chunked_data
]
await asyncio.gather(*coros)
Comment on lines +574 to +578
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could actually do this same thing inside of the synchronous ds.load() too, but it would require:

  1. Xarray to decide how to call the async code, e.g. with a ThreadPool or similar (see Support concurrent loading of variables #8965)
  2. The backend to support async_getitem (it could fall back to synchronous loading if it's not supported)


return self

Expand Down
Loading
Loading