Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6e7de8c
basic impl
raulchen Oct 13, 2023
7881f51
debug
raulchen Oct 13, 2023
b192a78
debug
raulchen Oct 16, 2023
c5a8453
refine
raulchen Oct 16, 2023
90afcfe
fix
raulchen Oct 16, 2023
18df710
fix on_waitable_ready
raulchen Oct 16, 2023
b746753
revert
raulchen Oct 16, 2023
c08d041
refine
raulchen Oct 17, 2023
75b6c23
comments
raulchen Oct 17, 2023
1fc504f
Merge branch 'master' into streaming-gen-backpressure
raulchen Oct 17, 2023
0ea025a
Merge branch 'master' into streaming-gen-backpressure
raulchen Oct 17, 2023
65d449d
comments
raulchen Oct 17, 2023
7a7b423
comment
raulchen Oct 17, 2023
87e719c
comment
raulchen Oct 17, 2023
dff7416
remove blank line
raulchen Oct 17, 2023
b946ccb
Sang's PR
raulchen Oct 17, 2023
2d142ed
debug
raulchen Oct 18, 2023
4b5bd0c
Merge branch 'master' into streaming-gen-backpressure
raulchen Oct 19, 2023
0667bd4
fix
raulchen Oct 19, 2023
3ce4a8a
Revert "Sang's PR"
raulchen Oct 19, 2023
3ac3564
resolve deadlock
raulchen Oct 19, 2023
0a8aeb3
ray core
raulchen Oct 19, 2023
009cb82
Revert "resolve deadlock"
raulchen Oct 19, 2023
ebe8e93
resolve deadlock
raulchen Oct 19, 2023
05e42db
use backpressure policy
raulchen Oct 19, 2023
222dc0b
test
raulchen Oct 19, 2023
584a699
lint
raulchen Oct 19, 2023
abf42f8
Revert "ray core"
raulchen Oct 19, 2023
6891f41
fix
raulchen Oct 19, 2023
9632e84
comments
raulchen Oct 19, 2023
8fa34ec
config based on number
raulchen Oct 19, 2023
259983c
ray core
raulchen Oct 19, 2023
af82f99
fix test
raulchen Oct 19, 2023
8e38323
comment
raulchen Oct 19, 2023
21b6d19
Revert "ray core"
raulchen Oct 19, 2023
84539ba
comments
raulchen Oct 19, 2023
a35f380
Merge branch 'master' into streaming-gen-backpressure
raulchen Oct 20, 2023
ebdc82f
refine
raulchen Oct 20, 2023
d0769fb
typo
raulchen Oct 20, 2023
7b71cc8
refine
raulchen Oct 20, 2023
5aaa08e
Merge branch 'master' into streaming-gen-backpressure
raulchen Oct 23, 2023
41951fb
fix ray core
raulchen Oct 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 109 additions & 9 deletions python/ray/data/_internal/execution/backpressure_policy.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Dict

import ray

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
PhysicalOperator,
)
from ray.data._internal.execution.streaming_executor_state import Topology
from ray.data._internal.execution.streaming_executor_state import OpState, Topology

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -36,14 +36,35 @@ class BackpressurePolicy(ABC):
def __init__(self, topology: "Topology"):
...

@abstractmethod
def can_run(self, op: "PhysicalOperator") -> bool:
"""Called when StreamingExecutor selects an operator to run in
`streaming_executor_state.select_operator_to_run()`.
def calculate_max_blocks_to_read_per_op(
self, topology: "Topology"
) -> Dict["OpState", int]:
"""Determine how many blocks of data we can read from each operator.
The `DataOpTask`s of the operators will stop reading blocks when the limit is
reached. Then the execution of these tasks will be paused when the streaming
generator backpressure threshold is reached.
Used in `streaming_executor_state.py::process_completed_tasks()`.

Returns: A dict mapping from each operator's OpState to the desired number of
blocks to read. For operators that are not in the dict, all available blocks
will be read.

Note: Only one backpressure policy that implements this method can be enabled
at a time.
"""
return {}

def can_add_input(self, op: "PhysicalOperator") -> bool:
"""Determine if we can add a new input to the operator. If returns False, the
operator will be backpressured and will not be able to run new tasks.
Used in `streaming_executor_state.py::select_operator_to_run()`.

Returns: True if the operator can run, False otherwise.
Returns: True if we can add a new input to the operator, False otherwise.

Note, if multiple backpressure policies are enabled, the operator will be
backpressured if any of the policies returns False.
"""
...
return True


class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):
Expand Down Expand Up @@ -103,7 +124,7 @@ def __init__(self, topology: "Topology"):
for op, _ in topology.items():
self._concurrency_caps[op] = self._init_cap

def can_run(self, op: "PhysicalOperator") -> bool:
def can_add_input(self, op: "PhysicalOperator") -> bool:
metrics = op.metrics
while metrics.num_tasks_finished >= (
self._concurrency_caps[op] * self._cap_multiply_threshold
Expand All @@ -113,3 +134,82 @@ def can_run(self, op: "PhysicalOperator") -> bool:
f"Concurrency cap for {op} increased to {self._concurrency_caps[op]}"
)
return metrics.num_tasks_running < self._concurrency_caps[op]


class StreamingOutputBackpressurePolicy(BackpressurePolicy):
"""A backpressure policy that throttles the streaming outputs of the `DataOpTask`s.

The are 2 levels of configs to control the behavior:
- At the Ray Core level, we use
`MAX_BLOCKS_IN_GENERATOR_BUFFER` to limit the number of blocks buffered in
the streaming generator of each OpDataTask. When it's reached, the task will
be blocked at `yield` until the caller reads another `ObjectRef.
- At the Ray Data level, we use
`MAX_BLOCKS_IN_GENERATOR_BUFFER` to limit the number of blocks buffered in the
output queue of each operator. When it's reached, we'll stop reading from the
streaming generators of the op's tasks, and thus trigger backpressure at the
Ray Core level.

Thus, total number of buffered blocks for each operator can be
`MAX_BLOCKS_IN_GENERATOR_BUFFER * num_running_tasks +
MAX_BLOCKS_IN_OP_OUTPUT_QUEUE`.
"""

# The max number of blocks that can be buffered at the streaming generator
# of each `DataOpTask`.
MAX_BLOCKS_IN_GENERATOR_BUFFER = 10
MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = (
"backpressure_policies.streaming_output.max_blocks_in_generator_buffer"
)
# The max number of blocks that can be buffered at the operator output queue
# (`OpState.outqueue`).
MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20
MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY = (
"backpressure_policies.streaming_output.max_blocks_in_op_output_queue"
)

def __init__(self, topology: "Topology"):
data_context = ray.data.DataContext.get_current()
self._max_num_blocks_in_streaming_gen_buffer = data_context.get_config(
self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY,
self.MAX_BLOCKS_IN_GENERATOR_BUFFER,
)
assert self._max_num_blocks_in_streaming_gen_buffer > 0
# The `_generator_backpressure_num_objects` parameter should be
# `2 * self._max_num_blocks_in_streaming_gen_buffer` because we yield
# 2 objects for each block: the block and the block metadata.
data_context._task_pool_data_task_remote_args[
"_generator_backpressure_num_objects"
] = (2 * self._max_num_blocks_in_streaming_gen_buffer)

self._max_num_blocks_in_op_output_queue = data_context.get_config(
self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY,
self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE,
)
assert self._max_num_blocks_in_op_output_queue > 0

def calculate_max_blocks_to_read_per_op(
self, topology: "Topology"
) -> Dict["OpState", int]:
max_blocks_to_read_per_op: Dict["OpState", int] = {}
downstream_num_active_tasks = 0
for op, state in list(topology.items())[::-1]:
max_blocks_to_read_per_op[state] = (
self._max_num_blocks_in_op_output_queue - state.outqueue_num_blocks()
)
if downstream_num_active_tasks == 0:
# If all downstream operators are idle, it could be because no resources
# are available. In this case, we'll make sure to read at least one
# block to avoid deadlock.
Comment on lines +201 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still unsure about this condition.

"it could be" -> meaning that there cases where the downstream operators are idle but it is not a deadlock? Can we guarantee that it is a deadlock?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually isn't exponential rampup one of the cases where it's not actually a deadlock?

I guess it is okay to merge for now, but I think we should revisit this condition after 2.8.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is not 100% percent accurate. so the current strategy is conservative, it only unblocks one block at a time. I'll add a TODO here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, it seems that the only false positive case is when the executor just starts, the first op hasn't produced blocks for the second op to consume.
For the exponential ramp-up case, if an op is backpressured due to the concurrency cap. num_active_tasks will be greater than 0.

# TODO(hchen): `downstream_num_active_tasks == 0` doesn't necessarily
# mean no enough resources. One false positive case is when the upstream
# op hasn't produced any blocks for the downstream op to consume.
# In this case, at least reading one block is fine.
# If there are other false positive cases, we may want to make this
# deadlock check more accurate by directly checking resources.
max_blocks_to_read_per_op[state] = max(
max_blocks_to_read_per_op[state],
1,
)
downstream_num_active_tasks += len(op.get_active_tasks())
return max_blocks_to_read_per_op
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,6 @@ def get_waitable(self) -> Waitable:
"""Return the ObjectRef or StreamingObjectRefGenerator to wait on."""
pass

@abstractmethod
def on_waitable_ready(self):
"""Called when the waitable is ready.

This method may get called multiple times if the waitable is a
streaming generator.
"""
pass


class DataOpTask(OpTask):
"""Represents an OpTask that handles Block data."""
Expand Down Expand Up @@ -65,18 +56,25 @@ def __init__(
def get_waitable(self) -> StreamingObjectRefGenerator:
return self._streaming_gen

def on_waitable_ready(self):
# Handle all the available outputs of the streaming generator.
while True:
def on_data_ready(self, max_blocks_to_read: Optional[int]) -> int:
"""Callback when data is ready to be read from the streaming generator.

Args:
max_blocks_to_read: Max number of blocks to read. If None, all available
will be read.
Returns: The number of blocks read.
"""
num_blocks_read = 0
while max_blocks_to_read is None or num_blocks_read < max_blocks_to_read:
try:
block_ref = self._streaming_gen._next_sync(0)
if block_ref.is_nil():
# The generator currently doesn't have new output.
# And it's not stopped yet.
return
break
except StopIteration:
self._task_done_callback()
return
break

try:
meta = ray.get(next(self._streaming_gen))
Expand All @@ -93,6 +91,8 @@ def on_waitable_ready(self):
self._output_ready_callback(
RefBundle([(block_ref, meta)], owns_blocks=True)
)
num_blocks_read += 1
return num_blocks_read


class MetadataOpTask(OpTask):
Expand All @@ -112,7 +112,8 @@ def __init__(
def get_waitable(self) -> ray.ObjectRef:
return self._object_ref

def on_waitable_ready(self):
def on_task_finished(self):
"""Callback when the task is finished."""
self._task_done_callback()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,18 @@ def _add_bundled_input(self, bundle: RefBundle):
# Submit the task as a normal Ray task.
map_task = cached_remote_fn(_map_task, num_returns="streaming")
input_blocks = [block for block, _ in bundle.blocks]

ctx = TaskContext(
task_idx=self._next_data_task_idx,
target_max_block_size=self.actual_target_max_block_size,
)
gen = map_task.options(
**self._get_runtime_ray_remote_args(input_bundle=bundle), name=self.name
).remote(
data_context = DataContext.get_current()
ray_remote_args = self._get_runtime_ray_remote_args(input_bundle=bundle)
ray_remote_args["name"] = self.name
ray_remote_args.update(data_context._task_pool_data_task_remote_args)

gen = map_task.options(**ray_remote_args).remote(
self._map_transformer_ref,
DataContext.get_current(),
data_context,
ctx,
*input_blocks,
)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self, options: ExecutionOptions, dataset_tag: str = "unknown_datase
# generator `yield`s.
self._topology: Optional[Topology] = None
self._output_node: Optional[OpState] = None
self._backpressure_policies: Optional[List[BackpressurePolicy]] = None
self._backpressure_policies: List[BackpressurePolicy] = []

self._dataset_tag = dataset_tag

Expand Down Expand Up @@ -249,7 +249,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
# Note: calling process_completed_tasks() is expensive since it incurs
# ray.wait() overhead, so make sure to allow multiple dispatch per call for
# greater parallelism.
process_completed_tasks(topology)
process_completed_tasks(topology, self._backpressure_policies)

# Dispatch as many operators as we can for completed tasks.
limits = self._get_or_refresh_resource_limits()
Expand Down
56 changes: 47 additions & 9 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
PhysicalOperator,
RefBundle,
)
from ray.data._internal.execution.interfaces.physical_operator import OpTask, Waitable
from ray.data._internal.execution.interfaces.physical_operator import (
DataOpTask,
MetadataOpTask,
OpTask,
Waitable,
)
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
Expand Down Expand Up @@ -260,6 +265,18 @@ def _queue_memory_usage(self, queue: Deque[RefBundle]) -> int:
break # Concurrent pop from the outqueue by the consumer thread.
return object_store_memory

def outqueue_num_blocks(self) -> int:
"""Return the number of blocks in this operator's outqueue."""
num_blocks = 0
for i in range(len(self.outqueue)):
try:
bundle = self.outqueue[i]
if isinstance(bundle, RefBundle):
num_blocks += len(bundle.blocks)
except IndexError:
break
return len(self.outqueue)


def build_streaming_topology(
dag: PhysicalOperator, options: ExecutionOptions
Expand Down Expand Up @@ -311,16 +328,28 @@ def setup_state(op: PhysicalOperator) -> OpState:
return (topology, i)


def process_completed_tasks(topology: Topology) -> None:
def process_completed_tasks(
topology: Topology,
backpressure_policies: List[BackpressurePolicy],
) -> None:
"""Process any newly completed tasks. To update operator
states, call `update_operator_states()` afterwards."""

# Update active tasks.
active_tasks: Dict[Waitable, OpTask] = {}

for op in topology.keys():
# All active tasks, keyed by their waitables.
active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {}
for op, state in topology.items():
for task in op.get_active_tasks():
active_tasks[task.get_waitable()] = task
active_tasks[task.get_waitable()] = (state, task)

max_blocks_to_read_per_op: Dict[OpState, int] = {}
for policy in backpressure_policies:
non_empty = len(max_blocks_to_read_per_op) > 0
max_blocks_to_read_per_op = policy.calculate_max_blocks_to_read_per_op(topology)
if non_empty and len(max_blocks_to_read_per_op) > 0:
raise ValueError(
"At most one backpressure policy that implements "
"calculate_max_blocks_to_read_per_op() can be used at a time."
)

# Process completed Ray tasks and notify operators.
if active_tasks:
Expand All @@ -331,7 +360,16 @@ def process_completed_tasks(topology: Topology) -> None:
timeout=0.1,
)
for ref in ready:
active_tasks[ref].on_waitable_ready()
state, task = active_tasks.pop(ref)
if isinstance(task, DataOpTask):
num_blocks_read = task.on_data_ready(
max_blocks_to_read_per_op.get(state, None)
)
if state in max_blocks_to_read_per_op:
max_blocks_to_read_per_op[state] -= num_blocks_read
else:
assert isinstance(task, MetadataOpTask)
task.on_task_finished()

# Pull any operator outputs into the streaming op state.
for op, op_state in topology.items():
Expand Down Expand Up @@ -408,7 +446,7 @@ def select_operator_to_run(
and op.should_add_input()
and under_resource_limits
and not op.completed()
and all(p.can_run(op) for p in backpressure_policies)
and all(p.can_add_input(op) for p in backpressure_policies)
):
ops.append(op)
# Update the op in all cases to enable internal autoscaling, etc.
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ def __init__(
self.enable_get_object_locations_for_metrics = (
enable_get_object_locations_for_metrics
)
# The additonal ray remote args that should be added to
# the task-pool-based data tasks.
self._task_pool_data_task_remote_args: Dict[str, Any] = {}
# The extra key-value style configs.
# These configs are managed by individual components or plugins via
# `set_config`, `get_config` and `remove_config`.
Expand Down
Loading