diff --git a/pyproject.toml b/pyproject.toml index fca263db9a..3014f98031 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -207,10 +207,6 @@ check_untyped_defs = false module = [ "zarr.v2.*", "zarr.abc.codec", - "zarr.codecs.bytes", - "zarr.codecs.pipeline", - "zarr.codecs.sharding", - "zarr.codecs.transpose", "zarr.array_v2", ] disallow_any_generics = false diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index 65daae8f6d..00c01560f4 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -1,10 +1,7 @@ from __future__ import annotations -from typing import Union - import zarr.codecs # noqa: F401 from zarr.array import Array, AsyncArray -from zarr.array_v2 import ArrayV2 from zarr.config import config # noqa: F401 from zarr.group import AsyncGroup, Group from zarr.store import ( @@ -18,9 +15,7 @@ assert not __version__.startswith("0.0.0") -async def open_auto_async( - store: StoreLike, -) -> Union[AsyncArray, AsyncGroup]: +async def open_auto_async(store: StoreLike) -> AsyncArray | AsyncGroup: store_path = make_store_path(store) try: return await AsyncArray.open(store_path) @@ -28,9 +23,7 @@ async def open_auto_async( return await AsyncGroup.open(store_path) -def open_auto( - store: StoreLike, -) -> Union[Array, ArrayV2, Group]: +def open_auto(store: StoreLike) -> Array | Group: object = _sync( open_auto_async(store), ) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index a91bd63c3b..1c665590bf 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -1,110 +1,348 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Generic, Iterable, TypeVar from zarr.abc.metadata import Metadata - +from zarr.abc.store import ByteGetter, ByteSetter from zarr.buffer import Buffer, NDBuffer -from zarr.common import ArraySpec -from zarr.store import StorePath if TYPE_CHECKING: from typing_extensions import Self - from zarr.common import SliceSelection + from zarr.common import ArraySpec, SliceSelection from zarr.metadata import ArrayMetadata -class Codec(Metadata): +CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer) +CodecOutput = TypeVar("CodecOutput", bound=NDBuffer | Buffer) + + +class _Codec(Generic[CodecInput, CodecOutput], Metadata): + """Generic base class for codecs. + Please use ArrayArrayCodec, ArrayBytesCodec or BytesBytesCodec for subclassing. + + Codecs can be registered via zarr.codecs.registry. + """ + is_fixed_size: bool @abstractmethod def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int: - pass + """Given an input byte length, this method returns the output byte length. + Raises a NotImplementedError for codecs with variable-sized outputs (e.g. compressors). + + Parameters + ---------- + input_byte_length : int + chunk_spec : ArraySpec + + Returns + ------- + int + """ + ... def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + """Computed the spec of the chunk after it has been encoded by the codec. + This is important for codecs that change the shape, data type or fill value of a chunk. + The spec will then be used for subsequent codecs in the pipeline. + + Parameters + ---------- + chunk_spec : ArraySpec + + Returns + ------- + ArraySpec + """ return chunk_spec def evolve(self, array_spec: ArraySpec) -> Self: + """Fills in codec configuration parameters that can be automatically + inferred from the array metadata. + + Parameters + ---------- + chunk_spec : ArraySpec + + Returns + ------- + Self + """ return self def validate(self, array_metadata: ArrayMetadata) -> None: - pass + """Validates that the codec configuration is compatible with the array metadata. + Raises errors when the codec configuration is not compatible. + Parameters + ---------- + array_metadata : ArrayMetadata + """ + ... -class ArrayArrayCodec(Codec): @abstractmethod async def decode( self, - chunk_array: NDBuffer, - chunk_spec: ArraySpec, - ) -> NDBuffer: - pass + chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]], + ) -> Iterable[CodecInput | None]: + """Decodes a batch of chunks. + Chunks can be None in which case they are ignored by the codec. + + Parameters + ---------- + chunks_and_specs : Iterable[tuple[CodecOutput | None, ArraySpec]] + Ordered set of encoded chunks with their accompanying chunk spec. + + Returns + ------- + Iterable[CodecInput | None] + """ + ... @abstractmethod async def encode( self, - chunk_array: NDBuffer, - chunk_spec: ArraySpec, - ) -> Optional[NDBuffer]: - pass + chunks_and_specs: Iterable[tuple[CodecInput | None, ArraySpec]], + ) -> Iterable[CodecOutput | None]: + """Encodes a batch of chunks. + Chunks can be None in which case they are ignored by the codec. + Parameters + ---------- + chunks_and_specs : Iterable[tuple[CodecInput | None, ArraySpec]] + Ordered set of to-be-encoded chunks with their accompanying chunk spec. -class ArrayBytesCodec(Codec): - @abstractmethod - async def decode( - self, - chunk_array: Buffer, - chunk_spec: ArraySpec, - ) -> NDBuffer: - pass + Returns + ------- + Iterable[CodecOutput | None] + """ + ... - @abstractmethod - async def encode( - self, - chunk_array: NDBuffer, - chunk_spec: ArraySpec, - ) -> Optional[Buffer]: - pass + +class ArrayArrayCodec(_Codec[NDBuffer, NDBuffer]): + """Base class for array-to-array codecs.""" + + ... + + +class ArrayBytesCodec(_Codec[NDBuffer, Buffer]): + """Base class for array-to-bytes codecs.""" + + ... + + +class BytesBytesCodec(_Codec[Buffer, Buffer]): + """Base class for bytes-to-bytes codecs.""" + + ... + + +Codec = ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec class ArrayBytesCodecPartialDecodeMixin: + """Mixin for array-to-bytes codecs that implement partial decoding.""" + @abstractmethod async def decode_partial( self, - store_path: StorePath, - selection: SliceSelection, - chunk_spec: ArraySpec, - ) -> Optional[NDBuffer]: - pass + batch_info: Iterable[tuple[ByteGetter, SliceSelection, ArraySpec]], + ) -> Iterable[NDBuffer | None]: + """Partially decodes a batch of chunks. + This method determines parts of a chunk from the slice selection, + fetches these parts from the store (via ByteGetter) and decodes them. + + Parameters + ---------- + batch_info : Iterable[tuple[ByteGetter, SliceSelection, ArraySpec]] + Ordered set of information about slices of encoded chunks. + The slice selection determines which parts of the chunk will be fetched. + The ByteGetter is used to fetch the necessary bytes. + The chunk spec contains information about the construction of an array from the bytes. + + Returns + ------- + Iterable[NDBuffer | None] + """ + ... class ArrayBytesCodecPartialEncodeMixin: + """Mixin for array-to-bytes codecs that implement partial encoding.""" + @abstractmethod async def encode_partial( self, - store_path: StorePath, - chunk_array: NDBuffer, - selection: SliceSelection, - chunk_spec: ArraySpec, + batch_info: Iterable[tuple[ByteSetter, NDBuffer, SliceSelection, ArraySpec]], ) -> None: - pass + """Partially encodes a batch of chunks. + This method determines parts of a chunk from the slice selection, encodes them and + writes these parts to the store (via ByteSetter). + If merging with existing chunk data in the store is necessary, this method will + read from the store first and perform the merge. + + Parameters + ---------- + batch_info : Iterable[tuple[ByteSetter, NDBuffer, SliceSelection, ArraySpec]] + Ordered set of information about slices of to-be-encoded chunks. + The slice selection determines which parts of the chunk will be encoded. + The ByteSetter is used to write the necessary bytes and fetch bytes for existing chunk data. + The chunk spec contains information about the chunk. + """ + ... + +class CodecPipeline(Metadata): + """Base class for implementing CodecPipeline. + A CodecPipeline implements the read and write paths for chunk data. + On the read path, it is responsible for fetching chunks from a store (via ByteGetter), + decoding them and assembling an output array. On the write path, it encodes the chunks + and writes them to a store (via ByteSetter).""" + + @abstractmethod + def evolve(self, array_spec: ArraySpec) -> Self: + """Fills in codec configuration parameters that can be automatically + inferred from the array metadata. + + Parameters + ---------- + array_spec : ArraySpec + + Returns + ------- + Self + """ + ... + + @classmethod + @abstractmethod + def from_list(cls, codecs: list[Codec]) -> Self: + """Creates a codec pipeline from a list of codecs. + + Parameters + ---------- + codecs : list[Codec] + + Returns + ------- + Self + """ + ... + + @property + @abstractmethod + def supports_partial_decode(self) -> bool: ... + + @property + @abstractmethod + def supports_partial_encode(self) -> bool: ... + + @abstractmethod + def validate(self, array_metadata: ArrayMetadata) -> None: + """Validates that all codec configurations are compatible with the array metadata. + Raises errors when a codec configuration is not compatible. + + Parameters + ---------- + array_metadata : ArrayMetadata + """ + ... + + @abstractmethod + def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: + """Given an input byte length, this method returns the output byte length. + Raises a NotImplementedError for codecs with variable-sized outputs (e.g. compressors). + + Parameters + ---------- + input_byte_length : int + array_spec : ArraySpec + + Returns + ------- + int + """ + ... -class BytesBytesCodec(Codec): @abstractmethod async def decode( self, - chunk_array: Buffer, - chunk_spec: ArraySpec, - ) -> Buffer: - pass + chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[NDBuffer | None]: + """Decodes a batch of chunks. + Chunks can be None in which case they are ignored by the codec. + + Parameters + ---------- + chunks_and_specs : Iterable[tuple[Buffer | None, ArraySpec]] + Ordered set of encoded chunks with their accompanying chunk spec. + + Returns + ------- + Iterable[NDBuffer | None] + """ + ... @abstractmethod async def encode( self, - chunk_array: Buffer, - chunk_spec: ArraySpec, - ) -> Optional[Buffer]: - pass + chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + """Encodes a batch of chunks. + Chunks can be None in which case they are ignored by the codec. + + Parameters + ---------- + chunks_and_specs : Iterable[tuple[NDBuffer | None, ArraySpec]] + Ordered set of to-be-encoded chunks with their accompanying chunk spec. + + Returns + ------- + Iterable[Buffer | None] + """ + ... + + @abstractmethod + async def read( + self, + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SliceSelection, SliceSelection]], + out: NDBuffer, + ) -> None: + """Reads chunk data from the store, decodes it and writes it into an output array. + Partial decoding may be utilized if the codecs and stores support it. + + Parameters + ---------- + batch_info : Iterable[tuple[ByteGetter, ArraySpec, SliceSelection, SliceSelection]] + Ordered set of information about the chunks. + The first slice selection determines which parts of the chunk will be fetched. + The second slice selection determines where in the output array the chunk data will be written. + The ByteGetter is used to fetch the necessary bytes. + The chunk spec contains information about the construction of an array from the bytes. + out : NDBuffer + """ + ... + + @abstractmethod + async def write( + self, + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SliceSelection, SliceSelection]], + value: NDBuffer, + ) -> None: + """Encodes chunk data and writes it to the store. + Merges with existing chunk data by reading first, if necessary. + Partial encoding may be utilized if the codecs and stores support it. + + Parameters + ---------- + batch_info : Iterable[tuple[ByteSetter, ArraySpec, SliceSelection, SliceSelection]] + Ordered set of information about the chunks. + The first slice selection determines which parts of the chunk will be encoded. + The second slice selection determines where in the value array the chunk data is located. + The ByteSetter is used to fetch and write the necessary bytes. + The chunk spec contains information about the chunk. + value : NDBuffer + """ + ... diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 914987cda7..a3a112e58e 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -1,8 +1,8 @@ from abc import abstractmethod, ABC from collections.abc import AsyncGenerator +from typing import List, Protocol, Tuple, Optional, runtime_checkable -from typing import List, Tuple, Optional - +from zarr.common import BytesLike from zarr.buffer import Buffer @@ -68,7 +68,7 @@ async def set(self, key: str, value: Buffer) -> None: Parameters ---------- key : str - value : bytes + value : Buffer """ ... @@ -89,12 +89,12 @@ def supports_partial_writes(self) -> bool: ... @abstractmethod - async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None: """Store values at a given key, starting at byte range_start. Parameters ---------- - key_start_values : list[tuple[str, int, bytes]] + key_start_values : list[tuple[str, int, BytesLike]] set of key, range_start, values triples, a key may occur multiple times with different range_starts, range_starts (considering the length of the respective values) must not specify overlapping ranges for the same key @@ -146,3 +146,28 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: AsyncGenerator[str, None] """ ... + + +@runtime_checkable +class ByteGetter(Protocol): + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[Buffer]: ... + + +@runtime_checkable +class ByteSetter(Protocol): + async def get( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[Buffer]: ... + + async def set(self, value: Buffer, byte_range: Optional[Tuple[int, int]] = None) -> None: ... + + async def delete(self) -> None: ... + + +async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None: + if value is None: + await byte_setter.delete() + else: + await byte_setter.set(value) diff --git a/src/zarr/array.py b/src/zarr/array.py index 1567c9bbe5..61f91ab966 100644 --- a/src/zarr/array.py +++ b/src/zarr/array.py @@ -1,3 +1,5 @@ +from __future__ import annotations + # Notes on what I've changed here: # 1. Split Array into AsyncArray and Array # 3. Added .size and .attrs methods @@ -7,37 +9,38 @@ # Questions to consider: # 1. Was splitting the array into two classes really necessary? -from __future__ import annotations +from asyncio import gather from dataclasses import dataclass, replace import json -from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union +from typing import Any, Iterable, Literal import numpy as np import numpy.typing as npt from zarr.abc.codec import Codec +from zarr.abc.store import set_or_delete -# from zarr.array_v2 import ArrayV2 -from zarr.buffer import Buffer, Factory, NDArrayLike, NDBuffer +from zarr.attributes import Attributes +from zarr.buffer import Factory, NDArrayLike, NDBuffer from zarr.codecs import BytesCodec -from zarr.codecs.pipeline import CodecPipeline from zarr.common import ( + JSON, ZARR_JSON, - ArraySpec, + ZARRAY_JSON, + ZATTRS_JSON, ChunkCoords, Selection, - SliceSelection, ZarrFormat, concurrent_map, ) from zarr.config import config -from zarr.indexing import BasicIndexer, all_chunk_coords, is_total_slice +from zarr.indexing import BasicIndexer from zarr.chunk_grids import RegularChunkGrid -from zarr.chunk_key_encodings import DefaultChunkKeyEncoding, V2ChunkKeyEncoding -from zarr.metadata import ArrayMetadata, parse_indexing_order +from zarr.chunk_key_encodings import ChunkKeyEncoding, DefaultChunkKeyEncoding, V2ChunkKeyEncoding +from zarr.metadata import ArrayMetadata, ArrayV3Metadata, ArrayV2Metadata, parse_indexing_order from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import sync @@ -46,9 +49,11 @@ def parse_array_metadata(data: Any) -> ArrayMetadata: if isinstance(data, ArrayMetadata): return data elif isinstance(data, dict): - return ArrayMetadata.from_dict(data) - else: - raise TypeError + if data["zarr_format"] == 3: + return ArrayV3Metadata.from_dict(data) + elif data["zarr_format"] == 2: + return ArrayV2Metadata.from_dict(data) + raise TypeError @dataclass(frozen=True) @@ -57,10 +62,6 @@ class AsyncArray: store_path: StorePath order: Literal["C", "F"] - @property - def codecs(self) -> CodecPipeline: - return self.metadata.codecs - def __init__( self, metadata: ArrayMetadata, @@ -79,21 +80,116 @@ async def create( cls, store: StoreLike, *, + # v2 and v3 shape: ChunkCoords, dtype: npt.DTypeLike, - chunk_shape: ChunkCoords, - fill_value: Optional[Any] = None, - chunk_key_encoding: Union[ - Tuple[Literal["default"], Literal[".", "/"]], - Tuple[Literal["v2"], Literal[".", "/"]], - ] = ("default", "/"), - codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, - dimension_names: Optional[Iterable[str]] = None, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, zarr_format: ZarrFormat = 3, + fill_value: Any | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime + exists_ok: bool = False, ) -> AsyncArray: store_path = make_store_path(store) + + if chunk_shape is None: + if chunks is None: + raise ValueError("Either chunk_shape or chunks needs to be provided.") + chunk_shape = chunks + elif chunks is not None: + raise ValueError("Only one of chunk_shape or chunks must be provided.") + + if zarr_format == 3: + if dimension_separator is not None: + raise ValueError( + "dimension_separator cannot be used for arrays with version 3. Use chunk_key_encoding instead." + ) + if order is not None: + raise ValueError( + "order cannot be used for arrays with version 3. Use a transpose codec instead." + ) + if filters is not None: + raise ValueError( + "filters cannot be used for arrays with version 3. Use array-to-array codecs instead." + ) + if compressor is not None: + raise ValueError( + "compressor cannot be used for arrays with version 3. Use bytes-to-bytes codecs instead." + ) + return await cls._create_v3( + store_path, + shape=shape, + dtype=dtype, + chunk_shape=chunk_shape, + fill_value=fill_value, + chunk_key_encoding=chunk_key_encoding, + codecs=codecs, + dimension_names=dimension_names, + attributes=attributes, + exists_ok=exists_ok, + ) + elif zarr_format == 2: + if codecs is not None: + raise ValueError( + "codecs cannot be used for arrays with version 2. Use filters and compressor instead." + ) + if chunk_key_encoding is not None: + raise ValueError( + "chunk_key_encoding cannot be used for arrays with version 2. Use dimension_separator instead." + ) + if dimension_names is not None: + raise ValueError("dimension_names cannot be used for arrays with version 2.") + return await cls._create_v2( + store_path, + shape=shape, + dtype=dtype, + chunks=chunk_shape, + dimension_separator=dimension_separator, + fill_value=fill_value, + order=order, + filters=filters, + compressor=compressor, + attributes=attributes, + exists_ok=exists_ok, + ) + else: + raise ValueError(f"Insupported zarr_format. Got: {zarr_format}") + + @classmethod + async def _create_v3( + cls, + store_path: StorePath, + *, + shape: ChunkCoords, + dtype: npt.DTypeLike, + chunk_shape: ChunkCoords, + fill_value: Any | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + attributes: dict[str, JSON] | None = None, + exists_ok: bool = False, + ) -> AsyncArray: if not exists_ok: assert not await (store_path / ZARR_JSON).exists() @@ -105,36 +201,86 @@ async def create( else: fill_value = 0 - metadata = ArrayMetadata( - shape=shape, - data_type=dtype, - chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), - chunk_key_encoding=( + if chunk_key_encoding is None: + chunk_key_encoding = ("default", "/") + if isinstance(chunk_key_encoding, tuple): + chunk_key_encoding = ( V2ChunkKeyEncoding(separator=chunk_key_encoding[1]) if chunk_key_encoding[0] == "v2" else DefaultChunkKeyEncoding(separator=chunk_key_encoding[1]) - ), + ) + + metadata = ArrayV3Metadata( + shape=shape, + data_type=dtype, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), + chunk_key_encoding=chunk_key_encoding, fill_value=fill_value, codecs=codecs, dimension_names=tuple(dimension_names) if dimension_names else None, attributes=attributes or {}, ) - array = cls( - metadata=metadata, - store_path=store_path, - ) + array = cls(metadata=metadata, store_path=store_path) + + await array._save_metadata(metadata) + return array + + @classmethod + async def _create_v2( + cls, + store_path: StorePath, + *, + shape: ChunkCoords, + dtype: npt.DTypeLike, + chunks: ChunkCoords, + dimension_separator: Literal[".", "/"] | None = None, + fill_value: None | int | float = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + attributes: dict[str, JSON] | None = None, + exists_ok: bool = False, + ) -> AsyncArray: + import numcodecs + + if not exists_ok: + assert not await (store_path / ZARRAY_JSON).exists() + + if order is None: + order = "C" + + if dimension_separator is None: + dimension_separator = "." - await array._save_metadata() + metadata = ArrayV2Metadata( + shape=shape, + dtype=np.dtype(dtype), + chunks=chunks, + order=order, + dimension_separator=dimension_separator, + fill_value=0 if fill_value is None else fill_value, + compressor=( + numcodecs.get_codec(compressor).get_config() if compressor is not None else None + ), + filters=( + [numcodecs.get_codec(filter).get_config() for filter in filters] + if filters is not None + else None + ), + attributes=attributes, + ) + array = cls(metadata=metadata, store_path=store_path) + await array._save_metadata(metadata) return array @classmethod def from_dict( cls, store_path: StorePath, - data: Dict[str, Any], + data: dict[str, JSON], ) -> AsyncArray: - metadata = ArrayMetadata.from_dict(data) + metadata = parse_array_metadata(data) async_array = cls(metadata=metadata, store_path=store_path) return async_array @@ -142,30 +288,54 @@ def from_dict( async def open( cls, store: StoreLike, + zarr_format: ZarrFormat | None = 3, ) -> AsyncArray: store_path = make_store_path(store) - zarr_json_bytes = await (store_path / ZARR_JSON).get() - assert zarr_json_bytes is not None - return cls.from_dict( - store_path, - json.loads(zarr_json_bytes.to_bytes()), - ) - @classmethod - async def open_auto( - cls, - store: StoreLike, - ) -> AsyncArray: # TODO: Union[AsyncArray, ArrayV2] - store_path = make_store_path(store) - v3_metadata_bytes = await (store_path / ZARR_JSON).get() - if v3_metadata_bytes is not None: - return cls.from_dict( - store_path, - json.loads(v3_metadata_bytes.to_bytes()), + if zarr_format == 2: + zarray_bytes, zattrs_bytes = await gather( + (store_path / ZARRAY_JSON).get(), (store_path / ZATTRS_JSON).get() ) + if zarray_bytes is None: + raise KeyError(store_path) # filenotfounderror? + elif zarr_format == 3: + zarr_json_bytes = await (store_path / ZARR_JSON).get() + if zarr_json_bytes is None: + raise KeyError(store_path) # filenotfounderror? + elif zarr_format is None: + zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather( + (store_path / ZARR_JSON).get(), + (store_path / ZARRAY_JSON).get(), + (store_path / ZATTRS_JSON).get(), + ) + if zarr_json_bytes is not None and zarray_bytes is not None: + # TODO: revisit this exception type + # alternatively, we could warn and favor v3 + raise ValueError("Both zarr.json and .zarray objects exist") + if zarr_json_bytes is None and zarray_bytes is None: + raise KeyError(store_path) # filenotfounderror? + # set zarr_format based on which keys were found + if zarr_json_bytes is not None: + zarr_format = 3 + else: + zarr_format = 2 else: - raise ValueError("no v2 support yet") - # return await ArrayV2.open(store_path) + raise ValueError(f"unexpected zarr_format: {zarr_format}") + + if zarr_format == 2: + # V2 arrays are comprised of a .zarray and .zattrs objects + assert zarray_bytes is not None + zarray_dict = json.loads(zarray_bytes.to_bytes()) + zattrs_dict = json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else {} + zarray_dict["attributes"] = zattrs_dict + return cls(store_path=store_path, metadata=ArrayV2Metadata.from_dict(zarray_dict)) + else: + # V3 arrays are comprised of a zarr.json object + assert zarr_json_bytes is not None + return cls( + store_path=store_path, + metadata=ArrayV3Metadata.from_dict(json.loads(zarr_json_bytes.to_bytes())), + ) @property def ndim(self) -> int: @@ -184,64 +354,45 @@ def dtype(self) -> np.dtype[Any]: return self.metadata.dtype @property - def attrs(self) -> dict[str, Any]: + def attrs(self) -> dict[str, JSON]: return self.metadata.attributes async def getitem( self, selection: Selection, *, factory: Factory.Create = NDBuffer.create ) -> NDArrayLike: - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) indexer = BasicIndexer( selection, shape=self.metadata.shape, - chunk_shape=self.metadata.chunk_grid.chunk_shape, + chunk_grid=self.metadata.chunk_grid, ) # setup output array out = factory( - shape=indexer.shape, dtype=self.metadata.dtype, order=self.order, fill_value=0 + shape=indexer.shape, + dtype=self.metadata.dtype, + order=self.order, + fill_value=0, # TODO use fill_value ) # reading chunks and decoding them - await concurrent_map( + await self.metadata.codec_pipeline.read( [ - (chunk_coords, chunk_selection, out_selection, out) + ( + self.store_path / self.metadata.encode_chunk_key(chunk_coords), + self.metadata.get_chunk_spec(chunk_coords, self.order), + chunk_selection, + out_selection, + ) for chunk_coords, chunk_selection, out_selection in indexer ], - self._read_chunk, - config.get("async.concurrency"), + out, ) return out.as_ndarray_like() - async def _save_metadata(self) -> None: - await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(self.metadata.to_bytes())) - - async def _read_chunk( - self, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - out: NDBuffer, - ) -> None: - chunk_spec = self.metadata.get_chunk_spec(chunk_coords, self.order) - chunk_key_encoding = self.metadata.chunk_key_encoding - chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords) - store_path = self.store_path / chunk_key - - if self.codecs.supports_partial_decode: - chunk_array = await self.codecs.decode_partial(store_path, chunk_selection, chunk_spec) - if chunk_array is not None: - out[out_selection] = chunk_array - else: - out[out_selection] = self.metadata.fill_value - else: - chunk_bytes = await store_path.get() - if chunk_bytes is not None: - chunk_array = await self.codecs.decode(chunk_bytes, chunk_spec) - tmp = chunk_array[chunk_selection] - out[out_selection] = tmp - else: - out[out_selection] = self.metadata.fill_value + async def _save_metadata(self, metadata: ArrayMetadata) -> None: + to_save = metadata.to_buffer_dict() + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] + await gather(*awaitables) async def setitem( self, @@ -249,12 +400,10 @@ async def setitem( value: NDArrayLike, factory: Factory.NDArrayLike = NDBuffer.from_ndarray_like, ) -> None: - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) - chunk_shape = self.metadata.chunk_grid.chunk_shape indexer = BasicIndexer( selection, shape=self.metadata.shape, - chunk_shape=chunk_shape, + chunk_grid=self.metadata.chunk_grid, ) sel_shape = indexer.shape @@ -275,122 +424,52 @@ async def setitem( value = factory(value) # merging with existing data and encoding chunks - await concurrent_map( + await self.metadata.codec_pipeline.write( [ ( - value, - chunk_shape, - chunk_coords, + self.store_path / self.metadata.encode_chunk_key(chunk_coords), + self.metadata.get_chunk_spec(chunk_coords, self.order), chunk_selection, out_selection, ) for chunk_coords, chunk_selection, out_selection in indexer ], - self._write_chunk, - config.get("async.concurrency"), + value, ) - async def _write_chunk( - self, - value: NDBuffer, - chunk_shape: ChunkCoords, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - ) -> None: - chunk_spec = self.metadata.get_chunk_spec(chunk_coords, self.order) - chunk_key_encoding = self.metadata.chunk_key_encoding - chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords) - store_path = self.store_path / chunk_key - - if is_total_slice(chunk_selection, chunk_shape): - # write entire chunks - if np.isscalar(value): - chunk_array = NDBuffer.create( - shape=chunk_shape, dtype=self.metadata.dtype, fill_value=value - ) - else: - chunk_array = value[out_selection] - await self._write_chunk_to_store(store_path, chunk_array, chunk_spec) + async def resize( + self, new_shape: ChunkCoords, delete_outside_chunks: bool = True + ) -> AsyncArray: + assert len(new_shape) == len(self.metadata.shape) + new_metadata = self.metadata.update_shape(new_shape) - elif self.codecs.supports_partial_encode: - # print("encode_partial", chunk_coords, chunk_selection, repr(self)) - await self.codecs.encode_partial( - store_path, - value[out_selection], - chunk_selection, - chunk_spec, - ) - else: - # writing partial chunks - # read chunk first - chunk_bytes = await store_path.get() - - # merge new value - if chunk_bytes is None: - chunk_array = NDBuffer.create( - shape=chunk_shape, - dtype=self.metadata.dtype, - fill_value=self.metadata.fill_value, - ) - else: - chunk_array = ( - await self.codecs.decode(chunk_bytes, chunk_spec) - ).copy() # make a writable copy - chunk_array[chunk_selection] = value[out_selection] + # Remove all chunks outside of the new shape + old_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(self.metadata.shape)) + new_chunk_coords = set(self.metadata.chunk_grid.all_chunk_coords(new_shape)) - await self._write_chunk_to_store(store_path, chunk_array, chunk_spec) + if delete_outside_chunks: - async def _write_chunk_to_store( - self, store_path: StorePath, chunk_array: NDBuffer, chunk_spec: ArraySpec - ) -> None: - if chunk_array.all_equal(self.metadata.fill_value): - # chunks that only contain fill_value will be removed - await store_path.delete() - else: - chunk_bytes = await self.codecs.encode(chunk_array, chunk_spec) - if chunk_bytes is None: - await store_path.delete() - else: - await store_path.set(chunk_bytes) + async def _delete_key(key: str) -> None: + await (self.store_path / key).delete() - async def resize(self, new_shape: ChunkCoords) -> AsyncArray: - if len(new_shape) != len(self.metadata.shape): - raise ValueError( - "The new shape must have the same number of dimensions " - + f"(={len(self.metadata.shape)})." + await concurrent_map( + [ + (self.metadata.encode_chunk_key(chunk_coords),) + for chunk_coords in old_chunk_coords.difference(new_chunk_coords) + ], + _delete_key, + config.get("async.concurrency"), ) - new_metadata = replace(self.metadata, shape=new_shape) - - # Remove all chunks outside of the new shape - assert isinstance(self.metadata.chunk_grid, RegularChunkGrid) - chunk_shape = self.metadata.chunk_grid.chunk_shape - chunk_key_encoding = self.metadata.chunk_key_encoding - old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) - new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) - - async def _delete_key(key: str) -> None: - await (self.store_path / key).delete() - - await concurrent_map( - [ - (chunk_key_encoding.encode_chunk_key(chunk_coords),) - for chunk_coords in old_chunk_coords.difference(new_chunk_coords) - ], - _delete_key, - config.get("async.concurrency"), - ) - # Write new metadata - await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(new_metadata.to_bytes())) + await self._save_metadata(new_metadata) return replace(self, metadata=new_metadata) - async def update_attributes(self, new_attributes: Dict[str, Any]) -> AsyncArray: - new_metadata = replace(self.metadata, attributes=new_attributes) + async def update_attributes(self, new_attributes: dict[str, JSON]) -> AsyncArray: + new_metadata = self.metadata.update_attributes(new_attributes) # Write new metadata - await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(new_metadata.to_bytes())) + await self._save_metadata(new_metadata) return replace(self, metadata=new_metadata) def __repr__(self) -> str: @@ -409,17 +488,29 @@ def create( cls, store: StoreLike, *, + # v2 and v3 shape: ChunkCoords, dtype: npt.DTypeLike, - chunk_shape: ChunkCoords, - fill_value: Optional[Any] = None, - chunk_key_encoding: Union[ - Tuple[Literal["default"], Literal[".", "/"]], - Tuple[Literal["v2"], Literal[".", "/"]], - ] = ("default", "/"), - codecs: Optional[Iterable[Union[Codec, Dict[str, Any]]]] = None, - dimension_names: Optional[Iterable[str]] = None, - attributes: Optional[Dict[str, Any]] = None, + zarr_format: ZarrFormat = 3, + fill_value: Any | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, + dimension_names: Iterable[str] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime exists_ok: bool = False, ) -> Array: async_array = sync( @@ -427,12 +518,18 @@ def create( store=store, shape=shape, dtype=dtype, - chunk_shape=chunk_shape, + zarr_format=zarr_format, + attributes=attributes, fill_value=fill_value, + chunk_shape=chunk_shape, chunk_key_encoding=chunk_key_encoding, codecs=codecs, dimension_names=dimension_names, - attributes=attributes, + chunks=chunks, + dimension_separator=dimension_separator, + order=order, + filters=filters, + compressor=compressor, exists_ok=exists_ok, ), ) @@ -442,7 +539,7 @@ def create( def from_dict( cls, store_path: StorePath, - data: Dict[str, Any], + data: dict[str, JSON], ) -> Array: async_array = AsyncArray.from_dict(store_path=store_path, data=data) return cls(async_array) @@ -455,16 +552,6 @@ def open( async_array = sync(AsyncArray.open(store)) return cls(async_array) - @classmethod - def open_auto( - cls, - store: StoreLike, - ) -> Array: # TODO: Union[Array, ArrayV2]: - async_array = sync( - AsyncArray.open_auto(store), - ) - return cls(async_array) - @property def ndim(self) -> int: return self._async_array.ndim @@ -482,8 +569,8 @@ def dtype(self) -> np.dtype[Any]: return self._async_array.dtype @property - def attrs(self) -> dict[str, Any]: - return self._async_array.attrs + def attrs(self) -> Attributes: + return Attributes(self) @property def metadata(self) -> ArrayMetadata: @@ -514,7 +601,7 @@ def resize(self, new_shape: ChunkCoords) -> Array: ) ) - def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: + def update_attributes(self, new_attributes: dict[str, JSON]) -> Array: return type(self)( sync( self._async_array.update_attributes(new_attributes), diff --git a/src/zarr/array_v2.py b/src/zarr/array_v2.py deleted file mode 100644 index 053d58eb1a..0000000000 --- a/src/zarr/array_v2.py +++ /dev/null @@ -1,524 +0,0 @@ -from __future__ import annotations - -import asyncio -from dataclasses import dataclass, replace -import json -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union - -import numcodecs -import numpy as np - -from numcodecs.compat import ensure_bytes, ensure_ndarray - -from zarr.buffer import Buffer, NDBuffer -from zarr.common import ( - ZARRAY_JSON, - ZATTRS_JSON, - BytesLike, - ChunkCoords, - Selection, - SliceSelection, - concurrent_map, - to_thread, -) -from zarr.indexing import BasicIndexer, all_chunk_coords, is_total_slice -from zarr.metadata import ArrayV2Metadata -from zarr.store import StoreLike, StorePath, make_store_path -from zarr.sync import sync - -if TYPE_CHECKING: - from zarr.array import Array - - -def as_bytearray(data: Optional[Buffer]) -> Optional[bytes]: - """Help function to convert a Buffer into bytes if not None""" - if data is None: - return data - return data.to_bytes() - - -@dataclass(frozen=True) -class _AsyncArrayProxy: - array: ArrayV2 - - def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: - return _AsyncArraySelectionProxy(self.array, selection) - - -@dataclass(frozen=True) -class _AsyncArraySelectionProxy: - array: ArrayV2 - selection: Selection - - async def get(self) -> np.ndarray: - return await self.array.get_async(self.selection) - - async def set(self, value: np.ndarray): - return await self.array.set_async(self.selection, value) - - -@dataclass(frozen=True) -class ArrayV2: - metadata: ArrayV2Metadata - attributes: Optional[Dict[str, Any]] - store_path: StorePath - - @classmethod - async def create_async( - cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: np.dtype, - chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", - fill_value: Optional[Union[None, int, float]] = None, - order: Literal["C", "F"] = "C", - filters: Optional[List[Dict[str, Any]]] = None, - compressor: Optional[Dict[str, Any]] = None, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - ) -> ArrayV2: - store_path = make_store_path(store) - if not exists_ok: - assert not await (store_path / ZARRAY_JSON).exists() - - metadata = ArrayV2Metadata( - shape=shape, - dtype=np.dtype(dtype), - chunks=chunks, - order=order, - dimension_separator=dimension_separator, - fill_value=0 if fill_value is None else fill_value, - compressor=( - numcodecs.get_codec(compressor).get_config() if compressor is not None else None - ), - filters=( - [numcodecs.get_codec(filter).get_config() for filter in filters] - if filters is not None - else None - ), - ) - array = cls( - metadata=metadata, - store_path=store_path, - attributes=attributes, - ) - await array._save_metadata() - return array - - @classmethod - def create( - cls, - store: StoreLike, - *, - shape: ChunkCoords, - dtype: np.dtype, - chunks: ChunkCoords, - dimension_separator: Literal[".", "/"] = ".", - fill_value: Optional[Union[None, int, float]] = None, - order: Literal["C", "F"] = "C", - filters: Optional[List[Dict[str, Any]]] = None, - compressor: Optional[Dict[str, Any]] = None, - attributes: Optional[Dict[str, Any]] = None, - exists_ok: bool = False, - ) -> ArrayV2: - return sync( - cls.create_async( - store, - shape=shape, - dtype=dtype, - chunks=chunks, - order=order, - dimension_separator=dimension_separator, - fill_value=0 if fill_value is None else fill_value, - compressor=compressor, - filters=filters, - attributes=attributes, - exists_ok=exists_ok, - ), - ) - - @classmethod - async def open_async( - cls, - store: StoreLike, - ) -> ArrayV2: - store_path = make_store_path(store) - zarray_bytes, zattrs_bytes = await asyncio.gather( - (store_path / ZARRAY_JSON).get(), - (store_path / ZATTRS_JSON).get(), - ) - assert zarray_bytes is not None - return cls.from_dict( - store_path, - zarray_json=json.loads(zarray_bytes.to_bytes()), - zattrs_json=json.loads(zattrs_bytes.to_bytes()) if zattrs_bytes is not None else None, - ) - - @classmethod - def open( - cls, - store: StoreLike, - ) -> ArrayV2: - return sync( - cls.open_async(store), - ) - - @classmethod - def from_dict( - cls, - store_path: StorePath, - zarray_json: Any, - zattrs_json: Optional[Any], - ) -> ArrayV2: - metadata = ArrayV2Metadata.from_dict(zarray_json) - out = cls( - store_path=store_path, - metadata=metadata, - attributes=zattrs_json, - ) - out._validate_metadata() - return out - - async def _save_metadata(self) -> None: - self._validate_metadata() - - await (self.store_path / ZARRAY_JSON).set(self.metadata.to_bytes()) - if self.attributes is not None and len(self.attributes) > 0: - await (self.store_path / ZATTRS_JSON).set( - Buffer.from_bytes(json.dumps(self.attributes).encode()), - ) - else: - await (self.store_path / ZATTRS_JSON).delete() - - def _validate_metadata(self) -> None: - assert len(self.metadata.shape) == len( - self.metadata.chunks - ), "`chunks` and `shape` need to have the same number of dimensions." - - @property - def ndim(self) -> int: - return len(self.metadata.shape) - - @property - def shape(self) -> ChunkCoords: - return self.metadata.shape - - @property - def dtype(self) -> np.dtype: - return self.metadata.dtype - - @property - def async_(self) -> _AsyncArrayProxy: - return _AsyncArrayProxy(self) - - def __getitem__(self, selection: Selection): - return sync(self.get_async(selection)) - - async def get_async(self, selection: Selection): - indexer = BasicIndexer( - selection, - shape=self.metadata.shape, - chunk_shape=self.metadata.chunks, - ) - - # setup output array - out = NDBuffer.create( - shape=indexer.shape, dtype=self.metadata.dtype, order=self.metadata.order, fill_value=0 - ) - - # reading chunks and decoding them - await concurrent_map( - [ - (chunk_coords, chunk_selection, out_selection, out) - for chunk_coords, chunk_selection, out_selection in indexer - ], - self._read_chunk, - ) - - if out.shape: - return out - else: - return out[()] - - async def _read_chunk( - self, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - out: np.ndarray, - ): - store_path = self.store_path / self._encode_chunk_key(chunk_coords) - - chunk_array = await self._decode_chunk(as_bytearray(await store_path.get())) - if chunk_array is not None: - tmp = chunk_array[chunk_selection] - out[out_selection] = tmp - else: - out[out_selection] = self.metadata.fill_value - - async def _decode_chunk(self, chunk_bytes: Optional[BytesLike]) -> Optional[np.ndarray]: - if chunk_bytes is None: - return None - - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) - chunk_array = ensure_ndarray(await to_thread(compressor.decode, chunk_bytes)) - else: - chunk_array = ensure_ndarray(chunk_bytes) - - # ensure correct dtype - if str(chunk_array.dtype) != self.metadata.dtype: - chunk_array = chunk_array.view(self.metadata.dtype) - - # apply filters in reverse order - if self.metadata.filters is not None: - for filter_metadata in self.metadata.filters[::-1]: - filter = numcodecs.get_codec(filter_metadata) - chunk_array = await to_thread(filter.decode, chunk_array) - - # ensure correct chunk shape - if chunk_array.shape != self.metadata.chunks: - chunk_array = chunk_array.reshape( - self.metadata.chunks, - order=self.metadata.order, - ) - - return chunk_array - - def __setitem__(self, selection: Selection, value: np.ndarray) -> None: - sync(self.set_async(selection, value)) - - async def set_async(self, selection: Selection, value: np.ndarray) -> None: - chunk_shape = self.metadata.chunks - indexer = BasicIndexer( - selection, - shape=self.metadata.shape, - chunk_shape=chunk_shape, - ) - - sel_shape = indexer.shape - - # check value shape - if np.isscalar(value): - # setting a scalar value - pass - else: - if not hasattr(value, "shape"): - value = np.asarray(value, self.metadata.dtype) - assert value.shape == sel_shape - if value.dtype != self.metadata.dtype: - value = value.astype(self.metadata.dtype, order="A") - - # merging with existing data and encoding chunks - await concurrent_map( - [ - ( - value, - chunk_shape, - chunk_coords, - chunk_selection, - out_selection, - ) - for chunk_coords, chunk_selection, out_selection in indexer - ], - self._write_chunk, - ) - - async def _write_chunk( - self, - value: np.ndarray, - chunk_shape: ChunkCoords, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - ): - store_path = self.store_path / self._encode_chunk_key(chunk_coords) - - if is_total_slice(chunk_selection, chunk_shape): - # write entire chunks - if np.isscalar(value): - chunk_array = NDBuffer.create( - shape=chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - fill_value=value, - ) - else: - chunk_array = value[out_selection] - await self._write_chunk_to_store(store_path, chunk_array) - - else: - # writing partial chunks - # read chunk first - tmp = await self._decode_chunk(as_bytearray(await store_path.get())) - - # merge new value - if tmp is None: - chunk_array = NDBuffer.create( - shape=chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, - fill_value=self.metadata.fill_value, - ) - else: - chunk_array = tmp.copy( - order=self.metadata.order, - ) # make a writable copy - chunk_array[chunk_selection] = value[out_selection] - - await self._write_chunk_to_store(store_path, chunk_array) - - async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray): - chunk_bytes: Optional[BytesLike] - if np.all(chunk_array == self.metadata.fill_value): - # chunks that only contain fill_value will be removed - await store_path.delete() - else: - chunk_bytes = await self._encode_chunk(chunk_array) - if chunk_bytes is None: - await store_path.delete() - else: - await store_path.set(Buffer.from_bytes(chunk_bytes)) - - async def _encode_chunk(self, chunk_array: np.ndarray) -> Optional[BytesLike]: - chunk_array = chunk_array.ravel(order=self.metadata.order) - - if self.metadata.filters is not None: - for filter_metadata in self.metadata.filters: - filter = numcodecs.get_codec(filter_metadata) - chunk_array = await to_thread(filter.encode, chunk_array) - - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) - if not chunk_array.flags.c_contiguous and not chunk_array.flags.f_contiguous: - chunk_array = chunk_array.copy(order="A") - encoded_chunk_bytes = ensure_bytes(await to_thread(compressor.encode, chunk_array)) - else: - encoded_chunk_bytes = ensure_bytes(chunk_array) - - return encoded_chunk_bytes - - def _encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: - chunk_identifier = self.metadata.dimension_separator.join(map(str, chunk_coords)) - return "0" if chunk_identifier == "" else chunk_identifier - - async def resize_async(self, new_shape: ChunkCoords) -> ArrayV2: - assert len(new_shape) == len(self.metadata.shape) - new_metadata = replace(self.metadata, shape=new_shape) - - # Remove all chunks outside of the new shape - chunk_shape = self.metadata.chunks - old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) - new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) - - async def _delete_key(key: str) -> None: - await (self.store_path / key).delete() - - await concurrent_map( - [ - (self._encode_chunk_key(chunk_coords),) - for chunk_coords in old_chunk_coords.difference(new_chunk_coords) - ], - _delete_key, - ) - - # Write new metadata - await (self.store_path / ZARRAY_JSON).set(new_metadata.to_bytes()) - return replace(self, metadata=new_metadata) - - def resize(self, new_shape: ChunkCoords) -> ArrayV2: - return sync(self.resize_async(new_shape)) - - async def convert_to_v3_async(self) -> Array: - from sys import byteorder as sys_byteorder - - from zarr.abc.codec import Codec - from zarr.array import Array - from zarr.common import ZARR_JSON - from zarr.chunk_grids import RegularChunkGrid - from zarr.chunk_key_encodings import V2ChunkKeyEncoding - from zarr.metadata import ArrayMetadata, DataType - - from zarr.codecs import ( - BloscCodec, - BloscShuffle, - BytesCodec, - GzipCodec, - TransposeCodec, - ) - - data_type = DataType.from_dtype(self.metadata.dtype) - endian: Literal["little", "big"] - if self.metadata.dtype.byteorder == "=": - endian = sys_byteorder - elif self.metadata.dtype.byteorder == ">": - endian = "big" - else: - endian = "little" - - assert ( - self.metadata.filters is None or len(self.metadata.filters) == 0 - ), "Filters are not supported by v3." - - codecs: List[Codec] = [] - - if self.metadata.order == "F": - codecs.append(TransposeCodec(order=tuple(reversed(range(self.metadata.ndim))))) - codecs.append(BytesCodec(endian=endian)) - - if self.metadata.compressor is not None: - v2_codec = numcodecs.get_codec(self.metadata.compressor).get_config() - assert v2_codec["id"] in ( - "blosc", - "gzip", - ), "Only blosc and gzip are supported by v3." - if v2_codec["id"] == "blosc": - codecs.append( - BloscCodec( - typesize=data_type.byte_count, - cname=v2_codec["cname"], - clevel=v2_codec["clevel"], - shuffle=BloscShuffle.from_int(v2_codec.get("shuffle", 0)), - blocksize=v2_codec.get("blocksize", 0), - ) - ) - elif v2_codec["id"] == "gzip": - codecs.append(GzipCodec(level=v2_codec.get("level", 5))) - - new_metadata = ArrayMetadata( - shape=self.metadata.shape, - chunk_grid=RegularChunkGrid(chunk_shape=self.metadata.chunks), - data_type=data_type, - fill_value=0 if self.metadata.fill_value is None else self.metadata.fill_value, - chunk_key_encoding=V2ChunkKeyEncoding(separator=self.metadata.dimension_separator), - codecs=codecs, - attributes=self.attributes or {}, - dimension_names=None, - ) - - new_metadata_bytes = new_metadata.to_bytes() - await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(new_metadata_bytes)) - - return Array.from_dict( - store_path=self.store_path, - data=json.loads(new_metadata_bytes), - ) - - async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> ArrayV2: - await (self.store_path / ZATTRS_JSON).set( - Buffer.from_bytes(json.dumps(new_attributes).encode()) - ) - return replace(self, attributes=new_attributes) - - def update_attributes(self, new_attributes: Dict[str, Any]) -> ArrayV2: - return sync( - self.update_attributes_async(new_attributes), - ) - - def convert_to_v3(self) -> Array: - return sync(self.convert_to_v3_async()) - - def __repr__(self): - return f"" diff --git a/src/zarr/attributes.py b/src/zarr/attributes.py index 18f6a63a55..e6b26309f2 100644 --- a/src/zarr/attributes.py +++ b/src/zarr/attributes.py @@ -1,21 +1,24 @@ from __future__ import annotations + from collections.abc import MutableMapping -from typing import TYPE_CHECKING, Any, Iterator, Union +from typing import TYPE_CHECKING, Iterator + +from zarr.common import JSON if TYPE_CHECKING: from zarr.group import Group from zarr.array import Array -class Attributes(MutableMapping[str, Any]): - def __init__(self, obj: Union[Array, Group]): +class Attributes(MutableMapping[str, JSON]): + def __init__(self, obj: Array | Group): # key=".zattrs", read_only=False, cache=True, synchronizer=None self._obj = obj - def __getitem__(self, key: str) -> Any: + def __getitem__(self, key: str) -> JSON: return self._obj.metadata.attributes[key] - def __setitem__(self, key: str, value: Any) -> None: + def __setitem__(self, key: str, value: JSON) -> None: new_attrs = dict(self._obj.metadata.attributes) new_attrs[key] = value self._obj = self._obj.update_attributes(new_attrs) diff --git a/src/zarr/chunk_grids.py b/src/zarr/chunk_grids.py index 73557f6e4b..16c0df9174 100644 --- a/src/zarr/chunk_grids.py +++ b/src/zarr/chunk_grids.py @@ -1,5 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict +import itertools +from typing import TYPE_CHECKING, Any, Dict, Iterator from dataclasses import dataclass from zarr.abc.metadata import Metadata @@ -10,6 +11,7 @@ parse_named_configuration, parse_shapelike, ) +from zarr.indexing import _ceildiv if TYPE_CHECKING: from typing_extensions import Self @@ -27,6 +29,9 @@ def from_dict(cls, data: Dict[str, JSON]) -> ChunkGrid: return RegularChunkGrid.from_dict(data) raise ValueError(f"Unknown chunk grid. Got {name_parsed}.") + def all_chunk_coords(self, array_shape: ChunkCoords) -> Iterator[ChunkCoords]: + raise NotImplementedError + @dataclass(frozen=True) class RegularChunkGrid(ChunkGrid): @@ -45,3 +50,8 @@ def from_dict(cls, data: Dict[str, Any]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "regular", "configuration": {"chunk_shape": list(self.chunk_shape)}} + + def all_chunk_coords(self, array_shape: ChunkCoords) -> Iterator[ChunkCoords]: + return itertools.product( + *(range(0, _ceildiv(s, c)) for s, c in zip(array_shape, self.chunk_shape)) + ) diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 8fa0c9f7b0..959a85af57 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -7,3 +7,4 @@ from zarr.codecs.sharding import ShardingCodec, ShardingCodecIndexLocation # noqa: F401 from zarr.codecs.transpose import TransposeCodec # noqa: F401 from zarr.codecs.zstd import ZstdCodec # noqa: F401 +from zarr.codecs.pipeline import BatchedCodecPipeline # noqa: F401 diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py new file mode 100644 index 0000000000..fb7122600f --- /dev/null +++ b/src/zarr/codecs/_v2.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from zarr.buffer import Buffer, NDBuffer +from zarr.codecs.mixins import ArrayArrayCodecBatchMixin, ArrayBytesCodecBatchMixin +from zarr.common import JSON, ArraySpec, to_thread + +import numcodecs +from numcodecs.compat import ensure_bytes, ensure_ndarray + + +@dataclass(frozen=True) +class V2Compressor(ArrayBytesCodecBatchMixin): + compressor: dict[str, JSON] | None + + is_fixed_size = False + + async def decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + if chunk_bytes is None: + return None + + if self.compressor is not None: + compressor = numcodecs.get_codec(self.compressor) + chunk_numpy_array = ensure_ndarray( + await to_thread(compressor.decode, chunk_bytes.as_array_like()) + ) + else: + chunk_numpy_array = ensure_ndarray(chunk_bytes.as_array_like()) + + # ensure correct dtype + if str(chunk_numpy_array.dtype) != chunk_spec.dtype: + chunk_numpy_array = chunk_numpy_array.view(chunk_spec.dtype) + + return NDBuffer.from_numpy_array(chunk_numpy_array) + + async def encode_single( + self, + chunk_array: NDBuffer, + _chunk_spec: ArraySpec, + ) -> Buffer | None: + chunk_numpy_array = chunk_array.as_numpy_array() + if self.compressor is not None: + compressor = numcodecs.get_codec(self.compressor) + if ( + not chunk_numpy_array.flags.c_contiguous + and not chunk_numpy_array.flags.f_contiguous + ): + chunk_numpy_array = chunk_numpy_array.copy(order="A") + encoded_chunk_bytes = ensure_bytes( + await to_thread(compressor.encode, chunk_numpy_array) + ) + else: + encoded_chunk_bytes = ensure_bytes(chunk_numpy_array) + + return Buffer.from_bytes(encoded_chunk_bytes) + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError + + +@dataclass(frozen=True) +class V2Filters(ArrayArrayCodecBatchMixin): + filters: list[dict[str, JSON]] + + is_fixed_size = False + + async def decode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + chunk_numpy_array = chunk_array.as_numpy_array() + # apply filters in reverse order + if self.filters is not None: + for filter_metadata in self.filters[::-1]: + filter = numcodecs.get_codec(filter_metadata) + chunk_numpy_array = await to_thread(filter.decode, chunk_numpy_array) + + # ensure correct chunk shape + if chunk_numpy_array.shape != chunk_spec.shape: + chunk_numpy_array = chunk_numpy_array.reshape( + chunk_spec.shape, + order=chunk_spec.order, + ) + + return NDBuffer.from_numpy_array(chunk_numpy_array) + + async def encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer | None: + chunk_numpy_array = chunk_array.as_numpy_array().ravel(order=chunk_spec.order) + + for filter_metadata in self.filters: + filter = numcodecs.get_codec(filter_metadata) + chunk_numpy_array = await to_thread(filter.encode, chunk_numpy_array) + + return NDBuffer.from_numpy_array(chunk_numpy_array) + + def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index 7e94575f9a..ab3ffab479 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -8,7 +8,7 @@ import numcodecs from numcodecs.blosc import Blosc -from zarr.abc.codec import BytesBytesCodec +from zarr.codecs.mixins import BytesBytesCodecBatchMixin from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_enum, parse_named_configuration, to_thread @@ -74,7 +74,7 @@ def parse_blocksize(data: JSON) -> int: @dataclass(frozen=True) -class BloscCodec(BytesBytesCodec): +class BloscCodec(BytesBytesCodecBatchMixin): is_fixed_size = False typesize: int @@ -158,14 +158,14 @@ def _blosc_codec(self) -> Blosc: } return Blosc.from_config(config_dict) - async def decode( + async def decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, ) -> Buffer: return await to_thread(as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes) - async def encode( + async def encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index d6a626e160..6df78a08b8 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -7,7 +7,7 @@ import numpy as np -from zarr.abc.codec import ArrayBytesCodec +from zarr.codecs.mixins import ArrayBytesCodecBatchMixin from zarr.buffer import Buffer, NDBuffer from zarr.codecs.registry import register_codec from zarr.common import parse_enum, parse_named_configuration @@ -26,7 +26,7 @@ class Endian(Enum): @dataclass(frozen=True) -class BytesCodec(ArrayBytesCodec): +class BytesCodec(ArrayBytesCodecBatchMixin): is_fixed_size = True endian: Optional[Endian] @@ -60,7 +60,7 @@ def evolve(self, array_spec: ArraySpec) -> Self: ) return self - async def decode( + async def decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -83,7 +83,7 @@ async def decode( ) return chunk_array - async def encode( + async def encode_single( self, chunk_array: NDBuffer, _chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/crc32c_.py b/src/zarr/codecs/crc32c_.py index 1daf512e43..ab4bad65fe 100644 --- a/src/zarr/codecs/crc32c_.py +++ b/src/zarr/codecs/crc32c_.py @@ -7,7 +7,7 @@ from crc32c import crc32c -from zarr.abc.codec import BytesBytesCodec +from zarr.codecs.mixins import BytesBytesCodecBatchMixin from zarr.buffer import Buffer from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration @@ -19,7 +19,7 @@ @dataclass(frozen=True) -class Crc32cCodec(BytesBytesCodec): +class Crc32cCodec(BytesBytesCodecBatchMixin): is_fixed_size = True @classmethod @@ -30,7 +30,7 @@ def from_dict(cls, data: Dict[str, JSON]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "crc32c"} - async def decode( + async def decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, @@ -48,7 +48,7 @@ async def decode( ) return Buffer.from_array_like(inner_bytes) - async def encode( + async def encode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/gzip.py b/src/zarr/codecs/gzip.py index a8d7f815aa..6a8e30db13 100644 --- a/src/zarr/codecs/gzip.py +++ b/src/zarr/codecs/gzip.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING from numcodecs.gzip import GZip -from zarr.abc.codec import BytesBytesCodec +from zarr.codecs.mixins import BytesBytesCodecBatchMixin from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration, to_thread @@ -26,7 +26,7 @@ def parse_gzip_level(data: JSON) -> int: @dataclass(frozen=True) -class GzipCodec(BytesBytesCodec): +class GzipCodec(BytesBytesCodecBatchMixin): is_fixed_size = False level: int = 5 @@ -44,14 +44,14 @@ def from_dict(cls, data: Dict[str, JSON]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "gzip", "configuration": {"level": self.level}} - async def decode( + async def decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, ) -> Buffer: return await to_thread(as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes) - async def encode( + async def encode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/mixins.py b/src/zarr/codecs/mixins.py new file mode 100644 index 0000000000..8b0a684509 --- /dev/null +++ b/src/zarr/codecs/mixins.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from abc import abstractmethod +from typing import Awaitable, Callable, Generic, Iterable, TypeVar + + +from zarr.abc.codec import ( + ArrayArrayCodec, + ArrayBytesCodec, + ArrayBytesCodecPartialDecodeMixin, + ArrayBytesCodecPartialEncodeMixin, + ByteGetter, + ByteSetter, + BytesBytesCodec, +) +from zarr.buffer import Buffer, NDBuffer +from zarr.common import ArraySpec, SliceSelection, concurrent_map +from zarr.config import config + + +CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer) +CodecOutput = TypeVar("CodecOutput", bound=NDBuffer | Buffer) + + +async def batching_helper( + func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]], + batch_info: Iterable[tuple[CodecInput | None, ArraySpec]], +) -> list[CodecOutput | None]: + return await concurrent_map( + [(chunk_array, chunk_spec) for chunk_array, chunk_spec in batch_info], + noop_for_none(func), + config.get("async.concurrency"), + ) + + +def noop_for_none( + func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]], +) -> Callable[[CodecInput | None, ArraySpec], Awaitable[CodecOutput | None]]: + async def wrap(chunk: CodecInput | None, chunk_spec: ArraySpec) -> CodecOutput | None: + if chunk is None: + return None + return await func(chunk, chunk_spec) + + return wrap + + +class CodecBatchMixin(Generic[CodecInput, CodecOutput]): + """The default interface from the Codec class expects batches of codecs. + However, many codec implementation operate on single codecs. + This mixin provides abstract methods for decode_single and encode_single and + implements batching through concurrent processing. + + Use ArrayArrayCodecBatchMixin, ArrayBytesCodecBatchMixin and BytesBytesCodecBatchMixin + for subclassing. + """ + + @abstractmethod + async def decode_single(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput: + pass + + async def decode( + self, chunk_data_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]] + ) -> Iterable[CodecInput | None]: + return await batching_helper(self.decode_single, chunk_data_and_specs) + + @abstractmethod + async def encode_single( + self, chunk_data: CodecInput, chunk_spec: ArraySpec + ) -> CodecOutput | None: + pass + + async def encode( + self, chunk_data_and_specs: Iterable[tuple[CodecInput | None, ArraySpec]] + ) -> Iterable[CodecOutput | None]: + return await batching_helper(self.encode_single, chunk_data_and_specs) + + +class ArrayArrayCodecBatchMixin(CodecBatchMixin[NDBuffer, NDBuffer], ArrayArrayCodec): + pass + + +class ArrayBytesCodecBatchMixin(CodecBatchMixin[NDBuffer, Buffer], ArrayBytesCodec): + pass + + +class BytesBytesCodecBatchMixin(CodecBatchMixin[Buffer, Buffer], BytesBytesCodec): + pass + + +class ArrayBytesCodecPartialDecodeBatchMixin(ArrayBytesCodecPartialDecodeMixin): + @abstractmethod + async def decode_partial_single( + self, byte_getter: ByteGetter, selection: SliceSelection, chunk_spec: ArraySpec + ) -> NDBuffer | None: + pass + + async def decode_partial( + self, batch_info: Iterable[tuple[ByteGetter, SliceSelection, ArraySpec]] + ) -> Iterable[NDBuffer | None]: + return await concurrent_map( + [ + (byte_getter, selection, chunk_spec) + for byte_getter, selection, chunk_spec in batch_info + ], + self.decode_partial_single, + config.get("async.concurrency"), + ) + + +class ArrayBytesCodecPartialEncodeBatchMixin(ArrayBytesCodecPartialEncodeMixin): + @abstractmethod + async def encode_partial_single( + self, + byte_setter: ByteSetter, + chunk_array: NDBuffer, + selection: SliceSelection, + chunk_spec: ArraySpec, + ) -> None: + pass + + async def encode_partial( + self, batch_info: Iterable[tuple[ByteSetter, NDBuffer, SliceSelection, ArraySpec]] + ) -> None: + await concurrent_map( + [ + (byte_setter, chunk_array, selection, chunk_spec) + for byte_setter, chunk_array, selection, chunk_spec in batch_info + ], + self.encode_partial_single, + config.get("async.concurrency"), + ) diff --git a/src/zarr/codecs/pipeline.py b/src/zarr/codecs/pipeline.py index 1602eb1ef8..8396a0c2ce 100644 --- a/src/zarr/codecs/pipeline.py +++ b/src/zarr/codecs/pipeline.py @@ -1,63 +1,103 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Iterable -from dataclasses import dataclass +from itertools import islice +from typing import TYPE_CHECKING, Iterator, TypeVar, Iterable from warnings import warn +from dataclasses import dataclass +from zarr.config import config from zarr.abc.codec import ( + ByteGetter, + ByteSetter, + Codec, + CodecPipeline, ArrayArrayCodec, ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin, BytesBytesCodec, - Codec, ) -from zarr.abc.metadata import Metadata from zarr.buffer import Buffer, NDBuffer from zarr.codecs.registry import get_codec_class -from zarr.common import parse_named_configuration +from zarr.common import JSON, concurrent_map, parse_named_configuration +from zarr.indexing import is_total_slice +from zarr.metadata import ArrayMetadata if TYPE_CHECKING: - from typing import Iterator, List, Optional, Tuple, Union - from zarr.store import StorePath - from zarr.metadata import ArrayMetadata - from zarr.common import JSON, ArraySpec, SliceSelection + from typing_extensions import Self + from zarr.common import ArraySpec, SliceSelection + +T = TypeVar("T") +U = TypeVar("U") + + +def _unzip2(iterable: Iterable[tuple[T, U]]) -> tuple[list[T], list[U]]: + out0: list[T] = [] + out1: list[U] = [] + for item0, item1 in iterable: + out0.append(item0) + out1.append(item1) + return (out0, out1) + + +def batched(iterable: Iterable[T], n: int) -> Iterable[tuple[T, ...]]: + if n < 1: + raise ValueError("n must be at least one") + it = iter(iterable) + while batch := tuple(islice(it, n)): + yield batch + + +def resolve_batched(codec: Codec, chunk_specs: Iterable[ArraySpec]) -> Iterable[ArraySpec]: + return [codec.resolve_metadata(chunk_spec) for chunk_spec in chunk_specs] @dataclass(frozen=True) -class CodecPipeline(Metadata): - array_array_codecs: Tuple[ArrayArrayCodec, ...] +class BatchedCodecPipeline(CodecPipeline): + """Default codec pipeline. + + This batched codec pipeline divides the chunk batches into batches of a configurable + batch size ("mini-batch"). Fetching, decoding, encoding and storing are performed in + lock step for each mini-batch. Multiple mini-batches are processing concurrently. + """ + + array_array_codecs: tuple[ArrayArrayCodec, ...] array_bytes_codec: ArrayBytesCodec - bytes_bytes_codecs: Tuple[BytesBytesCodec, ...] + bytes_bytes_codecs: tuple[BytesBytesCodec, ...] + batch_size: int @classmethod - def from_dict(cls, data: Iterable[Union[JSON, Codec]]) -> CodecPipeline: - out: List[Codec] = [] + def from_dict(cls, data: Iterable[JSON | Codec], *, batch_size: int | None = None) -> Self: + out: list[Codec] = [] if not isinstance(data, Iterable): raise TypeError(f"Expected iterable, got {type(data)}") for c in data: - if isinstance(c, Codec): + if isinstance( + c, ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec + ): # Can't use Codec here because of mypy limitation out.append(c) else: name_parsed, _ = parse_named_configuration(c, require_configuration=False) out.append(get_codec_class(name_parsed).from_dict(c)) # type: ignore[arg-type] - return CodecPipeline.from_list(out) + return cls.from_list(out, batch_size=batch_size) def to_dict(self) -> JSON: return [c.to_dict() for c in self] - def evolve(self, array_spec: ArraySpec) -> CodecPipeline: - return CodecPipeline.from_list([c.evolve(array_spec) for c in self]) + def evolve(self, array_spec: ArraySpec) -> Self: + return type(self).from_list([c.evolve(array_spec) for c in self]) - @classmethod - def from_list(cls, codecs: List[Codec]) -> CodecPipeline: + @staticmethod + def codecs_from_list( + codecs: list[Codec], + ) -> tuple[tuple[ArrayArrayCodec, ...], ArrayBytesCodec, tuple[BytesBytesCodec, ...]]: from zarr.codecs.sharding import ShardingCodec if not any(isinstance(codec, ArrayBytesCodec) for codec in codecs): raise ValueError("Exactly one array-to-bytes codec is required.") - prev_codec: Optional[Codec] = None + prev_codec: Codec | None = None for codec in codecs: if prev_codec is not None: if isinstance(codec, ArrayBytesCodec) and isinstance(prev_codec, ArrayBytesCodec): @@ -86,27 +126,55 @@ def from_list(cls, codecs: List[Codec]) -> CodecPipeline: if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: warn( "Combining a `sharding_indexed` codec disables partial reads and " - + "writes, which may lead to inefficient performance." + + "writes, which may lead to inefficient performance.", + stacklevel=3, ) - return CodecPipeline( - array_array_codecs=tuple( - codec for codec in codecs if isinstance(codec, ArrayArrayCodec) - ), - array_bytes_codec=next(codec for codec in codecs if isinstance(codec, ArrayBytesCodec)), - bytes_bytes_codecs=tuple( - codec for codec in codecs if isinstance(codec, BytesBytesCodec) - ), + return ( + tuple(codec for codec in codecs if isinstance(codec, ArrayArrayCodec)), + next(codec for codec in codecs if isinstance(codec, ArrayBytesCodec)), + tuple(codec for codec in codecs if isinstance(codec, BytesBytesCodec)), + ) + + @classmethod + def from_list(cls, codecs: list[Codec], *, batch_size: int | None = None) -> Self: + array_array_codecs, array_bytes_codec, bytes_bytes_codecs = cls.codecs_from_list(codecs) + + return cls( + array_array_codecs=array_array_codecs, + array_bytes_codec=array_bytes_codec, + bytes_bytes_codecs=bytes_bytes_codecs, + batch_size=batch_size or config.get("codec_pipeline.batch_size"), ) @property def supports_partial_decode(self) -> bool: + """Determines whether the codec pipeline supports partial decoding. + + Currently, only codec pipelines with a single ArrayBytesCodec that supports + partial decoding can support partial decoding. This limitation is due to the fact + that ArrayArrayCodecs can change the slice selection leading to non-contiguous + slices and BytesBytesCodecs can change the chunk bytes in a way that slice + selections cannot be attributed to byte ranges anymore which renders partial + decoding infeasible. + + This limitation may softened in the future.""" return (len(self.array_array_codecs) + len(self.bytes_bytes_codecs)) == 0 and isinstance( self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin ) @property def supports_partial_encode(self) -> bool: + """Determines whether the codec pipeline supports partial encoding. + + Currently, only codec pipelines with a single ArrayBytesCodec that supports + partial encoding can support partial encoding. This limitation is due to the fact + that ArrayArrayCodecs can change the slice selection leading to non-contiguous + slices and BytesBytesCodecs can change the chunk bytes in a way that slice + selections cannot be attributed to byte ranges anymore which renders partial + encoding infeasible. + + This limitation may softened in the future.""" return (len(self.array_array_codecs) + len(self.bytes_bytes_codecs)) == 0 and isinstance( self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin ) @@ -124,105 +192,272 @@ def validate(self, array_metadata: ArrayMetadata) -> None: for codec in self: codec.validate(array_metadata) - def _codecs_with_resolved_metadata( - self, array_spec: ArraySpec - ) -> Tuple[ - List[Tuple[ArrayArrayCodec, ArraySpec]], - Tuple[ArrayBytesCodec, ArraySpec], - List[Tuple[BytesBytesCodec, ArraySpec]], + def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: + for codec in self: + byte_length = codec.compute_encoded_size(byte_length, array_spec) + array_spec = codec.resolve_metadata(array_spec) + return byte_length + + def _codecs_with_resolved_metadata_batched( + self, chunk_specs: Iterable[ArraySpec] + ) -> tuple[ + list[tuple[ArrayArrayCodec, list[ArraySpec]]], + tuple[ArrayBytesCodec, list[ArraySpec]], + list[tuple[BytesBytesCodec, list[ArraySpec]]], ]: - aa_codecs_with_spec: List[Tuple[ArrayArrayCodec, ArraySpec]] = [] + aa_codecs_with_spec: list[tuple[ArrayArrayCodec, list[ArraySpec]]] = [] + chunk_specs = list(chunk_specs) for aa_codec in self.array_array_codecs: - aa_codecs_with_spec.append((aa_codec, array_spec)) - array_spec = aa_codec.resolve_metadata(array_spec) + aa_codecs_with_spec.append((aa_codec, chunk_specs)) + chunk_specs = [aa_codec.resolve_metadata(chunk_spec) for chunk_spec in chunk_specs] - ab_codec_with_spec = (self.array_bytes_codec, array_spec) - array_spec = self.array_bytes_codec.resolve_metadata(array_spec) + ab_codec_with_spec = (self.array_bytes_codec, chunk_specs) + chunk_specs = [ + self.array_bytes_codec.resolve_metadata(chunk_spec) for chunk_spec in chunk_specs + ] - bb_codecs_with_spec: List[Tuple[BytesBytesCodec, ArraySpec]] = [] + bb_codecs_with_spec: list[tuple[BytesBytesCodec, list[ArraySpec]]] = [] for bb_codec in self.bytes_bytes_codecs: - bb_codecs_with_spec.append((bb_codec, array_spec)) - array_spec = bb_codec.resolve_metadata(array_spec) + bb_codecs_with_spec.append((bb_codec, chunk_specs)) + chunk_specs = [bb_codec.resolve_metadata(chunk_spec) for chunk_spec in chunk_specs] return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) - async def decode( + async def decode_batch( self, - chunk_bytes: Buffer, - array_spec: ArraySpec, - ) -> NDBuffer: + chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[NDBuffer | None]: + chunk_bytes_batch: Iterable[Buffer | None] + chunk_bytes_batch, chunk_specs = _unzip2(chunk_bytes_and_specs) + ( aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec, - ) = self._codecs_with_resolved_metadata(array_spec) + ) = self._codecs_with_resolved_metadata_batched(chunk_specs) - for bb_codec, array_spec in bb_codecs_with_spec[::-1]: - chunk_bytes = await bb_codec.decode(chunk_bytes, array_spec) + for bb_codec, chunk_spec_batch in bb_codecs_with_spec[::-1]: + chunk_bytes_batch = await bb_codec.decode(zip(chunk_bytes_batch, chunk_spec_batch)) - ab_codec, array_spec = ab_codec_with_spec - chunk_array = await ab_codec.decode(chunk_bytes, array_spec) + ab_codec, chunk_spec_batch = ab_codec_with_spec + chunk_array_batch = await ab_codec.decode(zip(chunk_bytes_batch, chunk_spec_batch)) - for aa_codec, array_spec in aa_codecs_with_spec[::-1]: - chunk_array = await aa_codec.decode(chunk_array, array_spec) + for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]: + chunk_array_batch = await aa_codec.decode(zip(chunk_array_batch, chunk_spec_batch)) - return chunk_array + return chunk_array_batch - async def decode_partial( + async def decode_partial_batch( self, - store_path: StorePath, - selection: SliceSelection, - chunk_spec: ArraySpec, - ) -> Optional[NDBuffer]: + batch_info: Iterable[tuple[ByteGetter, SliceSelection, ArraySpec]], + ) -> Iterable[NDBuffer | None]: assert self.supports_partial_decode assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin) - return await self.array_bytes_codec.decode_partial(store_path, selection, chunk_spec) + return await self.array_bytes_codec.decode_partial(batch_info) - async def encode( + async def encode_batch( self, - chunk_array: NDBuffer, - array_spec: ArraySpec, - ) -> Optional[Buffer]: - ( - aa_codecs_with_spec, - ab_codec_with_spec, - bb_codecs_with_spec, - ) = self._codecs_with_resolved_metadata(array_spec) - - for aa_codec, array_spec in aa_codecs_with_spec: - chunk_array_maybe = await aa_codec.encode(chunk_array, array_spec) - if chunk_array_maybe is None: - return None - chunk_array = chunk_array_maybe - - ab_codec, array_spec = ab_codec_with_spec - chunk_bytes_maybe = await ab_codec.encode(chunk_array, array_spec) - if chunk_bytes_maybe is None: - return None - chunk_bytes = chunk_bytes_maybe - - for bb_codec, array_spec in bb_codecs_with_spec: - chunk_bytes_maybe = await bb_codec.encode(chunk_bytes, array_spec) - if chunk_bytes_maybe is None: - return None - chunk_bytes = chunk_bytes_maybe - - assert isinstance(chunk_bytes, Buffer) - return chunk_bytes - - async def encode_partial( + chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + chunk_array_batch: Iterable[NDBuffer | None] + chunk_specs: Iterable[ArraySpec] + chunk_array_batch, chunk_specs = _unzip2(chunk_arrays_and_specs) + + for aa_codec in self.array_array_codecs: + chunk_array_batch = await aa_codec.encode(zip(chunk_array_batch, chunk_specs)) + chunk_specs = resolve_batched(aa_codec, chunk_specs) + + chunk_bytes_batch = await self.array_bytes_codec.encode(zip(chunk_array_batch, chunk_specs)) + chunk_specs = resolve_batched(self.array_bytes_codec, chunk_specs) + + for bb_codec in self.bytes_bytes_codecs: + chunk_bytes_batch = await bb_codec.encode(zip(chunk_bytes_batch, chunk_specs)) + chunk_specs = resolve_batched(bb_codec, chunk_specs) + + return chunk_bytes_batch + + async def encode_partial_batch( self, - store_path: StorePath, - chunk_array: NDBuffer, - selection: SliceSelection, - chunk_spec: ArraySpec, + batch_info: Iterable[tuple[ByteSetter, NDBuffer, SliceSelection, ArraySpec]], ) -> None: assert self.supports_partial_encode assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin) - await self.array_bytes_codec.encode_partial(store_path, chunk_array, selection, chunk_spec) + await self.array_bytes_codec.encode_partial(batch_info) - def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: - for codec in self: - byte_length = codec.compute_encoded_size(byte_length, array_spec) - array_spec = codec.resolve_metadata(array_spec) - return byte_length + async def read_batch( + self, + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SliceSelection, SliceSelection]], + out: NDBuffer, + ) -> None: + if self.supports_partial_decode: + chunk_array_batch = await self.decode_partial_batch( + [ + (byte_getter, chunk_selection, chunk_spec) + for byte_getter, chunk_spec, chunk_selection, _ in batch_info + ] + ) + for chunk_array, (_, chunk_spec, _, out_selection) in zip( + chunk_array_batch, batch_info + ): + if chunk_array is not None: + out[out_selection] = chunk_array + else: + out[out_selection] = chunk_spec.fill_value + else: + chunk_bytes_batch = await concurrent_map( + [(byte_getter,) for byte_getter, _, _, _ in batch_info], + lambda byte_getter: byte_getter.get(), + config.get("async.concurrency"), + ) + chunk_array_batch = await self.decode_batch( + [ + (chunk_bytes, chunk_spec) + for chunk_bytes, (_, chunk_spec, _, _) in zip(chunk_bytes_batch, batch_info) + ], + ) + for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip( + chunk_array_batch, batch_info + ): + if chunk_array is not None: + tmp = chunk_array[chunk_selection] + out[out_selection] = tmp + else: + out[out_selection] = chunk_spec.fill_value + + async def write_batch( + self, + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SliceSelection, SliceSelection]], + value: NDBuffer, + ) -> None: + if self.supports_partial_encode: + await self.encode_partial_batch( + [ + (byte_setter, value[out_selection], chunk_selection, chunk_spec) + for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info + ], + ) + + else: + # Read existing bytes if not total slice + async def _read_key(byte_setter: ByteSetter | None) -> Buffer | None: + if byte_setter is None: + return None + return await byte_setter.get() + + chunk_bytes_batch: Iterable[Buffer | None] + chunk_bytes_batch = await concurrent_map( + [ + (None if is_total_slice(chunk_selection, chunk_spec.shape) else byte_setter,) + for byte_setter, chunk_spec, chunk_selection, _ in batch_info + ], + _read_key, + config.get("async.concurrency"), + ) + chunk_array_batch = await self.decode_batch( + [ + (chunk_bytes, chunk_spec) + for chunk_bytes, (_, chunk_spec, _, _) in zip(chunk_bytes_batch, batch_info) + ], + ) + + def _merge_chunk_array( + existing_chunk_array: NDBuffer | None, + new_chunk_array_slice: NDBuffer, + chunk_spec: ArraySpec, + chunk_selection: SliceSelection, + ) -> NDBuffer: + if is_total_slice(chunk_selection, chunk_spec.shape): + return new_chunk_array_slice + if existing_chunk_array is None: + chunk_array = NDBuffer.create( + shape=chunk_spec.shape, + dtype=chunk_spec.dtype, + order=chunk_spec.order, + fill_value=chunk_spec.fill_value, + ) + else: + chunk_array = existing_chunk_array.copy() # make a writable copy + chunk_array[chunk_selection] = new_chunk_array_slice + return chunk_array + + chunk_array_batch = [ + _merge_chunk_array(chunk_array, value[out_selection], chunk_spec, chunk_selection) + for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip( + chunk_array_batch, batch_info + ) + ] + + chunk_array_batch = [ + None + if chunk_array is None or chunk_array.all_equal(chunk_spec.fill_value) + else chunk_array + for chunk_array, (_, chunk_spec, _, _) in zip(chunk_array_batch, batch_info) + ] + + chunk_bytes_batch = await self.encode_batch( + [ + (chunk_array, chunk_spec) + for chunk_array, (_, chunk_spec, _, _) in zip(chunk_array_batch, batch_info) + ], + ) + + async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> None: + if chunk_bytes is None: + await byte_setter.delete() + else: + await byte_setter.set(chunk_bytes) + + await concurrent_map( + [ + (byte_setter, chunk_bytes) + for chunk_bytes, (byte_setter, _, _, _) in zip(chunk_bytes_batch, batch_info) + ], + _write_key, + config.get("async.concurrency"), + ) + + async def decode( + self, + chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[NDBuffer | None]: + output: list[NDBuffer | None] = [] + for batch_info in batched(chunk_bytes_and_specs, self.batch_size): + output.extend(await self.decode_batch(batch_info)) + return output + + async def encode( + self, + chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + output: list[Buffer | None] = [] + for single_batch_info in batched(chunk_arrays_and_specs, self.batch_size): + output.extend(await self.encode_batch(single_batch_info)) + return output + + async def read( + self, + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SliceSelection, SliceSelection]], + out: NDBuffer, + ) -> None: + await concurrent_map( + [ + (single_batch_info, out) + for single_batch_info in batched(batch_info, self.batch_size) + ], + self.read_batch, + config.get("async.concurrency"), + ) + + async def write( + self, + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SliceSelection, SliceSelection]], + value: NDBuffer, + ) -> None: + await concurrent_map( + [ + (single_batch_info, value) + for single_batch_info in batched(batch_info, self.batch_size) + ], + self.write_batch, + config.get("async.concurrency"), + ) diff --git a/src/zarr/codecs/registry.py b/src/zarr/codecs/registry.py index 7d46041255..b981f1f36c 100644 --- a/src/zarr/codecs/registry.py +++ b/src/zarr/codecs/registry.py @@ -7,7 +7,6 @@ from importlib.metadata import EntryPoint, entry_points as get_entry_points - __codec_registry: Dict[str, Type[Codec]] = {} __lazy_load_codecs: Dict[str, EntryPoint] = {} diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index b63d1e499b..dd7cdcd0b4 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -1,56 +1,47 @@ from __future__ import annotations from enum import Enum -from typing import TYPE_CHECKING, Iterable, Mapping, NamedTuple, Union -from dataclasses import dataclass, replace +from typing import TYPE_CHECKING, Iterable, Mapping, MutableMapping, NamedTuple, Tuple, Union +from dataclasses import dataclass, field, replace from functools import lru_cache import numpy as np -from zarr.abc.codec import ( - Codec, - ArrayBytesCodec, - ArrayBytesCodecPartialDecodeMixin, - ArrayBytesCodecPartialEncodeMixin, -) +from zarr.abc.codec import ByteGetter, ByteSetter, Codec, CodecPipeline from zarr.codecs.bytes import BytesCodec from zarr.codecs.crc32c_ import Crc32cCodec -from zarr.codecs.pipeline import CodecPipeline +from zarr.codecs.mixins import ( + ArrayBytesCodecBatchMixin, + ArrayBytesCodecPartialDecodeBatchMixin, + ArrayBytesCodecPartialEncodeBatchMixin, +) +from zarr.codecs.pipeline import BatchedCodecPipeline from zarr.codecs.registry import register_codec from zarr.common import ( ArraySpec, + ChunkCoords, ChunkCoordsLike, - concurrent_map, parse_enum, parse_named_configuration, parse_shapelike, product, ) -from zarr.config import config from zarr.chunk_grids import RegularChunkGrid from zarr.indexing import ( BasicIndexer, c_order_iter, - is_total_slice, morton_order_iter, ) -from zarr.metadata import ( - ArrayMetadata, - parse_codecs, -) +from zarr.metadata import ArrayMetadata, parse_codecs from zarr.buffer import Buffer, NDBuffer if TYPE_CHECKING: - from typing import Awaitable, Callable, Dict, Iterator, List, Optional, Set, Tuple + from typing import Awaitable, Callable, Dict, Iterator, Optional, Set from typing_extensions import Self - - from zarr.store import StorePath - from zarr.common import ( - JSON, - ChunkCoords, - SliceSelection, - ) + from zarr.common import JSON, SliceSelection MAX_UINT_64 = 2**64 - 1 +ShardMapping = Mapping[ChunkCoords, Buffer] +ShardMutableMapping = MutableMapping[ChunkCoords, Buffer] class ShardingCodecIndexLocation(Enum): @@ -62,6 +53,28 @@ def parse_index_location(data: JSON) -> ShardingCodecIndexLocation: return parse_enum(data, ShardingCodecIndexLocation) +@dataclass(frozen=True) +class _ShardingByteGetter(ByteGetter): + shard_dict: ShardMapping + chunk_coords: ChunkCoords + + async def get(self, byte_range: Optional[Tuple[int, Optional[int]]] = None) -> Optional[Buffer]: + assert byte_range is None, "byte_range is not supported within shards" + return self.shard_dict.get(self.chunk_coords) + + +@dataclass(frozen=True) +class _ShardingByteSetter(_ShardingByteGetter, ByteSetter): + shard_dict: ShardMutableMapping + + async def set(self, value: Buffer, byte_range: Optional[Tuple[int, int]] = None) -> None: + assert byte_range is None, "byte_range is not supported within shards" + self.shard_dict[self.chunk_coords] = value + + async def delete(self) -> None: + del self.shard_dict[self.chunk_coords] + + class _ShardIndex(NamedTuple): # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) offsets_and_lengths: np.ndarray @@ -79,6 +92,9 @@ def _localize_chunk(self, chunk_coords: ChunkCoords) -> ChunkCoords: def is_all_empty(self) -> bool: return bool(np.array_equiv(self.offsets_and_lengths, MAX_UINT_64)) + def get_full_chunk_map(self) -> np.ndarray: + return self.offsets_and_lengths[..., 0] != MAX_UINT_64 + def get_chunk_slice(self, chunk_coords: ChunkCoords) -> Optional[Tuple[int, int]]: localized_chunk = self._localize_chunk(chunk_coords) chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] @@ -125,14 +141,14 @@ def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardIndex: return cls(offsets_and_lengths) -class _ShardProxy(Mapping): - index: _ShardIndex +class _ShardReader(ShardMapping): buf: Buffer + index: _ShardIndex @classmethod async def from_bytes( cls, buf: Buffer, codec: ShardingCodec, chunks_per_shard: ChunkCoords - ) -> _ShardProxy: + ) -> _ShardReader: shard_index_size = codec._shard_index_size(chunks_per_shard) obj = cls() obj.buf = buf @@ -145,18 +161,18 @@ async def from_bytes( return obj @classmethod - def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardProxy: + def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardReader: index = _ShardIndex.create_empty(chunks_per_shard) obj = cls() obj.buf = Buffer.create_zero_length() obj.index = index return obj - def __getitem__(self, chunk_coords: ChunkCoords) -> Optional[Buffer]: + def __getitem__(self, chunk_coords: ChunkCoords) -> Buffer: chunk_byte_slice = self.index.get_chunk_slice(chunk_coords) if chunk_byte_slice: return self.buf[chunk_byte_slice[0] : chunk_byte_slice[1]] - return None + raise KeyError def __len__(self) -> int: return int(self.index.offsets_and_lengths.size / 2) @@ -164,8 +180,11 @@ def __len__(self) -> int: def __iter__(self) -> Iterator[ChunkCoords]: return c_order_iter(self.index.offsets_and_lengths.shape[:-1]) + def is_empty(self) -> bool: + return self.index.is_all_empty() + -class _ShardBuilder(_ShardProxy): +class _ShardBuilder(_ShardReader, ShardMutableMapping): buf: Buffer index: _ShardIndex @@ -174,7 +193,7 @@ def merge_with_morton_order( cls, chunks_per_shard: ChunkCoords, tombstones: Set[ChunkCoords], - *shard_dicts: Mapping[ChunkCoords, Buffer], + *shard_dicts: ShardMapping, ) -> _ShardBuilder: obj = cls.create_empty(chunks_per_shard) for chunk_coords in morton_order_iter(chunks_per_shard): @@ -183,7 +202,7 @@ def merge_with_morton_order( for shard_dict in shard_dicts: maybe_value = shard_dict.get(chunk_coords, None) if maybe_value is not None: - obj.append(chunk_coords, maybe_value) + obj[chunk_coords] = maybe_value break return obj @@ -194,12 +213,15 @@ def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardBuilder: obj.index = _ShardIndex.create_empty(chunks_per_shard) return obj - def append(self, chunk_coords: ChunkCoords, value: Buffer) -> None: + def __setitem__(self, chunk_coords: ChunkCoords, value: Buffer) -> None: chunk_start = len(self.buf) chunk_length = len(value) self.buf = self.buf + value self.index.set_chunk_slice(chunk_coords, slice(chunk_start, chunk_start + chunk_length)) + def __delitem__(self, chunk_coords: ChunkCoords) -> None: + raise NotImplementedError + async def finalize( self, index_location: ShardingCodecIndexLocation, @@ -215,9 +237,58 @@ async def finalize( return out_buf +@dataclass(frozen=True) +class _MergingShardBuilder(ShardMutableMapping): + old_dict: _ShardReader + new_dict: _ShardBuilder + tombstones: Set[ChunkCoords] = field(default_factory=set) + + def __getitem__(self, chunk_coords: ChunkCoords) -> Buffer: + chunk_bytes_maybe = self.new_dict.get(chunk_coords) + if chunk_bytes_maybe is not None: + return chunk_bytes_maybe + return self.old_dict[chunk_coords] + + def __setitem__(self, chunk_coords: ChunkCoords, value: Buffer) -> None: + self.new_dict[chunk_coords] = value + + def __delitem__(self, chunk_coords: ChunkCoords) -> None: + self.tombstones.add(chunk_coords) + + def __len__(self) -> int: + return self.old_dict.__len__() + + def __iter__(self) -> Iterator[ChunkCoords]: + return self.old_dict.__iter__() + + def is_empty(self) -> bool: + full_chunk_coords_map = self.old_dict.index.get_full_chunk_map() + full_chunk_coords_map = np.logical_or( + full_chunk_coords_map, self.new_dict.index.get_full_chunk_map() + ) + for tombstone in self.tombstones: + full_chunk_coords_map[tombstone] = False + return bool(np.array_equiv(full_chunk_coords_map, False)) + + async def finalize( + self, + index_location: ShardingCodecIndexLocation, + index_encoder: Callable[[_ShardIndex], Awaitable[Buffer]], + ) -> Buffer: + shard_builder = _ShardBuilder.merge_with_morton_order( + self.new_dict.index.chunks_per_shard, + self.tombstones, + self.new_dict, + self.old_dict, + ) + return await shard_builder.finalize(index_location, index_encoder) + + @dataclass(frozen=True) class ShardingCodec( - ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin + ArrayBytesCodecBatchMixin, + ArrayBytesCodecPartialDecodeBatchMixin, + ArrayBytesCodecPartialEncodeBatchMixin, ): chunk_shape: ChunkCoords codecs: CodecPipeline @@ -234,12 +305,14 @@ def __init__( ) -> None: chunk_shape_parsed = parse_shapelike(chunk_shape) codecs_parsed = ( - parse_codecs(codecs) if codecs is not None else CodecPipeline.from_list([BytesCodec()]) + parse_codecs(codecs) + if codecs is not None + else BatchedCodecPipeline.from_list([BytesCodec()]) ) index_codecs_parsed = ( parse_codecs(index_codecs) if index_codecs is not None - else CodecPipeline.from_list([BytesCodec(), Crc32cCodec()]) + else BatchedCodecPipeline.from_list([BytesCodec(), Crc32cCodec()]) ) index_location_parsed = ( parse_index_location(index_location) @@ -252,6 +325,11 @@ def __init__( object.__setattr__(self, "index_codecs", index_codecs_parsed) object.__setattr__(self, "index_location", index_location_parsed) + # Use instance-local lru_cache to avoid memory leaks + object.__setattr__(self, "_get_chunk_spec", lru_cache()(self._get_chunk_spec)) + object.__setattr__(self, "_get_index_chunk_spec", lru_cache()(self._get_index_chunk_spec)) + object.__setattr__(self, "_get_chunks_per_shard", lru_cache()(self._get_chunks_per_shard)) + @classmethod def from_dict(cls, data: Dict[str, JSON]) -> Self: _, configuration_parsed = parse_named_configuration(data, "sharding_indexed") @@ -295,65 +373,63 @@ def validate(self, array_metadata: ArrayMetadata) -> None: + "shard's inner `chunk_shape`." ) - async def decode( + async def decode_single( self, shard_bytes: Buffer, shard_spec: ArraySpec, ) -> NDBuffer: - # print("decode") shard_shape = shard_spec.shape chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) + chunk_spec = self._get_chunk_spec(shard_spec) indexer = BasicIndexer( tuple(slice(0, s) for s in shard_shape), shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) # setup output array out = NDBuffer.create( shape=shard_shape, dtype=shard_spec.dtype, order=shard_spec.order, fill_value=0 ) - shard_dict = await _ShardProxy.from_bytes(shard_bytes, self, chunks_per_shard) + shard_dict = await _ShardReader.from_bytes(shard_bytes, self, chunks_per_shard) if shard_dict.index.is_all_empty(): out.fill(shard_spec.fill_value) return out # decoding chunks and writing them into the output buffer - await concurrent_map( + await self.codecs.read( [ ( - shard_dict, - chunk_coords, + _ShardingByteGetter(shard_dict, chunk_coords), + chunk_spec, chunk_selection, out_selection, - shard_spec, - out, ) for chunk_coords, chunk_selection, out_selection in indexer ], - self._read_chunk, - config.get("async.concurrency"), + out, ) return out - async def decode_partial( + async def decode_partial_single( self, - store_path: StorePath, + byte_getter: ByteGetter, selection: SliceSelection, shard_spec: ArraySpec, ) -> Optional[NDBuffer]: shard_shape = shard_spec.shape chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) + chunk_spec = self._get_chunk_spec(shard_spec) indexer = BasicIndexer( selection, shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) # setup output array @@ -365,63 +441,42 @@ async def decode_partial( all_chunk_coords = set(chunk_coords for chunk_coords, _, _ in indexed_chunks) # reading bytes of all requested chunks - shard_dict: Mapping[ChunkCoords, Buffer] = {} + shard_dict: ShardMapping = {} if self._is_total_shard(all_chunk_coords, chunks_per_shard): # read entire shard - shard_dict_maybe = await self._load_full_shard_maybe(store_path, chunks_per_shard) + shard_dict_maybe = await self._load_full_shard_maybe(byte_getter, chunks_per_shard) if shard_dict_maybe is None: return None shard_dict = shard_dict_maybe else: # read some chunks within the shard - shard_index = await self._load_shard_index_maybe(store_path, chunks_per_shard) + shard_index = await self._load_shard_index_maybe(byte_getter, chunks_per_shard) if shard_index is None: return None shard_dict = {} for chunk_coords in all_chunk_coords: chunk_byte_slice = shard_index.get_chunk_slice(chunk_coords) if chunk_byte_slice: - chunk_bytes = await store_path.get(chunk_byte_slice) + chunk_bytes = await byte_getter.get(chunk_byte_slice) if chunk_bytes: shard_dict[chunk_coords] = chunk_bytes # decoding chunks and writing them into the output buffer - await concurrent_map( + await self.codecs.read( [ ( - shard_dict, - chunk_coords, + _ShardingByteGetter(shard_dict, chunk_coords), + chunk_spec, chunk_selection, out_selection, - shard_spec, - out, ) - for chunk_coords, chunk_selection, out_selection in indexed_chunks + for chunk_coords, chunk_selection, out_selection in indexer ], - self._read_chunk, - config.get("async.concurrency"), + out, ) return out - async def _read_chunk( - self, - shard_dict: Mapping[ChunkCoords, Optional[Buffer]], - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - shard_spec: ArraySpec, - out: NDBuffer, - ) -> None: - chunk_spec = self._get_chunk_spec(shard_spec) - chunk_bytes = shard_dict.get(chunk_coords, None) - if chunk_bytes is not None: - chunk_array = await self.codecs.decode(chunk_bytes, chunk_spec) - tmp = chunk_array[chunk_selection] - out[out_selection] = tmp - else: - out[out_selection] = chunk_spec.fill_value - - async def encode( + async def encode_single( self, shard_array: NDBuffer, shard_spec: ArraySpec, @@ -429,150 +484,77 @@ async def encode( shard_shape = shard_spec.shape chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) + chunk_spec = self._get_chunk_spec(shard_spec) indexer = list( BasicIndexer( tuple(slice(0, s) for s in shard_shape), shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) ) - async def _write_chunk( - shard_array: NDBuffer, - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - ) -> Tuple[ChunkCoords, Optional[Buffer]]: - assert isinstance(shard_array, NDBuffer) - if is_total_slice(chunk_selection, chunk_shape): - chunk_array = shard_array[out_selection] - else: - # handling writing partial chunks - chunk_array = NDBuffer.create( - shape=chunk_shape, - dtype=shard_spec.dtype, - ) - chunk_array.fill(shard_spec.fill_value) - chunk_array[chunk_selection] = shard_array[out_selection] - if not chunk_array.all_equal(shard_spec.fill_value): - chunk_spec = self._get_chunk_spec(shard_spec) - return ( - chunk_coords, - await self.codecs.encode(chunk_array, chunk_spec), - ) - return (chunk_coords, None) + shard_builder = _ShardBuilder.create_empty(chunks_per_shard) - # assembling and encoding chunks within the shard - encoded_chunks: List[Tuple[ChunkCoords, Optional[Buffer]]] = await concurrent_map( + await self.codecs.write( [ - (shard_array, chunk_coords, chunk_selection, out_selection) + ( + _ShardingByteSetter(shard_builder, chunk_coords), + chunk_spec, + chunk_selection, + out_selection, + ) for chunk_coords, chunk_selection, out_selection in indexer ], - _write_chunk, - config.get("async.concurrency"), + shard_array, ) - if len(encoded_chunks) == 0: - return None - - shard_builder = _ShardBuilder.create_empty(chunks_per_shard) - for chunk_coords, chunk_bytes in encoded_chunks: - if chunk_bytes is not None: - shard_builder.append(chunk_coords, chunk_bytes) return await shard_builder.finalize(self.index_location, self._encode_shard_index) - async def encode_partial( + async def encode_partial_single( self, - store_path: StorePath, + byte_setter: ByteSetter, shard_array: NDBuffer, selection: SliceSelection, shard_spec: ArraySpec, ) -> None: - # print("encode_partial") shard_shape = shard_spec.shape chunk_shape = self.chunk_shape chunks_per_shard = self._get_chunks_per_shard(shard_spec) chunk_spec = self._get_chunk_spec(shard_spec) - old_shard_dict = ( - await self._load_full_shard_maybe(store_path, chunks_per_shard) - ) or _ShardProxy.create_empty(chunks_per_shard) - new_shard_builder = _ShardBuilder.create_empty(chunks_per_shard) - tombstones: Set[ChunkCoords] = set() + shard_dict = _MergingShardBuilder( + await self._load_full_shard_maybe(byte_setter, chunks_per_shard) + or _ShardReader.create_empty(chunks_per_shard), + _ShardBuilder.create_empty(chunks_per_shard), + ) indexer = list( BasicIndexer( selection, shape=shard_shape, - chunk_shape=chunk_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), ) ) - async def _write_chunk( - chunk_coords: ChunkCoords, - chunk_selection: SliceSelection, - out_selection: SliceSelection, - ) -> Tuple[ChunkCoords, Optional[Buffer]]: - if is_total_slice(chunk_selection, self.chunk_shape): - chunk_array = shard_array[out_selection] - else: - # handling writing partial chunks - # read chunk first - chunk_bytes = old_shard_dict.get(chunk_coords, None) - - # merge new value - if chunk_bytes is None: - chunk_array = NDBuffer.create( - shape=self.chunk_shape, - dtype=shard_spec.dtype, - ) - chunk_array.fill(shard_spec.fill_value) - else: - chunk_array = ( - await self.codecs.decode(chunk_bytes, chunk_spec) - ).copy() # make a writable copy - chunk_array[chunk_selection] = shard_array[out_selection] - - if not chunk_array.all_equal(shard_spec.fill_value): - return ( - chunk_coords, - await self.codecs.encode(chunk_array, chunk_spec), - ) - else: - return (chunk_coords, None) - - encoded_chunks: List[Tuple[ChunkCoords, Optional[Buffer]]] = await concurrent_map( + await self.codecs.write( [ ( - chunk_coords, + _ShardingByteSetter(shard_dict, chunk_coords), + chunk_spec, chunk_selection, out_selection, ) for chunk_coords, chunk_selection, out_selection in indexer ], - _write_chunk, - config.get("async.concurrency"), - ) - - for chunk_coords, chunk_bytes in encoded_chunks: - if chunk_bytes is not None: - new_shard_builder.append(chunk_coords, chunk_bytes) - else: - tombstones.add(chunk_coords) - - shard_builder = _ShardBuilder.merge_with_morton_order( - chunks_per_shard, - tombstones, - new_shard_builder, - old_shard_dict, + shard_array, ) - if shard_builder.index.is_all_empty(): - await store_path.delete() + if shard_dict.is_empty(): + await byte_setter.delete() else: - await store_path.set( - await shard_builder.finalize( + await byte_setter.set( + await shard_dict.finalize( self.index_location, self._encode_shard_index, ) @@ -588,19 +570,28 @@ def _is_total_shard( async def _decode_shard_index( self, index_bytes: Buffer, chunks_per_shard: ChunkCoords ) -> _ShardIndex: - return _ShardIndex( - ( + index_array = next( + iter( await self.index_codecs.decode( - index_bytes, - self._get_index_chunk_spec(chunks_per_shard), + [(index_bytes, self._get_index_chunk_spec(chunks_per_shard))], ) - ).as_numpy_array() + ) ) + assert index_array is not None + return _ShardIndex(index_array.as_numpy_array()) async def _encode_shard_index(self, index: _ShardIndex) -> Buffer: - index_bytes = await self.index_codecs.encode( - NDBuffer.from_numpy_array(index.offsets_and_lengths), - self._get_index_chunk_spec(index.chunks_per_shard), + index_bytes = next( + iter( + await self.index_codecs.encode( + [ + ( + NDBuffer.from_numpy_array(index.offsets_and_lengths), + self._get_index_chunk_spec(index.chunks_per_shard), + ) + ], + ) + ) ) assert index_bytes is not None assert isinstance(index_bytes, Buffer) @@ -611,7 +602,6 @@ def _shard_index_size(self, chunks_per_shard: ChunkCoords) -> int: 16 * product(chunks_per_shard), self._get_index_chunk_spec(chunks_per_shard) ) - @lru_cache def _get_index_chunk_spec(self, chunks_per_shard: ChunkCoords) -> ArraySpec: return ArraySpec( shape=chunks_per_shard + (2,), @@ -620,7 +610,6 @@ def _get_index_chunk_spec(self, chunks_per_shard: ChunkCoords) -> ArraySpec: order="C", # Note: this is hard-coded for simplicity -- it is not surfaced into user code ) - @lru_cache def _get_chunk_spec(self, shard_spec: ArraySpec) -> ArraySpec: return ArraySpec( shape=self.chunk_shape, @@ -629,7 +618,6 @@ def _get_chunk_spec(self, shard_spec: ArraySpec) -> ArraySpec: order=shard_spec.order, ) - @lru_cache def _get_chunks_per_shard(self, shard_spec: ArraySpec) -> ChunkCoords: return tuple( s // c @@ -640,31 +628,31 @@ def _get_chunks_per_shard(self, shard_spec: ArraySpec) -> ChunkCoords: ) async def _load_shard_index_maybe( - self, store_path: StorePath, chunks_per_shard: ChunkCoords + self, byte_getter: ByteGetter, chunks_per_shard: ChunkCoords ) -> Optional[_ShardIndex]: shard_index_size = self._shard_index_size(chunks_per_shard) if self.index_location == ShardingCodecIndexLocation.start: - index_bytes = await store_path.get((0, shard_index_size)) + index_bytes = await byte_getter.get((0, shard_index_size)) else: - index_bytes = await store_path.get((-shard_index_size, None)) + index_bytes = await byte_getter.get((-shard_index_size, None)) if index_bytes is not None: return await self._decode_shard_index(index_bytes, chunks_per_shard) return None async def _load_shard_index( - self, store_path: StorePath, chunks_per_shard: ChunkCoords + self, byte_getter: ByteGetter, chunks_per_shard: ChunkCoords ) -> _ShardIndex: return ( - await self._load_shard_index_maybe(store_path, chunks_per_shard) + await self._load_shard_index_maybe(byte_getter, chunks_per_shard) ) or _ShardIndex.create_empty(chunks_per_shard) async def _load_full_shard_maybe( - self, store_path: StorePath, chunks_per_shard: ChunkCoords - ) -> Optional[_ShardProxy]: - shard_bytes = await store_path.get() + self, byte_getter: ByteGetter, chunks_per_shard: ChunkCoords + ) -> Optional[_ShardReader]: + shard_bytes = await byte_getter.get() return ( - await _ShardProxy.from_bytes(shard_bytes, self, chunks_per_shard) + await _ShardReader.from_bytes(shard_bytes, self, chunks_per_shard) if shard_bytes else None ) diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index 70ae30f908..5d4d2a7b84 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -3,18 +3,16 @@ from dataclasses import dataclass, replace +from zarr.codecs.mixins import ArrayArrayCodecBatchMixin from zarr.buffer import NDBuffer from zarr.common import JSON, ArraySpec, ChunkCoordsLike, parse_named_configuration +from zarr.codecs.registry import register_codec if TYPE_CHECKING: from typing import TYPE_CHECKING, Optional, Tuple from typing_extensions import Self -from zarr.abc.codec import ArrayArrayCodec -from zarr.codecs.registry import register_codec - - def parse_transpose_order(data: Union[JSON, Iterable[int]]) -> Tuple[int, ...]: if not isinstance(data, Iterable): raise TypeError(f"Expected an iterable. Got {data} instead.") @@ -24,7 +22,7 @@ def parse_transpose_order(data: Union[JSON, Iterable[int]]) -> Tuple[int, ...]: @dataclass(frozen=True) -class TransposeCodec(ArrayArrayCodec): +class TransposeCodec(ArrayArrayCodecBatchMixin): is_fixed_size = True order: Tuple[int, ...] @@ -73,7 +71,7 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: order=chunk_spec.order, ) - async def decode( + async def decode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -84,7 +82,7 @@ async def decode( chunk_array = chunk_array.transpose(inverse_order) return chunk_array - async def encode( + async def encode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/zstd.py b/src/zarr/codecs/zstd.py index 0cc99a0368..4422188d25 100644 --- a/src/zarr/codecs/zstd.py +++ b/src/zarr/codecs/zstd.py @@ -5,7 +5,7 @@ from zstandard import ZstdCompressor, ZstdDecompressor -from zarr.abc.codec import BytesBytesCodec +from zarr.codecs.mixins import BytesBytesCodecBatchMixin from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration, to_thread @@ -31,7 +31,7 @@ def parse_checksum(data: JSON) -> bool: @dataclass(frozen=True) -class ZstdCodec(BytesBytesCodec): +class ZstdCodec(BytesBytesCodecBatchMixin): is_fixed_size = True level: int = 0 @@ -60,14 +60,14 @@ def _decompress(self, data: bytes) -> bytes: ctx = ZstdDecompressor() return ctx.decompress(data) - async def decode( + async def decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, ) -> Buffer: return await to_thread(as_numpy_array_wrapper, self._decompress, chunk_bytes) - async def encode( + async def encode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, diff --git a/src/zarr/common.py b/src/zarr/common.py index 95cb8f4a3e..3ef847a1f3 100644 --- a/src/zarr/common.py +++ b/src/zarr/common.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import ( TYPE_CHECKING, + ParamSpec, Literal, Union, Tuple, @@ -58,7 +59,11 @@ async def run(item: Tuple[Any]) -> V: return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) -async def to_thread(func: Callable[..., V], /, *args: Any, **kwargs: Any) -> V: +P = ParamSpec("P") +U = TypeVar("U") + + +async def to_thread(func: Callable[P, U], /, *args: P.args, **kwargs: P.kwargs) -> U: loop = asyncio.get_running_loop() ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) diff --git a/src/zarr/config.py b/src/zarr/config.py index e546cb1c23..5b1640bd56 100644 --- a/src/zarr/config.py +++ b/src/zarr/config.py @@ -6,7 +6,13 @@ config = Config( "zarr", - defaults=[{"array": {"order": "C"}, "async": {"concurrency": None, "timeout": None}}], + defaults=[ + { + "array": {"order": "C"}, + "async": {"concurrency": None, "timeout": None}, + "codec_pipeline": {"batch_size": 1}, + } + ], ) diff --git a/src/zarr/group.py b/src/zarr/group.py index d344b3db00..6cd6ab6aad 100644 --- a/src/zarr/group.py +++ b/src/zarr/group.py @@ -7,24 +7,34 @@ import logging import numpy.typing as npt -from zarr.buffer import Buffer - -if TYPE_CHECKING: - from typing import Any, AsyncGenerator, Literal, Iterable +from zarr.abc.store import set_or_delete from zarr.abc.codec import Codec from zarr.abc.metadata import Metadata +from zarr.buffer import Buffer from zarr.array import AsyncArray, Array from zarr.attributes import Attributes -from zarr.common import ZARR_JSON, ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, ChunkCoords +from zarr.chunk_key_encodings import ChunkKeyEncoding +from zarr.common import ( + JSON, + ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, + ZGROUP_JSON, + ChunkCoords, + ZarrFormat, +) from zarr.store import StoreLike, StorePath, make_store_path from zarr.sync import SyncMixin, sync from typing import overload +if TYPE_CHECKING: + from typing import Any, AsyncGenerator, Literal, Iterable + logger = logging.getLogger("zarr.group") -def parse_zarr_format(data: Any) -> Literal[2, 3]: +def parse_zarr_format(data: Any) -> ZarrFormat: if data in (2, 3): return data msg = msg = f"Invalid zarr_format. Expected one 2 or 3. Got {data}." @@ -64,20 +74,21 @@ def _parse_async_node(node: AsyncArray | AsyncGroup) -> Array | Group: @dataclass(frozen=True) class GroupMetadata(Metadata): attributes: dict[str, Any] = field(default_factory=dict) - zarr_format: Literal[2, 3] = 3 + zarr_format: ZarrFormat = 3 node_type: Literal["group"] = field(default="group", init=False) - # todo: rename this, since it doesn't return bytes - def to_bytes(self) -> dict[str, bytes]: + def to_buffer_dict(self) -> dict[str, Buffer]: if self.zarr_format == 3: - return {ZARR_JSON: json.dumps(self.to_dict()).encode()} + return {ZARR_JSON: Buffer.from_bytes(json.dumps(self.to_dict()).encode())} else: return { - ZGROUP_JSON: json.dumps({"zarr_format": self.zarr_format}).encode(), - ZATTRS_JSON: json.dumps(self.attributes).encode(), + ZGROUP_JSON: Buffer.from_bytes( + json.dumps({"zarr_format": self.zarr_format}).encode() + ), + ZATTRS_JSON: Buffer.from_bytes(json.dumps(self.attributes).encode()), } - def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: Literal[2, 3] = 3): + def __init__(self, attributes: dict[str, Any] | None = None, zarr_format: ZarrFormat = 3): attributes_parsed = parse_attributes(attributes) zarr_format_parsed = parse_zarr_format(zarr_format) @@ -105,7 +116,7 @@ async def create( *, attributes: dict[str, Any] = {}, exists_ok: bool = False, - zarr_format: Literal[2, 3] = 3, + zarr_format: ZarrFormat = 3, ) -> AsyncGroup: store_path = make_store_path(store) if not exists_ok: @@ -247,16 +258,15 @@ async def delitem(self, key: str) -> None: elif self.metadata.zarr_format == 2: await asyncio.gather( (store_path / ZGROUP_JSON).delete(), # TODO: missing_ok=False + (store_path / ZARRAY_JSON).delete(), # TODO: missing_ok=False (store_path / ZATTRS_JSON).delete(), # TODO: missing_ok=True ) else: raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}") async def _save_metadata(self) -> None: - to_save = self.metadata.to_bytes() - awaitables = [ - (self.store_path / key).set(Buffer.from_bytes(value)) for key, value in to_save.items() - ] + to_save = self.metadata.to_buffer_dict() + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] await asyncio.gather(*awaitables) @property @@ -282,13 +292,25 @@ async def create_array( path: str, shape: ChunkCoords, dtype: npt.DTypeLike, - chunk_shape: ChunkCoords, fill_value: Any | None = None, - chunk_key_encoding: tuple[Literal["default"], Literal[".", "/"]] - | tuple[Literal["v2"], Literal[".", "/"]] = ("default", "/"), - codecs: Iterable[Codec | dict[str, Any]] | None = None, + attributes: dict[str, JSON] | None = None, + # v3 only + chunk_shape: ChunkCoords | None = None, + chunk_key_encoding: ( + ChunkKeyEncoding + | tuple[Literal["default"], Literal[".", "/"]] + | tuple[Literal["v2"], Literal[".", "/"]] + | None + ) = None, + codecs: Iterable[Codec | dict[str, JSON]] | None = None, dimension_names: Iterable[str] | None = None, - attributes: dict[str, Any] | None = None, + # v2 only + chunks: ChunkCoords | None = None, + dimension_separator: Literal[".", "/"] | None = None, + order: Literal["C", "F"] | None = None, + filters: list[dict[str, JSON]] | None = None, + compressor: dict[str, JSON] | None = None, + # runtime exists_ok: bool = False, ) -> AsyncArray: return await AsyncArray.create( @@ -301,6 +323,11 @@ async def create_array( codecs=codecs, dimension_names=dimension_names, attributes=attributes, + chunks=chunks, + dimension_separator=dimension_separator, + order=order, + filters=filters, + compressor=compressor, exists_ok=exists_ok, zarr_format=self.metadata.zarr_format, ) @@ -311,15 +338,7 @@ async def update_attributes(self, new_attributes: dict[str, Any]) -> "AsyncGroup self.metadata.attributes.update(new_attributes) # Write new metadata - to_save = self.metadata.to_bytes() - if self.metadata.zarr_format == 2: - # only save the .zattrs object - await (self.store_path / ZATTRS_JSON).set(Buffer.from_bytes(to_save[ZATTRS_JSON])) - else: - await (self.store_path / ZARR_JSON).set(Buffer.from_bytes(to_save[ZARR_JSON])) - - self.metadata.attributes.clear() - self.metadata.attributes.update(new_attributes) + await self._save_metadata() return self @@ -483,10 +502,8 @@ async def update_attributes_async(self, new_attributes: dict[str, Any]) -> Group new_metadata = replace(self.metadata, attributes=new_attributes) # Write new metadata - to_save = new_metadata.to_bytes() - awaitables = [ - (self.store_path / key).set(Buffer.from_bytes(value)) for key, value in to_save.items() - ] + to_save = new_metadata.to_buffer_dict() + awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] await asyncio.gather(*awaitables) async_group = replace(self._async_group, metadata=new_metadata) diff --git a/src/zarr/indexing.py b/src/zarr/indexing.py index 9f324eb5ea..8e7cd95430 100644 --- a/src/zarr/indexing.py +++ b/src/zarr/indexing.py @@ -2,10 +2,13 @@ import itertools import math -from typing import Iterator, List, NamedTuple, Optional, Tuple +from typing import TYPE_CHECKING, Iterator, List, NamedTuple, Optional, Tuple from zarr.common import ChunkCoords, Selection, SliceSelection, product +if TYPE_CHECKING: + from zarr.chunk_grids import ChunkGrid + def _ensure_tuple(v: Selection) -> SliceSelection: if not isinstance(v, tuple): @@ -131,13 +134,18 @@ def __init__( self, selection: Selection, shape: Tuple[int, ...], - chunk_shape: Tuple[int, ...], + chunk_grid: ChunkGrid, ): + from zarr.chunk_grids import RegularChunkGrid + + assert isinstance( + chunk_grid, RegularChunkGrid + ), "Only regular chunk grid is supported, currently." # setup per-dimension indexers self.dim_indexers = [ _SliceDimIndexer(dim_sel, dim_len, dim_chunk_len) for dim_sel, dim_len, dim_chunk_len in zip( - _ensure_selection(selection, shape), shape, chunk_shape + _ensure_selection(selection, shape), shape, chunk_grid.chunk_shape ) ] self.shape = tuple(s.nitems for s in self.dim_indexers) @@ -202,7 +210,3 @@ def is_total_slice(item: Selection, shape: ChunkCoords) -> bool: ) else: raise TypeError("expected slice or tuple of slices, found %r" % item) - - -def all_chunk_coords(shape: ChunkCoords, chunk_shape: ChunkCoords) -> Iterator[ChunkCoords]: - return itertools.product(*(range(0, _ceildiv(s, c)) for s, c in zip(shape, chunk_shape))) diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 098ab34b86..695d83da55 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -1,26 +1,30 @@ from __future__ import annotations +from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, cast, Dict, Iterable, Any -from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, cast, Iterable +from dataclasses import dataclass, field, replace import json import numpy as np import numpy.typing as npt +from zarr.abc.codec import Codec, CodecPipeline +from zarr.abc.metadata import Metadata from zarr.buffer import Buffer from zarr.chunk_grids import ChunkGrid, RegularChunkGrid from zarr.chunk_key_encodings import ChunkKeyEncoding, parse_separator +from zarr.codecs._v2 import V2Compressor, V2Filters if TYPE_CHECKING: - from typing import Literal, Union, List, Optional, Tuple - from zarr.codecs.pipeline import CodecPipeline - + from typing import Literal + from typing_extensions import Self -from zarr.abc.codec import Codec -from zarr.abc.metadata import Metadata from zarr.common import ( JSON, + ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, ArraySpec, ChunkCoords, parse_dtype, @@ -104,16 +108,58 @@ def from_dtype(cls, dtype: np.dtype[Any]) -> DataType: return DataType[dtype_to_data_type[dtype.str]] -@dataclass(frozen=True) -class ArrayMetadata(Metadata): +@dataclass(frozen=True, kw_only=True) +class ArrayMetadata(Metadata, ABC): + shape: ChunkCoords + chunk_grid: ChunkGrid + attributes: dict[str, JSON] + + @property + @abstractmethod + def dtype(self) -> np.dtype[Any]: + pass + + @property + @abstractmethod + def ndim(self) -> int: + pass + + @property + @abstractmethod + def codec_pipeline(self) -> CodecPipeline: + pass + + @abstractmethod + def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: + pass + + @abstractmethod + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + pass + + @abstractmethod + def to_buffer_dict(self) -> dict[str, Buffer]: + pass + + @abstractmethod + def update_shape(self, shape: ChunkCoords) -> Self: + pass + + @abstractmethod + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + pass + + +@dataclass(frozen=True, kw_only=True) +class ArrayV3Metadata(ArrayMetadata): shape: ChunkCoords data_type: np.dtype[Any] chunk_grid: ChunkGrid chunk_key_encoding: ChunkKeyEncoding fill_value: Any codecs: CodecPipeline - attributes: Dict[str, Any] = field(default_factory=dict) - dimension_names: Optional[Tuple[str, ...]] = None + attributes: dict[str, Any] = field(default_factory=dict) + dimension_names: tuple[str, ...] | None = None zarr_format: Literal[3] = field(default=3, init=False) node_type: Literal["array"] = field(default="array", init=False) @@ -182,6 +228,10 @@ def dtype(self) -> np.dtype[Any]: def ndim(self) -> int: return len(self.shape) + @property + def codec_pipeline(self) -> CodecPipeline: + return self.codecs + def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: assert isinstance( self.chunk_grid, RegularChunkGrid @@ -193,7 +243,10 @@ def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) - order=order, ) - def to_bytes(self) -> bytes: + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + return self.chunk_key_encoding.encode_chunk_key(chunk_coords) + + def to_buffer_dict(self) -> dict[str, Buffer]: def _json_convert(o): if isinstance(o, np.dtype): return str(o) @@ -205,13 +258,12 @@ def _json_convert(o): return o.get_config() raise TypeError - return json.dumps( - self.to_dict(), - default=_json_convert, - ).encode() + return { + ZARR_JSON: Buffer.from_bytes(json.dumps(self.to_dict(), default=_json_convert).encode()) + } @classmethod - def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: + def from_dict(cls, data: dict[str, JSON]) -> ArrayV3Metadata: # check that the zarr_format attribute is correct _ = parse_zarr_format_v3(data.pop("zarr_format")) # check that the node_type attribute is correct @@ -221,7 +273,7 @@ def from_dict(cls, data: Dict[str, Any]) -> ArrayMetadata: return cls(**data, dimension_names=dimension_names) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: out_dict = super().to_dict() if not isinstance(out_dict, dict): @@ -233,18 +285,24 @@ def to_dict(self) -> Dict[str, Any]: out_dict.pop("dimension_names") return out_dict + def update_shape(self, shape: ChunkCoords) -> Self: + return replace(self, shape=shape) -@dataclass(frozen=True) -class ArrayV2Metadata(Metadata): + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + return replace(self, attributes=attributes) + + +@dataclass(frozen=True, kw_only=True) +class ArrayV2Metadata(ArrayMetadata): shape: ChunkCoords - chunks: ChunkCoords - dtype: np.dtype[Any] - fill_value: Union[None, int, float] = 0 + chunk_grid: RegularChunkGrid + data_type: np.dtype[Any] + fill_value: None | int | float = 0 order: Literal["C", "F"] = "C" - filters: Optional[List[Dict[str, Any]]] = None + filters: list[dict[str, JSON]] | None = None dimension_separator: Literal[".", "/"] = "." - compressor: Optional[Dict[str, Any]] = None - attributes: Optional[Dict[str, Any]] = cast(Dict[str, Any], field(default_factory=dict)) + compressor: dict[str, JSON] | None = None + attributes: dict[str, JSON] = cast(dict[str, JSON], field(default_factory=dict)) zarr_format: Literal[2] = field(init=False, default=2) def __init__( @@ -256,9 +314,9 @@ def __init__( fill_value: Any, order: Literal["C", "F"], dimension_separator: Literal[".", "/"] = ".", - compressor: Optional[Dict[str, Any]] = None, - filters: Optional[List[Dict[str, Any]]] = None, - attributes: Optional[Dict[str, JSON]] = None, + compressor: dict[str, JSON] | None = None, + filters: list[dict[str, JSON]] | None = None, + attributes: dict[str, JSON] | None = None, ): """ Metadata for a Zarr version 2 array. @@ -268,14 +326,14 @@ def __init__( chunks_parsed = parse_shapelike(chunks) compressor_parsed = parse_compressor(compressor) order_parsed = parse_indexing_order(order) - dimension_separator_parsed = parse_separator(order) + dimension_separator_parsed = parse_separator(dimension_separator) filters_parsed = parse_filters(filters) fill_value_parsed = parse_fill_value(fill_value) attributes_parsed = parse_attributes(attributes) object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) - object.__setattr__(self, "chunks", chunks_parsed) + object.__setattr__(self, "chunk_grid", RegularChunkGrid(chunk_shape=chunks_parsed)) object.__setattr__(self, "compressor", compressor_parsed) object.__setattr__(self, "order", order_parsed) object.__setattr__(self, "dimension_separator", dimension_separator_parsed) @@ -290,7 +348,23 @@ def __init__( def ndim(self) -> int: return len(self.shape) - def to_bytes(self) -> Buffer: + @property + def dtype(self) -> np.dtype[Any]: + return self.data_type + + @property + def chunks(self) -> ChunkCoords: + return self.chunk_grid.chunk_shape + + @property + def codec_pipeline(self) -> CodecPipeline: + from zarr.codecs import BatchedCodecPipeline + + return BatchedCodecPipeline.from_list( + [V2Filters(self.filters or []), V2Compressor(self.compressor)] + ) + + def to_buffer_dict(self) -> dict[str, Buffer]: def _json_convert(o): if isinstance(o, np.dtype): if o.fields is None: @@ -299,16 +373,54 @@ def _json_convert(o): return o.descr raise TypeError - return Buffer.from_bytes(json.dumps(self.to_dict(), default=_json_convert).encode()) + zarray_dict = self.to_dict() + assert isinstance(zarray_dict, dict) + zattrs_dict = zarray_dict.pop("attributes", {}) + assert isinstance(zattrs_dict, dict) + return { + ZARRAY_JSON: Buffer.from_bytes(json.dumps(zarray_dict, default=_json_convert).encode()), + ZATTRS_JSON: Buffer.from_bytes(json.dumps(zattrs_dict).encode()), + } @classmethod - def from_dict(cls, data: Dict[str, Any]) -> ArrayV2Metadata: + def from_dict(cls, data: dict[str, Any]) -> ArrayV2Metadata: # check that the zarr_format attribute is correct _ = parse_zarr_format_v2(data.pop("zarr_format")) return cls(**data) + def to_dict(self) -> JSON: + zarray_dict = super().to_dict() + + assert isinstance(zarray_dict, dict) + + _ = zarray_dict.pop("chunk_grid") + zarray_dict["chunks"] = self.chunk_grid.chunk_shape + + _ = zarray_dict.pop("data_type") + zarray_dict["dtype"] = self.data_type + + return zarray_dict + + def get_chunk_spec(self, _chunk_coords: ChunkCoords, order: Literal["C", "F"]) -> ArraySpec: + return ArraySpec( + shape=self.chunk_grid.chunk_shape, + dtype=self.dtype, + fill_value=self.fill_value, + order=order, + ) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + chunk_identifier = self.dimension_separator.join(map(str, chunk_coords)) + return "0" if chunk_identifier == "" else chunk_identifier + + def update_shape(self, shape: ChunkCoords) -> Self: + return replace(self, shape=shape) + + def update_attributes(self, attributes: dict[str, JSON]) -> Self: + return replace(self, attributes=attributes) + -def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: +def parse_dimension_names(data: Any) -> tuple[str, ...] | None: if data is None: return data if isinstance(data, Iterable) and all([isinstance(x, str) for x in data]): @@ -318,11 +430,11 @@ def parse_dimension_names(data: Any) -> Tuple[str, ...] | None: # todo: real validation -def parse_attributes(data: Any) -> Dict[str, JSON]: +def parse_attributes(data: Any) -> dict[str, JSON]: if data is None: return {} - data_json = cast(Dict[str, JSON], data) + data_json = cast(dict[str, JSON], data) return data_json @@ -349,12 +461,12 @@ def parse_node_type_array(data: Any) -> Literal["array"]: # todo: real validation -def parse_filters(data: Any) -> List[Codec]: +def parse_filters(data: Any) -> list[dict[str, JSON]]: return data # todo: real validation -def parse_compressor(data: Any) -> Codec: +def parse_compressor(data: Any) -> dict[str, JSON] | None: return data @@ -368,9 +480,9 @@ def parse_v2_metadata(data: ArrayV2Metadata) -> ArrayV2Metadata: return data -def parse_codecs(data: Iterable[Union[Codec, JSON]]) -> CodecPipeline: - from zarr.codecs.pipeline import CodecPipeline +def parse_codecs(data: Iterable[Codec | JSON]) -> CodecPipeline: + from zarr.codecs import BatchedCodecPipeline if not isinstance(data, Iterable): raise TypeError(f"Expected iterable, got {type(data)}") - return CodecPipeline.from_dict(data) + return BatchedCodecPipeline.from_dict(data) diff --git a/tests/v3/test_codecs.py b/tests/v3/test_codecs.py index 665e3124c0..73553b5565 100644 --- a/tests/v3/test_codecs.py +++ b/tests/v3/test_codecs.py @@ -615,9 +615,9 @@ async def test_delete_empty_chunks(store: Store): assert await (store / "delete_empty_chunks/c0/0").get() is None -async def test_delete_empty_sharded_chunks(store: Store): +async def test_delete_empty_shards(store: Store): a = await AsyncArray.create( - store / "delete_empty_sharded_chunks", + store / "delete_empty_shards", shape=(16, 16), chunk_shape=(8, 16), dtype="uint16", @@ -635,8 +635,8 @@ async def test_delete_empty_sharded_chunks(store: Store): data = np.ones((16, 16), dtype="uint16") data[:8, :8] = 0 assert np.array_equal(data, await _AsyncArrayProxy(a)[:, :].get()) - assert await (store / "delete_empty_sharded_chunks/c/1/0").get() is None - chunk_bytes = await (store / "delete_empty_sharded_chunks/c/0/0").get() + assert await (store / "delete_empty_shards/c/1/0").get() is None + chunk_bytes = await (store / "delete_empty_shards/c/0/0").get() assert chunk_bytes is not None and len(chunk_bytes) == 16 * 2 + 8 * 8 * 2 + 4 diff --git a/tests/v3/test_config.py b/tests/v3/test_config.py index 43acdec5fa..aed9775d17 100644 --- a/tests/v3/test_config.py +++ b/tests/v3/test_config.py @@ -4,7 +4,11 @@ def test_config_defaults_set(): # regression test for available defaults assert config.defaults == [ - {"array": {"order": "C"}, "async": {"concurrency": None, "timeout": None}} + { + "array": {"order": "C"}, + "async": {"concurrency": None, "timeout": None}, + "codec_pipeline": {"batch_size": 1}, + } ] assert config.get("array.order") == "C" diff --git a/tests/v3/test_group.py b/tests/v3/test_group.py index 16e4ceeecf..5a6751c11a 100644 --- a/tests/v3/test_group.py +++ b/tests/v3/test_group.py @@ -234,10 +234,7 @@ def test_asyncgroup_from_dict(store: MemoryStore | LocalStore, data: dict[str, A @pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) -@pytest.mark.parametrize( - "zarr_format", - (pytest.param(2, marks=pytest.mark.xfail(reason="V2 arrays cannot be created yet.")), 3), -) +@pytest.mark.parametrize("zarr_format", (2, 3)) async def test_asyncgroup_getitem(store: LocalStore | MemoryStore, zarr_format: ZarrFormat) -> None: """ Create an `AsyncGroup`, then create members of that group, and ensure that we can access those @@ -264,10 +261,7 @@ async def test_asyncgroup_getitem(store: LocalStore | MemoryStore, zarr_format: @pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) -@pytest.mark.parametrize( - "zarr_format", - (2, 3), -) +@pytest.mark.parametrize("zarr_format", (2, 3)) async def test_asyncgroup_delitem(store: LocalStore | MemoryStore, zarr_format: ZarrFormat) -> None: agroup = await AsyncGroup.create(store=store, zarr_format=zarr_format) sub_array_path = "sub_array" @@ -316,10 +310,7 @@ async def test_asyncgroup_create_group( @pytest.mark.parametrize("store", ("local", "memory"), indirect=["store"]) -@pytest.mark.parametrize( - "zarr_format", - (pytest.param(2, marks=pytest.mark.xfail(reason="V2 arrays cannot be created yet")), 3), -) +@pytest.mark.parametrize("zarr_format", (2, 3)) async def test_asyncgroup_create_array( store: LocalStore | MemoryStore, zarr_format: ZarrFormat, diff --git a/tests/v3/test_v2.py b/tests/v3/test_v2.py new file mode 100644 index 0000000000..5b831b1bb0 --- /dev/null +++ b/tests/v3/test_v2.py @@ -0,0 +1,28 @@ +from typing import Iterator +import numpy as np +import pytest + +from zarr.abc.store import Store +from zarr.array import Array +from zarr.store import StorePath, MemoryStore + + +@pytest.fixture +def store() -> Iterator[Store]: + yield StorePath(MemoryStore()) + + +def test_simple(store: Store): + data = np.arange(0, 256, dtype="uint16").reshape((16, 16)) + + a = Array.create( + store / "simple_v2", + zarr_format=2, + shape=data.shape, + chunks=(16, 16), + dtype=data.dtype, + fill_value=0, + ) + + a[:, :] = data + assert np.array_equal(data, a[:, :])