-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add asynchronous load method #10327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add asynchronous load method #10327
Changes from 50 commits
01e7518
83e553b
e44326d
4e4eeb0
d858059
d377780
3132f6a
900eef5
4c4462f
5b9b749
fadb953
57d9d23
11170fc
0b8fa41
f769f85
4eef318
29242a4
e6b3b3b
3ceab60
071c35a
29374f9
ab12bb8
62aa39d
dfe8bf7
a906dec
629ab31
7e9ae0f
d288351
e0731a0
9b41e78
67ba26a
9344e2e
f8f8563
30ce9be
5d15bbd
1f02de1
2342b50
b6d4a82
2079d7e
48e4534
cca7589
dfe9b87
84099f3
ab000c8
a8b7b46
82c7654
5eacdb0
9f33c09
093bf50
4073a24
842a06c
e19ab55
b9e8e06
8bc7bea
6c47e3f
a86f646
884ce13
a43af86
17d7a0e
d824a2d
6a13611
d79ed54
1da3359
dded9e0
4c347ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
"coveralls", | ||
"pip", | ||
"pytest", | ||
"pytest-asyncio", | ||
"pytest-cov", | ||
"pytest-env", | ||
"pytest-mypy-plugins", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import os | ||
import time | ||
import traceback | ||
from abc import ABC, abstractmethod | ||
from collections.abc import Hashable, Iterable, Mapping, Sequence | ||
from glob import glob | ||
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, Union, overload | ||
|
@@ -267,13 +268,23 @@ def robust_getitem(array, key, catch=Exception, max_retries=6, initial_delay=500 | |
time.sleep(1e-3 * next_delay) | ||
|
||
|
||
class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed): | ||
class BackendArray(ABC, NdimSizeLenMixin, indexing.ExplicitlyIndexed): | ||
__slots__ = () | ||
|
||
@abstractmethod | ||
def __getitem__(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: ... | ||
|
||
async def async_getitem(key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: | ||
raise NotImplementedError("Backend does not not support asynchronous loading") | ||
Comment on lines
+273
to
+274
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've implemented this for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might not be the desired behaviour though - this currently means if you opened a dataset from netCDF and called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes absolutely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I can do that. But can you explain why you feel that this would be better behaviour? Asking for something to be done async and it quietly blocking also seems not great... |
||
|
||
def get_duck_array(self, dtype: np.typing.DTypeLike = None): | ||
key = indexing.BasicIndexer((slice(None),) * self.ndim) | ||
return self[key] # type: ignore[index] | ||
|
||
async def async_get_duck_array(self, dtype: np.typing.DTypeLike = None): | ||
key = indexing.BasicIndexer((slice(None),) * self.ndim) | ||
return await self.async_getitem(key) # type: ignore[index] | ||
|
||
|
||
class AbstractDataStore: | ||
__slots__ = () | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
import copy | ||
import datetime | ||
import math | ||
|
@@ -531,24 +532,50 @@ def load(self, **kwargs) -> Self: | |
dask.compute | ||
""" | ||
# access .data to coerce everything to numpy or dask arrays | ||
lazy_data = { | ||
chunked_data = { | ||
k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) | ||
} | ||
if lazy_data: | ||
chunkmanager = get_chunked_array_type(*lazy_data.values()) | ||
if chunked_data: | ||
chunkmanager = get_chunked_array_type(*chunked_data.values()) | ||
|
||
# evaluate all the chunked arrays simultaneously | ||
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( | ||
*lazy_data.values(), **kwargs | ||
*chunked_data.values(), **kwargs | ||
) | ||
|
||
for k, data in zip(lazy_data, evaluated_data, strict=False): | ||
for k, data in zip(chunked_data, evaluated_data, strict=False): | ||
self.variables[k].data = data | ||
|
||
# load everything else sequentially | ||
for k, v in self.variables.items(): | ||
if k not in lazy_data: | ||
v.load() | ||
[v.load() for k, v in self.variables.items() if k not in chunked_data] | ||
|
||
return self | ||
|
||
async def load_async(self, **kwargs) -> Self: | ||
# TODO refactor this to pull out the common chunked_data codepath | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's instead just have the sync methods issue a blocking call to the async versions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that would solve the use case in xpublish though? You need to be able to asynchronously trigger loading for a bunch of separate dataset objects, which requires an async load api to be exposed, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I understand what you mean now, you're not talking about the API, you're just talking about my comment about internal refactoring. You're proposing we do what zarr does internally, which makes sense. |
||
|
||
# this blocks on chunked arrays but not on lazily indexed arrays | ||
|
||
# access .data to coerce everything to numpy or dask arrays | ||
chunked_data = { | ||
k: v._data for k, v in self.variables.items() if is_chunked_array(v._data) | ||
} | ||
if chunked_data: | ||
chunkmanager = get_chunked_array_type(*chunked_data.values()) | ||
|
||
# evaluate all the chunked arrays simultaneously | ||
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute( | ||
*chunked_data.values(), **kwargs | ||
) | ||
|
||
for k, data in zip(chunked_data, evaluated_data, strict=False): | ||
self.variables[k].data = data | ||
|
||
# load everything else concurrently | ||
coros = [ | ||
v.load_async() for k, v in self.variables.items() if k not in chunked_data | ||
] | ||
await asyncio.gather(*coros) | ||
Comment on lines
+574
to
+578
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could actually do this same thing inside of the synchronous
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should rate-limite all async def async_gather(*coros, concurrency: Optional[int] = None, return_exceptions: bool = False) -> list[Any]:
"""Execute a gather while limiting the number of concurrent tasks.
Args:
coros: coroutines
list of coroutines to execute
concurrency: int
concurrency limit
if None, defaults to config_obj.get('async.concurrency', 4)
if <= 0, no concurrency limit
"""
if concurrency is None:
concurrency = int(config_obj.get("async.concurrency", 4))
if concurrency > 0:
# if concurrency > 0, we use a semaphore to limit the number of concurrent coroutines
semaphore = asyncio.Semaphore(concurrency)
async def sem_coro(coro):
async with semaphore:
return await coro
results = await asyncio.gather(*(sem_coro(c) for c in coros), return_exceptions=return_exceptions)
else:
results = await asyncio.gather(*coros, return_exceptions=return_exceptions)
return results There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Arguably that should be left to the underlying storage layer. Zarr already has its own rate limiting. Why introduce this additional complexity and configuration parameter in Xarray? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does zarr rate-limit per call or globally though? If it's rate-limited per call, and we make lots of concurrent calls from the xarray API, it will exceed the intended rate set in zarr... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not 100% on what Zarr will do but this will rate limit across Xarray variables. We will undoubtedly want to offer control here, even if the default is None for a start. |
||
|
||
return self | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As
__getitem__
is required, I feel likeBackendArray
should always have been an ABC.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is public API and this is a backwards incompatible change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is technically, but only if someone is using this class in a way counter to what the docs explicitly tell you to do (i.e. subclass it).
Regardless this is orthogonal to the rest of the PR, I can remove it, I was just trying to clean up bad things I found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted in 6c47e3f