Skip to content

Add FSStore #546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Storage (``zarr.storage``)

.. autoclass:: ABSStore

.. autoclass:: FSStore

.. autoclass:: ConsolidatedMetadataStore

.. autofunction:: init_array
Expand Down
4 changes: 3 additions & 1 deletion requirements_dev_optional.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ pytest-cov==2.7.1
pytest-doctestplus==0.4.0
pytest-remotedata==0.3.2
h5py==2.10.0
s3fs==0.3.4; python_version > '3.0'
s3fs==0.5.0; python_version > '3.6'
moto>=1.3.14; python_version > '3.6'
flask
21 changes: 15 additions & 6 deletions zarr/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from zarr.n5 import N5Store
from zarr.storage import (DirectoryStore, ZipStore, contains_array,
contains_group, default_compressor, init_array,
normalize_storage_path)
normalize_storage_path, FSStore)


def create(shape, chunks=True, dtype=None, compressor='default',
Expand Down Expand Up @@ -127,12 +127,16 @@ def create(shape, chunks=True, dtype=None, compressor='default',
return z


def normalize_store_arg(store, clobber=False, default=dict):
def normalize_store_arg(store, clobber=False, default=dict, storage_options=None):
if store is None:
return default()
elif isinstance(store, str):
mode = 'w' if clobber else 'r'
if "://" in store or "::" in store:
return FSStore(store, mode=mode, **(storage_options or {}))
elif storage_options:
raise ValueError("storage_options passed with non-fsspec path")
if store.endswith('.zip'):
mode = 'w' if clobber else 'r'
return ZipStore(store, mode=mode)
elif store.endswith('.n5'):
return N5Store(store)
Expand Down Expand Up @@ -353,7 +357,8 @@ def array(data, **kwargs):
def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,
compressor='default', fill_value=0, order='C', synchronizer=None,
filters=None, cache_metadata=True, cache_attrs=True, path=None,
object_codec=None, chunk_store=None, **kwargs):
object_codec=None, chunk_store=None, storage_options=None,
**kwargs):
"""Open an array using file-mode-like semantics.

Parameters
Expand Down Expand Up @@ -399,6 +404,9 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,
A codec to encode object arrays, only needed if dtype=object.
chunk_store : MutableMapping or string, optional
Store or path to directory in file system or name of zip file.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand Down Expand Up @@ -435,9 +443,10 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,

# handle polymorphic store arg
clobber = mode == 'w'
store = normalize_store_arg(store, clobber=clobber)
store = normalize_store_arg(store, clobber=clobber, storage_options=storage_options)
if chunk_store is not None:
chunk_store = normalize_store_arg(chunk_store, clobber=clobber)
chunk_store = normalize_store_arg(chunk_store, clobber=clobber,
storage_options=storage_options)
path = normalize_storage_path(path)

# API compatibility with h5py
Expand Down
16 changes: 11 additions & 5 deletions zarr/hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,8 +1032,9 @@ def move(self, source, dest):
self._write_op(self._move_nosync, source, dest)


def _normalize_store_arg(store, clobber=False):
return normalize_store_arg(store, clobber=clobber, default=MemoryStore)
def _normalize_store_arg(store, clobber=False, storage_options=None):
return normalize_store_arg(store, clobber=clobber, default=MemoryStore,
storage_options=storage_options)


def group(store=None, overwrite=False, chunk_store=None,
Expand Down Expand Up @@ -1095,7 +1096,7 @@ def group(store=None, overwrite=False, chunk_store=None,


def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None,
chunk_store=None):
chunk_store=None, storage_options=None):
"""Open a group using file-mode-like semantics.

Parameters
Expand All @@ -1117,6 +1118,9 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N
Group path within store.
chunk_store : MutableMapping or string, optional
Store or path to directory in file system or name of zip file.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand All @@ -1139,9 +1143,11 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N
"""

# handle polymorphic store arg
store = _normalize_store_arg(store)
clobber = mode != 'r'
store = _normalize_store_arg(store, clobber=clobber, storage_options=storage_options)
if chunk_store is not None:
chunk_store = _normalize_store_arg(chunk_store)
chunk_store = _normalize_store_arg(chunk_store, clobber=clobber,
storage_options=storage_options)
path = normalize_storage_path(path)

# ensure store is initialized
Expand Down
122 changes: 122 additions & 0 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,128 @@ def atexit_rmglob(path,
rmtree(p)


class FSStore(MutableMapping):
"""Wraps an fsspec.FSMap to give access to arbitrary filesystems

Requires that ``fsspec`` is installed, as well as any additional
requirements for the protocol chosen.

Parameters
----------
url : str
The destination to map. Should include protocol and path,
like "s3://bucket/root"
normalize_keys : bool
key_separator : str
Character to use when constructing the target path strings
for data keys
mode : str
"w" for writable, "r" for read-only
exceptions : list of Exception subclasses
When accessing data, any of these exceptions will be treated
as a missing key
storage_options : passed to the fsspec implementation
"""

def __init__(self, url, normalize_keys=True, key_separator='.',
mode='w',
exceptions=(KeyError, PermissionError, IOError),
**storage_options):
import fsspec
self.path = url
self.normalize_keys = normalize_keys
self.key_separator = key_separator
self.map = fsspec.get_mapper(url, **storage_options)
self.fs = self.map.fs # for direct operations
self.mode = mode
self.exceptions = exceptions
if self.fs.exists(url) and not self.fs.isdir(url):
err_fspath_exists_notdir(url)

def _normalize_key(self, key):
key = normalize_storage_path(key).lstrip('/')
if key:
*bits, end = key.split('/')
key = '/'.join(bits + [end.replace('.', self.key_separator)])
return key.lower() if self.normalize_keys else key

def __getitem__(self, key):
key = self._normalize_key(key)
try:
return self.map[key]
except self.exceptions as e:
raise KeyError(key) from e

def __setitem__(self, key, value):
if self.mode == 'r':
err_read_only()
key = self._normalize_key(key)
path = self.dir_path(key)
value = ensure_contiguous_ndarray(value)
try:
if self.fs.isdir(path):
self.fs.rm(path, recursive=True)
self.map[key] = value
except self.exceptions as e:
raise KeyError(key) from e

def __delitem__(self, key):
if self.mode == 'r':
err_read_only()
key = self._normalize_key(key)
path = self.dir_path(key)
if self.fs.isdir(path):
self.fs.rm(path, recursive=True)
else:
del self.map[key]

def __contains__(self, key):
key = self._normalize_key(key)
return key in self.map

def __eq__(self, other):
return (type(self) == type(other) and self.map == other.map
and self.mode == other.mode)

def keys(self):
return iter(self.map)

def __iter__(self):
return self.keys()

def __len__(self):
return len(list(self.keys()))

def dir_path(self, path=None):
store_path = normalize_storage_path(path)
return self.map._key_to_str(store_path)

def listdir(self, path=None):
dir_path = self.dir_path(path)
try:
out = sorted(p.rstrip('/').rsplit('/', 1)[-1]
for p in self.fs.ls(dir_path, detail=False))
return out
except IOError:
return []

def rmdir(self, path=None):
if self.mode == 'r':
err_read_only()
store_path = self.dir_path(path)
if self.fs.isdir(store_path):
self.fs.rm(store_path, recursive=True)

def getsize(self, path=None):
store_path = self.dir_path(path)
return self.fs.du(store_path, True, True)

def clear(self):
if self.mode == 'r':
err_read_only()
self.map.clear()


class TempStore(DirectoryStore):
"""Directory store using a temporary directory for storage.

Expand Down
Loading