Skip to content

adds partial_decompress capabilites #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2 changes: 1 addition & 1 deletion requirements_dev_minimal.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# library requirements
asciitree==0.3.3
fasteners==0.15
numcodecs==0.6.4
numcodecs==0.7.2
msgpack-python==0.5.6
setuptools-scm==3.3.3
# test requirements
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ commands =
py35,py36,py37-npylatest,py38: coverage report -m
# run doctests in the tutorial and spec
py38: python -m doctest -o NORMALIZE_WHITESPACE -o ELLIPSIS docs/tutorial.rst docs/spec/v2.rst
py38_test: pytest {posargs}
# pep8 checks
py38: flake8 zarr
# print environment for debugging
Expand Down
39 changes: 31 additions & 8 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

from zarr.attrs import Attributes
from zarr.codecs import AsType, get_codec
from zarr.errors import ArrayNotFoundError, ReadOnlyError
from zarr.errors import ArrayNotFoundError, ReadOnlyError, ArrayIndexError
from zarr.indexing import (BasicIndexer, CoordinateIndexer, MaskIndexer,
OIndex, OrthogonalIndexer, VIndex, check_fields,
OIndex, OrthogonalIndexer, VIndex, PartialChunkIterator,
check_fields,
check_no_multi_fields, ensure_tuple,
err_too_many_indices, is_contiguous_selection,
is_scalar, pop_fields)
Expand Down Expand Up @@ -1579,7 +1580,6 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
# optimization: we want the whole chunk, and the destination is
# contiguous, so we can decompress directly from the chunk
# into the destination array

if self._compressor:
self._compressor.decode(cdata, dest)
else:
Expand All @@ -1589,16 +1589,34 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
return

# decode chunk
try:
if self._compressor and self._compressor.codec_id == 'blosc' \
and not fields and self.dtype != object:
tmp = np.empty(self._chunks, dtype=self.dtype)
index_selection = PartialChunkIterator(chunk_selection, self.chunks)
for start, nitems, partial_out_selection in index_selection:
expected_shape = [
len(range(*partial_out_selection[i].indices(self.chunks[0]+1)))
if i < len(partial_out_selection) else dim
for i, dim in enumerate(self.chunks)]
chunk_partial = self._decode_chunk(
cdata, start=start, nitems=nitems,
expected_shape=expected_shape)
tmp[partial_out_selection] = chunk_partial
out[out_selection] = tmp[chunk_selection]
return
except ArrayIndexError:
pass
chunk = self._decode_chunk(cdata)

# select data from chunk
# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

# store selected data in output
# store selected data in output
out[out_selection] = tmp

def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
Expand Down Expand Up @@ -1770,11 +1788,16 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non
def _chunk_key(self, chunk_coords):
return self._key_prefix + '.'.join(map(str, chunk_coords))

def _decode_chunk(self, cdata):
def _decode_chunk(self, cdata, start=None, nitems=None, expected_shape=None):

# decompress
if self._compressor:
chunk = self._compressor.decode(cdata)
# only decode requested items
if (all([x is not None for x in [start, nitems]])
and self._compressor.codec_id == 'blosc') and hasattr(self._compressor, 'decode_partial'):
chunk = self._compressor.decode_partial(cdata, start, nitems)
else:
chunk = self._compressor.decode(cdata)
else:
chunk = cdata

Expand All @@ -1799,7 +1822,7 @@ def _decode_chunk(self, cdata):

# ensure correct chunk shape
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(self._chunks, order=self._order)
chunk = chunk.reshape(expected_shape or self._chunks, order=self._order)

return chunk

Expand Down
5 changes: 5 additions & 0 deletions zarr/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ class CopyError(RuntimeError):
pass


class ArrayIndexError(IndexError):
pass


class _BaseZarrError(ValueError):
_msg = ""


def __init__(self, *args):
super().__init__(self._msg.format(*args))

Expand Down
85 changes: 84 additions & 1 deletion zarr/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@

import numpy as np


from zarr.errors import (
ArrayIndexError,
NegativeStepError,
err_too_many_indices,
VindexInvalidSelectionError,
BoundsCheckError,
BoundsCheckError
)



def is_integer(x):
return isinstance(x, numbers.Integral)

Expand Down Expand Up @@ -825,3 +828,83 @@ def pop_fields(selection):
selection = tuple(s for s in selection if not isinstance(s, str))
selection = selection[0] if len(selection) == 1 else selection
return fields, selection


def int_to_slice(dim_selection):
return slice(dim_selection, dim_selection+1, 1)

def make_slice_selection(selection):
ls = []
for dim_selection in selection:
if is_integer(dim_selection):
ls.append(int_to_slice(dim_selection))
elif isinstance(dim_selection, np.ndarray):
if len(dim_selection) == 1:
ls.append(int_to_slice(dim_selection[0]))
else:
raise ArrayIndexError()
else:
ls.append(dim_selection)
return ls


class PartialChunkIterator(object):
"""Iterator tp retrieve the specific coordinates of requested data
from within a compressed chunk.

Parameters
-----------
selection : tuple
tuple of slice objects to take from the chunk
arr_shape : shape of chunk to select data from

Attributes
-----------
arr_shape
selection
"""

def __init__(self, selection, arr_shape):
self.selection = make_slice_selection(selection)
self.arr_shape = arr_shape

# number of selection dimensions can't be greater than the number of chunk dimensions
if len(self.selection) > len(self.arr_shape):
raise ValueError('Selection has more dimensions then the array:\n'
'selection dimensions = {len(self.selection)\n'
'array dimensions = {len(self.arr_shape)}')

# any selection can not be out of the range of the chunk
self.selection_shape = np.empty(self.arr_shape)[self.selection].shape
if any([selection_dim < 0 or selection_dim > arr_dim for selection_dim, arr_dim
in zip(self.selection_shape, self.arr_shape)]):
raise IndexError('a selection index is out of range for the dimension')

for i, dim_size in enumerate(self.arr_shape[::-1]):
index = len(self.arr_shape) - (i+1)
if index <= len(self.selection)-1:
slice_size = self.selection_shape[index]
if slice_size == dim_size and index > 0:
self.selection.pop()
else:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this block of code correctly:

here we seem to be looking for dimensions where we select the whole thing.
we start from the end of selection, and shape, and while there is no selections
or the length is 100%, we pop.

Is that correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I do this to maximize nitems/minimize the number of partial decompresions called. This logic helps get to the 200 nitems in the test example above.


chunk_loc_slices = []
last_dim_slice = None if self.selection[-1].step > 1 else self.selection.pop()
for i, sl in enumerate(self.selection):
dim_chunk_loc_slices = []
for i, x in enumerate(slice_to_range(sl, arr_shape[i])):
dim_chunk_loc_slices.append(slice(x, x+1, 1))
chunk_loc_slices.append(dim_chunk_loc_slices)
if last_dim_slice:
chunk_loc_slices.append([last_dim_slice])
self.chunk_loc_slices = list(itertools.product(*chunk_loc_slices))

def __iter__(self):
chunk1 = self.chunk_loc_slices[0]
nitems = (chunk1[-1].stop - chunk1[-1].start) * np.prod(self.arr_shape[len(chunk1):])
for chunk_selection in self.chunk_loc_slices:
start = 0
for i, sl in enumerate(chunk_selection):
start += sl.start * np.prod(self.arr_shape[i+1:])
yield int(start), int(nitems), chunk_selection
1 change: 1 addition & 0 deletions zarr/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def test_array_1d_selections(self):
ix = [99, 100, 101]
bix = np.zeros_like(a, dtype=bool)
bix[ix] = True
print(a[ix])
assert_array_equal(a[ix], z.get_orthogonal_selection(ix))
assert_array_equal(a[ix], z.oindex[ix])
assert_array_equal(a[ix], z.get_coordinate_selection(ix))
Expand Down
60 changes: 59 additions & 1 deletion zarr/tests/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import zarr
from zarr.indexing import (normalize_integer_selection, oindex, oindex_set,
replace_ellipsis)
replace_ellipsis, PartialChunkIterator)


def test_normalize_integer_selection():
Expand Down Expand Up @@ -1288,3 +1288,61 @@ def test_set_selections_with_fields():
a[key][ix] = v[key][ix]
z.set_mask_selection(ix, v[key][ix], fields=fields)
assert_array_equal(a, z[:])


@pytest.mark.parametrize('selection, arr, expected', [
((slice(5, 8, 1), slice(2, 4, 1), slice(0, 100, 1)),
np.arange(2, 100_002).reshape((100, 10, 100)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python 3.5 won't like those... we can argue to drop Python 3.5 I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underscore in the number you mean? I can also just take them out too. I didn't realize they weren't supported in all the python3 versions

[(5200, 200, (slice(5, 6, 1), slice(2, 4, 1))),
(6200, 200, (slice(6, 7, 1), slice(2, 4, 1))),
(7200, 200, (slice(7, 8, 1), slice(2, 4, 1)))]),
((slice(5, 8, 1), slice(2, 4, 1), slice(0, 5, 1)),
np.arange(2, 100_002).reshape((100, 10, 100)),
[(5200.0, 5.0, (slice(5, 6, 1), slice(2, 3, 1), slice(0, 5, 1))),
(5300.0, 5.0, (slice(5, 6, 1), slice(3, 4, 1), slice(0, 5, 1))),
(6200.0, 5.0, (slice(6, 7, 1), slice(2, 3, 1), slice(0, 5, 1))),
(6300.0, 5.0, (slice(6, 7, 1), slice(3, 4, 1), slice(0, 5, 1))),
(7200.0, 5.0, (slice(7, 8, 1), slice(2, 3, 1), slice(0, 5, 1))),
(7300.0, 5.0, (slice(7, 8, 1), slice(3, 4, 1), slice(0, 5, 1)))]),
((slice(5, 8, 1), slice(2, 4, 1), slice(0, 5, 1)),
np.asfortranarray(np.arange(2, 100_002).reshape((100, 10, 100))),
[(5200.0, 5.0, (slice(5, 6, 1), slice(2, 3, 1), slice(0, 5, 1))),
(5300.0, 5.0, (slice(5, 6, 1), slice(3, 4, 1), slice(0, 5, 1))),
(6200.0, 5.0, (slice(6, 7, 1), slice(2, 3, 1), slice(0, 5, 1))),
(6300.0, 5.0, (slice(6, 7, 1), slice(3, 4, 1), slice(0, 5, 1))),
(7200.0, 5.0, (slice(7, 8, 1), slice(2, 3, 1), slice(0, 5, 1))),
(7300.0, 5.0, (slice(7, 8, 1), slice(3, 4, 1), slice(0, 5, 1)))]),
((slice(5, 8, 1), slice(2, 4, 1)),
np.arange(2, 100_002).reshape((100, 10, 100)),
[(5200, 200, (slice(5, 6, 1), slice(2, 4, 1))),
(6200, 200, (slice(6, 7, 1), slice(2, 4, 1))),
(7200, 200, (slice(7, 8, 1), slice(2, 4, 1)))]),
((slice(0, 10, 1),),
np.arange(0, 10).reshape((10)),
[(0, 10, (slice(0, 10, 1),))]),
((0,),
np.arange(0, 100).reshape((10, 10)),
[(0, 10, (slice(0, 1, 1),))]),
((0,0,),
np.arange(0, 100).reshape((10, 10)),
[(0, 1, (slice(0, 1, 1), slice(0, 1, 1)))]),
((0,),
np.arange(0, 10).reshape((10)),
[(0, 1, (slice(0, 1, 1),))]),
pytest.param((slice(5, 8, 1), slice(2, 4, 1), slice(0, 5, 1)),
np.arange(2, 100002).reshape((10, 1, 10000)),
None,
marks=[pytest.mark.xfail(reason='slice 2 is out of range')]
),
pytest.param((slice(5, 8, 1), slice(2, 4, 1), slice(0, 5, 1)),
np.arange(2, 100_002).reshape((10, 10_000)),
None,
marks=[pytest.mark.xfail(reason='slice 2 is out of range')]
),
])
def test_PartialChunkIterator(selection, arr, expected):
print(selection)
PCI = PartialChunkIterator(selection, arr.shape)
results = list(PCI)
assert(results == expected)