From 5e0ffe80d039d9261517d96ce87220ce8d48e4f2 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Mon, 21 Oct 2024 07:33:22 -0500 Subject: [PATCH 01/13] Added Store.getsize Closes https://github.com/zarr-developers/zarr-python/issues/2420 --- src/zarr/abc/store.py | 28 ++++++++++++++++++++++++++++ src/zarr/storage/local.py | 3 +++ src/zarr/storage/remote.py | 15 ++++++++++++++- src/zarr/testing/store.py | 12 ++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index a995a6bf38..e6d0570c71 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -5,6 +5,8 @@ from itertools import starmap from typing import TYPE_CHECKING, NamedTuple, Protocol, runtime_checkable +from zarr.core.buffer.core import default_buffer_prototype + if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterable from types import TracebackType @@ -386,6 +388,32 @@ async def _get_many( for req in requests: yield (req[0], await self.get(*req)) + async def getsize(self, key: str) -> int: + """ + Return the size, in bytes, of a value in a Store. + + Parameters + ---------- + key : str + + Returns + ------- + nbytes: int + The size of the value in bytes. + + Raises + ------ + FileNotFoundError + When the given key does not exist in the store. + """ + # Note to implementers: this default implementation is very inefficient since + # it requires reading the entire object. Many systems will have ways to get the + # size of an object without reading it. + value = await self.get(key, prototype=default_buffer_prototype()) + if value is None: + raise FileNotFoundError(key) + return len(value) + @runtime_checkable class ByteGetter(Protocol): diff --git a/src/zarr/storage/local.py b/src/zarr/storage/local.py index 5c03009a97..fde825b682 100644 --- a/src/zarr/storage/local.py +++ b/src/zarr/storage/local.py @@ -242,3 +242,6 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: yield str(key).replace(to_strip, "") except (FileNotFoundError, NotADirectoryError): pass + + async def getsize(self, key: str) -> int: + return os.path.getsize(self.root / key) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index 0a0ec7f7cc..12a8664da4 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Self +from typing import TYPE_CHECKING, Any, Self, cast import fsspec @@ -301,3 +301,16 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: find_str = f"{self.path}/{prefix}" for onefile in await self.fs._find(find_str, detail=False, maxdepth=None, withdirs=False): yield onefile.removeprefix(find_str) + + async def getsize(self, key: str) -> int: + path = _dereference_path(self.path, key) + info = await self.fs._info(path) + + size = info.get("size") + + if size is None: + # Not all filesystems support size. Fall back to reading the entire object + return await super().getsize(key) + else: + # fsspec doesn't have typing. We'll need to assume this is correct. + return cast(int, size) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index b4da75b06b..af8b7332e6 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -338,3 +338,15 @@ async def test_set_if_not_exists(self, store: S) -> None: result = await store.get("k2", default_buffer_prototype()) assert result == new + + async def test_getsize(self, store: S) -> None: + key = "k" + data = self.buffer_cls.from_bytes(b"0" * 10) + await self.set(store, key, data) + + result = await store.getsize(key) + assert result == 10 + + async def test_getsize_raises(self, store: S) -> None: + with pytest.raises(FileNotFoundError): + await store.getsize("not-a-real-key") From 1926e19d787cc1a967d5c7dcb4d762f646dcea67 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 22 Oct 2024 07:55:30 -0500 Subject: [PATCH 02/13] fixups --- docs/guide/storage.rst | 8 ++++++++ src/zarr/abc/store.py | 5 ++++- src/zarr/buffer.py | 5 +++++ src/zarr/storage/local.py | 5 ++++- src/zarr/storage/logging.py | 2 +- src/zarr/storage/memory.py | 5 ++++- src/zarr/storage/remote.py | 11 +++++++---- src/zarr/storage/zip.py | 5 ++++- src/zarr/testing/store.py | 9 +++++++++ tests/test_codecs/test_blosc.py | 2 +- tests/test_codecs/test_codecs.py | 2 +- tests/test_codecs/test_sharding.py | 2 +- tests/test_group.py | 2 +- tests/test_indexing.py | 3 ++- tests/test_metadata/test_consolidated.py | 2 +- tests/test_metadata/test_v3.py | 2 +- tests/test_store/test_logging.py | 2 +- tests/test_store/test_remote.py | 3 ++- tests/test_store/test_stateful_store.py | 3 ++- tests/test_v2.py | 3 ++- 20 files changed, 61 insertions(+), 20 deletions(-) create mode 100644 src/zarr/buffer.py diff --git a/docs/guide/storage.rst b/docs/guide/storage.rst index dfda553c43..a0edcb0359 100644 --- a/docs/guide/storage.rst +++ b/docs/guide/storage.rst @@ -97,5 +97,13 @@ Zarr-Python :class:`zarr.abc.store.Store` API is meant to be extended. The Store Class includes all of the methods needed to be a fully operational store in Zarr Python. Zarr also provides a test harness for custom stores: :class:`zarr.testing.store.StoreTests`. +``Store.get`` +~~~~~~~~~~~~~ + +The ``prototype`` keyword of :func:`zarr.abc.store.Store.get` uses a default of +``None``. When given ``None``, implementations should use +:func:`zarr.buffer.default_buffer_prototype` to look up the prototype users have +configured. + .. _Zip Store Specification: https://github.com/zarr-developers/zarr-specs/pull/311 .. _Fsspec: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#consolidated-metadata diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index e6d0570c71..e0243d8951 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -199,7 +199,7 @@ def __eq__(self, value: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype, + prototype: BufferPrototype | None = None, byte_range: ByteRangeRequest | None = None, ) -> Buffer | None: """Retrieve the value associated with a given key. @@ -208,6 +208,9 @@ async def get( ---------- key : str byte_range : tuple[int | None, int | None], optional + prototype: BufferPrototype, optional + The prototype giving the buffer classes to use for buffers and nbuffers. + By default, :func:`zarr.buffer.default_buffer_prototype` is used. Returns ------- diff --git a/src/zarr/buffer.py b/src/zarr/buffer.py new file mode 100644 index 0000000000..0488b38524 --- /dev/null +++ b/src/zarr/buffer.py @@ -0,0 +1,5 @@ +from zarr.core.buffer import default_buffer_prototype + +__all__ = [ + "default_buffer_prototype", +] diff --git a/src/zarr/storage/local.py b/src/zarr/storage/local.py index fde825b682..f41d0a831f 100644 --- a/src/zarr/storage/local.py +++ b/src/zarr/storage/local.py @@ -9,6 +9,7 @@ from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer +from zarr.core.buffer.core import default_buffer_prototype from zarr.core.common import concurrent_map if TYPE_CHECKING: @@ -143,10 +144,12 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype, + prototype: BufferPrototype | None = None, byte_range: tuple[int | None, int | None] | None = None, ) -> Buffer | None: # docstring inherited + if prototype is None: + prototype = default_buffer_prototype() if not self._is_open: await self._open() assert isinstance(key, str) diff --git a/src/zarr/storage/logging.py b/src/zarr/storage/logging.py index 66fd1687e8..a327843afe 100644 --- a/src/zarr/storage/logging.py +++ b/src/zarr/storage/logging.py @@ -159,7 +159,7 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype, + prototype: BufferPrototype | None = None, byte_range: tuple[int | None, int | None] | None = None, ) -> Buffer | None: # docstring inherited diff --git a/src/zarr/storage/memory.py b/src/zarr/storage/memory.py index f942d57b95..bb3c0bde95 100644 --- a/src/zarr/storage/memory.py +++ b/src/zarr/storage/memory.py @@ -4,6 +4,7 @@ from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer, gpu +from zarr.core.buffer.core import default_buffer_prototype from zarr.core.common import concurrent_map from zarr.storage._utils import _normalize_interval_index @@ -79,10 +80,12 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype, + prototype: BufferPrototype | None = None, byte_range: tuple[int | None, int | None] | None = None, ) -> Buffer | None: # docstring inherited + if prototype is None: + prototype = default_buffer_prototype() if not self._is_open: await self._open() assert isinstance(key, str) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index 12a8664da4..0f1b91fe2c 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -1,10 +1,11 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Self, cast +from typing import TYPE_CHECKING, Any, Self import fsspec from zarr.abc.store import ByteRangeRequest, Store +from zarr.core.buffer.core import default_buffer_prototype from zarr.storage.common import _dereference_path if TYPE_CHECKING: @@ -175,10 +176,12 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype, + prototype: BufferPrototype | None = None, byte_range: ByteRangeRequest | None = None, ) -> Buffer | None: # docstring inherited + if prototype is None: + prototype = default_buffer_prototype() if not self._is_open: await self._open() path = _dereference_path(self.path, key) @@ -312,5 +315,5 @@ async def getsize(self, key: str) -> int: # Not all filesystems support size. Fall back to reading the entire object return await super().getsize(key) else: - # fsspec doesn't have typing. We'll need to assume this is correct. - return cast(int, size) + # fsspec doesn't have typing. We'll need to assume or verify this is true + return int(size) diff --git a/src/zarr/storage/zip.py b/src/zarr/storage/zip.py index 204a381bdb..e78675e467 100644 --- a/src/zarr/storage/zip.py +++ b/src/zarr/storage/zip.py @@ -9,6 +9,7 @@ from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer, BufferPrototype +from zarr.core.buffer.core import default_buffer_prototype if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterable @@ -166,11 +167,13 @@ def _get( async def get( self, key: str, - prototype: BufferPrototype, + prototype: BufferPrototype | None = None, byte_range: ByteRangeRequest | None = None, ) -> Buffer | None: # docstring inherited assert isinstance(key, str) + if prototype is None: + prototype = default_buffer_prototype() with self._lock: return self._get(key, prototype=prototype, byte_range=byte_range) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index af8b7332e6..6b5653d19a 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -121,6 +121,15 @@ async def test_get( expected = data_buf[start : start + length] assert_bytes_equal(observed, expected) + async def test_get_default_prototype(self, store: S) -> None: + key = "c/0" + data = b"\x01\x02\x03\x04" + data_buf = self.buffer_cls.from_bytes(data) + await self.set(store, key, data_buf) + observed = await store.get(key) + expected = data_buf[:] + assert_bytes_equal(observed, expected) + async def test_get_many(self, store: S) -> None: """ Ensure that multiple keys can be retrieved at once with the _get_many method. diff --git a/tests/test_codecs/test_blosc.py b/tests/test_codecs/test_blosc.py index 416a2f784e..30b0900276 100644 --- a/tests/test_codecs/test_blosc.py +++ b/tests/test_codecs/test_blosc.py @@ -5,8 +5,8 @@ from zarr import AsyncArray from zarr.abc.store import Store +from zarr.buffer import default_buffer_prototype from zarr.codecs import BloscCodec, BytesCodec, ShardingCodec -from zarr.core.buffer import default_buffer_prototype from zarr.storage.common import StorePath diff --git a/tests/test_codecs/test_codecs.py b/tests/test_codecs/test_codecs.py index 7a5fb979a1..761200cd7f 100644 --- a/tests/test_codecs/test_codecs.py +++ b/tests/test_codecs/test_codecs.py @@ -8,13 +8,13 @@ import pytest from zarr import Array, AsyncArray, config +from zarr.buffer import default_buffer_prototype from zarr.codecs import ( BytesCodec, GzipCodec, ShardingCodec, TransposeCodec, ) -from zarr.core.buffer import default_buffer_prototype from zarr.core.indexing import Selection, morton_order_iter from zarr.storage import StorePath diff --git a/tests/test_codecs/test_sharding.py b/tests/test_codecs/test_sharding.py index 85315c8780..43c7d8a2f2 100644 --- a/tests/test_codecs/test_sharding.py +++ b/tests/test_codecs/test_sharding.py @@ -7,6 +7,7 @@ from zarr import Array, AsyncArray from zarr.abc.store import Store +from zarr.buffer import default_buffer_prototype from zarr.codecs import ( BloscCodec, BytesCodec, @@ -14,7 +15,6 @@ ShardingCodecIndexLocation, TransposeCodec, ) -from zarr.core.buffer import default_buffer_prototype from zarr.storage.common import StorePath from ..conftest import ArrayRequest diff --git a/tests/test_group.py b/tests/test_group.py index 21e4ef4e50..0636fa58c4 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -14,7 +14,7 @@ import zarr.api.synchronous from zarr import Array, AsyncArray, AsyncGroup, Group from zarr.abc.store import Store -from zarr.core.buffer import default_buffer_prototype +from zarr.buffer import default_buffer_prototype from zarr.core.group import ConsolidatedMetadata, GroupMetadata from zarr.core.sync import sync from zarr.errors import ContainsArrayError, ContainsGroupError diff --git a/tests/test_indexing.py b/tests/test_indexing.py index b3a1990686..9bf932b5cd 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -11,7 +11,7 @@ from numpy.testing import assert_array_equal import zarr -from zarr.core.buffer import BufferPrototype, default_buffer_prototype +from zarr.buffer import default_buffer_prototype from zarr.core.indexing import ( BasicSelection, CoordinateSelection, @@ -32,6 +32,7 @@ from collections.abc import AsyncGenerator from zarr.core.array import Array + from zarr.core.buffer import BufferPrototype from zarr.core.buffer.core import Buffer from zarr.core.common import ChunkCoords diff --git a/tests/test_metadata/test_consolidated.py b/tests/test_metadata/test_consolidated.py index c0218602f6..2ce56c2c6f 100644 --- a/tests/test_metadata/test_consolidated.py +++ b/tests/test_metadata/test_consolidated.py @@ -16,7 +16,7 @@ open, open_consolidated, ) -from zarr.core.buffer.core import default_buffer_prototype +from zarr.buffer import default_buffer_prototype from zarr.core.group import ConsolidatedMetadata, GroupMetadata from zarr.core.metadata import ArrayV3Metadata from zarr.core.metadata.v2 import ArrayV2Metadata diff --git a/tests/test_metadata/test_v3.py b/tests/test_metadata/test_v3.py index 4e4ba23313..33f39ab5fe 100644 --- a/tests/test_metadata/test_v3.py +++ b/tests/test_metadata/test_v3.py @@ -7,8 +7,8 @@ import numpy as np import pytest +from zarr.buffer import default_buffer_prototype from zarr.codecs.bytes import BytesCodec -from zarr.core.buffer import default_buffer_prototype from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding from zarr.core.group import parse_node_type from zarr.core.metadata.v3 import ( diff --git a/tests/test_store/test_logging.py b/tests/test_store/test_logging.py index 0258244c50..3ddb9d2157 100644 --- a/tests/test_store/test_logging.py +++ b/tests/test_store/test_logging.py @@ -6,7 +6,7 @@ import zarr import zarr.storage -from zarr.core.buffer import default_buffer_prototype +from zarr.buffer import default_buffer_prototype from zarr.storage.logging import LoggingStore if TYPE_CHECKING: diff --git a/tests/test_store/test_remote.py b/tests/test_store/test_remote.py index c8e9a162b0..9d675c730d 100644 --- a/tests/test_store/test_remote.py +++ b/tests/test_store/test_remote.py @@ -10,7 +10,8 @@ from upath import UPath import zarr.api.asynchronous -from zarr.core.buffer import Buffer, cpu, default_buffer_prototype +from zarr.buffer import default_buffer_prototype +from zarr.core.buffer import Buffer, cpu from zarr.core.sync import _collect_aiterator, sync from zarr.storage import RemoteStore from zarr.testing.store import StoreTests diff --git a/tests/test_store/test_stateful_store.py b/tests/test_store/test_stateful_store.py index 9ac3bbc3f6..855885890b 100644 --- a/tests/test_store/test_stateful_store.py +++ b/tests/test_store/test_stateful_store.py @@ -15,7 +15,8 @@ import zarr from zarr.abc.store import AccessMode, Store -from zarr.core.buffer import BufferPrototype, cpu, default_buffer_prototype +from zarr.buffer import default_buffer_prototype +from zarr.core.buffer import BufferPrototype, cpu from zarr.storage import LocalStore, ZipStore from zarr.testing.strategies import key_ranges from zarr.testing.strategies import keys as zarr_keys diff --git a/tests/test_v2.py b/tests/test_v2.py index 439b15b64c..b2e47f6a4b 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -9,6 +9,7 @@ from numcodecs.blosc import Blosc import zarr +import zarr.buffer import zarr.storage from zarr import Array from zarr.storage import MemoryStore, StorePath @@ -90,7 +91,7 @@ async def test_v2_encode_decode(dtype): fill_value=b"X", ) - result = await store.get("foo/.zarray", zarr.core.buffer.default_buffer_prototype()) + result = await store.get("foo/.zarray", zarr.buffer.default_buffer_prototype()) assert result is not None serialized = json.loads(result.to_bytes()) From 12963abf853e2bc9840f3b82d10980cdbb62f2cd Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 22 Oct 2024 14:27:48 -0500 Subject: [PATCH 03/13] lint --- src/zarr/abc/store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index e0243d8951..f2b41bb80a 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -207,10 +207,10 @@ async def get( Parameters ---------- key : str - byte_range : tuple[int | None, int | None], optional - prototype: BufferPrototype, optional + prototype : BufferPrototype, optional The prototype giving the buffer classes to use for buffers and nbuffers. By default, :func:`zarr.buffer.default_buffer_prototype` is used. + byte_range : tuple[int | None, int | None], optional Returns ------- From 384d32325f00a23baba5da010c3b0355e47bbd74 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 24 Oct 2024 07:32:23 -0500 Subject: [PATCH 04/13] wip --- src/zarr/abc/store.py | 31 +++++++++++++++++++++++++++++-- src/zarr/core/array.py | 13 +++++++++++++ tests/test_array.py | 9 +++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index f2b41bb80a..c89252f70b 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -401,8 +401,8 @@ async def getsize(self, key: str) -> int: Returns ------- - nbytes: int - The size of the value in bytes. + nbytes : int + The size of the value (in bytes). Raises ------ @@ -417,6 +417,33 @@ async def getsize(self, key: str) -> int: raise FileNotFoundError(key) return len(value) + async def getsize_dir(self, prefix: str) -> int: + """ + Return the size, in bytes, of all values in a directory. + + This will include just values whose keys start with ``prefix`` and + do not contain the character ``/`` after the given prefix. + + Parameters + ---------- + prefix : str + The prefix of the directory to measure. + + Returns + ------- + nbytes : int + The sum of the sizes of the values in the directory (in bytes). + + Notes + ----- + ``getsize_dir`` is just provided as a potentially faster alternative to + listing all the keys in a directory and calling :meth:`Store.getsize` + on each. + """ + keys = [x async for x in self.list_dir(prefix)] + sizes = await gather(*[self.getsize(key) for key in keys]) + return sum(sizes) + @runtime_checkable class ByteGetter(Protocol): diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 6e3430c41a..e238f91eba 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -843,6 +843,9 @@ def nchunks_initialized(self) -> int: """ return nchunks_initialized(self) + async def nbytes_stored(self) -> int: + return await self.store_path.store.getsize_dir(self.store_path.path) + def _iter_chunk_coords( self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None ) -> Iterator[ChunkCoords]: @@ -1436,6 +1439,16 @@ def nchunks_initialized(self) -> int: """ return self._async_array.nchunks_initialized + def nbytes_stored(self) -> int: + """ + Determine the size, in bytes, of the array actually written to the store. + + Returns + ------- + size : int + """ + return sync(self._async_array.nbytes_stored()) + def _iter_chunk_keys( self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None ) -> Iterator[str]: diff --git a/tests/test_array.py b/tests/test_array.py index 829a04d304..0dac0afe3a 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -371,6 +371,15 @@ def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> No assert observed == expected +def test_nbytes_stored(): + arr = zarr.create(shape=(100,), chunks=(10,), dtype="i4") + result = arr.nbytes_stored() + assert result == 366 # the size of the metadata document. This is a fragile test + arr[:50] = 1 + result = arr.nbytes_stored() + assert result == 366 + + def test_default_fill_values() -> None: a = Array.create(MemoryStore({}, mode="w"), shape=5, chunk_shape=5, dtype=" Date: Thu, 24 Oct 2024 08:59:11 -0500 Subject: [PATCH 05/13] Use prefix --- src/zarr/abc/store.py | 6 +++--- src/zarr/core/array.py | 2 +- tests/test_array.py | 21 ++++++++++++++++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index c89252f70b..dd8516781e 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -417,9 +417,9 @@ async def getsize(self, key: str) -> int: raise FileNotFoundError(key) return len(value) - async def getsize_dir(self, prefix: str) -> int: + async def getsize_prefix(self, prefix: str) -> int: """ - Return the size, in bytes, of all values in a directory. + Return the size, in bytes, of all values under a prefix. This will include just values whose keys start with ``prefix`` and do not contain the character ``/`` after the given prefix. @@ -440,7 +440,7 @@ async def getsize_dir(self, prefix: str) -> int: listing all the keys in a directory and calling :meth:`Store.getsize` on each. """ - keys = [x async for x in self.list_dir(prefix)] + keys = [x async for x in self.list_prefix(prefix)] sizes = await gather(*[self.getsize(key) for key in keys]) return sum(sizes) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index e238f91eba..4d5cdd24e1 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -844,7 +844,7 @@ def nchunks_initialized(self) -> int: return nchunks_initialized(self) async def nbytes_stored(self) -> int: - return await self.store_path.store.getsize_dir(self.store_path.path) + return await self.store_path.store.getsize_prefix(self.store_path.path) def _iter_chunk_coords( self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None diff --git a/tests/test_array.py b/tests/test_array.py index 0dac0afe3a..8c088fe874 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -371,13 +371,28 @@ def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> No assert observed == expected -def test_nbytes_stored(): +def test_nbytes_stored() -> None: arr = zarr.create(shape=(100,), chunks=(10,), dtype="i4") result = arr.nbytes_stored() - assert result == 366 # the size of the metadata document. This is a fragile test + assert result == 366 # the size of the metadata document. This is a fragile test. arr[:50] = 1 result = arr.nbytes_stored() - assert result == 366 + assert result == 566 # the size with 5 chunks filled. + arr[50:] = 2 + result = arr.nbytes_stored() + assert result == 766 # the size with all chunks filled. + + +async def test_nbytes_stored_async() -> None: + arr = await zarr.api.asynchronous.create(shape=(100,), chunks=(10,), dtype="i4") + result = await arr.nbytes_stored() + assert result == 366 # the size of the metadata document. This is a fragile test. + await arr.setitem(slice(50), 1) + result = await arr.nbytes_stored() + assert result == 566 # the size with 5 chunks filled. + await arr.setitem(slice(50, 100), 2) + result = await arr.nbytes_stored() + assert result == 766 # the size with all chunks filled. def test_default_fill_values() -> None: From 87d2a9e5f274d2f5667a480861de54bee25281f0 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 24 Oct 2024 11:00:45 -0500 Subject: [PATCH 06/13] fixup --- src/zarr/abc/store.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index dd8516781e..7f087dedf7 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -421,9 +421,6 @@ async def getsize_prefix(self, prefix: str) -> int: """ Return the size, in bytes, of all values under a prefix. - This will include just values whose keys start with ``prefix`` and - do not contain the character ``/`` after the given prefix. - Parameters ---------- prefix : str @@ -434,11 +431,15 @@ async def getsize_prefix(self, prefix: str) -> int: nbytes : int The sum of the sizes of the values in the directory (in bytes). + See Also + -------- + zarr.Array.nbytes_stored + Store.getsize + Notes ----- - ``getsize_dir`` is just provided as a potentially faster alternative to - listing all the keys in a directory and calling :meth:`Store.getsize` - on each. + ``getsize_prefix`` is just provided as a potentially faster alternative to + listing all the keys under a prefix calling :meth:`Store.getsize` on each. """ keys = [x async for x in self.list_prefix(prefix)] sizes = await gather(*[self.getsize(key) for key in keys]) From 7cbc5003eb4d397a7cf0f019bad85cd1c3d0927a Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 5 Nov 2024 09:18:42 -0600 Subject: [PATCH 07/13] Maybe fixup --- src/zarr/abc/store.py | 11 +++++++++-- src/zarr/core/common.py | 16 ++++++++++++---- src/zarr/testing/store.py | 13 ++++++++++++- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index a27c71480f..cab3e8421e 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -6,6 +6,8 @@ from typing import TYPE_CHECKING, NamedTuple, Protocol, runtime_checkable from zarr.core.buffer.core import default_buffer_prototype +from zarr.core.common import concurrent_map +from zarr.core.config import config if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterable @@ -453,9 +455,14 @@ async def getsize_prefix(self, prefix: str) -> int: ----- ``getsize_prefix`` is just provided as a potentially faster alternative to listing all the keys under a prefix calling :meth:`Store.getsize` on each. + + In general, ``prefix`` should be the path of an Array or Group in the Store. + Implementations may differ on the behavior when some other ``prefix`` + is provided. """ - keys = [x async for x in self.list_prefix(prefix)] - sizes = await gather(*[self.getsize(key) for key in keys]) + keys = ((x,) async for x in self.list_prefix(prefix)) + limit = config.get("async.concurrency") + sizes = await concurrent_map(keys, self.getsize, limit=limit) return sum(sizes) diff --git a/src/zarr/core/common.py b/src/zarr/core/common.py index f3f49b0d5d..2794f04438 100644 --- a/src/zarr/core/common.py +++ b/src/zarr/core/common.py @@ -3,7 +3,7 @@ import asyncio import functools import operator -from collections.abc import Iterable, Mapping +from collections.abc import AsyncIterable, Iterable, Mapping from enum import Enum from itertools import starmap from typing import ( @@ -50,10 +50,15 @@ def product(tup: ChunkCoords) -> int: async def concurrent_map( - items: Iterable[T], func: Callable[..., Awaitable[V]], limit: int | None = None + items: Iterable[T] | AsyncIterable[T], + func: Callable[..., Awaitable[V]], + limit: int | None = None, ) -> list[V]: if limit is None: - return await asyncio.gather(*list(starmap(func, items))) + if isinstance(items, AsyncIterable): + return await asyncio.gather(*list(starmap(func, [x async for x in items]))) + else: + return await asyncio.gather(*list(starmap(func, items))) else: sem = asyncio.Semaphore(limit) @@ -62,7 +67,10 @@ async def run(item: tuple[Any]) -> V: async with sem: return await func(*item) - return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) + if isinstance(items, AsyncIterable): + return await asyncio.gather(*[asyncio.ensure_future(run(item)) async for item in items]) + else: + return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) E = TypeVar("E", bound=Enum) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index fb5e676cbd..062f38d2be 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -368,8 +368,19 @@ async def test_getsize(self, store: S) -> None: await self.set(store, key, data) result = await store.getsize(key) - assert result == 10 + assert isinstance(result, int) + assert result > 0 async def test_getsize_raises(self, store: S) -> None: with pytest.raises(FileNotFoundError): await store.getsize("not-a-real-key") + + async def test_getsize_prefix(self, store: S) -> None: + prefix = "array/c/" + for i in range(10): + data = self.buffer_cls.from_bytes(b"0" * 10) + await self.set(store, f"{prefix}/{i}", data) + + result = await store.getsize_prefix(prefix) + assert isinstance(result, int) + assert result > 0 From ade17d2d12b87cfe16c0222e676a298f6e950f82 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 8 Nov 2024 06:44:15 -0600 Subject: [PATCH 08/13] lint --- tests/test_indexing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_indexing.py b/tests/test_indexing.py index 1e4cc39e4f..83159328da 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -13,7 +13,7 @@ import zarr from zarr import Array from zarr.buffer import default_buffer_prototype -from zarr.core.buffer import BufferPrototype, default_buffer_prototype +from zarr.core.buffer import BufferPrototype from zarr.core.indexing import ( BasicSelection, CoordinateSelection, From 81c4b7e279997cbc433d859fef1e234716089df3 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 8 Nov 2024 06:50:03 -0600 Subject: [PATCH 09/13] revert buffer chnages --- docs/guide/storage.rst | 8 -------- src/zarr/abc/store.py | 5 +---- src/zarr/buffer.py | 5 ----- src/zarr/storage/logging.py | 2 +- src/zarr/storage/memory.py | 5 +---- src/zarr/storage/remote.py | 5 +---- src/zarr/storage/zip.py | 5 +---- src/zarr/testing/store.py | 9 --------- tests/test_codecs/test_blosc.py | 2 +- tests/test_codecs/test_codecs.py | 2 +- tests/test_codecs/test_sharding.py | 2 +- tests/test_group.py | 2 +- tests/test_indexing.py | 5 ++--- tests/test_metadata/test_consolidated.py | 2 +- tests/test_metadata/test_v3.py | 2 +- tests/test_store/test_logging.py | 2 +- tests/test_store/test_remote.py | 3 +-- tests/test_store/test_stateful_store.py | 3 +-- tests/test_v2.py | 2 +- 19 files changed, 17 insertions(+), 54 deletions(-) delete mode 100644 src/zarr/buffer.py diff --git a/docs/guide/storage.rst b/docs/guide/storage.rst index a2c4402216..0019f993a2 100644 --- a/docs/guide/storage.rst +++ b/docs/guide/storage.rst @@ -97,13 +97,5 @@ Zarr-Python :class:`zarr.abc.store.Store` API is meant to be extended. The Store Class includes all of the methods needed to be a fully operational store in Zarr Python. Zarr also provides a test harness for custom stores: :class:`zarr.testing.store.StoreTests`. -``Store.get`` -~~~~~~~~~~~~~ - -The ``prototype`` keyword of :func:`zarr.abc.store.Store.get` uses a default of -``None``. When given ``None``, implementations should use -:func:`zarr.buffer.default_buffer_prototype` to look up the prototype users have -configured. - .. _Zip Store Specification: https://github.com/zarr-developers/zarr-specs/pull/311 .. _Fsspec: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#consolidated-metadata diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 04bf5c50b9..5fe015f4a5 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -201,7 +201,7 @@ def __eq__(self, value: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype | None = None, + prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None, ) -> Buffer | None: """Retrieve the value associated with a given key. @@ -209,9 +209,6 @@ async def get( Parameters ---------- key : str - prototype : BufferPrototype, optional - The prototype giving the buffer classes to use for buffers and nbuffers. - By default, :func:`zarr.buffer.default_buffer_prototype` is used. byte_range : tuple[int | None, int | None], optional Returns diff --git a/src/zarr/buffer.py b/src/zarr/buffer.py deleted file mode 100644 index 0488b38524..0000000000 --- a/src/zarr/buffer.py +++ /dev/null @@ -1,5 +0,0 @@ -from zarr.core.buffer import default_buffer_prototype - -__all__ = [ - "default_buffer_prototype", -] diff --git a/src/zarr/storage/logging.py b/src/zarr/storage/logging.py index 0f038072e2..d3e55c0687 100644 --- a/src/zarr/storage/logging.py +++ b/src/zarr/storage/logging.py @@ -159,7 +159,7 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype | None = None, + prototype: BufferPrototype, byte_range: tuple[int | None, int | None] | None = None, ) -> Buffer | None: # docstring inherited diff --git a/src/zarr/storage/memory.py b/src/zarr/storage/memory.py index 38b25a792a..df33876df7 100644 --- a/src/zarr/storage/memory.py +++ b/src/zarr/storage/memory.py @@ -5,7 +5,6 @@ from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer, gpu -from zarr.core.buffer.core import default_buffer_prototype from zarr.core.common import concurrent_map from zarr.storage._utils import _normalize_interval_index @@ -84,12 +83,10 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype | None = None, + prototype: BufferPrototype, byte_range: tuple[int | None, int | None] | None = None, ) -> Buffer | None: # docstring inherited - if prototype is None: - prototype = default_buffer_prototype() if not self._is_open: await self._open() assert isinstance(key, str) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index e5cd7f233e..bd20113a22 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING, Any, Self from zarr.abc.store import ByteRangeRequest, Store -from zarr.core.buffer.core import default_buffer_prototype from zarr.storage.common import _dereference_path if TYPE_CHECKING: @@ -218,12 +217,10 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype | None = None, + prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None, ) -> Buffer | None: # docstring inherited - if prototype is None: - prototype = default_buffer_prototype() if not self._is_open: await self._open() path = _dereference_path(self.path, key) diff --git a/src/zarr/storage/zip.py b/src/zarr/storage/zip.py index 68d8081493..a45cc1672e 100644 --- a/src/zarr/storage/zip.py +++ b/src/zarr/storage/zip.py @@ -9,7 +9,6 @@ from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer, BufferPrototype -from zarr.core.buffer.core import default_buffer_prototype if TYPE_CHECKING: from collections.abc import AsyncGenerator, Iterable @@ -167,13 +166,11 @@ def _get( async def get( self, key: str, - prototype: BufferPrototype | None = None, + prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None, ) -> Buffer | None: # docstring inherited assert isinstance(key, str) - if prototype is None: - prototype = default_buffer_prototype() with self._lock: return self._get(key, prototype=prototype, byte_range=byte_range) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index 062f38d2be..bdbeb954fd 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -121,15 +121,6 @@ async def test_get( expected = data_buf[start : start + length] assert_bytes_equal(observed, expected) - async def test_get_default_prototype(self, store: S) -> None: - key = "c/0" - data = b"\x01\x02\x03\x04" - data_buf = self.buffer_cls.from_bytes(data) - await self.set(store, key, data_buf) - observed = await store.get(key) - expected = data_buf[:] - assert_bytes_equal(observed, expected) - async def test_get_many(self, store: S) -> None: """ Ensure that multiple keys can be retrieved at once with the _get_many method. diff --git a/tests/test_codecs/test_blosc.py b/tests/test_codecs/test_blosc.py index 30b0900276..416a2f784e 100644 --- a/tests/test_codecs/test_blosc.py +++ b/tests/test_codecs/test_blosc.py @@ -5,8 +5,8 @@ from zarr import AsyncArray from zarr.abc.store import Store -from zarr.buffer import default_buffer_prototype from zarr.codecs import BloscCodec, BytesCodec, ShardingCodec +from zarr.core.buffer import default_buffer_prototype from zarr.storage.common import StorePath diff --git a/tests/test_codecs/test_codecs.py b/tests/test_codecs/test_codecs.py index 42a8fbbf71..dfb8e1c595 100644 --- a/tests/test_codecs/test_codecs.py +++ b/tests/test_codecs/test_codecs.py @@ -8,13 +8,13 @@ import pytest from zarr import Array, AsyncArray, config -from zarr.buffer import default_buffer_prototype from zarr.codecs import ( BytesCodec, GzipCodec, ShardingCodec, TransposeCodec, ) +from zarr.core.buffer import default_buffer_prototype from zarr.core.indexing import Selection, morton_order_iter from zarr.storage import StorePath diff --git a/tests/test_codecs/test_sharding.py b/tests/test_codecs/test_sharding.py index db0f2c2f03..78f32fef0e 100644 --- a/tests/test_codecs/test_sharding.py +++ b/tests/test_codecs/test_sharding.py @@ -7,7 +7,6 @@ from zarr import Array, AsyncArray from zarr.abc.store import Store -from zarr.buffer import default_buffer_prototype from zarr.codecs import ( BloscCodec, BytesCodec, @@ -15,6 +14,7 @@ ShardingCodecIndexLocation, TransposeCodec, ) +from zarr.core.buffer import default_buffer_prototype from zarr.storage.common import StorePath from ..conftest import ArrayRequest diff --git a/tests/test_group.py b/tests/test_group.py index e18bf6f8b4..6bacca4889 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -14,7 +14,7 @@ import zarr.api.synchronous from zarr import Array, AsyncArray, AsyncGroup, Group from zarr.abc.store import Store -from zarr.buffer import default_buffer_prototype +from zarr.core.buffer import default_buffer_prototype from zarr.core.group import ConsolidatedMetadata, GroupMetadata from zarr.core.sync import sync from zarr.errors import ContainsArrayError, ContainsGroupError diff --git a/tests/test_indexing.py b/tests/test_indexing.py index 83159328da..cf9cbf3a1c 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -12,8 +12,8 @@ import zarr from zarr import Array -from zarr.buffer import default_buffer_prototype -from zarr.core.buffer import BufferPrototype +from zarr.core.array import Array +from zarr.core.buffer import BufferPrototype, default_buffer_prototype from zarr.core.indexing import ( BasicSelection, CoordinateSelection, @@ -33,7 +33,6 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator - from zarr.core.array import Array from zarr.core.buffer import BufferPrototype from zarr.core.buffer.core import Buffer from zarr.core.common import ChunkCoords diff --git a/tests/test_metadata/test_consolidated.py b/tests/test_metadata/test_consolidated.py index ba7a977a9a..1fe99787e4 100644 --- a/tests/test_metadata/test_consolidated.py +++ b/tests/test_metadata/test_consolidated.py @@ -16,7 +16,7 @@ open, open_consolidated, ) -from zarr.buffer import default_buffer_prototype +from zarr.core.buffer import default_buffer_prototype from zarr.core.group import ConsolidatedMetadata, GroupMetadata from zarr.core.metadata import ArrayV3Metadata from zarr.core.metadata.v2 import ArrayV2Metadata diff --git a/tests/test_metadata/test_v3.py b/tests/test_metadata/test_v3.py index 33f39ab5fe..4e4ba23313 100644 --- a/tests/test_metadata/test_v3.py +++ b/tests/test_metadata/test_v3.py @@ -7,8 +7,8 @@ import numpy as np import pytest -from zarr.buffer import default_buffer_prototype from zarr.codecs.bytes import BytesCodec +from zarr.core.buffer import default_buffer_prototype from zarr.core.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding from zarr.core.group import parse_node_type from zarr.core.metadata.v3 import ( diff --git a/tests/test_store/test_logging.py b/tests/test_store/test_logging.py index 5908694ed8..50db5c1c5b 100644 --- a/tests/test_store/test_logging.py +++ b/tests/test_store/test_logging.py @@ -6,7 +6,7 @@ import zarr import zarr.storage -from zarr.buffer import default_buffer_prototype +from zarr.core.buffer import default_buffer_prototype from zarr.storage.logging import LoggingStore if TYPE_CHECKING: diff --git a/tests/test_store/test_remote.py b/tests/test_store/test_remote.py index 30968c5a72..2ad1fd787b 100644 --- a/tests/test_store/test_remote.py +++ b/tests/test_store/test_remote.py @@ -10,8 +10,7 @@ from upath import UPath import zarr.api.asynchronous -from zarr.buffer import default_buffer_prototype -from zarr.core.buffer import Buffer, cpu +from zarr.core.buffer import Buffer, cpu, default_buffer_prototype from zarr.core.sync import _collect_aiterator, sync from zarr.storage import RemoteStore from zarr.testing.store import StoreTests diff --git a/tests/test_store/test_stateful_store.py b/tests/test_store/test_stateful_store.py index 855885890b..9ac3bbc3f6 100644 --- a/tests/test_store/test_stateful_store.py +++ b/tests/test_store/test_stateful_store.py @@ -15,8 +15,7 @@ import zarr from zarr.abc.store import AccessMode, Store -from zarr.buffer import default_buffer_prototype -from zarr.core.buffer import BufferPrototype, cpu +from zarr.core.buffer import BufferPrototype, cpu, default_buffer_prototype from zarr.storage import LocalStore, ZipStore from zarr.testing.strategies import key_ranges from zarr.testing.strategies import keys as zarr_keys diff --git a/tests/test_v2.py b/tests/test_v2.py index e846dec57e..3640fa52a8 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -9,7 +9,7 @@ from numcodecs.blosc import Blosc import zarr -import zarr.buffer +import zarr.core.buffer import zarr.storage from zarr import Array from zarr.storage import MemoryStore, StorePath From ce548e22d69f305dfb1877ec2194e5915fccab59 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 8 Nov 2024 06:52:39 -0600 Subject: [PATCH 10/13] fixup --- tests/test_v2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_v2.py b/tests/test_v2.py index 3640fa52a8..de0a9dcc58 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -91,7 +91,7 @@ async def test_v2_encode_decode(dtype): fill_value=b"X", ) - result = await store.get("foo/.zarray", zarr.buffer.default_buffer_prototype()) + result = await store.get("foo/.zarray", zarr.core.buffer.default_buffer_prototype()) assert result is not None serialized = json.loads(result.to_bytes()) From 4350e537672d036b57bcae215f1db52fb0113910 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 8 Nov 2024 09:07:13 -0600 Subject: [PATCH 11/13] fixup --- tests/test_indexing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_indexing.py b/tests/test_indexing.py index cf9cbf3a1c..e573feade0 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -12,7 +12,6 @@ import zarr from zarr import Array -from zarr.core.array import Array from zarr.core.buffer import BufferPrototype, default_buffer_prototype from zarr.core.indexing import ( BasicSelection, From 5f1d0360d3b985c1cb6f0678fbdaa4e4991f0742 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 8 Nov 2024 09:58:56 -0600 Subject: [PATCH 12/13] Remove AsyncIterable support --- src/zarr/abc/store.py | 7 ++++++- src/zarr/core/common.py | 14 ++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 5fe015f4a5..721247b5bd 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -454,7 +454,12 @@ async def getsize_prefix(self, prefix: str) -> int: Implementations may differ on the behavior when some other ``prefix`` is provided. """ - keys = ((x,) async for x in self.list_prefix(prefix)) + # TODO: Overlap listing keys with getsize calls. + # Currently, we load the list of keys into memory and only then move + # on to getting sizes. Ideally we would overlap those two, which should + # improve tail latency and might reduce memory pressure (since not all keys + # would be in memory at once). + keys = [(x,) async for x in self.list_prefix(prefix)] limit = config.get("async.concurrency") sizes = await concurrent_map(keys, self.getsize, limit=limit) return sum(sizes) diff --git a/src/zarr/core/common.py b/src/zarr/core/common.py index 2794f04438..e76ddd030d 100644 --- a/src/zarr/core/common.py +++ b/src/zarr/core/common.py @@ -3,7 +3,7 @@ import asyncio import functools import operator -from collections.abc import AsyncIterable, Iterable, Mapping +from collections.abc import Iterable, Mapping from enum import Enum from itertools import starmap from typing import ( @@ -50,15 +50,12 @@ def product(tup: ChunkCoords) -> int: async def concurrent_map( - items: Iterable[T] | AsyncIterable[T], + items: Iterable[T], func: Callable[..., Awaitable[V]], limit: int | None = None, ) -> list[V]: if limit is None: - if isinstance(items, AsyncIterable): - return await asyncio.gather(*list(starmap(func, [x async for x in items]))) - else: - return await asyncio.gather(*list(starmap(func, items))) + return await asyncio.gather(*list(starmap(func, items))) else: sem = asyncio.Semaphore(limit) @@ -67,10 +64,7 @@ async def run(item: tuple[Any]) -> V: async with sem: return await func(*item) - if isinstance(items, AsyncIterable): - return await asyncio.gather(*[asyncio.ensure_future(run(item)) async for item in items]) - else: - return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) + return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) E = TypeVar("E", bound=Enum) From 783cfe3e798ed870976959f92355d5517ccabfe6 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 14 Nov 2024 09:45:04 -0600 Subject: [PATCH 13/13] fixup --- src/zarr/storage/local.py | 1 + src/zarr/storage/logging.py | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/src/zarr/storage/local.py b/src/zarr/storage/local.py index 3dee152145..f9b1747c31 100644 --- a/src/zarr/storage/local.py +++ b/src/zarr/storage/local.py @@ -2,6 +2,7 @@ import asyncio import io +import os import shutil from pathlib import Path from typing import TYPE_CHECKING diff --git a/src/zarr/storage/logging.py b/src/zarr/storage/logging.py index 3b11ddbba7..bc90b4f30f 100644 --- a/src/zarr/storage/logging.py +++ b/src/zarr/storage/logging.py @@ -225,3 +225,11 @@ async def delete_dir(self, prefix: str) -> None: # docstring inherited with self.log(prefix): await self._store.delete_dir(prefix=prefix) + + async def getsize(self, key: str) -> int: + with self.log(key): + return await self._store.getsize(key) + + async def getsize_prefix(self, prefix: str) -> int: + with self.log(prefix): + return await self._store.getsize_prefix(prefix)