Skip to content

Reworked codec pipelines #1670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9405fda
merge
normanrz Feb 20, 2024
019ecc8
refactors CodecPipelines
normanrz Apr 19, 2024
bd2160d
fixes
normanrz Apr 30, 2024
4887c29
adds HybridCodecPipeline
normanrz Apr 30, 2024
c3e3504
fixes
normanrz Apr 30, 2024
a578d95
typing
normanrz May 8, 2024
13212b5
merge
normanrz May 8, 2024
e3cad7c
typing
normanrz May 8, 2024
027ebb5
consistent naming
normanrz May 8, 2024
2bb00ae
Apply suggestions from code review
normanrz May 10, 2024
56877ee
encode/decode are batched by default
normanrz May 10, 2024
797b50b
merge
normanrz May 13, 2024
99a1f93
use zarr.config for batch_size
normanrz May 13, 2024
6d00972
merge
normanrz May 14, 2024
530e88b
don't use global lru_cache
normanrz May 14, 2024
9eda592
removes HybridCodecPipeline
normanrz May 15, 2024
d9aa24f
generic codec classes
normanrz May 15, 2024
a5fb71e
default batch size = 1
normanrz May 15, 2024
efd9bce
default batch size = 1
normanrz May 15, 2024
38c436d
docs
normanrz May 15, 2024
2397d3f
merge
normanrz May 15, 2024
1ad9896
Merge remote-tracking branch 'origin/v3' into batched-codec-pipeline
normanrz May 15, 2024
3a85a0a
Update src/zarr/codecs/batched_codec_pipeline.py
normanrz May 16, 2024
f33e66a
mv batched_codec_pipeline -> pipeline
normanrz May 16, 2024
9e42076
Merge branch 'batched-codec-pipeline' of github.com:zarr-developers/z…
normanrz May 16, 2024
95ae4b6
Remove ArrayV2 (#1857)
normanrz May 16, 2024
faa965d
Merge branch 'batched-codec-pipeline' of github.com:zarr-developers/z…
normanrz May 16, 2024
9bb3243
merge
normanrz May 16, 2024
db97439
merge
normanrz May 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ check_untyped_defs = false
module = [
"zarr.v2.*",
"zarr.abc.codec",
"zarr.codecs.bytes",
"zarr.codecs.pipeline",
"zarr.codecs.sharding",
"zarr.codecs.transpose",
"zarr.array_v2",
"zarr.array",
"zarr.sync",
Expand Down
169 changes: 162 additions & 7 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING, Optional
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Iterable,
Optional,
Protocol,
Tuple,
TypeVar,
runtime_checkable,
)

import numpy as np
from zarr.abc.metadata import Metadata

from zarr.common import ArraySpec
from zarr.store import StorePath
from zarr.common import ArraySpec, concurrent_map


if TYPE_CHECKING:
Expand All @@ -16,6 +25,40 @@
from zarr.metadata import ArrayMetadata
from zarr.config import RuntimeConfiguration

T = TypeVar("T")
U = TypeVar("U")


def noop_for_none(
func: Callable[[T, ArraySpec, RuntimeConfiguration], Awaitable[Optional[U]]],
) -> Callable[[Optional[T], ArraySpec, RuntimeConfiguration], Awaitable[Optional[U]]]:
async def wrap(
chunk: Optional[T], chunk_spec: ArraySpec, runtime_configuration: RuntimeConfiguration
) -> Optional[U]:
if chunk is None:
return None
return await func(chunk, chunk_spec, runtime_configuration)

return wrap


@runtime_checkable
class ByteGetter(Protocol):
async def get(
self, byte_range: Optional[Tuple[int, Optional[int]]] = None
) -> Optional[BytesLike]: ...


@runtime_checkable
class ByteSetter(Protocol):
async def get(
self, byte_range: Optional[Tuple[int, Optional[int]]] = None
) -> Optional[BytesLike]: ...

async def set(self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None) -> None: ...

async def delete(self) -> None: ...


class Codec(Metadata):
is_fixed_size: bool
Expand Down Expand Up @@ -44,6 +87,20 @@ async def decode(
) -> np.ndarray:
pass

async def decode_batch(
self,
chunk_arrays_and_specs: Iterable[Tuple[Optional[np.ndarray], ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> Iterable[Optional[np.ndarray]]:
return await concurrent_map(
[
(chunk_array, chunk_spec, runtime_configuration)
for chunk_array, chunk_spec in chunk_arrays_and_specs
],
noop_for_none(self.decode),
runtime_configuration.concurrency,
)

@abstractmethod
async def encode(
self,
Expand All @@ -53,17 +110,45 @@ async def encode(
) -> Optional[np.ndarray]:
pass

async def encode_batch(
self,
chunk_arrays_and_specs: Iterable[Tuple[Optional[np.ndarray], ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> Iterable[Optional[np.ndarray]]:
return await concurrent_map(
[
(chunk_array, chunk_spec, runtime_configuration)
for chunk_array, chunk_spec in chunk_arrays_and_specs
],
noop_for_none(self.encode),
runtime_configuration.concurrency,
)


class ArrayBytesCodec(Codec):
@abstractmethod
async def decode(
self,
chunk_array: BytesLike,
chunk_bytes: BytesLike,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> np.ndarray:
pass

async def decode_batch(
self,
chunk_bytes_and_specs: Iterable[Tuple[Optional[BytesLike], ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> Iterable[Optional[np.ndarray]]:
return await concurrent_map(
[
(chunk_bytes, chunk_spec, runtime_configuration)
for chunk_bytes, chunk_spec in chunk_bytes_and_specs
],
noop_for_none(self.decode),
runtime_configuration.concurrency,
)

@abstractmethod
async def encode(
self,
Expand All @@ -73,42 +158,98 @@ async def encode(
) -> Optional[BytesLike]:
pass

async def encode_batch(
self,
chunk_arrays_and_specs: Iterable[Tuple[Optional[np.ndarray], ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> Iterable[Optional[BytesLike]]:
return await concurrent_map(
[
(chunk_array, chunk_spec, runtime_configuration)
for chunk_array, chunk_spec in chunk_arrays_and_specs
],
noop_for_none(self.encode),
runtime_configuration.concurrency,
)


class ArrayBytesCodecPartialDecodeMixin:
@abstractmethod
async def decode_partial(
self,
store_path: StorePath,
byte_getter: ByteGetter,
selection: SliceSelection,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> Optional[np.ndarray]:
pass

async def decode_partial_batch(
self,
batch_info: Iterable[Tuple[ByteGetter, SliceSelection, ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> Iterable[Optional[np.ndarray]]:
return await concurrent_map(
[
(byte_getter, selection, chunk_spec, runtime_configuration)
for byte_getter, selection, chunk_spec in batch_info
],
self.decode_partial,
runtime_configuration.concurrency,
)


class ArrayBytesCodecPartialEncodeMixin:
@abstractmethod
async def encode_partial(
self,
store_path: StorePath,
byte_setter: ByteSetter,
chunk_array: np.ndarray,
selection: SliceSelection,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> None:
pass

async def encode_partial_batch(
self,
batch_info: Iterable[Tuple[ByteSetter, np.ndarray, SliceSelection, ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> None:
await concurrent_map(
[
(byte_setter, chunk_array, selection, chunk_spec, runtime_configuration)
for byte_setter, chunk_array, selection, chunk_spec in batch_info
],
self.encode_partial,
runtime_configuration.concurrency,
)


class BytesBytesCodec(Codec):
@abstractmethod
async def decode(
self,
chunk_array: BytesLike,
chunk_bytes: BytesLike,
chunk_spec: ArraySpec,
runtime_configuration: RuntimeConfiguration,
) -> BytesLike:
pass

async def decode_batch(
self,
chunk_bytes_and_specs: Iterable[Tuple[Optional[BytesLike], ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> Iterable[Optional[BytesLike]]:
return await concurrent_map(
[
(chunk_bytes, chunk_spec, runtime_configuration)
for chunk_bytes, chunk_spec in chunk_bytes_and_specs
],
noop_for_none(self.decode),
runtime_configuration.concurrency,
)

@abstractmethod
async def encode(
self,
Expand All @@ -117,3 +258,17 @@ async def encode(
runtime_configuration: RuntimeConfiguration,
) -> Optional[BytesLike]:
pass

async def encode_batch(
self,
chunk_bytes_and_specs: Iterable[Tuple[Optional[BytesLike], ArraySpec]],
runtime_configuration: RuntimeConfiguration,
) -> Iterable[Optional[BytesLike]]:
return await concurrent_map(
[
(chunk_bytes, chunk_spec, runtime_configuration)
for chunk_bytes, chunk_spec in chunk_bytes_and_specs
],
noop_for_none(self.encode),
runtime_configuration.concurrency,
)
Loading