Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
7e39adb
update
owenowenisme Oct 22, 2025
ad81683
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 22, 2025
02a27c5
rename _schedule_task_input
owenowenisme Oct 22, 2025
1a28369
update
owenowenisme Oct 22, 2025
344c0c7
make a task can output multiple blocks
owenowenisme Oct 23, 2025
8c63cd0
remove new term chunk
owenowenisme Oct 23, 2025
6f47d8a
resolve comment
owenowenisme Oct 23, 2025
6796f01
merge _build_task_from_single_block_full_blocks & _build_single_outpu…
owenowenisme Oct 24, 2025
0c2e9a1
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Oct 25, 2025
ff71cbe
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 31, 2025
05c5e61
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 31, 2025
13d2280
remove _TaskInput
owenowenisme Oct 31, 2025
a60fbe6
make StreamingRepartition default preserve order
owenowenisme Oct 31, 2025
25be194
rename slice_rows num_rows_in_slice
owenowenisme Oct 31, 2025
3c0b02e
unify the interface for block_ref_bundler
owenowenisme Oct 31, 2025
e09ce6d
make enforce_target_num_rows_per_block True
owenowenisme Nov 1, 2025
4612f71
fix
owenowenisme Nov 1, 2025
944f920
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 1, 2025
7fd7c49
rename StreamingRepartitionTaskBuilder
owenowenisme Nov 1, 2025
ba7f5f0
remove set_block_ref_bundler and put the logic into constructor
owenowenisme Nov 1, 2025
37a51a4
keep track on ref bundle fully consumed
owenowenisme Nov 2, 2025
1bdf310
remove input_bundle from task_context
owenowenisme Nov 3, 2025
f1dc0f7
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 3, 2025
c5f3532
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 6, 2025
7577cef
remove num_rows_in_slice property
owenowenisme Nov 6, 2025
4e83486
rename task_kwargs into task_kwargs_for_bundle
owenowenisme Nov 6, 2025
8c2e4cf
make slice explicit in map op
owenowenisme Nov 7, 2025
d51180a
update
owenowenisme Nov 7, 2025
75e3435
update
owenowenisme Nov 7, 2025
6231a20
remove preserve order or streaming repartition
owenowenisme Nov 7, 2025
98493fa
add class description for BaseRefBundler
owenowenisme Nov 7, 2025
3106805
remove streaming_repartition_block_fn
owenowenisme Nov 7, 2025
f0c0bd3
make block slice in ref bundle
owenowenisme Nov 7, 2025
59d419c
update
owenowenisme Nov 7, 2025
130fd44
remove block_index & output_index from BlockSlice
owenowenisme Nov 8, 2025
ea7a3dc
update
owenowenisme Nov 8, 2025
0ab949e
added slice row and bytes calculation in ref bundle with unit test
owenowenisme Nov 8, 2025
96ffe1a
update
owenowenisme Nov 8, 2025
e1bbb0e
add ref bundler func and unit test
owenowenisme Nov 8, 2025
6e17feb
refactor
owenowenisme Nov 8, 2025
6e1ed9f
update to track consumed input ref
owenowenisme Nov 8, 2025
a08fe95
update
owenowenisme Nov 8, 2025
ca02719
refine
owenowenisme Nov 9, 2025
bb8fdfd
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 10, 2025
d60d898
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 10, 2025
e14c832
rename consumed_bundle to sliced bundle
owenowenisme Nov 11, 2025
fd15bff
use len function in sr num_blocks
owenowenisme Nov 11, 2025
75bed79
refactor _try_build_ready_bundle
owenowenisme Nov 12, 2025
98aa170
make slice a method of Refbundle
owenowenisme Nov 12, 2025
e23874e
make merge_ref_bundles classmethod of ref_bundle
owenowenisme Nov 12, 2025
9bf1a09
update
owenowenisme Nov 12, 2025
c22c97f
use None to represent full block
owenowenisme Nov 12, 2025
c9b66e0
add more test for test_slice_ref_bundle_invalid_rows
owenowenisme Nov 12, 2025
dfe11ab
add __str__ method
owenowenisme Nov 12, 2025
82ab74b
check 0 of num_rows
owenowenisme Nov 12, 2025
589b97f
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 13, 2025
5b23b18
fix logic of row_need_from_last_block and add test for ref_bundle met…
owenowenisme Nov 13, 2025
970b551
add bundler testing
owenowenisme Nov 13, 2025
a4390f9
add assertion to rows_needed_from_last_bundle
owenowenisme Nov 13, 2025
042312f
update
owenowenisme Nov 13, 2025
2868f9a
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 13, 2025
2c5cc47
make test streaming repartition bundler unit test
owenowenisme Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .execution_options import ExecutionOptions, ExecutionResources
from .executor import Executor, OutputIterator
from .physical_operator import PhysicalOperator, ReportsExtraResourceUsage
from .ref_bundle import RefBundle
from .ref_bundle import BlockSlice, RefBundle
from .task_context import TaskContext
from .transform_fn import AllToAllTransformFn

Expand All @@ -15,6 +15,7 @@
"OutputIterator",
"PhysicalOperator",
"RefBundle",
"BlockSlice",
"ReportsExtraResourceUsage",
"TaskContext",
]
83 changes: 79 additions & 4 deletions python/ray/data/_internal/execution/interfaces/ref_bundle.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
import math
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Iterator, List, Optional, Tuple
from typing import Dict, Iterable, Iterator, List, Optional, Tuple

import ray
from .common import NodeIdStr
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.memory_tracing import trace_deallocation
from ray.data.block import Block, BlockMetadata, Schema
from ray.data.block import Block, BlockAccessor, BlockMetadata, Schema
from ray.data.context import DataContext
from ray.types import ObjectRef


@dataclass
class BlockSlice:
"""A slice of a block."""

# Starting row offset (inclusive) within the block.
start_offset: int
# Ending row offset (exclusive) within the block.
end_offset: int

@property
def num_rows(self) -> int:
return self.end_offset - self.start_offset


@dataclass
class RefBundle:
"""A group of data block references and their metadata.
Expand Down Expand Up @@ -38,6 +54,10 @@ class RefBundle:
# Whether we own the blocks (can safely destroy them).
owns_blocks: bool

# The slices of the blocks in this bundle. This is optional, and may be None
# if the blocks are not sliced.
slices: Optional[List[BlockSlice]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be List[Optional[BlockSlice]] so that we can only slice certain blocks in the bundle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Optional[List[Optional[BlockSlice]]] = None and normalized this in __post_init__


# This attribute is used by the split() operator to assign bundles to logical
# output splits. It is otherwise None.
output_split_idx: Optional[int] = None
Expand All @@ -53,6 +73,10 @@ class RefBundle:
def __post_init__(self):
if not isinstance(self.blocks, tuple):
object.__setattr__(self, "blocks", tuple(self.blocks))
if self.slices is not None:
assert len(self.blocks) == len(
self.slices
), "Number of blocks and slices must match"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also validate the slices have valid ranges.

for b in self.blocks:
assert isinstance(b, tuple), b
assert len(b) == 2, b
Expand Down Expand Up @@ -80,16 +104,29 @@ def metadata(self) -> List[BlockMetadata]:

def num_rows(self) -> Optional[int]:
"""Number of rows present in this bundle, if known."""
slice_total: Optional[int] = None
if self.slices is not None:
slice_total = sum(block_slice.num_rows for block_slice in self.slices)
return slice_total

total = 0
for m in self.metadata:
if m.num_rows is None:
return None
else:
total += m.num_rows
total += m.num_rows
return total

def size_bytes(self) -> int:
"""Size of the blocks of this bundle in bytes."""
if self.slices is not None:
total = 0
for (_, metadata), block_slice in zip(self.blocks, self.slices):
if metadata.num_rows and metadata.num_rows != block_slice.num_rows:
per_row = metadata.size_bytes / metadata.num_rows
total += max(1, int(math.ceil(per_row * block_slice.num_rows)))
else:
total += metadata.size_bytes
return total
return sum(m.size_bytes for m in self.metadata)

def destroy_if_owned(self) -> int:
Expand Down Expand Up @@ -170,3 +207,41 @@ def _ref_bundles_iterator_to_block_refs_list(
return [
block_ref for ref_bundle in ref_bundles for block_ref in ref_bundle.block_refs
]


def _slice_block_metadata(
metadata: BlockMetadata, num_rows_in_slice: int
) -> BlockMetadata:
assert (
num_rows_in_slice > 0
), "num_rows_in_slice must be positive for slicing block metadata."
size_bytes = metadata.size_bytes
if metadata.size_bytes is not None and metadata.num_rows:
per_row = metadata.size_bytes / metadata.num_rows
size_bytes = max(1, int(math.ceil(per_row * num_rows_in_slice)))
return BlockMetadata(
num_rows=num_rows_in_slice if metadata.num_rows is not None else None,
size_bytes=size_bytes,
exec_stats=None,
input_files=list(metadata.input_files),
)


def _iter_sliced_blocks(
blocks: Iterable[Block],
slices: List[BlockSlice],
) -> Iterator[Block]:
blocks_list = list(blocks)
builder = DelegatingBlockBuilder()
for block, block_slice in zip(blocks_list, slices):
accessor = BlockAccessor.for_block(block)
start = block_slice.start_offset
end = block_slice.end_offset

if start == 0 and end >= accessor.num_rows():
builder.add_block(block)
else:
builder.add_block(accessor.slice(start, end, copy=False))

if builder is not None:
yield builder.build()
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.bundle_queue.bundle_queue import BundleQueue
from ray.data._internal.execution.interfaces import (
BlockSlice,
ExecutionOptions,
ExecutionResources,
NodeIdStr,
Expand All @@ -32,7 +33,11 @@
ActorLocationTracker,
get_or_create_actor_location_tracker,
)
from ray.data._internal.execution.operators.map_operator import MapOperator, _map_task
from ray.data._internal.execution.operators.map_operator import (
BaseRefBundler,
MapOperator,
_map_task,
)
from ray.data._internal.execution.operators.map_transformer import MapTransformer
from ray.data._internal.execution.util import locality_string
from ray.data._internal.remote_fn import _add_system_error_to_retry_exceptions
Expand Down Expand Up @@ -71,6 +76,7 @@ def __init__(
compute_strategy: ActorPoolStrategy,
name: str = "ActorPoolMap",
min_rows_per_bundle: Optional[int] = None,
ref_bundler: Optional[BaseRefBundler] = None,
supports_fusion: bool = True,
map_task_kwargs: Optional[Dict[str, Any]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
Expand All @@ -91,6 +97,7 @@ def __init__(
transform_fn, or None to use the block size. Setting the batch size is
important for the performance of GPU-accelerated transform functions.
The actual rows passed may be less if the dataset is small.
ref_bundler: The ref bundler to use for this operator.
supports_fusion: Whether this operator supports fusion with other operators.
map_task_kwargs: A dictionary of kwargs to pass to the map task. You can
access these kwargs through the `TaskContext.kwargs` dictionary.
Expand All @@ -113,6 +120,7 @@ def __init__(
name,
target_max_block_size_override,
min_rows_per_bundle,
ref_bundler,
supports_fusion,
map_task_kwargs,
ray_remote_args_fn,
Expand Down Expand Up @@ -296,13 +304,15 @@ def _task_done_callback(res_ref):
)
return actor, res_ref

def _add_bundled_input(self, bundle: RefBundle):
def _add_bundled_input(
self, bundle: RefBundle, slices: Optional[List[BlockSlice]] = None
):
self._bundle_queue.add(bundle)
self._metrics.on_input_queued(bundle)
# Try to dispatch all bundles in the queue, including this new bundle.
self._dispatch_tasks()
self._dispatch_tasks(slices)

def _dispatch_tasks(self):
def _dispatch_tasks(self, slices: Optional[List[BlockSlice]] = None):
"""Try to dispatch tasks from the bundle buffer to the actor pool.

This is called when:
Expand Down Expand Up @@ -330,6 +340,7 @@ def _dispatch_tasks(self):
self.data_context,
ctx,
*input_blocks,
slices=bundle.slices,
**self.get_map_task_kwargs(),
)

Expand Down Expand Up @@ -571,13 +582,15 @@ def submit(
data_context: DataContext,
ctx: TaskContext,
*blocks: Block,
slices: Optional[List[BlockSlice]] = None,
**kwargs: Dict[str, Any],
) -> Iterator[Union[Block, List[BlockMetadata]]]:
yield from _map_task(
self._map_transformer,
data_context,
ctx,
*blocks,
slices=slices,
**kwargs,
)

Expand Down
Loading