diff --git a/zarr/_storage/store.py b/zarr/_storage/store.py index 6f5bf78e28..2c6d7b3978 100644 --- a/zarr/_storage/store.py +++ b/zarr/_storage/store.py @@ -1,7 +1,9 @@ +import sys from collections.abc import MutableMapping +from string import ascii_letters, digits from typing import Any, List, Optional, Union -from zarr.meta import Metadata2 +from zarr.meta import Metadata2, Metadata3, _default_entry_point_metadata_v3 from zarr.util import normalize_storage_path # v2 store keys @@ -131,6 +133,169 @@ def rmdir(self, path: str = "") -> None: _rmdir_from_keys(self, path) +class StoreV3(BaseStore): + _store_version = 3 + _metadata_class = Metadata3 + + @staticmethod + def _valid_key(key: str) -> bool: + """ + Verify that a key conforms to the specification. + + A key is any string containing only character in the range a-z, A-Z, + 0-9, or in the set /.-_ it will return True if that's the case, False + otherwise. + + In addition, in spec v3, keys can only start with the prefix meta/, + data/ or be exactly zarr.json and should not end with /. This should + not be exposed to the user, and is a store implementation detail, so + this method will raise a ValueError in that case. + """ + if sys.version_info > (3, 7): + if not key.isascii(): + return False + if set(key) - set(ascii_letters + digits + "/.-_"): + return False + + if ( + not key.startswith("data/") + and (not key.startswith("meta/")) + and (not key == "zarr.json") + ): + raise ValueError("keys starts with unexpected value: `{}`".format(key)) + + if key.endswith('/'): + raise ValueError("keys may not end in /") + + return True + + def list_prefix(self, prefix): + if prefix.startswith('/'): + raise ValueError("prefix must not begin with /") + # TODO: force prefix to end with /? + return [k for k in self.list() if k.startswith(prefix)] + + def erase(self, key): + self.__delitem__(key) + + def erase_prefix(self, prefix): + assert prefix.endswith("/") + + if prefix == "/": + all_keys = self.list() + else: + all_keys = self.list_prefix(prefix) + for key in all_keys: + self.erase(key) + + def list_dir(self, prefix): + """ + Note: carefully test this with trailing/leading slashes + """ + if prefix: # allow prefix = "" ? + assert prefix.endswith("/") + + all_keys = self.list_prefix(prefix) + len_prefix = len(prefix) + keys = [] + prefixes = [] + for k in all_keys: + trail = k[len_prefix:] + if "/" not in trail: + keys.append(prefix + trail) + else: + prefixes.append(prefix + trail.split("/", maxsplit=1)[0] + "/") + return keys, list(set(prefixes)) + + def list(self): + if hasattr(self, 'keys'): + return list(self.keys()) + raise NotImplementedError( + "The list method has not been implemented for this store type." + ) + + # TODO: Remove listdir? This method is just to match the current V2 stores + # The v3 spec mentions: list, list_dir, list_prefix + def listdir(self, path: str = ""): + if path and not path.endswith("/"): + path = path + "/" + keys, prefixes = self.list_dir(path) + prefixes = [p[len(path):].rstrip("/") for p in prefixes] + keys = [k[len(path):] for k in keys] + return keys + prefixes + + # TODO: rmdir here is identical to the rmdir on Store so could potentially + # move to BaseStore instead. + def rmdir(self, path: str = "") -> None: + if not self.is_erasable(): + raise NotImplementedError( + f'{type(self)} is not erasable, cannot call "rmdir"' + ) # pragma: no cover + path = normalize_storage_path(path) + _rmdir_from_keys(self, path) + + def __contains__(self, key): + # TODO: re-enable this check? + # if not key.startswith(("meta/", "data/")): + # raise ValueError( + # f'Key must start with either "meta/" or "data/". ' + # f'Got {key}' + # ) + return key in self.list() + + def clear(self): + """Remove all items from store.""" + self.erase_prefix("/") + + def __eq__(self, other): + from zarr.storage import KVStoreV3 # avoid circular import + if isinstance(other, KVStoreV3): + return self._mutable_mapping == other._mutable_mapping + else: + return NotImplemented + + @staticmethod + def _ensure_store(store): + """ + We want to make sure internally that zarr stores are always a class + with a specific interface derived from ``Store``, which is slightly + different than ``MutableMapping``. + + We'll do this conversion in a few places automatically + """ + from zarr.storage import KVStoreV3 # avoid circular import + if store is None: + return None + elif isinstance(store, StoreV3): + return store + elif isinstance(store, MutableMapping): + return KVStoreV3(store) + else: + for attr in [ + "keys", + "values", + "get", + "__setitem__", + "__getitem__", + "__delitem__", + "__contains__", + ]: + if not hasattr(store, attr): + break + else: + return KVStoreV3(store) + + raise ValueError( + "v3 stores must be subclasses of StoreV3, " + "if your store exposes the MutableMapping interface wrap it in " + f"Zarr.storage.KVStoreV3. Got {store}" + ) + + +# allow MutableMapping for backwards compatibility +StoreLike = Union[BaseStore, MutableMapping] + + def _path_to_prefix(path: Optional[str]) -> str: # assume path already normalized if path: @@ -140,17 +305,49 @@ def _path_to_prefix(path: Optional[str]) -> str: return prefix +# TODO: Should this return default metadata or raise an Error if zarr.json +# is absent? +def _get_hierarchy_metadata(store=None): + meta = _default_entry_point_metadata_v3 + if store is not None: + version = getattr(store, '_store_version', 2) + if version < 3: + raise ValueError("zarr.json hierarchy metadata not stored for " + f"zarr v{version} stores") + if 'zarr.json' in store: + meta = store._metadata_class.decode_hierarchy_metadata(store['zarr.json']) + return meta + + def _rename_from_keys(store: BaseStore, src_path: str, dst_path: str) -> None: # assume path already normalized src_prefix = _path_to_prefix(src_path) dst_prefix = _path_to_prefix(dst_path) - for key in list(store.keys()): - if key.startswith(src_prefix): - new_key = dst_prefix + key.lstrip(src_prefix) - store[new_key] = store.pop(key) - - -def _rmdir_from_keys(store: Union[BaseStore, MutableMapping], path: Optional[str] = None) -> None: + version = getattr(store, '_store_version', 2) + if version == 2: + root_prefixes = [''] + elif version == 3: + root_prefixes = ['meta/root/', 'data/root/'] + for root_prefix in root_prefixes: + _src_prefix = root_prefix + src_prefix + _dst_prefix = root_prefix + dst_prefix + for key in list(store.keys()): + if key.startswith(_src_prefix): + new_key = _dst_prefix + key.lstrip(_src_prefix) + store[new_key] = store.pop(key) + if version == 3: + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + _src_array_json = 'meta/root/' + src_prefix[:-1] + '.array' + sfx + if _src_array_json in store: + new_key = 'meta/root/' + dst_prefix[:-1] + '.array' + sfx + store[new_key] = store.pop(_src_array_json) + _src_group_json = 'meta/root/' + src_prefix[:-1] + '.group' + sfx + if _src_group_json in store: + new_key = 'meta/root/' + dst_prefix[:-1] + '.group' + sfx + store[new_key] = store.pop(_src_group_json) + + +def _rmdir_from_keys(store: StoreLike, path: Optional[str] = None) -> None: # assume path already normalized prefix = _path_to_prefix(path) for key in list(store.keys()): @@ -168,3 +365,40 @@ def _listdir_from_keys(store: BaseStore, path: Optional[str] = None) -> List[str child = suffix.split('/')[0] children.add(child) return sorted(children) + + +def _prefix_to_array_key(store: StoreLike, prefix: str) -> str: + if getattr(store, "_store_version", 2) == 3: + if prefix: + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + key = "meta/root/" + prefix.rstrip("/") + ".array" + sfx + else: + raise ValueError("prefix must be supplied to get a v3 array key") + else: + key = prefix + array_meta_key + return key + + +def _prefix_to_group_key(store: StoreLike, prefix: str) -> str: + if getattr(store, "_store_version", 2) == 3: + if prefix: + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + key = "meta/root/" + prefix.rstrip('/') + ".group" + sfx + else: + raise ValueError("prefix must be supplied to get a v3 group key") + else: + key = prefix + group_meta_key + return key + + +def _prefix_to_attrs_key(store: StoreLike, prefix: str) -> str: + if getattr(store, "_store_version", 2) == 3: + # for v3, attributes are stored in the array metadata + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + if prefix: + key = "meta/root/" + prefix.rstrip('/') + ".array" + sfx + else: + raise ValueError("prefix must be supplied to get a v3 array key") + else: + key = prefix + attrs_key + return key diff --git a/zarr/attrs.py b/zarr/attrs.py index eff1237db1..78c26461c4 100644 --- a/zarr/attrs.py +++ b/zarr/attrs.py @@ -1,6 +1,6 @@ from collections.abc import MutableMapping -from zarr._storage.store import Store +from zarr._storage.store import Store, StoreV3 from zarr.util import json_dumps @@ -26,7 +26,15 @@ class Attributes(MutableMapping): def __init__(self, store, key='.zattrs', read_only=False, cache=True, synchronizer=None): - self.store = Store._ensure_store(store) + + self._version = getattr(store, '_store_version', 2) + assert key + + if self._version == 3 and '.z' in key: + raise ValueError('invalid v3 key') + + _Store = Store if self._version == 2 else StoreV3 + self.store = _Store._ensure_store(store) self.key = key self.read_only = read_only self.cache = cache @@ -38,6 +46,8 @@ def _get_nosync(self): data = self.store[self.key] except KeyError: d = dict() + if self._version > 2: + d['attributes'] = {} else: d = self.store._metadata_class.parse_metadata(data) return d @@ -47,6 +57,8 @@ def asdict(self): if self.cache and self._cached_asdict is not None: return self._cached_asdict d = self._get_nosync() + if self._version == 3: + d = d['attributes'] if self.cache: self._cached_asdict = d return d @@ -54,7 +66,10 @@ def asdict(self): def refresh(self): """Refresh cached attributes from the store.""" if self.cache: - self._cached_asdict = self._get_nosync() + if self._version == 3: + self._cached_asdict = self._get_nosync()['attributes'] + else: + self._cached_asdict = self._get_nosync() def __contains__(self, x): return x in self.asdict() @@ -84,7 +99,10 @@ def _setitem_nosync(self, item, value): d = self._get_nosync() # set key value - d[item] = value + if self._version == 2: + d[item] = value + else: + d['attributes'][item] = value # _put modified data self._put_nosync(d) @@ -98,7 +116,10 @@ def _delitem_nosync(self, key): d = self._get_nosync() # delete key value - del d[key] + if self._version == 2: + del d[key] + else: + del d['attributes'][key] # _put modified data self._put_nosync(d) @@ -106,12 +127,34 @@ def _delitem_nosync(self, key): def put(self, d): """Overwrite all attributes with the key/value pairs in the provided dictionary `d` in a single operation.""" - self._write_op(self._put_nosync, d) + if self._version == 2: + self._write_op(self._put_nosync, d) + else: + self._write_op(self._put_nosync, dict(attributes=d)) def _put_nosync(self, d): - self.store[self.key] = json_dumps(d) - if self.cache: - self._cached_asdict = d + if self._version == 2: + self.store[self.key] = json_dumps(d) + if self.cache: + self._cached_asdict = d + else: + if self.key in self.store: + # Cannot write the attributes directly to JSON, but have to + # store it within the pre-existing attributes key of the v3 + # metadata. + + # Note: this changes the store.counter result in test_caching_on! + + meta = self.store._metadata_class.parse_metadata(self.store[self.key]) + if 'attributes' in meta and 'filters' in meta['attributes']: + # need to preserve any existing "filters" attribute + d['attributes']['filters'] = meta['attributes']['filters'] + meta['attributes'] = d['attributes'] + else: + meta = d + self.store[self.key] = json_dumps(meta) + if self.cache: + self._cached_asdict = d['attributes'] # noinspection PyMethodOverriding def update(self, *args, **kwargs): @@ -124,7 +167,12 @@ def _update_nosync(self, *args, **kwargs): d = self._get_nosync() # update - d.update(*args, **kwargs) + if self._version == 2: + d.update(*args, **kwargs) + else: + if 'attributes' not in d: + d['attributes'] = {} + d['attributes'].update(*args, **kwargs) # _put modified data self._put_nosync(d) diff --git a/zarr/core.py b/zarr/core.py index 6f6b468e3b..e70d5591f7 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -4,13 +4,14 @@ import math import operator import re +from collections.abc import MutableMapping from functools import reduce +from typing import Any import numpy as np from numcodecs.compat import ensure_bytes, ensure_ndarray -from collections.abc import MutableMapping - +from zarr._storage.store import _prefix_to_attrs_key from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec from zarr.errors import ArrayNotFoundError, ReadOnlyError, ArrayIndexError @@ -31,7 +32,13 @@ is_scalar, pop_fields, ) -from zarr.storage import array_meta_key, attrs_key, getsize, listdir, BaseStore +from zarr.storage import ( + _get_hierarchy_metadata, + _prefix_to_array_key, + getsize, + listdir, + normalize_store_arg, +) from zarr.util import ( all_equal, InfoReporter, @@ -146,7 +153,7 @@ class Array: def __init__( self, - store: BaseStore, + store: Any, # BaseStore not stricly required due to normalize_store_arg path=None, read_only=False, chunk_store=None, @@ -155,12 +162,22 @@ def __init__( cache_attrs=True, partial_decompress=False, write_empty_chunks=True, + zarr_version=None, ): # N.B., expect at this point store is fully initialized with all # configuration metadata fully specified and normalized - store = BaseStore._ensure_store(store) - chunk_store = BaseStore._ensure_store(chunk_store) + store = normalize_store_arg(store, zarr_version=zarr_version) + if zarr_version is None: + zarr_version = getattr(store, '_store_version', 2) + + if chunk_store is not None: + chunk_store = normalize_store_arg(chunk_store, + zarr_version=zarr_version) + if not getattr(chunk_store, '_store_version', 2) == zarr_version: + raise ValueError( + "zarr_version of store and chunk_store must match" + ) self._store = store self._chunk_store = chunk_store @@ -175,12 +192,19 @@ def __init__( self._is_view = False self._partial_decompress = partial_decompress self._write_empty_chunks = write_empty_chunks + self._version = zarr_version + + if self._version == 3: + self._data_key_prefix = 'data/root/' + self._key_prefix + self._data_path = 'data/root/' + self._path + self._hierarchy_metadata = _get_hierarchy_metadata(store=None) + self._metadata_key_suffix = self._hierarchy_metadata['metadata_key_suffix'] # initialize metadata self._load_metadata() # initialize attributes - akey = self._key_prefix + attrs_key + akey = _prefix_to_attrs_key(self._store, self._key_prefix) self._attrs = Attributes(store, key=akey, read_only=read_only, synchronizer=synchronizer, cache=cache_attrs) @@ -196,13 +220,13 @@ def _load_metadata(self): if self._synchronizer is None: self._load_metadata_nosync() else: - mkey = self._key_prefix + array_meta_key + mkey = _prefix_to_array_key(self._store, self._key_prefix) with self._synchronizer[mkey]: self._load_metadata_nosync() def _load_metadata_nosync(self): try: - mkey = self._key_prefix + array_meta_key + mkey = _prefix_to_array_key(self._store, self._key_prefix) meta_bytes = self._store[mkey] except KeyError: raise ArrayNotFoundError(self._path) @@ -212,32 +236,50 @@ def _load_metadata_nosync(self): meta = self._store._metadata_class.decode_array_metadata(meta_bytes) self._meta = meta self._shape = meta['shape'] - self._chunks = meta['chunks'] - self._dtype = meta['dtype'] self._fill_value = meta['fill_value'] - self._order = meta['order'] - dimension_separator = meta.get('dimension_separator', None) - if dimension_separator is None: - try: - dimension_separator = self._store._dimension_separator - except (AttributeError, KeyError): - pass - - # Fallback for any stores which do not choose a default + if self._version == 2: + self._chunks = meta['chunks'] + self._dtype = meta['dtype'] + self._order = meta['order'] if dimension_separator is None: - dimension_separator = "." + try: + dimension_separator = self._store._dimension_separator + except (AttributeError, KeyError): + pass + + # Fallback for any stores which do not choose a default + if dimension_separator is None: + dimension_separator = "." + else: + self._chunks = meta['chunk_grid']['chunk_shape'] + self._dtype = meta['data_type'] + self._order = meta['chunk_memory_layout'] + chunk_separator = meta['chunk_grid']['separator'] + if dimension_separator is None: + # TODO: omit attribute in v3? + dimension_separator = meta.get('dimension_separator', chunk_separator) + else: + assert chunk_separator == dimension_separator + self._dimension_separator = dimension_separator # setup compressor - config = meta['compressor'] - if config is None: + compressor = meta.get('compressor', None) + if compressor is None: self._compressor = None + elif self._version == 2: + self._compressor = get_codec(compressor) else: - self._compressor = get_codec(config) + self._compressor = compressor # setup filters - filters = meta['filters'] + if self._version == 2: + filters = meta.get('filters', []) + else: + # TODO: storing filters under attributes for now since the v3 + # array metadata does not have a 'filters' attribute. + filters = meta['attributes'].get('filters', []) if filters: filters = [get_codec(config) for config in filters] self._filters = filters @@ -262,10 +304,23 @@ def _flush_metadata_nosync(self): filters_config = [f.get_config() for f in self._filters] else: filters_config = None - meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype, - compressor=compressor_config, fill_value=self._fill_value, - order=self._order, filters=filters_config) - mkey = self._key_prefix + array_meta_key + _compressor = compressor_config if self._version == 2 else self._compressor + meta = dict(shape=self._shape, compressor=_compressor, + fill_value=self._fill_value, filters=filters_config) + if getattr(self._store, '_store_version', 2) == 2: + meta.update( + dict(chunks=self._chunks, dtype=self._dtype, order=self._order) + ) + else: + meta.update( + dict(chunk_grid=dict(type='regular', + chunk_shape=self._chunks, + separator=self._dimension_separator), + data_type=self._dtype, + chunk_memory_layout=self._order, + attributes=self.attrs.asdict()) + ) + mkey = _prefix_to_array_key(self._store, self._key_prefix) self._store[mkey] = self._store._metadata_class.encode_array_metadata(meta) @property @@ -453,11 +508,28 @@ def nchunks(self): def nchunks_initialized(self): """The number of chunks that have been initialized with some data.""" - # key pattern for chunk keys - prog = re.compile(r'\.'.join([r'\d+'] * min(1, self.ndim))) - # count chunk keys - return sum(1 for k in listdir(self.chunk_store, self._path) if prog.match(k)) + if self._version == 3: + # # key pattern for chunk keys + # prog = re.compile(r'\.'.join([r'c\d+'] * min(1, self.ndim))) + # # get chunk keys, excluding the prefix + # members = self.chunk_store.list_prefix(self._data_path) + # members = [k.split(self._data_key_prefix)[1] for k in members] + # # count the chunk keys + # return sum(1 for k in members if prog.match(k)) + + # key pattern for chunk keys + prog = re.compile(self._data_key_prefix + r'c\d+') # TODO: ndim == 0 case? + # get chunk keys, excluding the prefix + members = self.chunk_store.list_prefix(self._data_path) + # count the chunk keys + return sum(1 for k in members if prog.match(k)) + else: + # key pattern for chunk keys + prog = re.compile(r'\.'.join([r'\d+'] * min(1, self.ndim))) + + # count chunk keys + return sum(1 for k in listdir(self.chunk_store, self._path) if prog.match(k)) # backwards compatibility initialized = nchunks_initialized @@ -2061,7 +2133,15 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None): return chunk def _chunk_key(self, chunk_coords): - return self._key_prefix + self._dimension_separator.join(map(str, chunk_coords)) + if self._version == 3: + # _chunk_key() corresponds to data_key(P, i, j, ...) example in the spec + # where P = self._key_prefix, i, j, ... = chunk_coords + # e.g. c0/2/3 for 3d array with chunk index (0, 2, 3) + # https://zarr-specs.readthedocs.io/en/core-protocol-v3.0-dev/protocol/core/v3.0.html#regular-grids + return ("data/root/" + self._key_prefix + + "c" + self._dimension_separator.join(map(str, chunk_coords))) + else: + return self._key_prefix + self._dimension_separator.join(map(str, chunk_coords)) def _decode_chunk(self, cdata, start=None, nitems=None, expected_shape=None): # decompress @@ -2242,7 +2322,8 @@ def digest(self, hashname="sha1"): for i in itertools.product(*[range(s) for s in self.cdata_shape]): h.update(self.chunk_store.get(self._chunk_key(i), b"")) - h.update(self.store.get(self._key_prefix + array_meta_key, b"")) + mkey = _prefix_to_array_key(self._store, self._key_prefix) + h.update(self.store.get(mkey, b"")) h.update(self.store.get(self.attrs.key, b"")) @@ -2279,7 +2360,7 @@ def hexdigest(self, hashname="sha1"): def __getstate__(self): return (self._store, self._path, self._read_only, self._chunk_store, self._synchronizer, self._cache_metadata, self._attrs.cache, - self._partial_decompress, self._write_empty_chunks) + self._partial_decompress, self._write_empty_chunks, self._version) def __setstate__(self, state): self.__init__(*state) @@ -2292,7 +2373,7 @@ def _synchronized_op(self, f, *args, **kwargs): else: # synchronize on the array - mkey = self._key_prefix + array_meta_key + mkey = _prefix_to_array_key(self._store, self._key_prefix) lock = self._synchronizer[mkey] with lock: @@ -2559,7 +2640,7 @@ def view(self, shape=None, chunks=None, dtype=None, if synchronizer is None: synchronizer = self._synchronizer a = Array(store=store, path=path, chunk_store=chunk_store, read_only=read_only, - synchronizer=synchronizer, cache_metadata=True) + synchronizer=synchronizer, cache_metadata=True, zarr_version=self._version) a._is_view = True # allow override of some properties diff --git a/zarr/creation.py b/zarr/creation.py index 64c5666adb..da84347760 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -19,7 +19,8 @@ def create(shape, chunks=True, dtype=None, compressor='default', fill_value=0, order='C', store=None, synchronizer=None, overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, - object_codec=None, dimension_separator=None, write_empty_chunks=True, **kwargs): + object_codec=None, dimension_separator=None, write_empty_chunks=True, *, + zarr_version=None, **kwargs): """Create an array. Parameters @@ -77,7 +78,10 @@ def create(shape, chunks=True, dtype=None, compressor='default', that chunk's key is deleted. This setting enables sparser storage, as only chunks with non-fill-value data are stored, at the expense of overhead associated with checking the data of each chunk. - + zarr_version : {None, 2, 3}, optional + The zarr protocol version of the created array. If None, it will be + inferred from ``store`` or ``chunk_store`` if they are provided, + otherwise defaulting to 2. Returns ------- @@ -122,9 +126,12 @@ def create(shape, chunks=True, dtype=None, compressor='default', """ + if zarr_version is None and store is None: + zarr_version = getattr(chunk_store, '_store_version', 2) # handle polymorphic store arg - store = normalize_store_arg(store) + store = normalize_store_arg(store, zarr_version=zarr_version) + zarr_version = getattr(store, '_store_version', 2) # API compatibility with h5py compressor, fill_value = _kwargs_compat(compressor, fill_value, kwargs) @@ -141,6 +148,9 @@ def create(shape, chunks=True, dtype=None, compressor='default', f"{store_separator}") dimension_separator = normalize_dimension_separator(dimension_separator) + if zarr_version > 2 and path is None: + raise ValueError("path must be supplied to initialize a zarr v3 array") + # initialize array metadata init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, overwrite=overwrite, path=path, @@ -388,6 +398,9 @@ def open_array( storage_options=None, partial_decompress=False, write_empty_chunks=True, + *, + zarr_version=None, + dimension_separator=None, **kwargs ): """Open an array using file-mode-like semantics. @@ -450,6 +463,15 @@ def open_array( that chunk's key is deleted. This setting enables sparser storage, as only chunks with non-fill-value data are stored, at the expense of overhead associated with checking the data of each chunk. + zarr_version : {None, 2, 3}, optional + The zarr protocol version of the array to be opened. If None, it will + be inferred from ``store`` or ``chunk_store`` if they are provided, + otherwise defaulting to 2. + zarr_version : {None, '.', '/'}, optional + Can be used to specify whether the array is in a flat ('.') or nested + ('/') format. If None, the appropriate value will be read from `store` + when present. Otherwise, defaults to '.' when ``zarr_version == 2`` + and `/` otherwise. Returns ------- @@ -484,12 +506,28 @@ def open_array( # w- or x : create, fail if exists # a : read/write if exists, create otherwise (default) + if zarr_version is None and store is None: + zarr_version = getattr(chunk_store, '_store_version', 2) + # handle polymorphic store arg clobber = (mode == 'w') - store = normalize_store_arg(store, clobber=clobber, storage_options=storage_options, mode=mode) + store = normalize_store_arg(store, clobber=clobber, storage_options=storage_options, + mode=mode, zarr_version=zarr_version) + zarr_version = getattr(store, '_store_version', 2) if chunk_store is not None: chunk_store = normalize_store_arg(chunk_store, clobber=clobber, - storage_options=storage_options) + storage_options=storage_options, + zarr_version=zarr_version) + + # respect the dimension separator specified in a store, if present + if dimension_separator is None: + if hasattr(store, '_dimension_separator'): + dimension_separator = store._dimension_separator + else: + dimension_separator = '.' if zarr_version == 2 else '/' + + if zarr_version == 3 and path is None: + path = 'array' # TODO: raise ValueError instead? path = normalize_storage_path(path) # API compatibility with h5py @@ -511,7 +549,8 @@ def open_array( init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, filters=filters, overwrite=True, path=path, - object_codec=object_codec, chunk_store=chunk_store) + object_codec=object_codec, chunk_store=chunk_store, + dimension_separator=dimension_separator) elif mode == 'a': if not contains_array(store, path=path): @@ -520,7 +559,8 @@ def open_array( init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, filters=filters, path=path, - object_codec=object_codec, chunk_store=chunk_store) + object_codec=object_codec, chunk_store=chunk_store, + dimension_separator=dimension_separator) elif mode in ['w-', 'x']: if contains_group(store, path=path): @@ -531,7 +571,8 @@ def open_array( init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, filters=filters, path=path, - object_codec=object_codec, chunk_store=chunk_store) + object_codec=object_codec, chunk_store=chunk_store, + dimension_separator=dimension_separator) # determine read only status read_only = mode == 'r' @@ -559,6 +600,7 @@ def _like_args(a, kwargs): kwargs.setdefault('compressor', a.compressor) kwargs.setdefault('order', a.order) kwargs.setdefault('filters', a.filters) + kwargs.setdefault('zarr_version', a._version) else: kwargs.setdefault('compressor', 'default') kwargs.setdefault('order', 'C') diff --git a/zarr/hierarchy.py b/zarr/hierarchy.py index 763a5f1631..53db0e617e 100644 --- a/zarr/hierarchy.py +++ b/zarr/hierarchy.py @@ -6,8 +6,7 @@ from zarr.attrs import Attributes from zarr.core import Array from zarr.creation import (array, create, empty, empty_like, full, full_like, - normalize_store_arg, ones, ones_like, zeros, - zeros_like) + ones, ones_like, zeros, zeros_like) from zarr.errors import ( ContainsArrayError, ContainsGroupError, @@ -15,14 +14,18 @@ ReadOnlyError, ) from zarr.storage import ( + _get_hierarchy_metadata, + _prefix_to_group_key, BaseStore, MemoryStore, + MemoryStoreV3, attrs_key, contains_array, contains_group, group_meta_key, init_group, listdir, + normalize_store_arg, rename, rmdir, ) @@ -109,9 +112,17 @@ class Group(MutableMapping): """ def __init__(self, store, path=None, read_only=False, chunk_store=None, - cache_attrs=True, synchronizer=None): - store: BaseStore = BaseStore._ensure_store(store) - chunk_store: BaseStore = BaseStore._ensure_store(chunk_store) + cache_attrs=True, synchronizer=None, zarr_version=None): + store: BaseStore = _normalize_store_arg(store, zarr_version=zarr_version) + if zarr_version is None: + zarr_version = getattr(store, '_store_version', 2) + if zarr_version > 2 and path: + if path.startswith(("meta/", "data/")): + raise ValueError("path must note start with 'meta/' or 'data/'") + if chunk_store is not None: + chunk_store: BaseStore = _normalize_store_arg(chunk_store, zarr_version=zarr_version) + if not getattr(chunk_store, '_store_version', 2) == zarr_version: + raise ValueError("zarr_version of store and chunk_store must match") self._store = store self._chunk_store = chunk_store self._path = normalize_storage_path(path) @@ -121,6 +132,13 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None, self._key_prefix = '' self._read_only = read_only self._synchronizer = synchronizer + self._version = zarr_version + + if self._version == 3: + self._data_key_prefix = 'data/root/' + self._key_prefix + self._data_path = 'data/root/' + self._path + self._hierarchy_metadata = _get_hierarchy_metadata(store=None) + self._metadata_key_suffix = self._hierarchy_metadata['metadata_key_suffix'] # guard conditions if contains_array(store, path=self._path): @@ -128,15 +146,31 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None, # initialize metadata try: - mkey = self._key_prefix + group_meta_key + mkey = _prefix_to_group_key(self._store, self._key_prefix) + assert not mkey.endswith("root/.group") meta_bytes = store[mkey] except KeyError: - raise GroupNotFoundError(path) + if self._version == 2: + raise GroupNotFoundError(path) + else: + implicit_prefix = 'meta/root/' + self._key_prefix + if not implicit_prefix.endswith('/'): + implicit_prefix += '/' + if self._store.list_prefix(implicit_prefix): + # implicit group does not have any metadata + self._meta = None + else: + raise GroupNotFoundError(path) else: self._meta = self._store._metadata_class.decode_group_metadata(meta_bytes) # setup attributes - akey = self._key_prefix + attrs_key + if self._version == 2: + akey = self._key_prefix + attrs_key + else: + # Note: mkey doesn't actually exist for implicit groups, but the + # object can still be created. + akey = mkey self._attrs = Attributes(store, key=akey, read_only=read_only, cache=cache_attrs, synchronizer=synchronizer) @@ -227,11 +261,36 @@ def __iter__(self): quux """ - for key in sorted(listdir(self._store, self._path)): - path = self._key_prefix + key - if (contains_array(self._store, path) or - contains_group(self._store, path)): - yield key + if getattr(self._store, '_store_version', 2) == 2: + for key in sorted(listdir(self._store, self._path)): + path = self._key_prefix + key + if (contains_array(self._store, path) or + contains_group(self._store, path)): + yield key + else: + # TODO: Should this iterate over data folders and/or metadata + # folders and/or metadata files + + dir_path = 'meta/root/' + self._key_prefix + name_start = len(dir_path) + keys, prefixes = self._store.list_dir(dir_path) + + # yield any groups or arrays + sfx = self._metadata_key_suffix + for key in keys: + len_suffix = len('.group') + len(sfx) # same for .array + if key.endswith(('.group' + sfx, '.array' + sfx)): + yield key[name_start:-len_suffix] + + # also yield any implicit groups + for prefix in prefixes: + prefix = prefix.rstrip('/') + # only implicit if there is no .group.sfx file + if not prefix + '.group' + sfx in self._store: + yield prefix[name_start:] + + # Note: omit data/root/ to avoid duplicate listings + # any group in data/root/ must has an entry in meta/root/ def __len__(self): """Number of members.""" @@ -323,9 +382,11 @@ def __contains__(self, item): False """ + if self._version > 2 and item.startswith('meta/'): + raise ValueError("meta/ must not be in item") path = self._item_path(item) return contains_array(self._store, path) or \ - contains_group(self._store, path) + contains_group(self._store, path, explicit_only=False) def __getitem__(self, item): """Obtain a group member. @@ -352,11 +413,21 @@ def __getitem__(self, item): if contains_array(self._store, path): return Array(self._store, read_only=self._read_only, path=path, chunk_store=self._chunk_store, - synchronizer=self._synchronizer, cache_attrs=self.attrs.cache) - elif contains_group(self._store, path): + synchronizer=self._synchronizer, cache_attrs=self.attrs.cache, + zarr_version=self._version) + elif contains_group(self._store, path, explicit_only=True): return Group(self._store, read_only=self._read_only, path=path, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, zarr_version=self._version) + elif self._version == 3: + implicit_group = 'meta/root/' + path + '/' + # non-empty folder in the metadata path implies an implicit group + if self._store.list_prefix(implicit_group): + return Group(self._store, read_only=self._read_only, path=path, + chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, + synchronizer=self._synchronizer, zarr_version=self._version) + else: + raise KeyError(item) else: raise KeyError(item) @@ -369,7 +440,7 @@ def __delitem__(self, item): def _delitem_nosync(self, item): path = self._item_path(item) if contains_array(self._store, path) or \ - contains_group(self._store, path): + contains_group(self._store, path, explicit_only=False): rmdir(self._store, path) else: raise KeyError(item) @@ -406,10 +477,24 @@ def group_keys(self): ['bar', 'foo'] """ - for key in sorted(listdir(self._store, self._path)): - path = self._key_prefix + key - if contains_group(self._store, path): - yield key + if self._version == 2: + for key in sorted(listdir(self._store, self._path)): + path = self._key_prefix + key + if contains_group(self._store, path): + yield key + else: + dir_name = 'meta/root/' + self._path + sfx = _get_hierarchy_metadata(self._store)['metadata_key_suffix'] + group_sfx = '.group' + sfx + for key in sorted(listdir(self._store, dir_name)): + if key.endswith(group_sfx): + key = key[:-len(group_sfx)] + path = self._key_prefix + key + if path.endswith(".array" + sfx): + # skip array keys + continue + if contains_group(self._store, path, explicit_only=False): + yield key def groups(self): """Return an iterator over (name, value) pairs for groups only. @@ -428,13 +513,39 @@ def groups(self): foo """ - for key in sorted(listdir(self._store, self._path)): - path = self._key_prefix + key - if contains_group(self._store, path): - yield key, Group(self._store, path=path, read_only=self._read_only, - chunk_store=self._chunk_store, - cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + if self._version == 2: + for key in sorted(listdir(self._store, self._path)): + path = self._key_prefix + key + if contains_group(self._store, path, explicit_only=False): + yield key, Group( + self._store, + path=path, + read_only=self._read_only, + chunk_store=self._chunk_store, + cache_attrs=self.attrs.cache, + synchronizer=self._synchronizer, + zarr_version=self._version) + + else: + dir_name = 'meta/root/' + self._path + sfx = _get_hierarchy_metadata(self._store)['metadata_key_suffix'] + group_sfx = '.group' + sfx + for key in sorted(listdir(self._store, dir_name)): + if key.endswith(group_sfx): + key = key[:-len(group_sfx)] + path = self._key_prefix + key + if path.endswith(".array" + sfx): + # skip array keys + continue + if contains_group(self._store, path, explicit_only=False): + yield key, Group( + self._store, + path=path, + read_only=self._read_only, + chunk_store=self._chunk_store, + cache_attrs=self.attrs.cache, + synchronizer=self._synchronizer, + zarr_version=self._version) def array_keys(self, recurse=False): """Return an iterator over member names for arrays only. @@ -491,14 +602,36 @@ def arrays(self, recurse=False): recurse=recurse) def _array_iter(self, keys_only, method, recurse): - for key in sorted(listdir(self._store, self._path)): - path = self._key_prefix + key - if contains_array(self._store, path): - yield key if keys_only else (key, self[key]) - elif recurse and contains_group(self._store, path): - group = self[key] - for i in getattr(group, method)(recurse=recurse): - yield i + if self._version == 2: + for key in sorted(listdir(self._store, self._path)): + path = self._key_prefix + key + assert not path.startswith("meta") + if contains_array(self._store, path): + _key = key.rstrip("/") + yield _key if keys_only else (_key, self[key]) + elif recurse and contains_group(self._store, path): + group = self[key] + for i in getattr(group, method)(recurse=recurse): + yield i + else: + dir_name = 'meta/root/' + self._path + sfx = _get_hierarchy_metadata(self._store)['metadata_key_suffix'] + array_sfx = '.array' + sfx + for key in sorted(listdir(self._store, dir_name)): + if key.endswith(array_sfx): + key = key[:-len(array_sfx)] + path = self._key_prefix + key + assert not path.startswith("meta") + if key.endswith('.group' + sfx): + # skip group metadata keys + continue + if contains_array(self._store, path): + _key = key.rstrip("/") + yield _key if keys_only else (_key, self[key]) + elif recurse and contains_group(self._store, path): + group = self[key] + for i in getattr(group, method)(recurse=recurse): + yield i def visitvalues(self, func): """Run ``func`` on each object. @@ -707,7 +840,7 @@ def _create_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, zarr_version=self._version) def create_groups(self, *names, **kwargs): """Convenience method to create multiple groups in a single call.""" @@ -751,7 +884,7 @@ def _require_group_nosync(self, name, overwrite=False): return Group(self._store, path=path, read_only=self._read_only, chunk_store=self._chunk_store, cache_attrs=self.attrs.cache, - synchronizer=self._synchronizer) + synchronizer=self._synchronizer, zarr_version=self._version) def require_groups(self, *names): """Convenience method to require multiple groups in a single call.""" @@ -1039,9 +1172,10 @@ def move(self, source, dest): # Check that source exists. if not (contains_array(self._store, source) or - contains_group(self._store, source)): + contains_group(self._store, source, explicit_only=False)): raise ValueError('The source, "%s", does not exist.' % source) - if contains_array(self._store, dest) or contains_group(self._store, dest): + if (contains_array(self._store, dest) or + contains_group(self._store, dest, explicit_only=False)): raise ValueError('The dest, "%s", already exists.' % dest) # Ensure groups needed for `dest` exist. @@ -1051,15 +1185,19 @@ def move(self, source, dest): self._write_op(self._move_nosync, source, dest) -def _normalize_store_arg(store, *, clobber=False, storage_options=None, mode=None): +def _normalize_store_arg(store, *, clobber=False, storage_options=None, mode=None, + zarr_version=None): + if zarr_version is None: + zarr_version = getattr(store, '_store_version', 2) if store is None: - return MemoryStore() + return MemoryStore() if zarr_version == 2 else MemoryStoreV3() return normalize_store_arg(store, clobber=clobber, - storage_options=storage_options, mode=mode) + storage_options=storage_options, mode=mode, + zarr_version=zarr_version) def group(store=None, overwrite=False, chunk_store=None, - cache_attrs=True, synchronizer=None, path=None): + cache_attrs=True, synchronizer=None, path=None, *, zarr_version=None): """Create a group. Parameters @@ -1104,20 +1242,29 @@ def group(store=None, overwrite=False, chunk_store=None, """ # handle polymorphic store arg - store = _normalize_store_arg(store) + store = _normalize_store_arg(store, zarr_version=zarr_version) + if zarr_version is None: + zarr_version = getattr(store, '_store_version', 2) + if zarr_version == 3 and path is None: + raise ValueError(f"path must be provided for a v{zarr_version} group") path = normalize_storage_path(path) - # require group - if overwrite or not contains_group(store): + if zarr_version == 2: + requires_init = overwrite or not contains_group(store) + elif zarr_version == 3: + requires_init = overwrite or not contains_group(store, path) + + if requires_init: init_group(store, overwrite=overwrite, chunk_store=chunk_store, path=path) return Group(store, read_only=False, chunk_store=chunk_store, - cache_attrs=cache_attrs, synchronizer=synchronizer, path=path) + cache_attrs=cache_attrs, synchronizer=synchronizer, path=path, + zarr_version=zarr_version) def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None, - chunk_store=None, storage_options=None): + chunk_store=None, storage_options=None, *, zarr_version=None): """Open a group using file-mode-like semantics. Parameters @@ -1166,11 +1313,22 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N # handle polymorphic store arg clobber = mode != "r" store = _normalize_store_arg( - store, clobber=clobber, storage_options=storage_options, mode=mode - ) + store, clobber=clobber, storage_options=storage_options, mode=mode, + zarr_version=zarr_version) + if zarr_version is None: + zarr_version = getattr(store, '_store_version', 2) if chunk_store is not None: chunk_store = _normalize_store_arg(chunk_store, clobber=clobber, storage_options=storage_options) + if not getattr(chunk_store, '_store_version', 2) == zarr_version: + raise ValueError( + "zarr_version of store and chunk_store must match" + ) + + store_version = getattr(store, '_store_version', 2) + if store_version == 3 and path is None: + raise ValueError("path must be supplied to initialize a zarr v3 group") + path = normalize_storage_path(path) # ensure store is initialized @@ -1202,4 +1360,5 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N read_only = mode == 'r' return Group(store, read_only=read_only, cache_attrs=cache_attrs, - synchronizer=synchronizer, path=path, chunk_store=chunk_store) + synchronizer=synchronizer, path=path, chunk_store=chunk_store, + zarr_version=zarr_version) diff --git a/zarr/meta.py b/zarr/meta.py index c292b09a14..02730f0c01 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -1,14 +1,87 @@ import base64 +import itertools from collections.abc import Mapping +import numcodecs import numpy as np +from numcodecs.abc import Codec from zarr.errors import MetadataError from zarr.util import json_dumps, json_loads -from typing import cast, Union, Any, List, Mapping as MappingType +from typing import cast, Union, Any, List, Mapping as MappingType, Optional ZARR_FORMAT = 2 +ZARR_FORMAT_v3 = 3 + +# FLOAT_FILLS = {"NaN": np.nan, "Infinity": np.PINF, "-Infinity": np.NINF} + +_default_entry_point_metadata_v3 = { + "zarr_format": "https://purl.org/zarr/spec/protocol/core/3.0", + "metadata_encoding": "https://purl.org/zarr/spec/protocol/core/3.0", + "metadata_key_suffix": ".json", + "extensions": [], +} + +_v3_core_types = set( + "".join(d) for d in itertools.product("<>", ("u", "i", "f"), ("2", "4", "8")) +) +_v3_core_types = {"bool", "i1", "u1"} | _v3_core_types + +# The set of complex types allowed ({"c8", ">c16"}) +_v3_complex_types = set( + f"{end}c{_bytes}" for end, _bytes in itertools.product("<>", ("8", "16")) +) + +# All dtype.str values corresponding to datetime64 and timedelta64 +# see: https://numpy.org/doc/stable/reference/arrays.datetime.html#datetime-units +_date_units = ["Y", "M", "W", "D"] +_time_units = ["h", "m", "s", "ms", "us", "μs", "ns", "ps", "fs", "as"] +_v3_datetime_types = set( + f"{end}{kind}8[{unit}]" + for end, unit, kind in itertools.product("<>", _date_units + _time_units, ('m', 'M')) +) + + +def get_extended_dtype_info(dtype): + if dtype.str in _v3_complex_types: + return dict( + extension="https://zarr-specs.readthedocs.io/en/core-protocol-v3.0-dev/protocol/extensions/complex-dtypes/v1.0.html", # noqa + type=dtype.str, + fallback=None, + ) + elif dtype.str == "|O": + return dict( + extension="TODO: object array protocol URL", # noqa + type=dtype.str, + fallback=None, + ) + elif dtype.str.startswith("|S"): + return dict( + extension="TODO: bytestring array protocol URL", # noqa + type=dtype.str, + fallback=None, + ) + elif dtype.str.startswith("|U"): + return dict( + extension="TODO: unicode array protocol URL", # noqa + type=dtype.str, + fallback=None, + ) + elif dtype.str.startswith("|V"): + return dict( + extension="TODO: structured array protocol URL", # noqa + type=dtype.descr, + fallback=None, + ) + elif dtype.str in _v3_datetime_types: + return dict( + extension="https://zarr-specs.readthedocs.io/en/core-protocol-v3.0-dev/protocol/extensions/datetime-dtypes/v1.0.html", # noqa + type=dtype.str, + fallback=None, + ) + else: + raise ValueError(f"Unsupport dtype: {dtype}") class Metadata2: @@ -46,12 +119,13 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A dtype = cls.decode_dtype(meta["dtype"]) if dtype.hasobject: import numcodecs - object_codec = numcodecs.get_codec(meta['filters'][0]) + + object_codec = numcodecs.get_codec(meta["filters"][0]) else: object_codec = None dimension_separator = meta.get("dimension_separator", None) - fill_value = cls.decode_fill_value(meta['fill_value'], dtype, object_codec) + fill_value = cls.decode_fill_value(meta["fill_value"], dtype, object_codec) meta = dict( zarr_format=meta["zarr_format"], shape=tuple(meta["shape"]), @@ -63,7 +137,7 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A filters=meta["filters"], ) if dimension_separator: - meta['dimension_separator'] = dimension_separator + meta["dimension_separator"] = dimension_separator except Exception as e: raise MetadataError("error decoding metadata") from e else: @@ -79,7 +153,8 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: dimension_separator = meta.get("dimension_separator") if dtype.hasobject: import numcodecs - object_codec = numcodecs.get_codec(meta['filters'][0]) + + object_codec = numcodecs.get_codec(meta["filters"][0]) else: object_codec = None @@ -93,9 +168,6 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: order=meta["order"], filters=meta["filters"], ) - if dimension_separator: - meta['dimension_separator'] = dimension_separator - if dimension_separator: meta["dimension_separator"] = dimension_separator @@ -141,13 +213,15 @@ def encode_group_metadata(cls, meta=None) -> bytes: return json_dumps(meta) @classmethod - def decode_fill_value(cls, v: Any, dtype: np.dtype, object_codec: Any = None) -> Any: + def decode_fill_value( + cls, v: Any, dtype: np.dtype, object_codec: Any = None + ) -> Any: # early out if v is None: return v - if dtype.kind == 'V' and dtype.hasobject: + if dtype.kind == "V" and dtype.hasobject: if object_codec is None: - raise ValueError('missing object_codec for object array') + raise ValueError("missing object_codec for object array") v = base64.standard_b64decode(v) v = object_codec.decode(v) v = np.array(v, dtype=dtype)[()] @@ -189,15 +263,17 @@ def decode_fill_value(cls, v: Any, dtype: np.dtype, object_codec: Any = None) -> return np.array(v, dtype=dtype)[()] @classmethod - def encode_fill_value(cls, v: Any, dtype: np.dtype, object_codec: Any = None) -> Any: + def encode_fill_value( + cls, v: Any, dtype: np.dtype, object_codec: Any = None + ) -> Any: # early out if v is None: return v - if dtype.kind == 'V' and dtype.hasobject: + if dtype.kind == "V" and dtype.hasobject: if object_codec is None: - raise ValueError('missing object_codec for object array') + raise ValueError("missing object_codec for object array") v = object_codec.encode(v) - v = str(base64.standard_b64encode(v), 'ascii') + v = str(base64.standard_b64encode(v), "ascii") return v if dtype.kind == "f": if np.isnan(v): @@ -214,8 +290,10 @@ def encode_fill_value(cls, v: Any, dtype: np.dtype, object_codec: Any = None) -> return bool(v) elif dtype.kind in "c": c = cast(np.complex128, np.dtype(complex).type()) - v = (cls.encode_fill_value(v.real, c.real.dtype, object_codec), - cls.encode_fill_value(v.imag, c.imag.dtype, object_codec)) + v = ( + cls.encode_fill_value(v.real, c.real.dtype, object_codec), + cls.encode_fill_value(v.imag, c.imag.dtype, object_codec), + ) return v elif dtype.kind in "SV": v = str(base64.standard_b64encode(v), "ascii") @@ -228,7 +306,235 @@ def encode_fill_value(cls, v: Any, dtype: np.dtype, object_codec: Any = None) -> return v -# expose class methods for backwards compatibility +class Metadata3(Metadata2): + ZARR_FORMAT = ZARR_FORMAT_v3 + + @classmethod + def decode_dtype(cls, d, validate=True): + if isinstance(d, dict): + # extract the type from the extension info + try: + d = d['type'] + except KeyError: + raise KeyError( + "Extended dtype info must provide a key named 'type'." + ) + d = cls._decode_dtype_descr(d) + dtype = np.dtype(d) + if validate: + if dtype.str in (_v3_core_types | {"|b1", "|u1", "|i1"}): + # it is a core dtype of the v3 spec + pass + else: + # will raise if this is not a recognized extended dtype + get_extended_dtype_info(dtype) + return dtype + + @classmethod + def encode_dtype(cls, d): + s = d.str + if s == "|b1": + return "bool" + elif s == "|u1": + return "u1" + elif s == "|i1": + return "i1" + elif s in _v3_core_types: + return Metadata2.encode_dtype(d) + else: + # Check if this dtype corresponds to a supported extension to + # the v3 protocol. + return get_extended_dtype_info(np.dtype(d)) + + @classmethod + def decode_group_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, Any]: + meta = cls.parse_metadata(s) + # 1 / 0 + # # check metadata format version + # zarr_format = meta.get("zarr_format", None) + # if zarr_format != cls.ZARR_FORMAT: + # raise MetadataError("unsupported zarr format: %s" % zarr_format) + + assert "attributes" in meta + # meta = dict(attributes=meta['attributes']) + return meta + + # return json.loads(s) + + @classmethod + def encode_group_metadata(cls, meta=None) -> bytes: + # The ZARR_FORMAT should not be in the group metadata, but in the + # entry point metadata instead + # meta = dict(zarr_format=cls.ZARR_FORMAT) + if meta is None: + meta = {"attributes": {}} + meta = dict(attributes=meta.get("attributes", {})) + return json_dumps(meta) + + @classmethod + def encode_hierarchy_metadata(cls, meta=None) -> bytes: + if meta is None: + meta = _default_entry_point_metadata_v3 + elif set(meta.keys()) != { + "zarr_format", + "metadata_encoding", + "metadata_key_suffix", + "extensions", + }: + raise ValueError(f"Unexpected keys in metadata. meta={meta}") + return json_dumps(meta) + + @classmethod + def decode_hierarchy_metadata( + cls, s: Union[MappingType, str] + ) -> MappingType[str, Any]: + meta = cls.parse_metadata(s) + # check metadata format + # zarr_format = meta.get("zarr_format", None) + # if zarr_format != "https://purl.org/zarr/spec/protocol/core/3.0": + # raise MetadataError("unsupported zarr format: %s" % zarr_format) + if set(meta.keys()) != { + "zarr_format", + "metadata_encoding", + "metadata_key_suffix", + "extensions", + }: + raise ValueError(f"Unexpected keys in metdata. meta={meta}") + return meta + + @classmethod + def _encode_codec_metadata(cls, codec: Codec) -> Optional[Mapping]: + if codec is None: + return None + + # only support gzip for now + config = codec.get_config() + del config["id"] + uri = 'https://purl.org/zarr/spec/codec/' + if isinstance(codec, numcodecs.GZip): + uri = uri + "gzip/1.0" + elif isinstance(codec, numcodecs.Zlib): + uri = uri + "zlib/1.0" + elif isinstance(codec, numcodecs.Blosc): + uri = uri + "blosc/1.0" + elif isinstance(codec, numcodecs.BZ2): + uri = uri + "bz2/1.0" + elif isinstance(codec, numcodecs.LZ4): + uri = uri + "lz4/1.0" + elif isinstance(codec, numcodecs.LZMA): + uri = uri + "lzma/1.0" + meta = { + "codec": uri, + "configuration": config, + } + return meta + + @classmethod + def _decode_codec_metadata(cls, meta: Optional[Mapping]) -> Optional[Codec]: + if meta is None: + return None + + uri = 'https://purl.org/zarr/spec/codec/' + conf = meta['configuration'] + if meta['codec'].startswith(uri + 'gzip/'): + codec = numcodecs.GZip(level=conf['level']) + elif meta['codec'].startswith(uri + 'zlib/'): + codec = numcodecs.Zlib(level=conf['level']) + elif meta['codec'].startswith(uri + 'blosc/'): + codec = numcodecs.Blosc(clevel=conf['clevel'], + shuffle=conf['shuffle'], + blocksize=conf['blocksize'], + cname=conf['cname']) + elif meta['codec'].startswith(uri + 'bz2/'): + codec = numcodecs.BZ2(level=conf['level']) + elif meta['codec'].startswith(uri + 'lz4/'): + codec = numcodecs.LZ4(acceleration=conf['acceleration']) + elif meta['codec'].startswith(uri + 'lzma/'): + codec = numcodecs.LZMA(format=conf['format'], + check=conf['check'], + preset=conf['preset'], + filters=conf['filters']) + else: + raise NotImplementedError + + return codec + + @classmethod + def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, Any]: + meta = cls.parse_metadata(s) + + # extract array metadata fields + try: + dtype = cls.decode_dtype(meta["data_type"]) + if dtype.hasobject: + import numcodecs + + object_codec = numcodecs.get_codec(meta["attributes"]["filters"][0]) + else: + object_codec = None + fill_value = cls.decode_fill_value(meta["fill_value"], dtype, object_codec) + # TODO: remove dimension_separator? + + compressor = cls._decode_codec_metadata(meta.get("compressor", None)) + extensions = meta.get("extensions", []) + meta = dict( + shape=tuple(meta["shape"]), + chunk_grid=dict( + type=meta["chunk_grid"]["type"], + chunk_shape=tuple(meta["chunk_grid"]["chunk_shape"]), + separator=meta["chunk_grid"]["separator"], + ), + data_type=dtype, + fill_value=fill_value, + chunk_memory_layout=meta["chunk_memory_layout"], + attributes=meta["attributes"], + extensions=extensions, + ) + # compressor field should be absent when there is no compression + if compressor: + meta['compressor'] = compressor + + except Exception as e: + raise MetadataError("error decoding metadata: %s" % e) + else: + return meta + + @classmethod + def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: + dtype = meta["data_type"] + sdshape = () + if dtype.subdtype is not None: + dtype, sdshape = dtype.subdtype + dimension_separator = meta.get("dimension_separator") + if dtype.hasobject: + import numcodecs + + object_codec = numcodecs.get_codec(meta["attributes"]["filters"][0]) + else: + object_codec = None + + compressor = cls._encode_codec_metadata(meta.get("compressor", None)) + extensions = meta.get("extensions", []) + meta = dict( + shape=meta["shape"] + sdshape, + chunk_grid=dict( + type=meta["chunk_grid"]["type"], + chunk_shape=tuple(meta["chunk_grid"]["chunk_shape"]), + separator=meta["chunk_grid"]["separator"], + ), + data_type=cls.encode_dtype(dtype), + fill_value=encode_fill_value(meta["fill_value"], dtype, object_codec), + chunk_memory_layout=meta["chunk_memory_layout"], + attributes=meta.get("attributes", {}), + extensions=extensions, + ) + if compressor: + meta["compressor"] = compressor + if dimension_separator: + meta["dimension_separator"] = dimension_separator + return json_dumps(meta) + + parse_metadata = Metadata2.parse_metadata decode_array_metadata = Metadata2.decode_array_metadata encode_array_metadata = Metadata2.encode_array_metadata diff --git a/zarr/storage.py b/zarr/storage.py index 1dc2cf32b3..ef5479c8fa 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -35,6 +35,7 @@ import uuid import time +from numcodecs.abc import Codec from numcodecs.compat import ( ensure_bytes, ensure_text, @@ -57,15 +58,19 @@ normalize_shape, normalize_storage_path, retry_call) from zarr._storage.absstore import ABSStore # noqa: F401 -from zarr._storage.store import (_listdir_from_keys, - _path_to_prefix, +from zarr._storage.store import (_get_hierarchy_metadata, + _listdir_from_keys, _rename_from_keys, _rmdir_from_keys, + _path_to_prefix, + _prefix_to_array_key, + _prefix_to_group_key, array_meta_key, group_meta_key, attrs_key, BaseStore, - Store) + Store, + StoreV3) __doctest_requires__ = { ('RedisStore', 'RedisStore.*'): ['redis'], @@ -92,40 +97,95 @@ def contains_array(store: StoreLike, path: Path = None) -> bool: """Return True if the store contains an array at the given logical path.""" path = normalize_storage_path(path) prefix = _path_to_prefix(path) - key = prefix + array_meta_key + key = _prefix_to_array_key(store, prefix) return key in store -def contains_group(store: StoreLike, path: Path = None) -> bool: +def contains_group(store: StoreLike, path: Path = None, explicit_only=True) -> bool: """Return True if the store contains a group at the given logical path.""" path = normalize_storage_path(path) prefix = _path_to_prefix(path) - key = prefix + group_meta_key - return key in store + key = _prefix_to_group_key(store, prefix) + store_version = getattr(store, '_store_version', 2) + if store_version == 2 or explicit_only: + return key in store + else: + if key in store: + return True + # for v3, need to also handle implicit groups + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + implicit_prefix = key.replace('.group' + sfx, '') + if not implicit_prefix.endswith('/'): + implicit_prefix += '/' + if store.list_prefix(implicit_prefix): # type: ignore + return True + return False -def normalize_store_arg(store: Any, clobber=False, storage_options=None, mode="w") -> BaseStore: +def normalize_store_arg(store, clobber=False, storage_options=None, mode="w", + *, zarr_version=None) -> Store: + if zarr_version is None: + # default to v2 store for backward compatibility + zarr_version = getattr(store, '_store_version', 2) + if zarr_version not in [2, 3]: + raise ValueError("zarr_version must be 2 or 3") if store is None: - return BaseStore._ensure_store(dict()) - elif isinstance(store, os.PathLike): + if zarr_version == 2: + store = KVStore(dict()) + else: + store = KVStoreV3(dict()) + # add default zarr.json metadata + store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) + return store + if isinstance(store, os.PathLike): store = os.fspath(store) if isinstance(store, str): mode = mode if clobber else "r" - if "://" in store or "::" in store: - return FSStore(store, mode=mode, **(storage_options or {})) - elif storage_options: - raise ValueError("storage_options passed with non-fsspec path") - if store.endswith('.zip'): - return ZipStore(store, mode=mode) - elif store.endswith('.n5'): - from zarr.n5 import N5Store - return N5Store(store) - else: - return DirectoryStore(store) - else: - if not isinstance(store, BaseStore) and isinstance(store, MutableMapping): - store = BaseStore._ensure_store(store) - return store + if zarr_version == 2: + if "://" in store or "::" in store: + return FSStore(store, mode=mode, **(storage_options or {})) + elif storage_options: + raise ValueError("storage_options passed with non-fsspec path") + if store.endswith('.zip'): + return ZipStore(store, mode=mode) + elif store.endswith('.n5'): + from zarr.n5 import N5Store + return N5Store(store) + else: + return DirectoryStore(store) + elif zarr_version == 3: + if "://" in store or "::" in store: + store = FSStoreV3(store, mode=mode, **(storage_options or {})) + elif storage_options: + store = ValueError("storage_options passed with non-fsspec path") + if store.endswith('.zip'): + store = ZipStoreV3(store, mode=mode) + elif store.endswith('.n5'): + raise NotImplementedError("N5Store not yet implemented for V3") + # return N5StoreV3(store) + else: + store = DirectoryStoreV3(store) + # add default zarr.json metadata + store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) + return store + elif zarr_version == 2: + store = Store._ensure_store(store) + if getattr(store, '_store_version', 2) != 2: + raise ValueError( + "provided store does not match the specified zarr version.") + # if not isinstance(store, Store) and isinstance(store, MutableMapping): + # store = KVStore(store) + elif zarr_version == 3: + store = StoreV3._ensure_store(store) + if getattr(store, '_store_version', 2) != 3: + raise ValueError( + "provided store does not match the specified zarr version.") + # if not isinstance(store, StoreV3) and isinstance(store, MutableMapping): + # store = KVStoreV3(store) + if 'zarr.json' not in store: + # add default zarr.json metadata + store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) + return store def rmdir(store: StoreLike, path: Path = None): @@ -133,15 +193,36 @@ def rmdir(store: StoreLike, path: Path = None): this will be called, otherwise will fall back to implementation via the `Store` interface.""" path = normalize_storage_path(path) - if hasattr(store, "rmdir") and store.is_erasable(): # type: ignore - # pass through - store.rmdir(path) # type: ignore + if getattr(store, '_store_version', 2) == 2: + if hasattr(store, "rmdir") and store.is_erasable(): # type: ignore + # pass through + store.rmdir(path) # type: ignore + else: + # slow version, delete one key at a time + _rmdir_from_keys(store, path) else: - # slow version, delete one key at a time - _rmdir_from_keys(store, path) + # TODO: check behavior for v3 and fix in the Store class, deferring to + # those by default + + # remove metadata folder + meta_dir = 'meta/root/' + path + _rmdir_from_keys(store, meta_dir) + + # remove data folder + data_dir = 'data/root/' + path + _rmdir_from_keys(store, data_dir) + # remove metadata files + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + array_meta_file = meta_dir + '.array' + sfx + if array_meta_file in store: + store.erase(array_meta_file) # type: ignore + group_meta_file = meta_dir + '.group' + sfx + if group_meta_file in store: + store.erase(group_meta_file) # type: ignore -def rename(store: BaseStore, src_path: Path, dst_path: Path): + +def rename(store: Store, src_path: Path, dst_path: Path): """Rename all items under the given path. If `store` provides a `rename` method, this will be called, otherwise will fall back to implementation via the `Store` interface.""" @@ -163,6 +244,27 @@ def listdir(store: BaseStore, path: Path = None): if hasattr(store, 'listdir'): # pass through return store.listdir(path) # type: ignore + elif getattr(store, "_store_version", None) == 3: + meta_prefix = 'meta/root/' + dir_path = meta_prefix + path + path_start = len(meta_prefix) + meta_keys = [] + include_meta_keys = False + if include_meta_keys: + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + group_meta_key = dir_path + '.group' + sfx + if group_meta_key in store: + meta_keys.append(group_meta_key[path_start:]) + array_meta_key = dir_path + '.array' + sfx + if array_meta_key in store: + meta_keys.append(array_meta_key[path_start:]) + if not dir_path.endswith('/'): + dir_path += '/' + keys, prefixes = store.list_dir(dir_path) # type: ignore + keys = [k[path_start:] for k in keys] + prefixes = [p[path_start:] for p in prefixes] + return meta_keys + keys + prefixes + else: # slow version, iterate through all keys warnings.warn( @@ -173,33 +275,45 @@ def listdir(store: BaseStore, path: Path = None): return _listdir_from_keys(store, path) +def _getsize(store: BaseStore, path: Path = None) -> int: + # compute from size of values + if path and path in store: + v = store[path] + size = buffer_size(v) + else: + path = '' if path is None else normalize_storage_path(path) + size = 0 + store_version = getattr(store, '_store_version', 2) + if store_version == 3: + members = store.list_prefix('data/root/' + path) # type: ignore + members += store.list_prefix('meta/root/' + path) # type: ignore + # members += ['zarr.json'] + else: + members = listdir(store, path) + prefix = _path_to_prefix(path) + members = [prefix + k for k in members] + for k in members: + try: + v = store[k] + except KeyError: + pass + else: + try: + size += buffer_size(v) + except TypeError: + return -1 + return size + + def getsize(store: BaseStore, path: Path = None) -> int: """Compute size of stored items for a given path. If `store` provides a `getsize` method, this will be called, otherwise will return -1.""" - path = normalize_storage_path(path) if hasattr(store, 'getsize'): # pass through + path = normalize_storage_path(path) return store.getsize(path) # type: ignore elif isinstance(store, MutableMapping): - # compute from size of values - if path in store: - v = store[path] - size = buffer_size(v) - else: - members = listdir(store, path) - prefix = _path_to_prefix(path) - size = 0 - for k in members: - try: - v = store[prefix + k] - except KeyError: - pass - else: - try: - size += buffer_size(v) - except TypeError: - return -1 - return size + return _getsize(store, path) else: return -1 @@ -346,8 +460,18 @@ def init_array( path = normalize_storage_path(path) # ensure parent group initialized - _require_parent_group(path, store=store, chunk_store=chunk_store, overwrite=overwrite) + store_version = getattr(store, "_store_version", 2) + if store_version < 3: + _require_parent_group(path, store=store, chunk_store=chunk_store, + overwrite=overwrite) + + if store_version == 3 and 'zarr.json' not in store: + # initialize with default zarr.json entry level metadata + store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) # type: ignore + if not compressor: + # compatibility with legacy tests using compressor=[] + compressor = None _init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, overwrite=overwrite, path=path, @@ -372,16 +496,50 @@ def _init_array_metadata( dimension_separator=None, ): + store_version = getattr(store, '_store_version', 2) + + path = normalize_storage_path(path) + # guard conditions if overwrite: - # attempt to delete any pre-existing items in store - rmdir(store, path) - if chunk_store is not None: - rmdir(chunk_store, path) - elif contains_array(store, path): - raise ContainsArrayError(path) - elif contains_group(store, path): - raise ContainsGroupError(path) + if store_version == 2: + # attempt to delete any pre-existing array in store + rmdir(store, path) + if chunk_store is not None: + rmdir(chunk_store, path) + else: + group_meta_key = _prefix_to_group_key(store, _path_to_prefix(path)) + array_meta_key = _prefix_to_array_key(store, _path_to_prefix(path)) + data_prefix = 'data/root/' + _path_to_prefix(path) + + # attempt to delete any pre-existing array in store + if array_meta_key in store: + store.erase(array_meta_key) # type: ignore + if group_meta_key in store: + store.erase(group_meta_key) # type: ignore + store.erase_prefix(data_prefix) # type: ignore + if chunk_store is not None: + chunk_store.erase_prefix(data_prefix) # type: ignore + + if '/' in path: + # path is a subfolder of an existing array, remove that array + parent_path = '/'.join(path.split('/')[:-1]) + sfx = _get_hierarchy_metadata(store)['metadata_key_suffix'] + array_key = 'meta/root/' + parent_path + '.array' + sfx + if array_key in store: + store.erase(array_key) # type: ignore + + if not overwrite: + if contains_array(store, path): + raise ContainsArrayError(path) + elif contains_group(store, path, explicit_only=False): + raise ContainsGroupError(path) + elif store_version == 3: + if '/' in path: + # cannot create an array within an existing array path + parent_path = '/'.join(path.split('/')[:-1]) + if contains_array(store, parent_path): + raise ContainsArrayError(path) # normalize metadata dtype, object_codec = normalize_dtype(dtype, object_codec) @@ -392,7 +550,7 @@ def _init_array_metadata( fill_value = normalize_fill_value(fill_value, dtype) # optional array metadata - if dimension_separator is None: + if dimension_separator is None and store_version == 2: dimension_separator = getattr(store, "_dimension_separator", None) dimension_separator = normalize_dimension_separator(dimension_separator) @@ -409,13 +567,21 @@ def _init_array_metadata( # obtain compressor config compressor_config = None if compressor: - try: - compressor_config = compressor.get_config() - except AttributeError as e: - raise BadCompressorError(compressor) from e + if store_version == 2: + try: + compressor_config = compressor.get_config() + except AttributeError as e: + raise BadCompressorError(compressor) from e + elif not isinstance(compressor, Codec): + raise ValueError("expected a numcodecs Codec for compressor") + # TODO: alternatively, could autoconvert str to a Codec + # e.g. 'zlib' -> numcodec.Zlib object + # compressor = numcodecs.get_codec({'id': compressor}) # obtain filters config if filters: + # TODO: filters was removed from the metadata in v3 + # raise error here if store_version > 2? filters_config = [f.get_config() for f in filters] else: filters_config = [] @@ -441,11 +607,31 @@ def _init_array_metadata( filters_config = None # type: ignore # initialize metadata - meta = dict(shape=shape, chunks=chunks, dtype=dtype, - compressor=compressor_config, fill_value=fill_value, - order=order, filters=filters_config, + # TODO: don't store redundant dimension_separator for v3? + _compressor = compressor_config if store_version == 2 else compressor + meta = dict(shape=shape, compressor=_compressor, + fill_value=fill_value, dimension_separator=dimension_separator) - key = _path_to_prefix(path) + array_meta_key + if store_version < 3: + meta.update(dict(chunks=chunks, dtype=dtype, order=order, + filters=filters_config)) + else: + if dimension_separator is None: + dimension_separator = "/" + if filters_config: + attributes = {'filters': filters_config} + else: + attributes = {} + meta.update( + dict(chunk_grid=dict(type="regular", + chunk_shape=chunks, + separator=dimension_separator), + chunk_memory_layout=order, + data_type=dtype, + attributes=attributes) + ) + + key = _prefix_to_array_key(store, _path_to_prefix(path)) if hasattr(store, '_metadata_class'): store[key] = store._metadata_class.encode_array_metadata(meta) # type: ignore else: @@ -482,14 +668,26 @@ def init_group( # normalize path path = normalize_storage_path(path) - # ensure parent group initialized - _require_parent_group(path, store=store, chunk_store=chunk_store, - overwrite=overwrite) + store_version = getattr(store, '_store_version', 2) + if store_version < 3: + # ensure parent group initialized + _require_parent_group(path, store=store, chunk_store=chunk_store, + overwrite=overwrite) + + if store_version == 3 and 'zarr.json' not in store: + # initialize with default zarr.json entry level metadata + store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) # type: ignore # initialise metadata _init_group_metadata(store=store, overwrite=overwrite, path=path, chunk_store=chunk_store) + if store_version == 3: + # TODO: Should initializing a v3 group also create a corresponding + # empty folder under data/root/? I think probably not until there + # is actual data written there. + pass + def _init_group_metadata( store: StoreLike, @@ -498,22 +696,51 @@ def _init_group_metadata( chunk_store: StoreLike = None, ): + store_version = getattr(store, '_store_version', 2) + path = normalize_storage_path(path) + # guard conditions if overwrite: - # attempt to delete any pre-existing items in store - rmdir(store, path) - if chunk_store is not None: - rmdir(chunk_store, path) - elif contains_array(store, path): - raise ContainsArrayError(path) - elif contains_group(store, path): - raise ContainsGroupError(path) + if store_version == 2: + # attempt to delete any pre-existing items in store + rmdir(store, path) + if chunk_store is not None: + rmdir(chunk_store, path) + else: + group_meta_key = _prefix_to_group_key(store, _path_to_prefix(path)) + array_meta_key = _prefix_to_array_key(store, _path_to_prefix(path)) + data_prefix = 'data/root/' + _path_to_prefix(path) + meta_prefix = 'meta/root/' + _path_to_prefix(path) + + # attempt to delete any pre-existing array in store + if array_meta_key in store: + store.erase(array_meta_key) # type: ignore + if group_meta_key in store: + store.erase(group_meta_key) # type: ignore + store.erase_prefix(data_prefix) # type: ignore + store.erase_prefix(meta_prefix) # type: ignore + if chunk_store is not None: + chunk_store.erase_prefix(data_prefix) # type: ignore + + if not overwrite: + if contains_array(store, path): + raise ContainsArrayError(path) + elif contains_group(store, path): + raise ContainsGroupError(path) + elif store_version == 3 and '/' in path: + # cannot create a group overlapping with an existing array name + parent_path = '/'.join(path.split('/')[:-1]) + if contains_array(store, parent_path): + raise ContainsArrayError(path) # initialize metadata # N.B., currently no metadata properties are needed, however there may # be in future - meta = dict() # type: ignore - key = _path_to_prefix(path) + group_meta_key + if store_version == 3: + meta = {'attributes': {}} # type: ignore + else: + meta = {} # type: ignore + key = _prefix_to_group_key(store, _path_to_prefix(path)) if hasattr(store, '_metadata_class'): store[key] = store._metadata_class.encode_group_metadata(meta) # type: ignore else: @@ -1133,14 +1360,17 @@ def __init__(self, url, normalize_keys=False, key_separator=None, dimension_separator = key_separator self.key_separator = dimension_separator - if self.key_separator is None: - self.key_separator = "." + self._default_key_separator() # Pass attributes to array creation self._dimension_separator = dimension_separator if self.fs.exists(self.path) and not self.fs.isdir(self.path): raise FSPathExistNotDir(url) + def _default_key_separator(self): + if self.key_separator is None: + self.key_separator = "." + def _normalize_key(self, key): key = normalize_storage_path(key).lstrip('/') if key: @@ -2641,6 +2871,10 @@ class ConsolidatedMetadataStore(Store): def __init__(self, store: StoreLike, metadata_key=".zmetadata"): self.store = Store._ensure_store(store) + if getattr(store, '_store_version', 2) != 2: + raise ValueError("Can only consolidate stores corresponding to " + "the Zarr v2 spec.") + # retrieve consolidated metadata meta = json_loads(store[metadata_key]) @@ -2676,3 +2910,351 @@ def getsize(self, path): def listdir(self, path): return listdir(self.meta_store, path) + + +""" versions of stores following the v3 protocol """ + + +def _get_files_and_dirs_from_path(store, path): + path = normalize_storage_path(path) + + files = [] + # add array metadata file if present + array_key = _prefix_to_array_key(store, path) + if array_key in store: + files.append(os.path.join(store.path, array_key)) + + # add group metadata file if present + group_key = _prefix_to_group_key(store, path) + if group_key in store: + files.append(os.path.join(store.path, group_key)) + + dirs = [] + # add array and group folders if present + for d in ['data/root/' + path, 'meta/root/' + path]: + dir_path = os.path.join(store.path, d) + if os.path.exists(dir_path): + dirs.append(dir_path) + return files, dirs + + +class KVStoreV3(KVStore, StoreV3): + + def list(self): + return list(self._mutable_mapping.keys()) + + +KVStoreV3.__doc__ = KVStore.__doc__ + + +class FSStoreV3(FSStore, StoreV3): + + # FSStoreV3 doesn't use this (FSStore uses it within _normalize_key) + _META_KEYS = () + + def _default_key_separator(self): + if self.key_separator is None: + self.key_separator = "/" + + def list(self): + return list(self.keys()) + + def _normalize_key(self, key): + key = normalize_storage_path(key).lstrip('/') + return key.lower() if self.normalize_keys else key + + def getsize(self, path=None): + size = 0 + if path is None or path == '': + # size of both the data and meta subdirs + dirs = [] + for d in ['data/root', 'meta/root']: + dir_path = os.path.join(self.path, d) + if os.path.exists(dir_path): + dirs.append(dir_path) + else: + files, dirs = _get_files_and_dirs_from_path(self, path) + for file in files: + size += os.path.getsize(file) + for d in dirs: + size += self.fs.du(d, total=True, maxdepth=None) + return size + + def setitems(self, values): + if self.mode == 'r': + raise ReadOnlyError() + values = {self._normalize_key(key): val for key, val in values.items()} + + # initialize the /data/root/... folder corresponding to the array! + # Note: zarr.tests.test_core_v3.TestArrayWithFSStoreV3PartialRead fails + # without this explicit creation of directories + subdirectories = set([os.path.dirname(v) for v in values.keys()]) + for subdirectory in subdirectories: + data_dir = os.path.join(self.path, subdirectory) + if not self.fs.exists(data_dir): + self.fs.mkdir(data_dir) + + self.map.setitems(values) + + +class MemoryStoreV3(MemoryStore, StoreV3): + + def __init__(self, root=None, cls=dict, dimension_separator=None): + if root is None: + self.root = cls() + else: + self.root = root + self.cls = cls + self.write_mutex = Lock() + self._dimension_separator = dimension_separator # TODO: modify for v3? + + def __eq__(self, other): + return ( + isinstance(other, MemoryStoreV3) and + self.root == other.root and + self.cls == other.cls + ) + + def list(self): + return list(self.keys()) + + def getsize(self, path: Path = None): + size = 0 + path = normalize_storage_path(path) + members = self.list_prefix('data/root/' + path) + members += self.list_prefix('meta/root/' + path) + for k in members: + try: + v = self[k] + except KeyError: + pass + else: + try: + size += buffer_size(v) + except TypeError: + return -1 + return size + + +MemoryStoreV3.__doc__ = MemoryStore.__doc__ + + +class DirectoryStoreV3(DirectoryStore, StoreV3): + + def list(self): + return list(self.keys()) + + def __eq__(self, other): + return ( + isinstance(other, DirectoryStoreV3) and + self.path == other.path + ) + + # def getsize(self, path=None): + # size = 0 + # if path is None or path == '': + # # add array and group folders if present + # dirs = [] + # for d in ['data/root', 'meta/root']: + # dir_path = os.path.join(self.path, d) + # if os.path.exists(dir_path): + # dirs.append(dir_path) + # print(f"dirs={dirs}") + # else: + # files, dirs = _get_files_and_dirs_from_path(self, path) + # for file in files: + # size += os.path.getsize(file) + # for d in dirs: + # for child in scandir(d): + # print(f"child={child}") + # if child.is_file(): + # size += child.stat().st_size + # return size + + def getsize(self, path: Path = None): + size = 0 + path = normalize_storage_path(path) + members = self.list_prefix('data/root/' + path) + members += self.list_prefix('meta/root/' + path) + for k in members: + try: + v = self[k] + except KeyError: + pass + else: + try: + size += buffer_size(v) + except TypeError: + return -1 + return size + + def rename(self, src_path, dst_path, metadata_key_suffix='.json'): + store_src_path = normalize_storage_path(src_path) + store_dst_path = normalize_storage_path(dst_path) + + dir_path = self.path + any_existed = False + for root_prefix in ['meta', 'data']: + src_path = os.path.join(dir_path, root_prefix, 'root', store_src_path) + if os.path.exists(src_path): + any_existed = True + dst_path = os.path.join(dir_path, root_prefix, 'root', store_dst_path) + os.renames(src_path, dst_path) + + for suffix in ['.array' + metadata_key_suffix, + '.group' + metadata_key_suffix]: + src_meta = os.path.join(dir_path, 'meta', 'root', store_src_path + suffix) + if os.path.exists(src_meta): + any_existed = True + dst_meta = os.path.join(dir_path, 'meta', 'root', store_dst_path + suffix) + dst_dir = os.path.dirname(dst_meta) + if not os.path.exists(dst_dir): + os.makedirs(dst_dir) + os.rename(src_meta, dst_meta) + if not any_existed: + raise FileNotFoundError("nothing found at src_path") + + +DirectoryStoreV3.__doc__ = DirectoryStore.__doc__ + + +class ZipStoreV3(ZipStore, StoreV3): + + def list(self): + return list(self.keys()) + + def __eq__(self, other): + return ( + isinstance(other, ZipStore) and + self.path == other.path and + self.compression == other.compression and + self.allowZip64 == other.allowZip64 + ) + + def getsize(self, path=None): + path = normalize_storage_path(path) + with self.mutex: + children = self.list_prefix('data/root/' + path) + children += self.list_prefix('meta/root/' + path) + if children: + size = 0 + for name in children: + try: + info = self.zf.getinfo(name) + except KeyError: + pass + else: + size += info.compress_size + return size + elif path: + try: + info = self.zf.getinfo(path) + return info.compress_size + except KeyError: + return 0 + else: + return 0 + + +ZipStoreV3.__doc__ = ZipStore.__doc__ + + +class NestedDirectoryStoreV3(NestedDirectoryStore, DirectoryStoreV3): + + def list(self): + return list(self.keys()) + + def __eq__(self, other): + return ( + isinstance(other, NestedDirectoryStoreV3) and + self.path == other.path + ) + + +NestedDirectoryStoreV3.__doc__ = NestedDirectoryStore.__doc__ + + +class RedisStoreV3(RedisStore, StoreV3): + + def list(self): + return list(self.keys()) + + +RedisStoreV3.__doc__ = RedisStore.__doc__ + + +class MongoDBStoreV3(MongoDBStore, StoreV3): + + def list(self): + return list(self.keys()) + + +MongoDBStoreV3.__doc__ = MongoDBStore.__doc__ + + +class DBMStoreV3(DBMStore, StoreV3): + + def list(self): + return list(self.keys()) + + +DBMStoreV3.__doc__ = DBMStore.__doc__ + + +class LMDBStoreV3(LMDBStore, StoreV3): + + def list(self): + return list(self.keys()) + + +LMDBStoreV3.__doc__ = LMDBStore.__doc__ + + +class SQLiteStoreV3(SQLiteStore, StoreV3): + + def list(self): + return list(self.keys()) + + def getsize(self, path=None): + if path is None or path == '': + # TODO: why does the query below not work in this case? + # For now fall back to the default _getsize implementation + return _getsize(self, path) + else: + path = normalize_storage_path(path) + size = 0 + for _path in ['data/root/' + path, 'meta/root/' + path]: + c = self.cursor.execute( + ''' + SELECT COALESCE(SUM(LENGTH(v)), 0) FROM zarr + WHERE k LIKE (? || "%") AND + 0 == INSTR(LTRIM(SUBSTR(k, LENGTH(?) + 1), "/"), "/") + ''', + (_path, _path) + ) + for item_size, in c: + size += item_size + return size + + +SQLiteStoreV3.__doc__ = SQLiteStore.__doc__ + + +class LRUStoreCacheV3(LRUStoreCache, StoreV3): + + def __init__(self, store, max_size: int): + self._store = StoreV3._ensure_store(store) + self._max_size = max_size + self._current_size = 0 + self._keys_cache = None + self._contains_cache = None + self._listdir_cache: Dict[Path, Any] = dict() + self._values_cache: Dict[Path, Any] = OrderedDict() + self._mutex = Lock() + self.hits = self.misses = 0 + + def list(self): + return list(self.keys()) + + +LRUStoreCacheV3.__doc__ = LRUStoreCache.__doc__ diff --git a/zarr/tests/test_attrs.py b/zarr/tests/test_attrs.py index b2de736d4a..62faf662da 100644 --- a/zarr/tests/test_attrs.py +++ b/zarr/tests/test_attrs.py @@ -3,8 +3,20 @@ import pytest from zarr.attrs import Attributes -from zarr.tests.util import CountingDict -from zarr.storage import KVStore +from zarr.storage import KVStore, KVStoreV3 +from zarr.tests.util import CountingDict, CountingDictV3 + + +@pytest.fixture(params=[2, 3]) +def zarr_version(request): + return request.param + + +def _init_store(version): + """Use a plain dict() for v2, but KVStoreV3 otherwise.""" + if version == 2: + return dict() + return KVStoreV3(dict()) class TestAttributes(): @@ -12,13 +24,9 @@ class TestAttributes(): def init_attributes(self, store, read_only=False, cache=True): return Attributes(store, key='attrs', read_only=read_only, cache=cache) - @pytest.mark.parametrize('store_from_dict', [False, True]) - def test_storage(self, store_from_dict): + def test_storage(self, zarr_version): - if store_from_dict: - store = dict() - else: - store = KVStore(dict()) + store = _init_store(zarr_version) a = Attributes(store=store, key='attrs') assert isinstance(a.store, KVStore) assert 'foo' not in a @@ -30,11 +38,14 @@ def test_storage(self, store_from_dict): assert 'attrs' in store assert isinstance(store['attrs'], bytes) d = json.loads(str(store['attrs'], 'ascii')) + if zarr_version == 3: + d = d['attributes'] assert dict(foo='bar', baz=42) == d - def test_get_set_del_contains(self): + def test_get_set_del_contains(self, zarr_version): - a = self.init_attributes(dict()) + store = _init_store(zarr_version) + a = self.init_attributes(store) assert 'foo' not in a a['foo'] = 'bar' a['baz'] = 42 @@ -48,9 +59,10 @@ def test_get_set_del_contains(self): # noinspection PyStatementEffect a['foo'] - def test_update_put(self): + def test_update_put(self, zarr_version): - a = self.init_attributes(dict()) + store = _init_store(zarr_version) + a = self.init_attributes(store) assert 'foo' not in a assert 'bar' not in a assert 'baz' not in a @@ -65,9 +77,10 @@ def test_update_put(self): assert a['bar'] == 84 assert 'baz' not in a - def test_iterators(self): + def test_iterators(self, zarr_version): - a = self.init_attributes(dict()) + store = _init_store(zarr_version) + a = self.init_attributes(store) assert 0 == len(a) assert set() == set(a) assert set() == set(a.keys()) @@ -83,10 +96,13 @@ def test_iterators(self): assert {'bar', 42} == set(a.values()) assert {('foo', 'bar'), ('baz', 42)} == set(a.items()) - def test_read_only(self): - store = dict() + def test_read_only(self, zarr_version): + store = _init_store(zarr_version) a = self.init_attributes(store, read_only=True) - store['attrs'] = json.dumps(dict(foo='bar', baz=42)).encode('ascii') + if zarr_version == 2: + store['attrs'] = json.dumps(dict(foo='bar', baz=42)).encode('ascii') + else: + store['attrs'] = json.dumps(dict(attributes=dict(foo='bar', baz=42))).encode('ascii') assert a['foo'] == 'bar' assert a['baz'] == 42 with pytest.raises(PermissionError): @@ -96,8 +112,9 @@ def test_read_only(self): with pytest.raises(PermissionError): a.update(foo='quux') - def test_key_completions(self): - a = self.init_attributes(dict()) + def test_key_completions(self, zarr_version): + store = _init_store(zarr_version) + a = self.init_attributes(store) d = a._ipython_key_completions_() assert 'foo' not in d assert '123' not in d @@ -112,14 +129,17 @@ def test_key_completions(self): assert 'asdf;' in d assert 'baz' not in d - def test_caching_on(self): + def test_caching_on(self, zarr_version): # caching is turned on by default # setup store - store = CountingDict() + store = CountingDict() if zarr_version == 2 else CountingDictV3() assert 0 == store.counter['__getitem__', 'attrs'] assert 0 == store.counter['__setitem__', 'attrs'] - store['attrs'] = json.dumps(dict(foo='xxx', bar=42)).encode('ascii') + if zarr_version == 2: + store['attrs'] = json.dumps(dict(foo='xxx', bar=42)).encode('ascii') + else: + store['attrs'] = json.dumps(dict(attributes=dict(foo='xxx', bar=42))).encode('ascii') assert 0 == store.counter['__getitem__', 'attrs'] assert 1 == store.counter['__setitem__', 'attrs'] @@ -136,54 +156,65 @@ def test_caching_on(self): # test __setitem__ updates the cache a['foo'] = 'yyy' - assert 2 == store.counter['__getitem__', 'attrs'] + get_cnt = 2 if zarr_version == 2 else 3 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 2 == store.counter['__setitem__', 'attrs'] assert a['foo'] == 'yyy' - assert 2 == store.counter['__getitem__', 'attrs'] + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 2 == store.counter['__setitem__', 'attrs'] # test update() updates the cache a.update(foo='zzz', bar=84) - assert 3 == store.counter['__getitem__', 'attrs'] + get_cnt = 3 if zarr_version == 2 else 5 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] assert a['foo'] == 'zzz' assert a['bar'] == 84 - assert 3 == store.counter['__getitem__', 'attrs'] + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] # test __contains__ uses the cache assert 'foo' in a - assert 3 == store.counter['__getitem__', 'attrs'] + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] assert 'spam' not in a - assert 3 == store.counter['__getitem__', 'attrs'] + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] # test __delitem__ updates the cache del a['bar'] - assert 4 == store.counter['__getitem__', 'attrs'] + get_cnt = 4 if zarr_version == 2 else 7 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 4 == store.counter['__setitem__', 'attrs'] assert 'bar' not in a - assert 4 == store.counter['__getitem__', 'attrs'] + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 4 == store.counter['__setitem__', 'attrs'] # test refresh() - store['attrs'] = json.dumps(dict(foo='xxx', bar=42)).encode('ascii') - assert 4 == store.counter['__getitem__', 'attrs'] + if zarr_version == 2: + store['attrs'] = json.dumps(dict(foo='xxx', bar=42)).encode('ascii') + else: + store['attrs'] = json.dumps(dict(attributes=dict(foo='xxx', bar=42))).encode('ascii') + assert get_cnt == store.counter['__getitem__', 'attrs'] a.refresh() - assert 5 == store.counter['__getitem__', 'attrs'] + get_cnt = 5 if zarr_version == 2 else 8 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert a['foo'] == 'xxx' - assert 5 == store.counter['__getitem__', 'attrs'] + assert get_cnt == store.counter['__getitem__', 'attrs'] assert a['bar'] == 42 - assert 5 == store.counter['__getitem__', 'attrs'] + assert get_cnt == store.counter['__getitem__', 'attrs'] - def test_caching_off(self): + def test_caching_off(self, zarr_version): # setup store - store = CountingDict() + store = CountingDict() if zarr_version == 2 else CountingDictV3() assert 0 == store.counter['__getitem__', 'attrs'] assert 0 == store.counter['__setitem__', 'attrs'] - store['attrs'] = json.dumps(dict(foo='xxx', bar=42)).encode('ascii') + + if zarr_version == 2: + store['attrs'] = json.dumps(dict(foo='xxx', bar=42)).encode('ascii') + else: + store['attrs'] = json.dumps(dict(attributes=dict(foo='xxx', bar=42))).encode('ascii') assert 0 == store.counter['__getitem__', 'attrs'] assert 1 == store.counter['__setitem__', 'attrs'] @@ -200,25 +231,31 @@ def test_caching_off(self): # test __setitem__ a['foo'] = 'yyy' - assert 4 == store.counter['__getitem__', 'attrs'] + get_cnt = 4 if zarr_version == 2 else 5 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 2 == store.counter['__setitem__', 'attrs'] assert a['foo'] == 'yyy' - assert 5 == store.counter['__getitem__', 'attrs'] + get_cnt = 5 if zarr_version == 2 else 6 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 2 == store.counter['__setitem__', 'attrs'] # test update() a.update(foo='zzz', bar=84) - assert 6 == store.counter['__getitem__', 'attrs'] + get_cnt = 6 if zarr_version == 2 else 8 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] assert a['foo'] == 'zzz' assert a['bar'] == 84 - assert 8 == store.counter['__getitem__', 'attrs'] + get_cnt = 8 if zarr_version == 2 else 10 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] # test __contains__ assert 'foo' in a - assert 9 == store.counter['__getitem__', 'attrs'] + get_cnt = 9 if zarr_version == 2 else 11 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] assert 'spam' not in a - assert 10 == store.counter['__getitem__', 'attrs'] + get_cnt = 10 if zarr_version == 2 else 12 + assert get_cnt == store.counter['__getitem__', 'attrs'] assert 3 == store.counter['__setitem__', 'attrs'] diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index 7423132887..938a58b494 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -43,6 +43,8 @@ class TestArray(unittest.TestCase): + _version = 2 + def test_array_init(self): # normal initialization @@ -1180,7 +1182,6 @@ def test_object_arrays(self): def test_object_arrays_vlen_text(self): data = np.array(greetings * 1000, dtype=object) - z = self.create_array(shape=data.shape, dtype=object, object_codec=VLenUTF8()) z[0] = 'foo' assert z[0] == 'foo' diff --git a/zarr/tests/test_core_v3.py b/zarr/tests/test_core_v3.py new file mode 100644 index 0000000000..d0a51088b0 --- /dev/null +++ b/zarr/tests/test_core_v3.py @@ -0,0 +1,898 @@ +import atexit +import os +import shutil +from tempfile import mkdtemp, mktemp + +import numpy as np +import pytest +from numcodecs import (Blosc, Zlib) +from numcodecs.compat import ensure_bytes +from numpy.testing import assert_array_almost_equal, assert_array_equal + +from zarr.core import Array +from zarr.errors import ArrayNotFoundError, ContainsGroupError +from zarr.meta import json_loads +from zarr.storage import ( + # ABSStoreV3, + DBMStoreV3, + DirectoryStoreV3, + FSStoreV3, + KVStoreV3, + LMDBStoreV3, + LRUStoreCacheV3, + NestedDirectoryStoreV3, + SQLiteStoreV3, + StoreV3, + atexit_rmglob, + atexit_rmtree, + init_array, + init_group, +) +from zarr.tests.test_core import TestArrayWithPath +from zarr.tests.util import have_fsspec +from zarr.util import buffer_size + + +# Start with TestArrayWithPathV3 not TestArrayV3 since path must be supplied + +class TestArrayWithPathV3(TestArrayWithPath): + + _version = 3 + + @staticmethod + def create_array(array_path='arr1', read_only=False, **kwargs): + store = KVStoreV3(dict()) + kwargs.setdefault('compressor', Zlib(level=1)) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + init_array(store, path=array_path, **kwargs) + return Array(store, path=array_path, read_only=read_only, + cache_metadata=cache_metadata, cache_attrs=cache_attrs, + write_empty_chunks=write_empty_chunks) + + def test_array_init(self): + + # should not be able to initialize without a path in V3 + store = KVStoreV3(dict()) + with pytest.raises(ValueError): + init_array(store, shape=100, chunks=10, dtype="