Skip to content

LRU cache for decoded chunks #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 56 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
d62febb
first attempt at chunk_cache layer
shikharsg Aug 29, 2018
f796ea7
ChunkCache test with MockChunkCacheArray
shikharsg Aug 29, 2018
32141a9
np.copy not needed when accessing a subset of a chunk
shikharsg Oct 9, 2018
b35139b
fixed 'Mm' dtype error for buffersize function
shikharsg Oct 13, 2018
3c45176
renamed ChunkCache to LRUChunkCache
Oct 13, 2018
46dcf94
LRUChunkCache in zarr root namespace
Oct 13, 2018
c69c751
LRUChunkCache example
Oct 13, 2018
2cb143e
write caching of chunk should be done after encoding
Oct 15, 2018
2fb169e
ensure cached chunk has been round tripped through encode-decode if d…
Oct 15, 2018
31e4dfb
flake8 fixes
Oct 15, 2018
5559c4f
read write cache for 0-d arrays
Oct 15, 2018
2a0124a
added tutorial and api docs
Oct 15, 2018
6fac2ad
separated store tests from mutable mapping tests in test_storage.py
shikharsg Oct 20, 2018
4e79d0b
fixed pickle, __delitem__ and ordered dict iteration bugs
shikharsg Oct 20, 2018
5fd6fc8
documenting slowdown when using write cache with object arrays
shikharsg Oct 20, 2018
422f9eb
factoring out mapping code from LRUStoreCache and LRUChunkCache
shikharsg Oct 23, 2018
44cea83
consistent variable naming in _chunk_getitem
shikharsg Nov 11, 2018
1b67e90
removed unnecesary code from _set_basic_selection_zd and added encode…
shikharsg Nov 11, 2018
9b0cc29
flake 8 fixes
shikharsg Nov 11, 2018
715f86d
Merge remote-tracking branch 'upstream/master' into chunk_cache
shikharsg Nov 13, 2018
0013f95
fixed coverage
shikharsg Nov 14, 2018
b70c348
Merge branch 'chunk_cache' into master
shikharsg Nov 14, 2018
c4f2487
Merge pull request #4 from shikharsg/master
shikharsg Nov 14, 2018
245f661
Merge branch 'master' into chunk_cache
shikharsg Nov 15, 2018
a2a05fb
Merge branch 'master' into chunk_cache
shikharsg Jan 8, 2019
b8b9056
Merge branch 'chunk_cache' into chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
04f0367
Merge pull request #3 from shikharsg/chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
f19d43e
bug fix
shikharsg Jan 27, 2019
52a43bf
Merge branch 'master' into chunk_cache
shikharsg Jan 27, 2019
697d46e
python 2 and 3 compatibility
shikharsg Jan 27, 2019
1e727c7
Merge branch 'master' into chunk_cache
shikharsg Feb 10, 2019
377ece7
coverage fix and __init__.py LRUChunkCache import
shikharsg Feb 10, 2019
3473adb
merged chunk_cache with master
shikharsg Mar 4, 2019
df84c89
flake8 fix
shikharsg Mar 4, 2019
88fe66d
Merge branch 'master' into chunk_cache
Mar 30, 2019
a816014
Implemented https://github.com/zarr-developers/zarr/pull/306/files#r2…
Apr 11, 2019
8cc083b
cache tests refactor
May 3, 2019
23fcdea
fixed minor tests mistak
May 3, 2019
309cc48
Merge branch 'master' into chunk_cache
May 3, 2019
635ec87
flake8 fix
May 3, 2019
a85d156
Merge remote-tracking branch 'upstream/master' into chunk_cache
Aug 20, 2019
ef86184
merged with master
Oct 30, 2019
875c24f
added chunk cache to Group
Nov 20, 2019
dcd4ee7
merge with master
Nov 20, 2019
4a1baa9
added chunk_cache to all relevant function
Nov 20, 2019
e6540e1
Merge branch 'master' into chunk_cache
Dec 12, 2019
9f9d176
merge with master
Sep 30, 2020
6571382
fixed failing doctest
Sep 30, 2020
8c8a69f
Merge remote-tracking branch 'origin/master' into pr-306
joshmoore Feb 19, 2021
e0e5254
fixed setitem caching order
Feb 20, 2021
992b48a
Merge branch 'master' into chunk_cache
jakirkham Feb 21, 2021
38ee622
refactor
Feb 21, 2021
8b6a699
Merge branch 'master' into chunk_cache
Apr 5, 2021
ba5c0ed
Merge 'origin/master' into pr-306
joshmoore Aug 27, 2021
7cdce5f
Remove use of unittest
joshmoore Aug 27, 2021
06c899b
Merge branch 'master' into chunk_cache
joshmoore Sep 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/api/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ Storage (``zarr.storage``)
.. automethod:: invalidate_values
.. automethod:: invalidate_keys

.. autoclass:: LRUChunkCache

.. automethod:: invalidate

.. autofunction:: init_array
.. autofunction:: init_group
.. autofunction:: contains_array
Expand Down
24 changes: 24 additions & 0 deletions docs/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,30 @@ layer over a remote store. E.g.::
b'Hello from the cloud!'
0.0009490990014455747

The above :class:`zarr.storage.LRUStoreCache` wraps any Zarr storage class, and stores
encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
encoded chunks. So every time cache is accessed, the chunk has to be decoded. For cases
encoded chunks. Every time the cache is accessed, the chunk must be decoded. For cases

where decoding is computationally expensive, Zarr also provides a
:class:`zarr.storage.LRUChunkCache` which can store decoded chunks, e.g.::

>>> import zarr
>>> from numcodecs import LZMA
>>> import numpy as np
>>> store = zarr.DictStore()
>>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100),
... store=store, compressor=LZMA())
>>> from timeit import timeit
>>> # data access without cache
... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP
0.6703157789888792
>>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None))
>>> # first data access about the same as without cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.681269913999131
>>> # second time accesses the decoded chunks in the cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.007617925992235541


If you are still experiencing poor performance with distributed/cloud storage, please
raise an issue on the GitHub issue tracker with any profiling data you can provide, as
there may be opportunities to optimise further either within Zarr or within the mapping
Expand Down
3 changes: 2 additions & 1 deletion zarr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from zarr.creation import (empty, zeros, ones, full, array, empty_like, zeros_like,
ones_like, full_like, open_array, open_like, create)
from zarr.storage import (DictStore, DirectoryStore, ZipStore, TempStore,
NestedDirectoryStore, DBMStore, LMDBStore, LRUStoreCache)
NestedDirectoryStore, DBMStore, LMDBStore, LRUStoreCache,
LRUChunkCache)
from zarr.hierarchy import group, open_group, Group
from zarr.sync import ThreadSynchronizer, ProcessSynchronizer
from zarr.codecs import *
Expand Down
119 changes: 92 additions & 27 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class Array(object):
If True (default), user attributes will be cached for attribute read
operations. If False, user attributes are reloaded from the store prior
to all attribute read operations.
chunk_cache: MutableMapping, optional
Mapping to store decoded chunks for caching. Can be used in repeated
chunk access scenarios when decoding of data is computationally
expensive.

Attributes
----------
Expand Down Expand Up @@ -103,7 +107,8 @@ class Array(object):
"""

def __init__(self, store, path=None, read_only=False, chunk_store=None,
synchronizer=None, cache_metadata=True, cache_attrs=True):
synchronizer=None, cache_metadata=True, cache_attrs=True,
chunk_cache=None):
# N.B., expect at this point store is fully initialized with all
# configuration metadata fully specified and normalized

Expand All @@ -118,6 +123,7 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None,
self._synchronizer = synchronizer
self._cache_metadata = cache_metadata
self._is_view = False
self._chunk_cache = chunk_cache

# initialize metadata
self._load_metadata()
Expand Down Expand Up @@ -692,19 +698,36 @@ def _get_basic_selection_zd(self, selection, out=None, fields=None):
if selection not in ((), (Ellipsis,)):
err_too_many_indices(selection, ())

try:
# obtain encoded data for chunk
ckey = self._chunk_key((0,))
cdata = self.chunk_store[ckey]
# obtain key for chunk
ckey = self._chunk_key((0,))

except KeyError:
# chunk not initialized
chunk = np.zeros((), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)
# setup variable to hold decoded chunk
chunk = None

else:
chunk = self._decode_chunk(cdata)
# check for cached chunk
if self._chunk_cache is not None:
try:
chunk = self._chunk_cache[ckey]
except KeyError:
pass

if chunk is None:
try:
# obtain encoded data for chunk
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
chunk = np.zeros((), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)

else:
chunk = self._decode_chunk(cdata)

# cache decoded chunk
if self._chunk_cache is not None:
self._chunk_cache[ckey] = chunk

# handle fields
if fields:
Expand Down Expand Up @@ -1454,20 +1477,29 @@ def _set_basic_selection_zd(self, selection, value, fields=None):
# obtain key for chunk
ckey = self._chunk_key((0,))

# setup chunk
try:
# obtain compressed data for chunk
cdata = self.chunk_store[ckey]
chunk = None

except KeyError:
# chunk not initialized
chunk = np.zeros((), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)
if self._chunk_cache is not None:
try:
chunk = self._chunk_cache[ckey]
except KeyError:
pass

else:
# decode chunk
chunk = self._decode_chunk(cdata).copy()
if chunk is None:
# setup chunk
try:
# obtain compressed data for chunk
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
chunk = np.zeros((), dtype=self._dtype)
if self._fill_value is not None:
chunk.fill(self._fill_value)

else:
# decode chunk
chunk = self._decode_chunk(cdata).copy()

# set value
if fields:
Expand All @@ -1478,6 +1510,8 @@ def _set_basic_selection_zd(self, selection, value, fields=None):
# encode and store
cdata = self._encode_chunk(chunk)
self.chunk_store[ckey] = cdata
if self._chunk_cache is not None:
self._chunk_cache[ckey] = chunk

def _set_basic_selection_nd(self, selection, value, fields=None):
# implementation of __setitem__ for array with at least one dimension
Expand Down Expand Up @@ -1562,8 +1596,21 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
ckey = self._chunk_key(chunk_coords)

try:

cdata = None
chunk_was_cached = False

# first try getting from cache (if one has been provided)
if self._chunk_cache is not None:
try:
cdata = self._chunk_cache[ckey]
chunk_was_cached = True
except KeyError:
pass

# obtain compressed data for chunk
cdata = self.chunk_store[ckey]
if not chunk_was_cached:
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
Expand Down Expand Up @@ -1593,19 +1640,30 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
# contiguous, so we can decompress directly from the chunk
# into the destination array

if self._compressor:
if chunk_was_cached:
np.copyto(dest, cdata)
elif self._compressor:
self._compressor.decode(cdata, dest)
if self._chunk_cache is not None:
self._chunk_cache[ckey] = np.copy(dest)
else:
if isinstance(cdata, np.ndarray):
chunk = cdata.view(self._dtype)
else:
chunk = np.frombuffer(cdata, dtype=self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
np.copyto(dest, chunk)
if self._chunk_cache is not None:
self._chunk_cache[ckey] = np.copy(chunk)
return

# decode chunk
chunk = self._decode_chunk(cdata)
if not chunk_was_cached:
chunk = self._decode_chunk(cdata)
if self._chunk_cache is not None:
self._chunk_cache[ckey] = np.copy(chunk)
else:
chunk = cdata

# select data from chunk
if fields:
Expand Down Expand Up @@ -1720,6 +1778,13 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non
# store
self.chunk_store[ckey] = cdata

# cache the chunk
if self._chunk_cache is not None:
# ensure cached chunk has been round tripped through encode-decode if dtype=object
if self.dtype == object:
chunk = self._decode_chunk(cdata)
self._chunk_cache[ckey] = np.copy(chunk)

def _chunk_key(self, chunk_coords):
return self._key_prefix + '.'.join(map(str, chunk_coords))

Expand Down
142 changes: 142 additions & 0 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,3 +1883,145 @@ def __delitem__(self, key):
with self._mutex:
self._invalidate_keys()
self._invalidate_value(key)


class LRUChunkCache(MutableMapping):
"""Class that implements a least-recently-used (LRU) cache for array chunks.
Intended primarily for use with stores that can be slow to access, e.g., remote stores that
require network communication to store and retrieve data, and/or arrays where decompression
of data is computationally expensive.

Parameters
----------
max_size : int
The maximum size that the cache may grow to, in number of bytes. Provide `None`
if you would like the cache to have unlimited size.

Examples
--------
The example below uses a dict store to store the encoded array and uses LRUChunkCache to
store decoded chunks::

>>> import zarr
>>> from numcodecs import LZMA
>>> import numpy as np
>>> store = zarr.DictStore()
>>> z = zarr.array(np.random.randn(1000000).reshape(1000,1000), chunks=(100,100),
... store=store, compressor=LZMA())
>>> from timeit import timeit
>>> # data access without cache
... timeit('z[:]', number=1, globals=globals()) # doctest: +SKIP
0.6703157789888792
>>> z_with_cache = zarr.Array(store=store, chunk_cache=zarr.LRUChunkCache(max_size=None))
>>> # first data access about the same as without cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.681269913999131
>>> # second time accesses the decoded chunks in the cache
... timeit('z_with_cache[:]', number=1, globals=globals()) # doctest: +SKIP
0.007617925992235541

"""

def __init__(self, max_size):
self._max_size = max_size
self._current_size = 0
self._values_cache = OrderedDict()
self._mutex = Lock()
self.hits = self.misses = 0

def __getstate__(self):
return (self._max_size, self._current_size,
self._values_cache, self.hits,
self.misses)

def __setstate__(self, state):
(self._store, self._max_size, self._current_size,
self._values_cache, self.hits,
self.misses) = state
self._mutex = Lock()

def __len__(self):
return len(self._keys())

def __iter__(self):
return self.keys()

def __contains__(self, key):
with self._mutex:
return key in self._keys()

def clear(self):
self.invalidate()

def keys(self):
with self._mutex:
return iter(self._keys())

def _keys(self):
return self._values_cache.keys()

def _pop_value(self):
# remove the first value from the cache, as this will be the least recently
# used value
_, v = self._values_cache.popitem(last=False)
return v

def _accommodate_value(self, value_size):
if self._max_size is None:
return
# ensure there is enough space in the cache for a new value
while self._current_size + value_size > self._max_size:
v = self._pop_value()
self._current_size -= buffer_size(v)

def _cache_value(self, key, value):
# cache a value
value_size = buffer_size(value)
# check size of the value against max size, as if the value itself exceeds max
# size then we are never going to cache it
if self._max_size is None or value_size <= self._max_size:
self._accommodate_value(value_size)
self._values_cache[key] = value
self._current_size += value_size

def invalidate(self):
"""Completely clear the cache."""
with self._mutex:
self._values_cache.clear()

def invalidate_values(self):
"""Clear the values cache."""
with self._mutex:
self._values_cache.clear()

def _invalidate_value(self, key):
if key in self._values_cache:
value = self._values_cache.pop(key)
self._current_size -= buffer_size(value)

def __getitem__(self, key):
try:
# try to obtain the value from the cache
with self._mutex:
value = self._values_cache[key]
# cache hit if no KeyError is raised
self.hits += 1
# treat the end as most recently used
OrderedDict_move_to_end(self._values_cache, key)

except KeyError:
# cache miss
with self._mutex:
self.misses += 1
raise KeyError

return value

def __setitem__(self, key, value):
with self._mutex:
self._invalidate_value(key)
self._cache_value(key, value)

def __delitem__(self, key):
with self._mutex:
self._invalidate_value(key)
Loading