Skip to content

[v3] Batch array / group access #1805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
d-v-b opened this issue Apr 22, 2024 · 11 comments
Open

[v3] Batch array / group access #1805

d-v-b opened this issue Apr 22, 2024 · 11 comments
Labels
enhancement New features or improvements
Milestone

Comments

@d-v-b
Copy link
Contributor

d-v-b commented Apr 22, 2024

In v3, since the storage API is asynchronous, we can open multiple array or groups concurrently. This would be broadly useful, but we don't have a good template from zarr-python v2 to extrapolate from, so we have to invent something new here (new, relative to zarr-python, that is).

Over in #1804 @martindurant brought this up, and I suggested something like this:

def open_nodes(store: Store, paths: tuple[str, ...], options: dict[Literal["array", "group"], dict[str, Any]]) -> Array | Group:
  ...
 
def open_arrays(store: Store, paths: tuple[str, ...], options: dict[str, Any]) -> Array:
  ...

def open_groups(store: Store, paths: tuple[str, ...], options: dict[str, Any]) -> Group:
  ...

I was imagining that the arguments to these functions would be the paths of arrays / groups anywhere in a Zarr hierarchy; we could also have a group.open_groups() method which can only "see" sub-groups, and similarly for group.open_arrays().

An alternative would be to use a more general transactional context manager:

with transaction(store) as tx:
     a1_maybe = tx.open_array(...)
     a2_maybe = tx.open_array(...)
    # IO gets run concurrently in `__aexit__`

a1 = a1_maybe.result()
a2 = a2_maybe.result()

I'm a lot less sure of this second design, since I have never implemented anything like it. For example, should we use futures for the results of tx.open_array()?

Are there other ideas, or examples from other implementations / domains we could draw from?

@d-v-b d-v-b mentioned this issue Apr 22, 2024
6 tasks
@martindurant
Copy link
Member

I would also add data-getter, so in the first model something like

def get_data({arr_obj1: ((0, [1, 2, 3], slice(None, None, None)), arr_obj2: (Ellipsis, 0), ...})

or similar,

In addition to a convenience addition to the functions above

group.descent_to_path(path1, path2, ...

(where store and options are fixed).

I am not suggesting these are the right signatures, but this is the functionality I would want. After all "open many nodes" is sort of already covered in v2 for the special case of consolidated metadata (one call, no more latency, rather than many concurrent calls).

@jhamman jhamman added the V3 label May 17, 2024
@jhamman jhamman added this to the 3.0.0 milestone May 17, 2024
@jhamman jhamman moved this to Todo in Zarr-Python - 3.0 May 17, 2024
@jhamman jhamman modified the milestones: 3.0.0, After 3.0.0 Jul 1, 2024
@d-v-b
Copy link
Contributor Author

d-v-b commented Dec 10, 2024

just noting that in the context of xarray, some function or an AsyncGroup method for concurrently creating sub-arrays / sub-groups would be extremely useful from a performance POV. In particular, it would be great to replace this "create arrays serially" code in xarray with a concurrent zarr method that can create a bunch of arrays in one go. I will make some progress on this shortly.

@dstansby dstansby removed the V3 label Dec 12, 2024
@dstansby dstansby added the enhancement New features or improvements label Dec 30, 2024
@oliverwm1
Copy link

Just dropping by to say that a feature to enable concurrently loading many (e.g. 50-100) small (~1MB) arrays would be very useful :) let me know if I can support in any way.

@martindurant
Copy link
Member

You should already be able to use the async class to open them and then asyncio.gather to await a bunch of them. Providing this as a convenience method callable from sync code is a good idea.

xarray does something similar to read coordinates of a large hierarchy on open.

@oliverwm1
Copy link

You should already be able to use the async class to open them and then asyncio.gather to await a bunch of them

I am struggling with conflicting event loops when I try do to this, but this may just be me not understanding how to do this properly. E.g.

RuntimeError: Task <Task pending name='Task-500' coro=<StorePath.get() running at /home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/zarr/storage/_common.py:124> cb=[gather.<locals>._done_callback() at /home/OliWM/miniconda3/envs/fme/lib/python3.11/asyncio/tasks.py:764]> got Future <Future pending> attached to a different loop

xarray does something similar to read coordinates of a large hierarchy on open.

Do you have a link to this code? would be a helpful pointer.

@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 3, 2025

@oliverwm1 do you have a runnable code example?

@oliverwm1
Copy link

My issues seem due to trying to use sync and async versions of zarr code in same script. I was doing that because I was doing some profiling of each. For example, following code fails:

import asyncio

import zarr


async def open(url):
    return await zarr.api.asynchronous.open(store=url)

url = 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3'

# open with zarr sync API
store = zarr.storage.FsspecStore.from_url(url)
group = zarr.open_group(store, mode='r')

# open with zarr async
asyncio.run(open(url))

Traceback:

Traceback (most recent call last):
  File "/home/OliWM/repos/explore2/oliverwm/2025-04-03-example-zarr-sync-async-fail.py", line 16, in <module>
    asyncio.run(open(url))
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/OliWM/repos/explore2/oliverwm/2025-04-03-example-zarr-sync-async-fail.py", line 7, in open
    return await zarr.api.asynchronous.open(store=url)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/zarr/api/asynchronous.py", line 320, in open
    metadata_dict = await get_array_metadata(store_path, zarr_format=zarr_format)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/zarr/core/array.py", line 177, in get_array_metadata
    zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
                                                  ^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/zarr/storage/_common.py", line 124, in get
    return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/zarr/storage/_fsspec.py", line 230, in get
    value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
                                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/gcsfs/core.py", line 1092, in _cat_file
    headers, out = await self._call("GET", u2, headers=head)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/gcsfs/core.py", line 477, in _call
    status, headers, info, contents = await self._request(
                                      ^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/decorator.py", line 224, in fun
    return await caller(func, *(extras + args), **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/gcsfs/retry.py", line 135, in retry_request
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/gcsfs/core.py", line 456, in _request
    async with self.session.request(
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/aiohttp/client.py", line 1425, in __aenter__
    self._resp: _RetType = await self._coro
                           ^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/aiohttp/client.py", line 730, in _request
    await resp.start(conn)
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 1059, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/aiohttp/streams.py", line 672, in read
    await self._waiter
RuntimeError: Task <Task pending name='Task-12' coro=<StorePath.get() running at /home/OliWM/miniconda3/envs/fme/lib/python3.11/site-packages/zarr/storage/_common.py:124> cb=[gather.<locals>._done_callback() at /home/OliWM/miniconda3/envs/fme/lib/python3.11/asyncio/tasks.py:764]> got Future <Future pending> attached to a different loop

But if you comment out the group = zarr.open_group(store, mode='r') line, then it does not raise an error. Let me know if making a separate issue is helpful. I don't know whether this should be expected to work or not, but it was pretty confusing to me.

@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 3, 2025

I'm also confused! I could get this to work, but only by using anonymous credentials:

# /// script
# requires-python = ">=3.11"
# dependencies = [
#     "zarr",
#     "fsspec",
#     "gcsfs"
# ]
# ///


import asyncio

import zarr

async def open(url):
    return await zarr.api.asynchronous.open(store=url)

url = 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3'

# open with zarr sync API
# note the anonymous credentials
store = zarr.storage.FsspecStore.from_url(url, storage_options={'token': 'anon'})
group = zarr.open_group(store, mode='r')

# open with zarr async
g = asyncio.run(open(url))
print(g)
# <AsyncGroup <FsspecStore(GCSFileSystem, gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3)>>

I don't know why the choice of credentials would matter here.

@martindurant
Copy link
Member

I can only suppose that zarr isn't being careful about passing asynchronous=True when required, and ending up with the same instance in both cases. You can test this by looking at

fsspec.filesystem("gcs")._cache

@oliverwm1
Copy link

but only by using anonymous credentials

Interesting, yes that also made the script work for me.

@oliverwm1
Copy link

Maybe this is the same issue: #2946

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New features or improvements
Projects
Status: Todo
Development

No branches or pull requests

5 participants