Skip to content

add storage_transformers and get/set_partial_values #1096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
605620b
add storage_transformers and get/set_partial_values
jstriebel Jul 28, 2022
566e4b0
formatting
jstriebel Jul 28, 2022
5f85439
add docs and release notes
jstriebel Jul 28, 2022
3c38d57
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jul 28, 2022
dd7fedb
add test_core testcase
jstriebel Jul 29, 2022
e33b365
Update zarr/creation.py
jstriebel Jul 29, 2022
81ebf68
apply PR feedback
jstriebel Jul 29, 2022
ca28471
add comment that storage_transformers=None is the same as storage_tra…
jstriebel Jul 29, 2022
85f3309
use empty tuple as default for storage_transformers
jstriebel Aug 1, 2022
03de894
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 1, 2022
41eaafb
make mypy happy
jstriebel Aug 1, 2022
5d7be76
better coverage, minor fix, adding rmdir
jstriebel Aug 1, 2022
46229ad
add missing rmdir to test
jstriebel Aug 1, 2022
3a9f7cc
increase coverage
jstriebel Aug 2, 2022
efa4e07
improve test coverage
jstriebel Aug 3, 2022
b4668a8
fix TestArrayWithStorageTransformersV3
jstriebel Aug 3, 2022
e4a4853
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 5, 2022
e454046
Update zarr/creation.py
jstriebel Aug 8, 2022
a3c7f74
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 8, 2022
c041dd8
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Aug 22, 2022
696d5ca
pick generic storage transformer changes from #1111
jstriebel Aug 22, 2022
c099440
increase coverage
jstriebel Aug 22, 2022
be98c01
fix order of storage transformers
jstriebel Aug 24, 2022
7c2767a
retrigger CI
jstriebel Aug 25, 2022
146c30a
Merge remote-tracking branch 'origin/main' into storage-transformers-…
jstriebel Dec 12, 2022
59cca8b
minor fixes
jstriebel Dec 12, 2022
c2dc0d6
make flake8 happy
jstriebel Dec 12, 2022
7402262
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 19, 2022
b9d8177
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 21, 2022
91f0c2c
apply PR feedback
jstriebel Dec 22, 2022
e68c97f
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Dec 22, 2022
a7e4d89
Merge remote-tracking branch 'origin/main' into storage-transformers-…
joshmoore Jan 16, 2023
fcb9ba0
Merge pull request #3 from joshmoore/storage-transformers-and-partial…
jstriebel Jan 16, 2023
b6588e7
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jan 16, 2023
eba9006
Merge branch 'main' into storage-transformers-and-partial-get-set
jstriebel Jan 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions docs/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ Release notes
# to document your changes. On releases it will be
# re-indented so that it does not show up in the notes.

.. _unreleased:
.. _unreleased:

Unreleased
----------
Unreleased
----------
..
# .. warning::
# Pre-release! Use :command:`pip install --pre zarr` to evaluate this release.

* Improve Zarr V3 support, adding partial store read/write and storage transformers.
Add two features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html):
* storage transformers
* `get_partial_values` and `set_partial_values`
By :user:`Jonathan Striebel <jstriebel>`; :issue:`1096`.

.. _release_2.13.6:

2.13.6
Expand Down Expand Up @@ -44,7 +50,10 @@ Bug fixes
Appreciation
~~~~~~~~~~~~~

Special thanks to Outreachy participants for contributing to most of the maintenance PRs. Please read the blog post summarising the contribution phase and welcoming new Outreachy interns: https://zarr.dev/blog/welcoming-outreachy-2022-interns/
Special thanks to Outreachy participants for contributing to most of the
maintenance PRs. Please read the blog post summarising the contribution phase
and welcoming new Outreachy interns:
https://zarr.dev/blog/welcoming-outreachy-2022-interns/


Enhancements
Expand Down
225 changes: 224 additions & 1 deletion zarr/_storage/store.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import abc
import os
from collections import defaultdict
from collections.abc import MutableMapping
from copy import copy
from string import ascii_letters, digits
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union

from zarr.meta import Metadata2, Metadata3
from zarr.util import normalize_storage_path
Expand Down Expand Up @@ -254,6 +256,82 @@ def __setitem__(self, key, value):
def __getitem__(self, key):
"""Get a value."""

@abc.abstractmethod
def rmdir(self, path=None):
"""Remove a data path and all its subkeys and related metadata.
Expects a path without the data or meta root prefix."""

@property
def supports_efficient_get_partial_values(self):
return False

def get_partial_values(
self,
key_ranges: Sequence[Tuple[str, Tuple[int, Optional[int]]]]
) -> List[Union[bytes, memoryview, bytearray]]:
"""Get multiple partial values.
key_ranges can be an iterable of key, range pairs,
where a range specifies two integers range_start and range_length
as a tuple, (range_start, range_length).
range_length may be None to indicate to read until the end.
range_start may be negative to start reading range_start bytes
from the end of the file.
A key may occur multiple times with different ranges.
Inserts None for missing keys into the returned list."""
results: List[Union[bytes, memoryview, bytearray]] = (
[None] * len(key_ranges) # type: ignore[list-item]
)
indexed_ranges_by_key: Dict[str, List[Tuple[int, Tuple[int, Optional[int]]]]] = (
defaultdict(list)
)
for i, (key, range_) in enumerate(key_ranges):
indexed_ranges_by_key[key].append((i, range_))
for key, indexed_ranges in indexed_ranges_by_key.items():
try:
value = self[key]
except KeyError: # pragma: no cover
continue
for i, (range_from, range_length) in indexed_ranges:
if range_length is None:
results[i] = value[range_from:]
else:
results[i] = value[range_from:range_from + range_length]
return results

def supports_efficient_set_partial_values(self):
return False

def set_partial_values(self, key_start_values):
"""Set multiple partial values.
key_start_values can be an iterable of key, start and value triplets
as tuples, (key, start, value), where start defines the offset in bytes.
A key may occur multiple times with different starts and non-overlapping values.
Also, start may only be beyond the current value if other values fill the gap.
start may be negative to start writing start bytes from the current
end of the file, ending the file with the new value."""
unique_keys = set(next(zip(*key_start_values)))
values = {}
for key in unique_keys:
old_value = self.get(key)
values[key] = None if old_value is None else bytearray(old_value)
for key, start, value in key_start_values:
if values[key] is None:
assert start == 0
values[key] = value
else:
if start > len(values[key]): # pragma: no cover
raise ValueError(
f"Cannot set value at start {start}, "
+ f"since it is beyond the data at key {key}, "
+ f"having length {len(values[key])}."
)
if start < 0:
values[key][start:] = value
else:
values[key][start:start + len(value)] = value
for key, value in values.items():
self[key] = value

def clear(self):
"""Remove all items from store."""
self.erase_prefix("/")
Expand Down Expand Up @@ -303,6 +381,151 @@ def _ensure_store(store):
)


class StorageTransformer(MutableMapping, abc.ABC):
"""Base class for storage transformers. The methods simply pass on the data as-is
and should be overwritten by sub-classes."""

_store_version = 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight surprise to find this in store.py rather than under v3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that the base-classes go into store.py, similar to StoreV3, but also happy to move this into _storage/v3.py if that's a better fit.

_metadata_class = Metadata3

def __init__(self, _type) -> None:
if _type not in self.valid_types: # pragma: no cover
raise ValueError(
f"Storage transformer cannot be initialized with type {_type}, "
+ f"must be one of {list(self.valid_types)}."
)
self.type = _type
self._inner_store = None

def _copy_for_array(self, array, inner_store):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of the unused array argument here? Is it a intended that subclasses would use this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I've seen a need for this when implementing the Sharding storage transformer in #1111:

def _copy_for_array(self, array, inner_store):
transformer_copy = super()._copy_for_array(array, inner_store)
transformer_copy._dimension_separator = array._dimension_separator
transformer_copy._data_key_prefix = array._data_key_prefix

Related to this: I'm not super happy about the double-initialization using both __init__ and _copy_for_array, but it seems necessary to supply the information in two steps, first from the config, then from the array. Please let me know if you have an idea how to solve this more elegantly.

transformer_copy = copy(self)
transformer_copy._inner_store = inner_store
return transformer_copy

@abc.abstractproperty
def extension_uri(self):
pass # pragma: no cover

@abc.abstractproperty
def valid_types(self):
pass # pragma: no cover

def get_config(self):
"""Return a dictionary holding configuration parameters for this
storage transformer. All values must be compatible with JSON encoding."""
# Override in sub-class if need special encoding of config values.
# By default, assume all non-private members are configuration
# parameters except for type .
return {
k: v for k, v in self.__dict__.items()
if not k.startswith('_') and k != "type"
}

@classmethod
def from_config(cls, _type, config):
"""Instantiate storage transformer from a configuration object."""
# override in sub-class if need special decoding of config values

# by default, assume constructor accepts configuration parameters as
# keyword arguments without any special decoding
return cls(_type, **config)

@property
def inner_store(self) -> Union["StorageTransformer", StoreV3]:
assert self._inner_store is not None, (
"inner_store is not initialized, first get a copy via _copy_for_array."
)
return self._inner_store

# The following implementations are usually fine to keep as-is:

def __eq__(self, other):
return (
type(self) == type(other) and
self._inner_store == other._inner_store and
self.get_config() == other.get_config()
)

def erase(self, key):
self.__delitem__(key)

def list(self):
return list(self.keys())

def list_dir(self, prefix):
return StoreV3.list_dir(self, prefix)

def is_readable(self):
return self.inner_store.is_readable()

def is_writeable(self):
return self.inner_store.is_writeable()

def is_listable(self):
return self.inner_store.is_listable()

def is_erasable(self):
return self.inner_store.is_erasable()

def clear(self):
return self.inner_store.clear()

def __enter__(self):
return self.inner_store.__enter__()

def __exit__(self, exc_type, exc_value, traceback):
return self.inner_store.__exit__(exc_type, exc_value, traceback)

def close(self) -> None:
return self.inner_store.close()

# The following implementations might need to be re-implemented
# by subclasses implementing storage transformers:

def rename(self, src_path: str, dst_path: str) -> None:
return self.inner_store.rename(src_path, dst_path)

def list_prefix(self, prefix):
return self.inner_store.list_prefix(prefix)

def erase_prefix(self, prefix):
return self.inner_store.erase_prefix(prefix)

def rmdir(self, path=None):
return self.inner_store.rmdir(path)

def __contains__(self, key):
return self.inner_store.__contains__(key)

def __setitem__(self, key, value):
return self.inner_store.__setitem__(key, value)

def __getitem__(self, key):
return self.inner_store.__getitem__(key)

def __delitem__(self, key):
return self.inner_store.__delitem__(key)

def __iter__(self):
return self.inner_store.__iter__()

def __len__(self):
return self.inner_store.__len__()

@property
def supports_efficient_get_partial_values(self):
return self.inner_store.supports_efficient_get_partial_values

def get_partial_values(self, key_ranges):
return self.inner_store.get_partial_values(key_ranges)

def supports_efficient_set_partial_values(self):
return self.inner_store.supports_efficient_set_partial_values()

def set_partial_values(self, key_start_values):
return self.inner_store.set_partial_values(key_start_values)


# allow MutableMapping for backwards compatibility
StoreLike = Union[BaseStore, MutableMapping]

Expand Down
26 changes: 21 additions & 5 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def __init__(

self._store = store
self._chunk_store = chunk_store
self._transformed_chunk_store = None
self._path = normalize_storage_path(path)
if self._path:
self._key_prefix = self._path + '/'
Expand Down Expand Up @@ -292,6 +293,16 @@ def _load_metadata_nosync(self):
filters = [get_codec(config) for config in filters]
self._filters = filters

if self._version == 3:
storage_transformers = meta.get('storage_transformers', [])
if storage_transformers:
transformed_store = self._chunk_store or self._store
for storage_transformer in storage_transformers[::-1]:
transformed_store = storage_transformer._copy_for_array(
self, transformed_store
)
self._transformed_chunk_store = transformed_store

def _refresh_metadata(self):
if not self._cache_metadata:
self._load_metadata()
Expand Down Expand Up @@ -371,10 +382,12 @@ def read_only(self, value):
@property
def chunk_store(self):
"""A MutableMapping providing the underlying storage for array chunks."""
if self._chunk_store is None:
return self._store
else:
if self._transformed_chunk_store is not None:
return self._transformed_chunk_store
elif self._chunk_store is not None:
return self._chunk_store
else:
return self._store

@property
def shape(self):
Expand Down Expand Up @@ -1800,7 +1813,7 @@ def _set_selection(self, indexer, value, fields=None):
check_array_shape('value', value, sel_shape)

# iterate over chunks in range
if not hasattr(self.store, "setitems") or self._synchronizer is not None \
if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to wonder if this wouldn't ever need to be the transformed_chunk_store. In general, this chain of replacement stores feels slightly like an anti-pattern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm also not particularly happy with this pattern, but also couldn't come up with a better solution for now. Happy about any ideas 👍

or any(map(lambda x: x == 0, self.shape)):
# iterative approach
for chunk_coords, chunk_selection, out_selection in indexer:
Expand Down Expand Up @@ -2229,7 +2242,10 @@ def _encode_chunk(self, chunk):
cdata = chunk

# ensure in-memory data is immutable and easy to compare
if isinstance(self.chunk_store, KVStore):
if (
isinstance(self.chunk_store, KVStore)
or isinstance(self._chunk_store, KVStore)
):
cdata = ensure_bytes(cdata)

return cdata
Expand Down
12 changes: 10 additions & 2 deletions zarr/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def create(shape, chunks=True, dtype=None, compressor='default',
overwrite=False, path=None, chunk_store=None, filters=None,
cache_metadata=True, cache_attrs=True, read_only=False,
object_codec=None, dimension_separator=None, write_empty_chunks=True,
*, zarr_version=None, meta_array=None, **kwargs):
*, zarr_version=None, meta_array=None, storage_transformers=(), **kwargs):
"""Create an array.

Parameters
Expand Down Expand Up @@ -85,6 +85,14 @@ def create(shape, chunks=True, dtype=None, compressor='default',

.. versionadded:: 2.11

storage_transformers : sequence of StorageTransformers, optional
Setting storage transformers, changes the storage structure and behaviour
of data coming from the underlying store. The transformers are applied in the
order of the given sequence. Supplying an empty sequence is the same as omitting
the argument or setting it to None. May only be set when using zarr_version 3.

.. versionadded:: 2.13

zarr_version : {None, 2, 3}, optional
The zarr protocol version of the created array. If None, it will be
inferred from ``store`` or ``chunk_store`` if they are provided,
Expand Down Expand Up @@ -170,7 +178,7 @@ def create(shape, chunks=True, dtype=None, compressor='default',
init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor,
fill_value=fill_value, order=order, overwrite=overwrite, path=path,
chunk_store=chunk_store, filters=filters, object_codec=object_codec,
dimension_separator=dimension_separator)
dimension_separator=dimension_separator, storage_transformers=storage_transformers)

# instantiate array
z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer,
Expand Down
Loading