Skip to content

Adds documentation for sharding and sharding in Array.info #2644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions docs/user-guide/arrays.rst
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,41 @@ Any combination of integer and slice can be used for block indexing::
Sharding
--------

Coming soon.

Using small chunk shapes in very large arrays can lead to a very large number of chunks.
This can become a performance issue for file systems and object storage.
With Zarr format 3, a new sharding feature has been added to address this issue.

With sharding, multiple chunks can be stored in a single storage object (e.g. a file).
Within a shard, chunks are compressed and serialized separately.
This allows individual chunks to be read independently.
However, when writing data, a full shard must be written in one go for optimal
performance and to avoid concurrency issues.
That means that shards are the units of writing and chunks are the units of reading.
Users need to configure the chunk and shard shapes accordingly.

Sharded arrays can be created by providing the ``shards`` parameter to :func:`zarr.create_array`.

>>> a = zarr.create_array('data/example-20.zarr', shape=(10000, 10000), shards=(1000, 1000), chunks=(100, 100), dtype='uint8')
>>> a[:] = (np.arange(10000 * 10000) % 256).astype('uint8').reshape(10000, 10000)
>>> a.info_complete()
Type : Array
Zarr format : 3
Data type : DataType.uint8
Shape : (10000, 10000)
Shard shape : (1000, 1000)
Chunk shape : (100, 100)
Order : C
Read-only : False
Store type : LocalStore
Codecs : [{'chunk_shape': (100, 100), 'codecs': ({'endian': <Endian.little: 'little'>}, {'level': 0, 'checksum': False}), 'index_codecs': ({'endian': <Endian.little: 'little'>}, {}), 'index_location': <ShardingCodecIndexLocation.end: 'end'>}]
No. bytes : 100000000 (95.4M)
No. bytes stored : 3981060
Storage ratio : 25.1
Chunks Initialized : 100

In this example a shard shape of (1000, 1000) and a chunk shape of (100, 100) is used.
This means that 10*10 chunks are stored in each shard, and there are 10*10 shards in total.
Without the ``shards`` argument, there would be 10,000 chunks stored as individual files.

Missing features in 3.0
-----------------------
Expand Down
39 changes: 39 additions & 0 deletions docs/user-guide/performance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,45 @@ will be one single chunk for the array::
>>> z5.chunks
(10000, 10000)


Sharding
~~~~~~~~

If you have large arrays but need small chunks to efficiently access the data, you can
use sharding. Sharding provides a mechanism to store multiple chunks in a single
storage object or file. This can be useful because traditional file systems and object
storage systems may have performance issues storing and accessing many files.
Additionally, small files can be inefficient to store if they are smaller than the
block size of the file system.

Picking a good combination of chunk shape and shard shape is important for performance.
The chunk shape determines what unit of your data can be read independently, while the
shard shape determines what unit of your data can be written efficiently.

For an example, consider you have a 100 GB array and need to read small chunks of 1 MB.
Without sharding, each chunk would be one file resulting in 100,000 files. That can
already cause performance issues on some file systems.
With sharding, you could use a shard size of 1 GB. This would result in 1000 chunks per
file and 100 files in total, which seems manageable for most storage systems.
You would still be able to read each 1 MB chunk independently, but you would need to
write your data in 1 GB increments.

To use sharding, you need to specify the ``shards`` parameter when creating the array.

>>> z6 = zarr.create_array(store={}, shape=(10000, 10000, 1000), shards=(1000, 1000, 1000), chunks=(100, 100, 100), dtype='uint8')
>>> z6.info
Type : Array
Zarr format : 3
Data type : DataType.uint8
Shape : (10000, 10000, 1000)
Shard shape : (1000, 1000, 1000)
Chunk shape : (100, 100, 100)
Order : C
Read-only : False
Store type : MemoryStore
Codecs : [{'chunk_shape': (100, 100, 100), 'codecs': ({'endian': <Endian.little: 'little'>}, {'level': 0, 'checksum': False}), 'index_codecs': ({'endian': <Endian.little: 'little'>}, {}), 'index_location': <ShardingCodecIndexLocation.end: 'end'>}]
No. bytes : 100000000000 (93.1G)

.. _user-guide-chunks-order:

Chunk memory layout
Expand Down
9 changes: 8 additions & 1 deletion src/zarr/core/_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ArrayInfo:
_zarr_format: ZarrFormat
_data_type: np.dtype[Any] | DataType
_shape: tuple[int, ...]
_shard_shape: tuple[int, ...] | None = None
_chunk_shape: tuple[int, ...] | None = None
_order: Literal["C", "F"]
_read_only: bool
Expand All @@ -96,7 +97,13 @@ def __repr__(self) -> str:
Type : {_type}
Zarr format : {_zarr_format}
Data type : {_data_type}
Shape : {_shape}
Shape : {_shape}""")

if self._shard_shape is not None:
template += textwrap.dedent("""
Shard shape : {_shard_shape}""")

template += textwrap.dedent("""
Chunk shape : {_chunk_shape}
Order : {_order}
Read-only : {_read_only}
Expand Down
10 changes: 2 additions & 8 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -1573,14 +1573,8 @@ def _info(
else:
kwargs["_codecs"] = self.metadata.codecs
kwargs["_data_type"] = self.metadata.data_type
# just regular?
chunk_grid = self.metadata.chunk_grid
if isinstance(chunk_grid, RegularChunkGrid):
kwargs["_chunk_shape"] = chunk_grid.chunk_shape
else:
raise NotImplementedError(
"'info' is not yet implemented for chunk grids of type {type(self.metadata.chunk_grid)}"
)
kwargs["_chunk_shape"] = self.chunks
kwargs["_shard_shape"] = self.shards

return ArrayInfo(
_zarr_format=self.metadata.zarr_format,
Expand Down
21 changes: 15 additions & 6 deletions src/zarr/core/codec_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,21 @@ async def write_batch(
drop_axes: tuple[int, ...] = (),
) -> None:
if self.supports_partial_encode:
await self.encode_partial_batch(
[
(byte_setter, value[out_selection], chunk_selection, chunk_spec)
for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info
],
)
# Pass scalar values as is
if len(value.shape) == 0:
await self.encode_partial_batch(
[
(byte_setter, value, chunk_selection, chunk_spec)
for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info
],
)
else:
await self.encode_partial_batch(
[
(byte_setter, value[out_selection], chunk_selection, chunk_spec)
for byte_setter, chunk_spec, chunk_selection, out_selection in batch_info
],
)

else:
# Read existing bytes if not total slice
Expand Down
136 changes: 92 additions & 44 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
VLenUTF8Codec,
ZstdCodec,
)
from zarr.codecs.sharding import ShardingCodec
from zarr.core._info import ArrayInfo
from zarr.core.array import (
CompressorsLike,
Expand Down Expand Up @@ -478,121 +479,168 @@ def test_update_attrs(zarr_format: ZarrFormat) -> None:
assert arr2.attrs["foo"] == "bar"


@pytest.mark.parametrize(("chunks", "shards"), [((2, 2), None), ((2, 2), (4, 4))])
class TestInfo:
def test_info_v2(self) -> None:
arr = zarr.create(shape=(4, 4), chunks=(2, 2), zarr_format=2)
def test_info_v2(self, chunks: tuple[int, int], shards: tuple[int, int] | None) -> None:
arr = zarr.create_array(store={}, shape=(8, 8), dtype="f8", chunks=chunks, zarr_format=2)
result = arr.info
expected = ArrayInfo(
_zarr_format=2,
_data_type=np.dtype("float64"),
_shape=(4, 4),
_chunk_shape=(2, 2),
_shape=(8, 8),
_chunk_shape=chunks,
_shard_shape=None,
_order="C",
_read_only=False,
_store_type="MemoryStore",
_count_bytes=128,
_count_bytes=512,
_compressor=numcodecs.Zstd(),
)
assert result == expected

def test_info_v3(self) -> None:
arr = zarr.create(shape=(4, 4), chunks=(2, 2), zarr_format=3)
def test_info_v3(self, chunks: tuple[int, int], shards: tuple[int, int] | None) -> None:
arr = zarr.create_array(store={}, shape=(8, 8), dtype="f8", chunks=chunks, shards=shards)
result = arr.info
expected = ArrayInfo(
_zarr_format=3,
_data_type=DataType.parse("float64"),
_shape=(4, 4),
_chunk_shape=(2, 2),
_shape=(8, 8),
_chunk_shape=chunks,
_shard_shape=shards,
_order="C",
_read_only=False,
_store_type="MemoryStore",
_codecs=[BytesCodec(), ZstdCodec()],
_count_bytes=128,
_codecs=[BytesCodec(), ZstdCodec()]
if shards is None
else [ShardingCodec(chunk_shape=chunks, codecs=[BytesCodec(), ZstdCodec()])],
_count_bytes=512,
)
assert result == expected

def test_info_complete(self) -> None:
arr = zarr.create(shape=(4, 4), chunks=(2, 2), zarr_format=3, codecs=[BytesCodec()])
def test_info_complete(self, chunks: tuple[int, int], shards: tuple[int, int] | None) -> None:
arr = zarr.create_array(
store={},
shape=(8, 8),
dtype="f8",
chunks=chunks,
shards=shards,
compressors=(),
)
result = arr.info_complete()
expected = ArrayInfo(
_zarr_format=3,
_data_type=DataType.parse("float64"),
_shape=(4, 4),
_chunk_shape=(2, 2),
_shape=(8, 8),
_chunk_shape=chunks,
_shard_shape=shards,
_order="C",
_read_only=False,
_store_type="MemoryStore",
_codecs=[BytesCodec()],
_count_bytes=128,
_codecs=[BytesCodec()] if shards is None else [ShardingCodec(chunk_shape=chunks)],
_count_bytes=512,
_count_chunks_initialized=0,
_count_bytes_stored=373, # the metadata?
_count_bytes_stored=373 if shards is None else 578, # the metadata?
)
assert result == expected

arr[:2, :2] = 10
arr[:4, :4] = 10
result = arr.info_complete()
expected = dataclasses.replace(
expected, _count_chunks_initialized=1, _count_bytes_stored=405
)
if shards is None:
expected = dataclasses.replace(
expected, _count_chunks_initialized=4, _count_bytes_stored=501
)
else:
expected = dataclasses.replace(
expected, _count_chunks_initialized=1, _count_bytes_stored=774
)
assert result == expected

async def test_info_v2_async(self) -> None:
arr = await zarr.api.asynchronous.create(shape=(4, 4), chunks=(2, 2), zarr_format=2)
async def test_info_v2_async(
self, chunks: tuple[int, int], shards: tuple[int, int] | None
) -> None:
arr = await zarr.api.asynchronous.create_array(
store={}, shape=(8, 8), dtype="f8", chunks=chunks, zarr_format=2
)
result = arr.info
expected = ArrayInfo(
_zarr_format=2,
_data_type=np.dtype("float64"),
_shape=(4, 4),
_shape=(8, 8),
_chunk_shape=(2, 2),
_shard_shape=None,
_order="C",
_read_only=False,
_store_type="MemoryStore",
_count_bytes=128,
_count_bytes=512,
_compressor=numcodecs.Zstd(),
)
assert result == expected

async def test_info_v3_async(self) -> None:
arr = await zarr.api.asynchronous.create(shape=(4, 4), chunks=(2, 2), zarr_format=3)
async def test_info_v3_async(
self, chunks: tuple[int, int], shards: tuple[int, int] | None
) -> None:
arr = await zarr.api.asynchronous.create_array(
store={},
shape=(8, 8),
dtype="f8",
chunks=chunks,
shards=shards,
)
result = arr.info
expected = ArrayInfo(
_zarr_format=3,
_data_type=DataType.parse("float64"),
_shape=(4, 4),
_chunk_shape=(2, 2),
_shape=(8, 8),
_chunk_shape=chunks,
_shard_shape=shards,
_order="C",
_read_only=False,
_store_type="MemoryStore",
_codecs=[BytesCodec(), ZstdCodec()],
_count_bytes=128,
_codecs=[BytesCodec(), ZstdCodec()]
if shards is None
else [ShardingCodec(chunk_shape=chunks, codecs=[BytesCodec(), ZstdCodec()])],
_count_bytes=512,
)
assert result == expected

async def test_info_complete_async(self) -> None:
arr = await zarr.api.asynchronous.create(
shape=(4, 4), chunks=(2, 2), zarr_format=3, codecs=[BytesCodec()]
async def test_info_complete_async(
self, chunks: tuple[int, int], shards: tuple[int, int] | None
) -> None:
arr = await zarr.api.asynchronous.create_array(
store={},
dtype="f8",
shape=(8, 8),
chunks=chunks,
shards=shards,
compressors=None,
)
result = await arr.info_complete()
expected = ArrayInfo(
_zarr_format=3,
_data_type=DataType.parse("float64"),
_shape=(4, 4),
_chunk_shape=(2, 2),
_shape=(8, 8),
_chunk_shape=chunks,
_shard_shape=shards,
_order="C",
_read_only=False,
_store_type="MemoryStore",
_codecs=[BytesCodec()],
_count_bytes=128,
_codecs=[BytesCodec()] if shards is None else [ShardingCodec(chunk_shape=chunks)],
_count_bytes=512,
_count_chunks_initialized=0,
_count_bytes_stored=373, # the metadata?
_count_bytes_stored=373 if shards is None else 578, # the metadata?
)
assert result == expected

await arr.setitem((slice(2), slice(2)), 10)
await arr.setitem((slice(4), slice(4)), 10)
result = await arr.info_complete()
expected = dataclasses.replace(
expected, _count_chunks_initialized=1, _count_bytes_stored=405
)
if shards is None:
expected = dataclasses.replace(
expected, _count_chunks_initialized=4, _count_bytes_stored=501
)
else:
expected = dataclasses.replace(
expected, _count_chunks_initialized=1, _count_bytes_stored=774
)
assert result == expected


Expand Down
Loading
Loading