Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
7e39adb
update
owenowenisme Oct 22, 2025
ad81683
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 22, 2025
02a27c5
rename _schedule_task_input
owenowenisme Oct 22, 2025
1a28369
update
owenowenisme Oct 22, 2025
344c0c7
make a task can output multiple blocks
owenowenisme Oct 23, 2025
8c63cd0
remove new term chunk
owenowenisme Oct 23, 2025
6f47d8a
resolve comment
owenowenisme Oct 23, 2025
6796f01
merge _build_task_from_single_block_full_blocks & _build_single_outpu…
owenowenisme Oct 24, 2025
0c2e9a1
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Oct 25, 2025
ff71cbe
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 31, 2025
05c5e61
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Oct 31, 2025
13d2280
remove _TaskInput
owenowenisme Oct 31, 2025
a60fbe6
make StreamingRepartition default preserve order
owenowenisme Oct 31, 2025
25be194
rename slice_rows num_rows_in_slice
owenowenisme Oct 31, 2025
3c0b02e
unify the interface for block_ref_bundler
owenowenisme Oct 31, 2025
e09ce6d
make enforce_target_num_rows_per_block True
owenowenisme Nov 1, 2025
4612f71
fix
owenowenisme Nov 1, 2025
944f920
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 1, 2025
7fd7c49
rename StreamingRepartitionTaskBuilder
owenowenisme Nov 1, 2025
ba7f5f0
remove set_block_ref_bundler and put the logic into constructor
owenowenisme Nov 1, 2025
37a51a4
keep track on ref bundle fully consumed
owenowenisme Nov 2, 2025
1bdf310
remove input_bundle from task_context
owenowenisme Nov 3, 2025
f1dc0f7
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 3, 2025
c5f3532
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 6, 2025
7577cef
remove num_rows_in_slice property
owenowenisme Nov 6, 2025
4e83486
rename task_kwargs into task_kwargs_for_bundle
owenowenisme Nov 6, 2025
8c2e4cf
make slice explicit in map op
owenowenisme Nov 7, 2025
d51180a
update
owenowenisme Nov 7, 2025
75e3435
update
owenowenisme Nov 7, 2025
6231a20
remove preserve order or streaming repartition
owenowenisme Nov 7, 2025
98493fa
add class description for BaseRefBundler
owenowenisme Nov 7, 2025
3106805
remove streaming_repartition_block_fn
owenowenisme Nov 7, 2025
f0c0bd3
make block slice in ref bundle
owenowenisme Nov 7, 2025
59d419c
update
owenowenisme Nov 7, 2025
130fd44
remove block_index & output_index from BlockSlice
owenowenisme Nov 8, 2025
ea7a3dc
update
owenowenisme Nov 8, 2025
0ab949e
added slice row and bytes calculation in ref bundle with unit test
owenowenisme Nov 8, 2025
96ffe1a
update
owenowenisme Nov 8, 2025
e1bbb0e
add ref bundler func and unit test
owenowenisme Nov 8, 2025
6e17feb
refactor
owenowenisme Nov 8, 2025
6e1ed9f
update to track consumed input ref
owenowenisme Nov 8, 2025
a08fe95
update
owenowenisme Nov 8, 2025
ca02719
refine
owenowenisme Nov 9, 2025
bb8fdfd
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 10, 2025
d60d898
Merge remote-tracking branch 'upstream/master' into data/use-map-op-f…
owenowenisme Nov 10, 2025
e14c832
rename consumed_bundle to sliced bundle
owenowenisme Nov 11, 2025
fd15bff
use len function in sr num_blocks
owenowenisme Nov 11, 2025
75bed79
refactor _try_build_ready_bundle
owenowenisme Nov 12, 2025
98aa170
make slice a method of Refbundle
owenowenisme Nov 12, 2025
e23874e
make merge_ref_bundles classmethod of ref_bundle
owenowenisme Nov 12, 2025
9bf1a09
update
owenowenisme Nov 12, 2025
c22c97f
use None to represent full block
owenowenisme Nov 12, 2025
c9b66e0
add more test for test_slice_ref_bundle_invalid_rows
owenowenisme Nov 12, 2025
dfe11ab
add __str__ method
owenowenisme Nov 12, 2025
82ab74b
check 0 of num_rows
owenowenisme Nov 12, 2025
589b97f
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 13, 2025
5b23b18
fix logic of row_need_from_last_block and add test for ref_bundle met…
owenowenisme Nov 13, 2025
970b551
add bundler testing
owenowenisme Nov 13, 2025
a4390f9
add assertion to rows_needed_from_last_bundle
owenowenisme Nov 13, 2025
042312f
update
owenowenisme Nov 13, 2025
2868f9a
Merge branch 'master' into data/use-map-op-for-streaming-repartition
owenowenisme Nov 13, 2025
2c5cc47
make test streaming repartition bundler unit test
owenowenisme Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from abc import ABC, abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -65,6 +66,12 @@
logger = logging.getLogger(__name__)


@dataclass
class _TaskInput:
bundle: RefBundle
task_kwargs: Optional[Dict[str, Any]] = None


class MapOperator(OneToOneOperator, InternalQueueOperatorMixin, ABC):
"""A streaming operator that maps input bundles 1:1 to output bundles.

Expand Down Expand Up @@ -117,6 +124,9 @@ def __init__(
# All active `MetadataOpTask`s.
self._metadata_tasks: Dict[int, MetadataOpTask] = {}
self._next_metadata_task_idx = 0
# Optional helper that can turn incoming ref bundles into ready-to-run
# task inputs (bundle + per-task kwargs).
self._task_input_builder = None
# Keep track of all finished streaming generators.
super().__init__(name, input_op, data_context, target_max_block_size_override)

Expand Down Expand Up @@ -153,6 +163,9 @@ def get_additional_split_factor(self) -> int:
def set_additional_split_factor(self, k: int):
self._additional_split_factor = k

def set_task_input_builder(self, builder: Optional[Any]) -> None:
self._task_input_builder = builder

def internal_queue_size(self) -> int:
return self._block_ref_bundler.num_bundles()

Expand Down Expand Up @@ -328,6 +341,14 @@ def _warn_large_udf(self):
def _add_input_inner(self, refs: RefBundle, input_index: int):
assert input_index == 0, input_index

if self._task_input_builder is not None:
self._metrics.on_input_queued(refs)
task_inputs: List[_TaskInput] = self._task_input_builder.add_input(refs)
self._metrics.on_input_dequeued(refs)
for task_input in task_inputs:
self._submit_task_input(task_input)
return

# Add RefBundle to the bundler.
self._block_ref_bundler.add_bundle(refs)
self._metrics.on_input_queued(refs)
Expand Down Expand Up @@ -380,8 +401,14 @@ def _get_dynamic_ray_remote_args(
return self._ray_remote_args_factory_actor_locality(ray_remote_args)
return ray_remote_args

def _submit_task_input(self, task_input: _TaskInput) -> None:
"""Submit a ready-to-run task input produced by a task input builder."""
self._add_bundled_input(task_input.bundle, task_input.task_kwargs)

@abstractmethod
def _add_bundled_input(self, refs: RefBundle):
def _add_bundled_input(
self, refs: RefBundle, task_kwargs: Optional[Dict[str, Any]] = None
):
"""Add a pre-bundled upstream output to this operator.

Unlike the add_input() arg, this RefBundle has already been further bundled by
Expand All @@ -392,6 +419,8 @@ def _add_bundled_input(self, refs: RefBundle):

Args:
refs: The fully-bundled ref bundle that should be added as input.
task_kwargs: A dictionary of kwargs to pass to the map task. You can
access these kwargs through the `TaskContext.kwargs` dictionary.
"""
raise NotImplementedError

Expand Down Expand Up @@ -469,6 +498,12 @@ def get_active_tasks(self) -> List[OpTask]:
return list(self._metadata_tasks.values()) + list(self._data_tasks.values())

def all_inputs_done(self):
if self._task_input_builder is not None:
for task_input in self._task_input_builder.finish():
self._submit_task_input(task_input)
super().all_inputs_done()
return

self._block_ref_bundler.done_adding_bundles()
if self._block_ref_bundler.has_bundle():
# Handle any leftover bundles in the bundler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def __init__(

self._map_task = cached_remote_fn(_map_task, **ray_remote_static_args)

def _add_bundled_input(self, bundle: RefBundle):
def _add_bundled_input(
self, bundle: RefBundle, task_kwargs: Optional[Dict[str, Any]] = None
):
# Submit the task as a normal Ray task.
ctx = TaskContext(
task_idx=self._next_data_task_idx,
Expand All @@ -102,13 +104,16 @@ def _add_bundled_input(self, bundle: RefBundle):
)

data_context = self.data_context
per_task_kwargs = self.get_map_task_kwargs().copy()
if task_kwargs:
per_task_kwargs.update(task_kwargs)

gen = self._map_task.options(**dynamic_ray_remote_args).remote(
self._map_transformer_ref,
data_context,
ctx,
*bundle.block_refs,
**self.get_map_task_kwargs(),
**per_task_kwargs,
)
self._submit_data_task(gen, bundle)

Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,19 +344,26 @@ class StreamingRepartition(AbstractMap):
Args:
target_num_rows_per_block: The target number of rows per block granularity for
streaming repartition.
enforce_target_num_rows_per_block: Whether to enforce the target number of rows per block. Default to False.
"""

def __init__(
self,
input_op: LogicalOperator,
target_num_rows_per_block: int,
enforce_target_num_rows_per_block: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
enforce_target_num_rows_per_block: bool = False,
strict_target_num_rows_per_block: bool = False,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed strict_target_num_rows_per_block since we always enable exact size blcok

):
super().__init__("StreamingRepartition", input_op)
self._target_num_rows_per_block = target_num_rows_per_block
self._enforce_target_num_rows_per_block = enforce_target_num_rows_per_block

@property
def target_num_rows_per_block(self) -> int:
return self._target_num_rows_per_block

@property
def enforce_target_num_rows_per_block(self) -> bool:
return self._enforce_target_num_rows_per_block

def can_modify_num_rows(self) -> bool:
return False
39 changes: 28 additions & 11 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
)
from ray.data._internal.numpy_support import _is_valid_column_values
from ray.data._internal.output_buffer import OutputBlockSizeOption
from ray.data._internal.streaming_repartition import (
StreamingRepartitionTaskBuilder,
streaming_repartition_block_fn,
)
from ray.data._internal.util import _truncated_repr
from ray.data.block import (
Block,
Expand Down Expand Up @@ -151,19 +155,25 @@ def plan_streaming_repartition_op(
input_physical_dag = physical_children[0]
compute = get_compute(op._compute)

# Create a no-op transform that is just coalescing/slicing the incoming
# blocks
transform_fn = BlockMapTransformFn(
lambda blocks, ctx: blocks,
output_block_size_option=OutputBlockSizeOption.of(
target_num_rows_per_block=op.target_num_rows_per_block
),
)

map_transformer = MapTransformer([transform_fn])
if op.enforce_target_num_rows_per_block:
transform_fn = BlockMapTransformFn(
streaming_repartition_block_fn,
disable_block_shaping=True,
)
map_transformer = MapTransformer([transform_fn])
else:
# Create a no-op transform that is just coalescing/slicing the incoming
# blocks
transform_fn = BlockMapTransformFn(
lambda blocks, ctx: blocks,
output_block_size_option=OutputBlockSizeOption.of(
target_num_rows_per_block=op.target_num_rows_per_block
),
)
map_transformer = MapTransformer([transform_fn])

# Disable fusion for streaming repartition with the downstream op.
return MapOperator.create(
operator = MapOperator.create(
map_transformer,
input_physical_dag,
data_context,
Expand All @@ -174,6 +184,13 @@ def plan_streaming_repartition_op(
supports_fusion=False,
)

if op.enforce_target_num_rows_per_block:
operator.set_task_input_builder(
StreamingRepartitionTaskBuilder(op.target_num_rows_per_block)
)

return operator


def plan_filter_op(
op: Filter,
Expand Down
Loading