-
Notifications
You must be signed in to change notification settings - Fork 7.3k
[Data] Streaming Partition enforce row_num per block #57984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 50 commits
7e39adb
ad81683
02a27c5
1a28369
344c0c7
8c63cd0
6f47d8a
6796f01
0c2e9a1
ff71cbe
05c5e61
13d2280
a60fbe6
25be194
3c0b02e
e09ce6d
4612f71
944f920
7fd7c49
ba7f5f0
37a51a4
1bdf310
f1dc0f7
c5f3532
7577cef
4e83486
8c2e4cf
d51180a
75e3435
6231a20
98493fa
3106805
f0c0bd3
59d419c
130fd44
ea7a3dc
0ab949e
96ffe1a
e1bbb0e
6e17feb
6e1ed9f
a08fe95
ca02719
bb8fdfd
d60d898
e14c832
fd15bff
75bed79
98aa170
e23874e
9bf1a09
c22c97f
c9b66e0
dfe11ab
82ab74b
589b97f
5b23b18
970b551
a4390f9
042312f
2868f9a
2c5cc47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,32 @@ | ||
| import itertools | ||
| import math | ||
| from collections import defaultdict | ||
| from dataclasses import dataclass | ||
| from typing import Dict, Iterator, List, Optional, Tuple | ||
| from typing import Dict, Iterable, Iterator, List, Optional, Tuple | ||
|
|
||
| import ray | ||
| from .common import NodeIdStr | ||
| from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder | ||
| from ray.data._internal.memory_tracing import trace_deallocation | ||
| from ray.data.block import Block, BlockMetadata, Schema | ||
| from ray.data.block import Block, BlockAccessor, BlockMetadata, Schema | ||
| from ray.data.context import DataContext | ||
| from ray.types import ObjectRef | ||
|
|
||
|
|
||
| @dataclass | ||
| class BlockSlice: | ||
| """A slice of a block.""" | ||
|
|
||
| # Starting row offset (inclusive) within the block. | ||
| start_offset: int | ||
| # Ending row offset (exclusive) within the block. | ||
| end_offset: int | ||
|
|
||
| @property | ||
| def num_rows(self) -> int: | ||
| return self.end_offset - self.start_offset | ||
|
|
||
|
|
||
| @dataclass | ||
| class RefBundle: | ||
| """A group of data block references and their metadata. | ||
|
|
@@ -38,6 +55,10 @@ class RefBundle: | |
| # Whether we own the blocks (can safely destroy them). | ||
| owns_blocks: bool | ||
|
|
||
| # The slices of the blocks in this bundle. This is optional, and may be None | ||
| # if the blocks are not sliced. | ||
| slices: Optional[List[BlockSlice]] = None | ||
|
||
|
|
||
| # This attribute is used by the split() operator to assign bundles to logical | ||
| # output splits. It is otherwise None. | ||
| output_split_idx: Optional[int] = None | ||
|
|
@@ -53,6 +74,10 @@ class RefBundle: | |
| def __post_init__(self): | ||
| if not isinstance(self.blocks, tuple): | ||
| object.__setattr__(self, "blocks", tuple(self.blocks)) | ||
| if self.slices is not None: | ||
| assert len(self.blocks) == len( | ||
| self.slices | ||
| ), "Number of blocks and slices must match" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's also validate the slices have valid ranges. |
||
| for b in self.blocks: | ||
| assert isinstance(b, tuple), b | ||
| assert len(b) == 2, b | ||
|
|
@@ -80,16 +105,29 @@ def metadata(self) -> List[BlockMetadata]: | |
|
|
||
| def num_rows(self) -> Optional[int]: | ||
| """Number of rows present in this bundle, if known.""" | ||
| slice_total: Optional[int] = None | ||
| if self.slices is not None: | ||
| slice_total = sum(block_slice.num_rows for block_slice in self.slices) | ||
| return slice_total | ||
|
|
||
| total = 0 | ||
| for m in self.metadata: | ||
| if m.num_rows is None: | ||
| return None | ||
| else: | ||
| total += m.num_rows | ||
| total += m.num_rows | ||
| return total | ||
|
|
||
| def size_bytes(self) -> int: | ||
| """Size of the blocks of this bundle in bytes.""" | ||
| if self.slices is not None: | ||
| total = 0 | ||
| for (_, metadata), block_slice in zip(self.blocks, self.slices): | ||
| if metadata.num_rows and metadata.num_rows != block_slice.num_rows: | ||
| per_row = metadata.size_bytes / metadata.num_rows | ||
| total += max(1, int(math.ceil(per_row * block_slice.num_rows))) | ||
| else: | ||
| total += metadata.size_bytes | ||
| return total | ||
| return sum(m.size_bytes for m in self.metadata) | ||
|
|
||
| def destroy_if_owned(self) -> int: | ||
|
|
@@ -143,6 +181,108 @@ def _get_cached_metadata(self) -> Dict[ObjectRef, "_ObjectMetadata"]: | |
|
|
||
| return self._cached_object_meta | ||
|
|
||
| def slice(self, needed_rows: int) -> Tuple["RefBundle", "RefBundle"]: | ||
| """Slice a Ref Bundle into the first bundle containing the first `needed_rows` rows and the remaining bundle containing the remaining rows. | ||
|
|
||
| Args: | ||
| needed_rows: Number of rows to take from the head of the bundle. | ||
|
|
||
| Returns: | ||
| A tuple of (sliced_bundle, remaining_bundle). The needed rows must be less than the number of rows in the bundle. | ||
| """ | ||
| assert needed_rows > 0, "needed_rows must be positive." | ||
| assert ( | ||
| self.num_rows() is not None | ||
| ), "Cannot slice a RefBundle with unknown number of rows." | ||
| assert ( | ||
| needed_rows < self.num_rows() | ||
| ), f"To slice a RefBundle, the number of requested rows must be less than the number of rows in the bundle. Requested {needed_rows} rows but bundle only has {self.num_rows()} rows." | ||
|
|
||
| if self.slices is not None: | ||
| block_slices = list(self.slices) | ||
| else: | ||
| block_slices = [] | ||
| for metadata in self.metadata: | ||
| assert ( | ||
| metadata.num_rows is not None | ||
| ), "Cannot derive block slice for a RefBundle with unknown block row counts." | ||
| block_slices.append( | ||
| BlockSlice(start_offset=0, end_offset=metadata.num_rows) | ||
| ) | ||
|
|
||
| consumed_blocks: List[Tuple[ObjectRef[Block], BlockMetadata]] = [] | ||
| consumed_slices: List[BlockSlice] = [] | ||
| remaining_blocks: List[Tuple[ObjectRef[Block], BlockMetadata]] = [] | ||
| remaining_slices: List[BlockSlice] = [] | ||
|
|
||
| rows_to_take = needed_rows | ||
|
|
||
| for (block_ref, metadata), block_slice in zip(self.blocks, block_slices): | ||
| block_rows = block_slice.num_rows | ||
| if rows_to_take >= block_rows: | ||
| consumed_blocks.append( | ||
| (block_ref, _slice_block_metadata(metadata, block_rows)) | ||
| ) | ||
| consumed_slices.append(block_slice) | ||
| rows_to_take -= block_rows | ||
| else: | ||
| if rows_to_take == 0: | ||
| remaining_blocks.append((block_ref, metadata)) | ||
| remaining_slices.append(block_slice) | ||
| continue | ||
| consume_slice = BlockSlice( | ||
| start_offset=block_slice.start_offset, | ||
| end_offset=block_slice.start_offset + rows_to_take, | ||
| ) | ||
| consumed_blocks.append( | ||
| (block_ref, _slice_block_metadata(metadata, rows_to_take)) | ||
| ) | ||
| consumed_slices.append(consume_slice) | ||
|
|
||
| leftover_rows = block_rows - rows_to_take | ||
| if leftover_rows > 0: | ||
| remainder_slice = BlockSlice( | ||
| start_offset=consume_slice.end_offset, | ||
| end_offset=block_slice.end_offset, | ||
| ) | ||
| remaining_blocks.append( | ||
| (block_ref, _slice_block_metadata(metadata, leftover_rows)) | ||
| ) | ||
| remaining_slices.append(remainder_slice) | ||
|
|
||
| rows_to_take = 0 | ||
|
|
||
| sliced_bundle = RefBundle( | ||
| blocks=tuple(consumed_blocks), | ||
| schema=self.schema, | ||
| owns_blocks=False, | ||
| slices=consumed_slices if consumed_slices else None, | ||
| ) | ||
|
|
||
| remaining_bundle = RefBundle( | ||
| blocks=tuple(remaining_blocks), | ||
| schema=self.schema, | ||
| owns_blocks=False, | ||
| slices=remaining_slices if remaining_slices else None, | ||
| ) | ||
|
|
||
| return sliced_bundle, remaining_bundle | ||
|
|
||
| @classmethod | ||
| def merge_ref_bundles(cls, bundles: List["RefBundle"]) -> "RefBundle": | ||
| assert bundles, "Cannot merge an empty list of RefBundles." | ||
| assert all( | ||
| bundle.slices is not None for bundle in bundles | ||
| ), "All bundles must have slices." | ||
| merged_blocks = list(itertools.chain(*[bundle.blocks for bundle in bundles])) | ||
| merged_slices = list(itertools.chain(*[bundle.slices for bundle in bundles])) | ||
| return cls( | ||
| blocks=tuple(merged_blocks), | ||
| schema=bundles[0].schema, | ||
| owns_blocks=False, | ||
| slices=merged_slices, | ||
| ) | ||
|
|
||
| def __eq__(self, other) -> bool: | ||
| return self is other | ||
|
|
||
|
|
@@ -170,3 +310,40 @@ def _ref_bundles_iterator_to_block_refs_list( | |
| return [ | ||
| block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs | ||
| ] | ||
|
|
||
|
|
||
| def _slice_block_metadata( | ||
| metadata: BlockMetadata, num_rows_in_slice: int | ||
| ) -> BlockMetadata: | ||
| assert ( | ||
| num_rows_in_slice > 0 | ||
| ), "num_rows_in_slice must be positive for slicing block metadata." | ||
| size_bytes = metadata.size_bytes | ||
| if metadata.size_bytes is not None and metadata.num_rows: | ||
| per_row = metadata.size_bytes / metadata.num_rows | ||
| size_bytes = max(1, int(math.ceil(per_row * num_rows_in_slice))) | ||
| return BlockMetadata( | ||
| num_rows=num_rows_in_slice if metadata.num_rows is not None else None, | ||
| size_bytes=size_bytes, | ||
| exec_stats=None, | ||
| input_files=list(metadata.input_files), | ||
| ) | ||
|
|
||
|
|
||
| def _iter_sliced_blocks( | ||
| blocks: Iterable[Block], | ||
| slices: List[BlockSlice], | ||
| ) -> Iterator[Block]: | ||
| blocks_list = list(blocks) | ||
| builder = DelegatingBlockBuilder() | ||
| for block, block_slice in zip(blocks_list, slices): | ||
| accessor = BlockAccessor.for_block(block) | ||
| start = block_slice.start_offset | ||
| end = block_slice.end_offset | ||
|
|
||
| if start == 0 and end >= accessor.num_rows(): | ||
owenowenisme marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| builder.add_block(block) | ||
| else: | ||
| builder.add_block(accessor.slice(start, end, copy=False)) | ||
owenowenisme marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| yield builder.build() | ||
Uh oh!
There was an error while loading. Please reload this page.