Skip to content

adds partial_decompress capabilites #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1626,12 +1626,18 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
return

# decode chunk
print(is_contiguous_selection(chunk_selection))
print(self.chunks)
print(chunk_selection)
if self._compressor.codec_id == 'blosc':
pass
chunk = self._decode_chunk(cdata)

# select data from chunk
if fields:
chunk = chunk[fields]
tmp = chunk[chunk_selection]
print(tmp)
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)

Expand Down Expand Up @@ -1731,11 +1737,16 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non
def _chunk_key(self, chunk_coords):
return self._key_prefix + '.'.join(map(str, chunk_coords))

def _decode_chunk(self, cdata):
def _decode_chunk(self, cdata, start=None, nitems=None):

# decompress
if self._compressor:
chunk = self._compressor.decode(cdata)
# only decode requested items
if (all([x is not None for x in [start, nitems]])
and self._compressor.codec_id == 'blosc'):
chunk = self._compressor.decode_partial(cdata, start, nitems)
else:
chunk = self._compressor.decode(cdata)
else:
chunk = cdata

Expand Down
59 changes: 59 additions & 0 deletions zarr/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,62 @@ def pop_fields(selection):
selection = tuple(s for s in selection if not isinstance(s, str))
selection = selection[0] if len(selection) == 1 else selection
return fields, selection


def selection_size(selection, arr):
if len(selection) > len(arr.shape):
raise ValueError(f'dimensions in selection cant be greater than dimensions or array: {len(selection)} > {len(arr.shape)}')
selection_shape = []
for i, size in arr.shape:
selection_slice = selection[i] if i < len(selection) else None
if selection_slice:
selection_slice_size = len(range(*selection_slice.indices(len(arr))))
selection_shape.append(selection_slice_size)
else:
selection_shape.append(size)
return tuple(selection_shape)


class PartialChunkIterator(object):

def __init__(self, selection, arr):
self.arr = arr
self.selection = list(selection)

for i, dim_shape in enumerate(self.arr.shape[slice(None, None, -1)]):
index = len(self.arr.shape) - (i+1)
if index <= len(selection)-1:
slice_nitems = len(range(*selection[index].indices(len(self.arr))))
if slice_nitems == dim_shape:
self.selection.pop()
else:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this block of code correctly:

here we seem to be looking for dimensions where we select the whole thing.
we start from the end of selection, and shape, and while there is no selections
or the length is 100%, we pop.

Is that correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I do this to maximize nitems/minimize the number of partial decompresions called. This logic helps get to the 200 nitems in the test example above.


out_slices = []
chunk_loc_slices = []

last_dim_slice = None if self.selection[-1].step > 1 else self.selection.pop()
for sl in self.selection:
dim_out_slices = []
dim_chunk_loc_slices = []
for i, x in enumerate(range(*sl.indices(len(self.arr)))):
dim_out_slices.append(slice(i, i+1, 1))
dim_chunk_loc_slices.append(slice(x, x+1, 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC here you are computing each individual 1 element wide selection across each axis both in the origin chunk coordinate system, and in the output version right ?

Would/could this be simpler if the step of the sl ic 1 ? because in that case you have a continuous sl right ? or did I misunderstood.

No a request to change, it can be an optimisation for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats right, except for the slice of the last dimension, if the that slice has a step of 1. if that slice has a step of more than one, then it is included here, though if thats the last dimension of the chunk, then it would be really inefficient since nitems would be 1. If the slice of the last dimension has a step of one, then the slice will cover more than one slice and the output slice is calculated by stop - start

out_slices.append(dim_out_slices)
chunk_loc_slices.append(dim_chunk_loc_slices)
if last_dim_slice:
out_slices.append(
[slice(0, last_dim_slice.stop - last_dim_slice.start, 1)])
chunk_loc_slices.append([last_dim_slice])

self.out_slices = itertools.product(*out_slices)
self.chunk_loc_slices = itertools.product(*chunk_loc_slices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for myself, out_slices are the slices to assign elements in the output array, chunk_loc_slices the indices of where to read the the original chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats right


def __iter__(self):
for out_selection, chunk_selection in zip(self.out_slices, self.chunk_loc_slices):
start = 0
for i, sl in enumerate(chunk_selection):
start += sl.start * np.prod(self.arr.shape[i+1:])
nitems = (chunk_selection[-1].stop - chunk_selection[-1].start) * np.prod(self.arr.shape[len(chunk_selection):])
yield start, nitems, out_selection

2 changes: 2 additions & 0 deletions zarr/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def test_array_1d(self):
# noinspection PyTypeChecker
assert_array_equal(a, z[slice(None)])
assert_array_equal(a[:10], z[:10])
assert False
assert_array_equal(a[10:20], z[10:20])
assert_array_equal(a[-10:], z[-10:])
assert_array_equal(a[:10, ...], z[:10, ...])
Expand Down Expand Up @@ -374,6 +375,7 @@ def test_array_2d(self):
# slicing across chunk boundaries
assert_array_equal(a[:110], z[:110])
assert_array_equal(a[190:310], z[190:310])
assert False
assert_array_equal(a[-110:], z[-110:])
assert_array_equal(a[:110, :], z[:110, :])
assert_array_equal(a[190:310, :], z[190:310, :])
Expand Down
28 changes: 27 additions & 1 deletion zarr/tests/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import zarr
from zarr.indexing import (normalize_integer_selection, oindex, oindex_set,
replace_ellipsis)
replace_ellipsis, PartialChunkIterator)


def test_normalize_integer_selection():
Expand Down Expand Up @@ -1289,3 +1289,29 @@ def test_set_selections_with_fields():
a[key][ix] = v[key][ix]
z.set_mask_selection(ix, v[key][ix], fields=fields)
assert_array_equal(a, z[:])


@pytest.mark.parametrize('selection, expected', [
((slice(5, 8, 1), slice(2, 4, 1), slice(0, 100, 1)),
[(5200, 200, (slice(0, 1, 1), slice(0, 2, 1))),
(6200, 200, (slice(1, 2, 1), slice(0, 2, 1))),
(7200, 200, (slice(2, 3, 1), slice(0, 2, 1)))]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So trying to understand this.

arr[5:8, 2:4, 0:100]

mean we need to read 200 items at position 5200, 6200 and 7200 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, really its taking 1 item from the first dimension and 2 items of 100 from the second dimension. So I wrote the code so that it takes 200 items since they are consecutive in the compressed buffer anyway. The data just has to be reshaped before it is put into the chunk output array

((slice(5, 8, 1), slice(2, 4, 1), slice(0, 5, 1)),
[(5200.0, 5.0, (slice(0, 1, 1), slice(0, 1, 1), slice(0, 5, 1))),
(5300.0, 5.0, (slice(0, 1, 1), slice(1, 2, 1), slice(0, 5, 1))),
(6200.0, 5.0, (slice(1, 2, 1), slice(0, 1, 1), slice(0, 5, 1))),
(6300.0, 5.0, (slice(1, 2, 1), slice(1, 2, 1), slice(0, 5, 1))),
(7200.0, 5.0, (slice(2, 3, 1), slice(0, 1, 1), slice(0, 5, 1))),
(7300.0, 5.0, (slice(2, 3, 1), slice(1, 2, 1), slice(0, 5, 1)))]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

floats are weird, only int in output no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the np.prod call used to generate the nitems is making that a float. I'll update that

((slice(5, 8, 1), slice(2, 4, 1)),
[(5200, 200, (slice(0, 1, 1), slice(0, 2, 1))),
(6200, 200, (slice(1, 2, 1), slice(0, 2, 1))),
(7200, 200, (slice(2, 3, 1), slice(0, 2, 1)))])
])
def test_PartialChunkIterator(selection, expected):
arr = np.arange(2, 100002).reshape((100, 10, 100))
print(selection)
PCI = PartialChunkIterator(selection, arr)
results = list(PCI)
assert(results == expected)