Skip to content
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
7e39adb
update
owenowenisme Oct 22, 2025
ad81683
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 22, 2025
02a27c5
rename _schedule_task_input
owenowenisme Oct 22, 2025
1a28369
update
owenowenisme Oct 22, 2025
344c0c7
make a task can output multiple blocks
owenowenisme Oct 23, 2025
8c63cd0
remove new term chunk
owenowenisme Oct 23, 2025
6f47d8a
resolve comment
owenowenisme Oct 23, 2025
6796f01
merge _build_task_from_single_block_full_blocks & _build_single_outpu…
owenowenisme Oct 24, 2025
0c2e9a1
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Oct 25, 2025
ff71cbe
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 31, 2025
05c5e61
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 31, 2025
13d2280
remove _TaskInput
owenowenisme Oct 31, 2025
a60fbe6
make StreamingRepartition default preserve order
owenowenisme Oct 31, 2025
25be194
rename slice_rows num_rows_in_slice
owenowenisme Oct 31, 2025
3c0b02e
unify the interface for block_ref_bundler
owenowenisme Oct 31, 2025
e09ce6d
make enforce_target_num_rows_per_block True
owenowenisme Nov 1, 2025
4612f71
fix
owenowenisme Nov 1, 2025
944f920
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 1, 2025
7fd7c49
rename StreamingRepartitionTaskBuilder
owenowenisme Nov 1, 2025
ba7f5f0
remove set_block_ref_bundler and put the logic into constructor
owenowenisme Nov 1, 2025
37a51a4
keep track on ref bundle fully consumed
owenowenisme Nov 2, 2025
1bdf310
remove input_bundle from task_context
owenowenisme Nov 3, 2025
f1dc0f7
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 3, 2025
c5f3532
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 6, 2025
7577cef
remove num_rows_in_slice property
owenowenisme Nov 6, 2025
4e83486
rename task_kwargs into task_kwargs_for_bundle
owenowenisme Nov 6, 2025
8c2e4cf
make slice explicit in map op
owenowenisme Nov 7, 2025
d51180a
update
owenowenisme Nov 7, 2025
75e3435
update
owenowenisme Nov 7, 2025
6231a20
remove preserve order or streaming repartition
owenowenisme Nov 7, 2025
98493fa
add class description for BaseRefBundler
owenowenisme Nov 7, 2025
3106805
remove streaming_repartition_block_fn
owenowenisme Nov 7, 2025
f0c0bd3
make block slice in ref bundle
owenowenisme Nov 7, 2025
59d419c
update
owenowenisme Nov 7, 2025
130fd44
remove block_index & output_index from BlockSlice
owenowenisme Nov 8, 2025
ea7a3dc
update
owenowenisme Nov 8, 2025
0ab949e
added slice row and bytes calculation in ref bundle with unit test
owenowenisme Nov 8, 2025
96ffe1a
update
owenowenisme Nov 8, 2025
e1bbb0e
add ref bundler func and unit test
owenowenisme Nov 8, 2025
6e17feb
refactor
owenowenisme Nov 8, 2025
6e1ed9f
update to track consumed input ref
owenowenisme Nov 8, 2025
a08fe95
update
owenowenisme Nov 8, 2025
ca02719
refine
owenowenisme Nov 9, 2025
bb8fdfd
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 10, 2025
d60d898
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 10, 2025
e14c832
rename consumed_bundle to sliced bundle
owenowenisme Nov 11, 2025
fd15bff
use len function in sr num_blocks
owenowenisme Nov 11, 2025
75bed79
refactor _try_build_ready_bundle
owenowenisme Nov 12, 2025
98aa170
make slice a method of Refbundle
owenowenisme Nov 12, 2025
e23874e
make merge_ref_bundles classmethod of ref_bundle
owenowenisme Nov 12, 2025
9bf1a09
update
owenowenisme Nov 12, 2025
c22c97f
use None to represent full block
owenowenisme Nov 12, 2025
c9b66e0
add more test for test_slice_ref_bundle_invalid_rows
owenowenisme Nov 12, 2025
dfe11ab
add __str__ method
owenowenisme Nov 12, 2025
82ab74b
check 0 of num_rows
owenowenisme Nov 12, 2025
589b97f
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 13, 2025
5b23b18
fix logic of row_need_from_last_block and add test for ref_bundle met…
owenowenisme Nov 13, 2025
970b551
add bundler testing
owenowenisme Nov 13, 2025
a4390f9
add assertion to rows_needed_from_last_bundle
owenowenisme Nov 13, 2025
042312f
update
owenowenisme Nov 13, 2025
2868f9a
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 13, 2025
2c5cc47
make test streaming repartition bundler unit test
owenowenisme Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .execution_options import ExecutionOptions, ExecutionResources
from .executor import Executor, OutputIterator
from .physical_operator import PhysicalOperator, ReportsExtraResourceUsage
from .ref_bundle import RefBundle
from .ref_bundle import BlockSlice, RefBundle
from .task_context import TaskContext
from .transform_fn import AllToAllTransformFn

Expand All @@ -15,6 +15,7 @@
"OutputIterator",
"PhysicalOperator",
"RefBundle",
"BlockSlice",
"ReportsExtraResourceUsage",
"TaskContext",
]
234 changes: 227 additions & 7 deletions python/ray/data/_internal/execution/interfaces/ref_bundle.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
import itertools
import math
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Iterator, List, Optional, Tuple
from typing import Dict, Iterable, Iterator, List, Optional, Tuple

import ray
from .common import NodeIdStr
from ray.data._internal.memory_tracing import trace_deallocation
from ray.data.block import Block, BlockMetadata, Schema
from ray.data.block import Block, BlockAccessor, BlockMetadata, Schema
from ray.data.context import DataContext
from ray.types import ObjectRef


@dataclass
class BlockSlice:
"""A slice of a block."""

# Starting row offset (inclusive) within the block.
start_offset: int
# Ending row offset (exclusive) within the block.
end_offset: int

@property
def num_rows(self) -> int:
return self.end_offset - self.start_offset


@dataclass
class RefBundle:
"""A group of data block references and their metadata.
Expand Down Expand Up @@ -38,6 +54,12 @@ class RefBundle:
# Whether we own the blocks (can safely destroy them).
owns_blocks: bool

# The slices of the blocks in this bundle. After __post_init__, this is always
# a list with length equal to len(blocks). Individual entries can be None to
# represent a full block (equivalent to BlockSlice(0, num_rows)).
# Pass None during construction to initialize all slices as None (full blocks).
slices: Optional[List[Optional[BlockSlice]]] = None

# This attribute is used by the split() operator to assign bundles to logical
# output splits. It is otherwise None.
output_split_idx: Optional[int] = None
Expand All @@ -53,6 +75,14 @@ class RefBundle:
def __post_init__(self):
if not isinstance(self.blocks, tuple):
object.__setattr__(self, "blocks", tuple(self.blocks))

if self.slices is None:
object.__setattr__(self, "slices", [None] * len(self.blocks))
else:
assert len(self.blocks) == len(
self.slices
), "Number of blocks and slices must match"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also validate the slices have valid ranges.


for b in self.blocks:
assert isinstance(b, tuple), b
assert len(b) == 2, b
Expand Down Expand Up @@ -81,16 +111,32 @@ def metadata(self) -> List[BlockMetadata]:
def num_rows(self) -> Optional[int]:
"""Number of rows present in this bundle, if known."""
total = 0
for m in self.metadata:
if m.num_rows is None:
return None
for metadata, block_slice in zip(self.metadata, self.slices):
if block_slice is None:
if metadata.num_rows is None:
return None
total += metadata.num_rows
else:
total += m.num_rows
total += block_slice.num_rows
return total

def size_bytes(self) -> int:
"""Size of the blocks of this bundle in bytes."""
return sum(m.size_bytes for m in self.metadata)
total = 0
for (_, metadata), block_slice in zip(self.blocks, self.slices):
if block_slice is None:
# Full block
total += metadata.size_bytes
elif metadata.num_rows is None or metadata.num_rows == 0:
# Unknown num_rows or empty block - use full metadata size
total += metadata.size_bytes
elif metadata.num_rows != block_slice.num_rows:
# Partial block - estimate size based on rows
per_row = metadata.size_bytes / metadata.num_rows
total += max(1, int(math.ceil(per_row * block_slice.num_rows)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we are double slicing the metadata? one here and one in _slice_block_metadata.

I think let's remove _slice_block_metadata and document that when slices are present, metadata is still the original metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually _slice_block_metadata is wrong. because then you cannot slice an already-sliced block.
Let's fix it and add a unit test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit test and remove _slice_block_metadata

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect Size for Empty Data

When calculating size_bytes() for a slice with zero rows, the code uses max(1, int(math.ceil(per_row * block_slice.num_rows))) which returns 1 byte even when block_slice.num_rows is 0. An empty slice (0 rows) should contribute 0 bytes to the total size, not 1 byte. The max(1, ...) guard appears intended to prevent zero-byte estimates for non-empty slices but incorrectly applies to empty slices as well.

Fix in Cursor Fix in Web

else:
total += metadata.size_bytes
return total

def destroy_if_owned(self) -> int:
"""Clears the object store memory for these blocks if owned.
Expand Down Expand Up @@ -143,6 +189,108 @@ def _get_cached_metadata(self) -> Dict[ObjectRef, "_ObjectMetadata"]:

return self._cached_object_meta

def slice(self, needed_rows: int) -> Tuple["RefBundle", "RefBundle"]:
"""Slice a Ref Bundle into the first bundle containing the first `needed_rows` rows and the remaining bundle containing the remaining rows.

Args:
needed_rows: Number of rows to take from the head of the bundle.

Returns:
A tuple of (sliced_bundle, remaining_bundle). The needed rows must be less than the number of rows in the bundle.
"""
assert needed_rows > 0, "needed_rows must be positive."
assert (
self.num_rows() is not None
), "Cannot slice a RefBundle with unknown number of rows."
assert (
needed_rows < self.num_rows()
), f"To slice a RefBundle, the number of requested rows must be less than the number of rows in the bundle. Requested {needed_rows} rows but bundle only has {self.num_rows()} rows."

block_slices = []
for metadata, block_slice in zip(self.metadata, self.slices):
if block_slice is None:
# None represents a full block, convert to explicit BlockSlice
assert (
metadata.num_rows is not None
), "Cannot derive block slice for a RefBundle with unknown block row counts."
block_slices.append(
BlockSlice(start_offset=0, end_offset=metadata.num_rows)
)
else:
block_slices.append(block_slice)

consumed_blocks: List[Tuple[ObjectRef[Block], BlockMetadata]] = []
consumed_slices: List[BlockSlice] = []
remaining_blocks: List[Tuple[ObjectRef[Block], BlockMetadata]] = []
remaining_slices: List[BlockSlice] = []

rows_to_take = needed_rows

for (block_ref, metadata), block_slice in zip(self.blocks, block_slices):
block_rows = block_slice.num_rows
if rows_to_take >= block_rows:
consumed_blocks.append(
(block_ref, _slice_block_metadata(metadata, block_rows))
)
consumed_slices.append(block_slice)
rows_to_take -= block_rows
else:
if rows_to_take == 0:
remaining_blocks.append((block_ref, metadata))
remaining_slices.append(block_slice)
continue
consume_slice = BlockSlice(
start_offset=block_slice.start_offset,
end_offset=block_slice.start_offset + rows_to_take,
)
consumed_blocks.append(
(block_ref, _slice_block_metadata(metadata, rows_to_take))
)
consumed_slices.append(consume_slice)

leftover_rows = block_rows - rows_to_take
if leftover_rows > 0:
remainder_slice = BlockSlice(
start_offset=consume_slice.end_offset,
end_offset=block_slice.end_offset,
)
remaining_blocks.append(
(block_ref, _slice_block_metadata(metadata, leftover_rows))
)
remaining_slices.append(remainder_slice)

rows_to_take = 0

sliced_bundle = RefBundle(
blocks=tuple(consumed_blocks),
schema=self.schema,
owns_blocks=False,
slices=consumed_slices if consumed_slices else None,
)

remaining_bundle = RefBundle(
blocks=tuple(remaining_blocks),
schema=self.schema,
owns_blocks=False,
slices=remaining_slices if remaining_slices else None,
)

return sliced_bundle, remaining_bundle

@classmethod
def merge_ref_bundles(cls, bundles: List["RefBundle"]) -> "RefBundle":
assert bundles, "Cannot merge an empty list of RefBundles."
merged_blocks = list(itertools.chain(*[bundle.blocks for bundle in bundles]))
merged_slices = list(itertools.chain(*[bundle.slices for bundle in bundles]))
return cls(
blocks=tuple(merged_blocks),
schema=bundles[0].schema, # Assume all bundles have the same schema
owns_blocks=bundles[
0
].owns_blocks, # Assume all bundles have the same ownership
slices=merged_slices,
)

def __eq__(self, other) -> bool:
return self is other

Expand All @@ -152,6 +300,38 @@ def __hash__(self) -> int:
def __len__(self) -> int:
return len(self.blocks)

def __str__(self) -> str:
lines = [
f"RefBundle({len(self.blocks)} blocks,",
f" {self.num_rows()} rows,",
f" schema={self.schema},",
f" owns_blocks={self.owns_blocks},",
" blocks=(",
]

# Loop through each block and show details
for i, ((block_ref, metadata), block_slice) in enumerate(
zip(self.blocks, self.slices)
):
row_str = (
f"{metadata.num_rows} rows"
if metadata.num_rows is not None
else "unknown rows"
)
bytes_str = f"{metadata.size_bytes} bytes"
slice_str = (
f"slice={block_slice}"
if block_slice is not None
else "slice=None (full block)"
)

lines.append(f" {i}: {row_str}, {bytes_str}, {slice_str}")

lines.append(" )")
lines.append(")")

return "\n".join(lines)


@dataclass
class _ObjectMetadata:
Expand All @@ -170,3 +350,43 @@ def _ref_bundles_iterator_to_block_refs_list(
return [
block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs
]


def _slice_block_metadata(
metadata: BlockMetadata, num_rows_in_slice: int
) -> BlockMetadata:
assert (
num_rows_in_slice > 0
), "num_rows_in_slice must be positive for slicing block metadata."
size_bytes = metadata.size_bytes
if metadata.size_bytes is not None and metadata.num_rows:
per_row = metadata.size_bytes / metadata.num_rows
size_bytes = max(1, int(math.ceil(per_row * num_rows_in_slice)))
return BlockMetadata(
num_rows=num_rows_in_slice if metadata.num_rows is not None else None,
size_bytes=size_bytes,
exec_stats=None,
input_files=list(metadata.input_files),
)


def _iter_sliced_blocks(
blocks: Iterable[Block],
slices: List[Optional[BlockSlice]],
) -> Iterator[Block]:
blocks_list = list(blocks)
for block, block_slice in zip(blocks_list, slices):
if block_slice is None:
# None represents a full block - yield it as is
yield block
else:
accessor = BlockAccessor.for_block(block)
start = block_slice.start_offset
end = block_slice.end_offset
assert start <= end, "start must be less than end"
assert start >= 0, "start must be non-negative"
assert (
end <= accessor.num_rows()
), "end must be less than or equal to the number of rows in the block"

yield accessor.slice(start, end, copy=False)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.bundle_queue.bundle_queue import BundleQueue
from ray.data._internal.execution.interfaces import (
BlockSlice,
ExecutionOptions,
ExecutionResources,
NodeIdStr,
Expand All @@ -32,7 +33,11 @@
ActorLocationTracker,
get_or_create_actor_location_tracker,
)
from ray.data._internal.execution.operators.map_operator import MapOperator, _map_task
from ray.data._internal.execution.operators.map_operator import (
BaseRefBundler,
MapOperator,
_map_task,
)
from ray.data._internal.execution.operators.map_transformer import MapTransformer
from ray.data._internal.execution.util import locality_string
from ray.data._internal.remote_fn import _add_system_error_to_retry_exceptions
Expand Down Expand Up @@ -71,6 +76,7 @@ def __init__(
compute_strategy: ActorPoolStrategy,
name: str = "ActorPoolMap",
min_rows_per_bundle: Optional[int] = None,
ref_bundler: Optional[BaseRefBundler] = None,
supports_fusion: bool = True,
map_task_kwargs: Optional[Dict[str, Any]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
Expand All @@ -91,6 +97,7 @@ def __init__(
transform_fn, or None to use the block size. Setting the batch size is
important for the performance of GPU-accelerated transform functions.
The actual rows passed may be less if the dataset is small.
ref_bundler: The ref bundler to use for this operator.
supports_fusion: Whether this operator supports fusion with other operators.
map_task_kwargs: A dictionary of kwargs to pass to the map task. You can
access these kwargs through the `TaskContext.kwargs` dictionary.
Expand All @@ -113,6 +120,7 @@ def __init__(
name,
target_max_block_size_override,
min_rows_per_bundle,
ref_bundler,
supports_fusion,
map_task_kwargs,
ray_remote_args_fn,
Expand Down Expand Up @@ -330,6 +338,7 @@ def _dispatch_tasks(self):
self.data_context,
ctx,
*input_blocks,
slices=bundle.slices,
**self.get_map_task_kwargs(),
)

Expand Down Expand Up @@ -571,13 +580,15 @@ def submit(
data_context: DataContext,
ctx: TaskContext,
*blocks: Block,
slices: Optional[List[BlockSlice]] = None,
**kwargs: Dict[str, Any],
) -> Iterator[Union[Block, List[BlockMetadata]]]:
yield from _map_task(
self._map_transformer,
data_context,
ctx,
*blocks,
slices=slices,
**kwargs,
)

Expand Down
Loading