diff --git a/.github/workflows/minimal.yml b/.github/workflows/minimal.yml index 2cde38e081..4de5aca273 100644 --- a/.github/workflows/minimal.yml +++ b/.github/workflows/minimal.yml @@ -24,6 +24,7 @@ jobs: shell: "bash -l {0}" env: ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 run: | conda activate minimal python -m pip install . @@ -32,6 +33,7 @@ jobs: shell: "bash -l {0}" env: ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 run: | conda activate minimal rm -rf fixture/ diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 872ce52343..cee2ca7aef 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -70,6 +70,7 @@ jobs: ZARR_TEST_MONGO: 1 ZARR_TEST_REDIS: 1 ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 run: | conda activate zarr-env mkdir ~/blob_emulator diff --git a/.github/workflows/windows-testing.yml b/.github/workflows/windows-testing.yml index ea1d0f64c9..2f8922b447 100644 --- a/.github/workflows/windows-testing.yml +++ b/.github/workflows/windows-testing.yml @@ -52,6 +52,7 @@ jobs: env: ZARR_TEST_ABS: 1 ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 - name: Conda info shell: bash -l {0} run: conda info diff --git a/docs/release.rst b/docs/release.rst index f633aea7cc..7c01590b90 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -18,7 +18,9 @@ Unreleased Add two features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html): * storage transformers * `get_partial_values` and `set_partial_values` - By :user:`Jonathan Striebel `; :issue:`1096`. + * efficient `get_partial_values` implementation for `FSStoreV3` + * sharding storage transformer + By :user:`Jonathan Striebel `; :issue:`1096`, :issue:`1111`. .. _release_2.13.6: diff --git a/zarr/_storage/v3.py b/zarr/_storage/v3.py index a0a1870ffc..5f8964fb5d 100644 --- a/zarr/_storage/v3.py +++ b/zarr/_storage/v3.py @@ -182,6 +182,35 @@ def rmdir(self, path=None): if self.fs.isdir(store_path): self.fs.rm(store_path, recursive=True) + @property + def supports_efficient_get_partial_values(self): + return True + + def get_partial_values(self, key_ranges): + """Get multiple partial values. + key_ranges can be an iterable of key, range pairs, + where a range specifies two integers range_start and range_length + as a tuple, (range_start, range_length). + range_length may be None to indicate to read until the end. + range_start may be negative to start reading range_start bytes + from the end of the file. + A key may occur multiple times with different ranges. + Inserts None for missing keys into the returned list.""" + results = [] + for key, (range_start, range_length) in key_ranges: + key = self._normalize_key(key) + path = self.dir_path(key) + try: + if range_start is None or range_length is None: + end = None + else: + end = range_start + range_length + result = self.fs.cat_file(path, start=range_start, end=end) + except self.map.missing_exceptions: + result = None + results.append(result) + return results + class MemoryStoreV3(MemoryStore, StoreV3): diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py new file mode 100644 index 0000000000..3675d42c38 --- /dev/null +++ b/zarr/_storage/v3_storage_transformers.py @@ -0,0 +1,383 @@ +import functools +import itertools +import os +from typing import NamedTuple, Tuple, Optional, Union, Iterator + +from numcodecs.compat import ensure_bytes +import numpy as np + +from zarr._storage.store import StorageTransformer, StoreV3, _rmdir_from_keys_v3 +from zarr.util import normalize_storage_path + + +MAX_UINT_64 = 2 ** 64 - 1 + + +v3_sharding_available = os.environ.get('ZARR_V3_SHARDING', '0').lower() not in ['0', 'false'] + + +def assert_zarr_v3_sharding_available(): + if not v3_sharding_available: + raise NotImplementedError( + "Using V3 sharding is experimental and not yet finalized! To enable support, set:\n" + "ZARR_V3_SHARDING=1" + ) # pragma: no cover + + +class _ShardIndex(NamedTuple): + store: "ShardingStorageTransformer" + # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) + offsets_and_lengths: np.ndarray + + def __localize_chunk__(self, chunk: Tuple[int, ...]) -> Tuple[int, ...]: + return tuple( + chunk_i % shard_i + for chunk_i, shard_i in zip(chunk, self.store.chunks_per_shard) + ) + + def is_all_empty(self) -> bool: + return np.array_equiv(self.offsets_and_lengths, MAX_UINT_64) + + def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: + localized_chunk = self.__localize_chunk__(chunk) + chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] + if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): + return None + else: + return slice(int(chunk_start), int(chunk_start + chunk_len)) + + def set_chunk_slice( + self, chunk: Tuple[int, ...], chunk_slice: Optional[slice] + ) -> None: + localized_chunk = self.__localize_chunk__(chunk) + if chunk_slice is None: + self.offsets_and_lengths[localized_chunk] = (MAX_UINT_64, MAX_UINT_64) + else: + self.offsets_and_lengths[localized_chunk] = ( + chunk_slice.start, + chunk_slice.stop - chunk_slice.start, + ) + + def to_bytes(self) -> bytes: + return self.offsets_and_lengths.tobytes(order="C") + + @classmethod + def from_bytes( + cls, buffer: Union[bytes, bytearray], store: "ShardingStorageTransformer" + ) -> "_ShardIndex": + try: + return cls( + store=store, + offsets_and_lengths=np.frombuffer(bytearray(buffer), dtype=" None: + assert_zarr_v3_sharding_available() + super().__init__(_type) + if isinstance(chunks_per_shard, int): + chunks_per_shard = (chunks_per_shard, ) + else: + chunks_per_shard = tuple(int(i) for i in chunks_per_shard) + if chunks_per_shard == (): + chunks_per_shard = (1, ) + self.chunks_per_shard = chunks_per_shard + self._num_chunks_per_shard = functools.reduce( + lambda x, y: x * y, chunks_per_shard, 1 + ) + self._dimension_separator = None + self._data_key_prefix = None + + def _copy_for_array(self, array, inner_store): + transformer_copy = super()._copy_for_array(array, inner_store) + transformer_copy._dimension_separator = array._dimension_separator + transformer_copy._data_key_prefix = array._data_key_prefix + if len(array._shape) > len(self.chunks_per_shard): + # The array shape might be longer when initialized with subdtypes. + # subdtypes dimensions come last, therefore padding chunks_per_shard + # with ones, effectively disabling sharding on the unlisted dimensions. + transformer_copy.chunks_per_shard += ( + (1, ) * (len(array._shape) - len(self.chunks_per_shard)) + ) + return transformer_copy + + @property + def dimension_separator(self) -> str: + assert self._dimension_separator is not None, ( + "dimension_separator is not initialized, first get a copy via _copy_for_array." + ) + return self._dimension_separator + + def _is_data_key(self, key: str) -> bool: + assert self._data_key_prefix is not None, ( + "data_key_prefix is not initialized, first get a copy via _copy_for_array." + ) + return key.startswith(self._data_key_prefix) + + def _key_to_shard(self, chunk_key: str) -> Tuple[str, Tuple[int, ...]]: + prefix, _, chunk_string = chunk_key.rpartition("c") + chunk_subkeys = tuple( + map(int, chunk_string.split(self.dimension_separator)) + ) if chunk_string else (0, ) + shard_key_tuple = ( + subkey // shard_i + for subkey, shard_i in zip(chunk_subkeys, self.chunks_per_shard) + ) + shard_key = ( + prefix + "c" + self.dimension_separator.join(map(str, shard_key_tuple)) + ) + return shard_key, chunk_subkeys + + def _get_index_from_store(self, shard_key: str) -> _ShardIndex: + # At the end of each shard 2*64bit per chunk for offset and length define the index: + index_bytes = self.inner_store.get_partial_values( + [(shard_key, (-16 * self._num_chunks_per_shard, None))] + )[0] + if index_bytes is None: + raise KeyError(shard_key) + return _ShardIndex.from_bytes( + index_bytes, + self, + ) + + def _get_index_from_buffer(self, buffer: Union[bytes, bytearray]) -> _ShardIndex: + # At the end of each shard 2*64bit per chunk for offset and length define the index: + return _ShardIndex.from_bytes(buffer[-16 * self._num_chunks_per_shard:], self) + + def _get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: + _, _, chunk_string = shard_key.rpartition("c") + shard_key_tuple = tuple( + map(int, chunk_string.split(self.dimension_separator)) + ) if chunk_string else (0, ) + for chunk_offset in itertools.product( + *(range(i) for i in self.chunks_per_shard) + ): + yield tuple( + shard_key_i * shards_i + offset_i + for shard_key_i, offset_i, shards_i in zip( + shard_key_tuple, chunk_offset, self.chunks_per_shard + ) + ) + + def __getitem__(self, key): + if self._is_data_key(key): + if self.supports_efficient_get_partial_values: + # Use the partial implementation, which fetches the index separately + value = self.get_partial_values([(key, (0, None))])[0] + if value is None: + raise KeyError(key) + else: + return value + shard_key, chunk_subkey = self._key_to_shard(key) + try: + full_shard_value = self.inner_store[shard_key] + except KeyError: + raise KeyError(key) + index = self._get_index_from_buffer(full_shard_value) + chunk_slice = index.get_chunk_slice(chunk_subkey) + if chunk_slice is not None: + return full_shard_value[chunk_slice] + else: + raise KeyError(key) + else: + return self.inner_store.__getitem__(key) + + def __setitem__(self, key, value): + value = ensure_bytes(value) + if self._is_data_key(key): + shard_key, chunk_subkey = self._key_to_shard(key) + chunks_to_read = set(self._get_chunks_in_shard(shard_key)) + chunks_to_read.remove(chunk_subkey) + new_content = {chunk_subkey: value} + try: + if self.supports_efficient_get_partial_values: + index = self._get_index_from_store(shard_key) + full_shard_value = None + else: + full_shard_value = self.inner_store[shard_key] + index = self._get_index_from_buffer(full_shard_value) + except KeyError: + index = _ShardIndex.create_empty(self) + else: + chunk_slices = [ + (chunk_to_read, index.get_chunk_slice(chunk_to_read)) + for chunk_to_read in chunks_to_read + ] + valid_chunk_slices = [ + (chunk_to_read, chunk_slice) + for chunk_to_read, chunk_slice in chunk_slices + if chunk_slice is not None + ] + # use get_partial_values if less than half of the available chunks must be read: + # (This can be changed when set_partial_values can be used efficiently.) + use_partial_get = ( + self.supports_efficient_get_partial_values + and len(valid_chunk_slices) < len(chunk_slices) / 2 + ) + + if use_partial_get: + chunk_values = self.inner_store.get_partial_values( + [ + ( + shard_key, + ( + chunk_slice.start, + chunk_slice.stop - chunk_slice.start, + ), + ) + for _, chunk_slice in valid_chunk_slices + ] + ) + for chunk_value, (chunk_to_read, _) in zip( + chunk_values, valid_chunk_slices + ): + new_content[chunk_to_read] = chunk_value + else: + if full_shard_value is None: + full_shard_value = self.inner_store[shard_key] + for chunk_to_read, chunk_slice in valid_chunk_slices: + if chunk_slice is not None: + new_content[chunk_to_read] = full_shard_value[chunk_slice] + + shard_content = b"" + for chunk_subkey, chunk_content in new_content.items(): + chunk_slice = slice( + len(shard_content), len(shard_content) + len(chunk_content) + ) + index.set_chunk_slice(chunk_subkey, chunk_slice) + shard_content += chunk_content + # Appending the index at the end of the shard: + shard_content += index.to_bytes() + self.inner_store[shard_key] = shard_content + else: # pragma: no cover + self.inner_store[key] = value + + def __delitem__(self, key): + if self._is_data_key(key): + shard_key, chunk_subkey = self._key_to_shard(key) + try: + index = self._get_index_from_store(shard_key) + except KeyError: + raise KeyError(key) + + index.set_chunk_slice(chunk_subkey, None) + + if index.is_all_empty(): + del self.inner_store[shard_key] + else: + index_bytes = index.to_bytes() + self.inner_store.set_partial_values([(shard_key, -len(index_bytes), index_bytes)]) + else: # pragma: no cover + del self.inner_store[key] + + def _shard_key_to_original_keys(self, key: str) -> Iterator[str]: + if self._is_data_key(key): + index = self._get_index_from_store(key) + prefix, _, _ = key.rpartition("c") + for chunk_tuple in self._get_chunks_in_shard(key): + if index.get_chunk_slice(chunk_tuple) is not None: + yield prefix + "c" + self.dimension_separator.join( + map(str, chunk_tuple) + ) + else: + yield key + + def __iter__(self) -> Iterator[str]: + for key in self.inner_store: + yield from self._shard_key_to_original_keys(key) + + def __len__(self): + return sum(1 for _ in self.keys()) + + def get_partial_values(self, key_ranges): + if self.supports_efficient_get_partial_values: + transformed_key_ranges = [] + cached_indices = {} + none_indices = [] + for i, (key, range_) in enumerate(key_ranges): + if self._is_data_key(key): + shard_key, chunk_subkey = self._key_to_shard(key) + try: + index = cached_indices[shard_key] + except KeyError: + try: + index = self._get_index_from_store(shard_key) + except KeyError: + none_indices.append(i) + continue + cached_indices[shard_key] = index + chunk_slice = index.get_chunk_slice(chunk_subkey) + if chunk_slice is None: + none_indices.append(i) + continue + range_start, range_length = range_ + if range_length is None: + range_length = chunk_slice.stop - chunk_slice.start + transformed_key_ranges.append( + (shard_key, (range_start + chunk_slice.start, range_length)) + ) + else: # pragma: no cover + transformed_key_ranges.append((key, range_)) + values = self.inner_store.get_partial_values(transformed_key_ranges) + for i in none_indices: + values.insert(i, None) + return values + else: + return StoreV3.get_partial_values(self, key_ranges) + + def supports_efficient_set_partial_values(self): + return False + + def set_partial_values(self, key_start_values): + # This does not yet implement efficient set_partial_values + StoreV3.set_partial_values(self, key_start_values) + + def rename(self, src_path: str, dst_path: str) -> None: + StoreV3.rename(self, src_path, dst_path) # type: ignore[arg-type] + + def list_prefix(self, prefix): + return StoreV3.list_prefix(self, prefix) + + def erase_prefix(self, prefix): + if self._is_data_key(prefix): + StoreV3.erase_prefix(self, prefix) + else: + self.inner_store.erase_prefix(prefix) + + def rmdir(self, path=None): + path = normalize_storage_path(path) + _rmdir_from_keys_v3(self, path) # type: ignore + + def __contains__(self, key): + if self._is_data_key(key): + shard_key, chunk_subkeys = self._key_to_shard(key) + try: + index = self._get_index_from_store(shard_key) + except KeyError: + return False + chunk_slice = index.get_chunk_slice(chunk_subkeys) + return chunk_slice is not None + else: + return self._inner_store.__contains__(key) diff --git a/zarr/core.py b/zarr/core.py index 5d37570831..b9db6cb2c8 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -51,7 +51,8 @@ normalize_shape, normalize_storage_path, PartialReadBuffer, - ensure_ndarray_like + UncompressedPartialReadBufferV3, + ensure_ndarray_like, ) @@ -1271,8 +1272,12 @@ def _get_selection(self, indexer, out=None, fields=None): check_array_shape('out', out, out_shape) # iterate over chunks - if not hasattr(self.chunk_store, "getitems") or \ - any(map(lambda x: x == 0, self.shape)): + if ( + not hasattr(self.chunk_store, "getitems") and not ( + hasattr(self.chunk_store, "get_partial_values") and + self.chunk_store.supports_efficient_get_partial_values + ) + ) or any(map(lambda x: x == 0, self.shape)): # sequentially get one key at a time from storage for chunk_coords, chunk_selection, out_selection in indexer: @@ -1898,6 +1903,8 @@ def _process_chunk( cdata = cdata.read_full() self._compressor.decode(cdata, dest) else: + if isinstance(cdata, UncompressedPartialReadBufferV3): + cdata = cdata.read_full() chunk = ensure_ndarray_like(cdata).view(self._dtype) chunk = chunk.reshape(self._chunks, order=self._order) np.copyto(dest, chunk) @@ -1919,13 +1926,21 @@ def _process_chunk( else dim for i, dim in enumerate(self.chunks) ] - cdata.read_part(start, nitems) - chunk_partial = self._decode_chunk( - cdata.buff, - start=start, - nitems=nitems, - expected_shape=expected_shape, - ) + if isinstance(cdata, UncompressedPartialReadBufferV3): + chunk_partial = self._decode_chunk( + cdata.read_part(start, nitems), + start=start, + nitems=nitems, + expected_shape=expected_shape, + ) + else: + cdata.read_part(start, nitems) + chunk_partial = self._decode_chunk( + cdata.buff, + start=start, + nitems=nitems, + expected_shape=expected_shape, + ) tmp[partial_out_selection] = chunk_partial out[out_selection] = tmp[chunk_selection] return @@ -2020,9 +2035,29 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, for ckey in ckeys if ckey in self.chunk_store } + elif ( + self._partial_decompress + and not self._compressor + and not fields + and self.dtype != object + and hasattr(self.chunk_store, "get_partial_values") + and self.chunk_store.supports_efficient_get_partial_values + ): + partial_read_decode = True + cdatas = { + ckey: UncompressedPartialReadBufferV3( + ckey, self.chunk_store, itemsize=self.itemsize + ) + for ckey in ckeys + if ckey in self.chunk_store + } else: partial_read_decode = False - cdatas = self.chunk_store.getitems(ckeys, on_error="omit") + if not hasattr(self.chunk_store, "getitems"): + values = self.chunk_store.get_partial_values([(ckey, (0, None)) for ckey in ckeys]) + cdatas = {key: value for key, value in zip(ckeys, values) if value is not None} + else: + cdatas = self.chunk_store.getitems(ckeys, on_error="omit") for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection): if ckey in cdatas: self._process_chunk( diff --git a/zarr/meta.py b/zarr/meta.py index 41a90101b5..b493e833f0 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -477,9 +477,10 @@ def _encode_storage_transformer_metadata( @classmethod def _decode_storage_transformer_metadata(cls, meta: Mapping) -> "StorageTransformer": from zarr.tests.test_storage_v3 import DummyStorageTransfomer + from zarr._storage.v3_storage_transformers import ShardingStorageTransformer # This might be changed to a proper registry in the future - KNOWN_STORAGE_TRANSFORMERS = [DummyStorageTransfomer] + KNOWN_STORAGE_TRANSFORMERS = [DummyStorageTransfomer, ShardingStorageTransformer] conf = meta.get('configuration', {}) extension_uri = meta['extension'] diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index ffacefb937..397ebad513 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -20,6 +20,7 @@ from zarr._storage.store import ( v3_api_available, ) +from .._storage.v3_storage_transformers import ShardingStorageTransformer, v3_sharding_available from zarr.core import Array from zarr.errors import ArrayNotFoundError, ContainsGroupError from zarr.meta import json_loads @@ -829,7 +830,6 @@ def test_pickle(self): attrs_cache = z.attrs.cache a = np.random.randint(0, 1000, 1000) z[:] = a - # round trip through pickle dump = pickle.dumps(z) # some stores cannot be opened twice at the same time, need to close @@ -3298,6 +3298,60 @@ def expected(self): ] +@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +@pytest.mark.skipif(not v3_sharding_available, reason="sharding is disabled") +class TestArrayWithFSStoreV3PartialReadUncompressedSharded( + TestArrayWithPathV3, TestArrayWithFSStorePartialRead +): + + @staticmethod + def create_array(array_path='arr1', read_only=False, **kwargs): + path = mkdtemp() + atexit.register(shutil.rmtree, path) + store = FSStoreV3(path) + cache_metadata = kwargs.pop("cache_metadata", True) + cache_attrs = kwargs.pop("cache_attrs", True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + kwargs.setdefault('compressor', None) + num_dims = 1 if isinstance(kwargs["shape"], int) else len(kwargs["shape"]) + sharding_transformer = ShardingStorageTransformer( + "indexed", chunks_per_shard=(2, ) * num_dims + ) + init_array(store, path=array_path, storage_transformers=[sharding_transformer], **kwargs) + return Array( + store, + path=array_path, + read_only=read_only, + cache_metadata=cache_metadata, + cache_attrs=cache_attrs, + partial_decompress=True, + write_empty_chunks=write_empty_chunks, + ) + + def test_nbytes_stored(self): + z = self.create_array(shape=1000, chunks=100) + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + z[:] = 42 + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + + def test_supports_efficient_get_set_partial_values(self): + z = self.create_array(shape=100, chunks=10) + assert z.chunk_store.supports_efficient_get_partial_values + assert not z.chunk_store.supports_efficient_set_partial_values() + + def expected(self): + return [ + "90109fc2a4e17efbcb447003ea1c08828b91f71e", + "2b73519f7260dba3ddce0d2b70041888856fec6b", + "bca5798be2ed71d444f3045b05432d937682b7dd", + "9ff1084501e28520e577662a6e3073f1116c76a2", + "882a97cad42417f90f111d0cb916a21579650467", + ] + + @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") @pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") class TestArrayWithFSStoreV3Nested(TestArrayWithPathV3, TestArrayWithFSStoreNested): @@ -3391,6 +3445,63 @@ def expected(self): ] +@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +@pytest.mark.skipif(not v3_sharding_available, reason="sharding is disabled") +class TestArrayWithShardingStorageTransformerV3(TestArrayWithPathV3): + + @staticmethod + def create_array(array_path='arr1', read_only=False, **kwargs): + store = KVStoreV3(dict()) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + kwargs.setdefault('compressor', None) + num_dims = 1 if isinstance(kwargs["shape"], int) else len(kwargs["shape"]) + sharding_transformer = ShardingStorageTransformer( + "indexed", chunks_per_shard=(2, ) * num_dims + ) + init_array(store, path=array_path, storage_transformers=[sharding_transformer], **kwargs) + return Array(store, path=array_path, read_only=read_only, + cache_metadata=cache_metadata, + cache_attrs=cache_attrs, write_empty_chunks=write_empty_chunks) + + def test_nbytes_stored(self): + z = self.create_array(shape=1000, chunks=100) + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + z[:] = 42 + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + + # mess with store + z.store[data_root + z._key_prefix + 'foo'] = list(range(10)) + assert -1 == z.nbytes_stored + + def test_keys_inner_store(self): + z = self.create_array(shape=1000, chunks=100) + assert z.chunk_store.keys() == z._store.keys() + meta_keys = set(z.store.keys()) + z[:] = 42 + assert len(z.chunk_store.keys() - meta_keys) == 10 + # inner store should have half the data keys, + # since chunks_per_shard is 2: + assert len(z._store.keys() - meta_keys) == 5 + + def test_supports_efficient_get_set_partial_values(self): + z = self.create_array(shape=100, chunks=10) + assert not z.chunk_store.supports_efficient_get_partial_values + assert not z.chunk_store.supports_efficient_set_partial_values() + + def expected(self): + return [ + '90109fc2a4e17efbcb447003ea1c08828b91f71e', + '2b73519f7260dba3ddce0d2b70041888856fec6b', + 'bca5798be2ed71d444f3045b05432d937682b7dd', + '9ff1084501e28520e577662a6e3073f1116c76a2', + '882a97cad42417f90f111d0cb916a21579650467', + ] + + @pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") def test_array_mismatched_store_versions(): store_v3 = KVStoreV3(dict()) diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index 9f18c89361..cc031f0db4 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -10,6 +10,8 @@ import zarr from zarr._storage.store import _get_hierarchy_metadata, v3_api_available, StorageTransformer +from zarr._storage.v3_storage_transformers import ShardingStorageTransformer, v3_sharding_available +from zarr.core import Array from zarr.meta import _default_entry_point_metadata_v3 from zarr.storage import (atexit_rmglob, atexit_rmtree, data_root, default_compressor, getsize, init_array, meta_root, @@ -523,26 +525,38 @@ def create_store(self, **kwargs): return store +@pytest.mark.skipif(not v3_sharding_available, reason="sharding is disabled") class TestStorageTransformerV3(TestMappingStoreV3): def create_store(self, **kwargs): inner_store = super().create_store(**kwargs) - storage_transformer = DummyStorageTransfomer( + dummy_transformer = DummyStorageTransfomer( "dummy_type", test_value=DummyStorageTransfomer.TEST_CONSTANT ) - return storage_transformer._copy_for_array(None, inner_store) + sharding_transformer = ShardingStorageTransformer( + "indexed", chunks_per_shard=2, + ) + path = 'bla' + init_array(inner_store, path=path, shape=1000, chunks=100, + dimension_separator=".", + storage_transformers=[dummy_transformer, sharding_transformer]) + store = Array(store=inner_store, path=path).chunk_store + store.erase_prefix("data/root/bla/") + store.clear() + return store def test_method_forwarding(self): store = self.create_store() - assert store.list() == store.inner_store.list() - assert store.list_dir(data_root) == store.inner_store.list_dir(data_root) + inner_store = store.inner_store.inner_store + assert store.list() == inner_store.list() + assert store.list_dir(data_root) == inner_store.list_dir(data_root) assert store.is_readable() assert store.is_writeable() assert store.is_listable() - store.inner_store._readable = False - store.inner_store._writeable = False - store.inner_store._listable = False + inner_store._readable = False + inner_store._writeable = False + inner_store._listable = False assert not store.is_readable() assert not store.is_writeable() assert not store.is_listable() diff --git a/zarr/util.py b/zarr/util.py index 9fcdac9df7..5976b36d8d 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -639,6 +639,25 @@ def read_full(self): return self.chunk_store[self.store_key] +class UncompressedPartialReadBufferV3: + def __init__(self, store_key, chunk_store, itemsize): + assert chunk_store.supports_efficient_get_partial_values + self.chunk_store = chunk_store + self.store_key = store_key + self.itemsize = itemsize + + def prepare_chunk(self): + pass + + def read_part(self, start, nitems): + return self.chunk_store.get_partial_values( + [(self.store_key, (start * self.itemsize, nitems * self.itemsize))] + )[0] + + def read_full(self): + return self.chunk_store[self.store_key] + + def retry_call(callabl: Callable, args=None, kwargs=None,