Skip to content

Commit 89d9045

Browse files
dragongutohtana
authored andcommitted
[data] Add DownstreamCapacityBackpressurePolicy based on downstream processing capacity (ray-project#55463)
## Summary Implement a downstream processing capacity-based backpressure mechanism to address stability and performance issues caused by unbalanced processing speeds across pipeline operators due to user misconfigurations, instance preemptions, and cluster resource scaling. ## Problem Statement Current Ray Data pipelines face several critical challenges: ### 1. Performance & Stability Issues - Large amounts of objects accumulate in memory while downstream operators cannot consume them timely - Memory resource waste and potential spilling leads to significant performance degradation - Pipeline instability due to memory pressure ### 2. Resource Waste in Dynamic Environments - In preemption scenarios, the situation becomes worse as large amounts of objects are repeatedly rebuilt when workers are preempted - Inefficient resource utilization due to upstream-downstream speed mismatch - Wasted compute resources on processing data that cannot be consumed ### 3. Complex Configuration Requirements - Users find it difficult to configure reasonable parallelism ratios - Inappropriate configurations lead to resource waste or insufficient throughput - Especially challenging on elastic resources where capacity changes dynamically ## Solution This PR introduces `DownstreamCapacityBackpressurePolicy` that provides: ### 1. Simplified User Configuration with Adaptive Concurrency - Ray Data automatically adjusts parallelism based on actual pipeline performance - When upstream is blocked due to backpressure, resources are released to allow downstream scaling up - Self-adaptive mechanism reduces the need for manual tuning and complex configuration ### 2. Consistent Pipeline Throughput - Objects output by upstream operators are consumed by downstream as quickly as possible - Ensures stability, saves memory resources, and avoids unnecessary object rebuilding risks - Maintains balanced flow throughout the entire pipeline ## Key Benefits ### 🚀 Performance Improvements - Prevents memory bloat and reduces object spilling - Maintains optimal memory utilization across the pipeline - Eliminates performance degradation from memory pressure ### 🛡️ Enhanced Stability - Handles instance preemptions gracefully - Reduces object rebuilding in dynamic environments - Maintains pipeline stability under varying cluster conditions ### ⚙️ Simplified Operations - Reduces complex configuration requirements - Provides self-adaptive parallelism adjustment - Works effectively on elastic resources ### 💰 Resource Efficiency - Prevents resource waste from unbalanced processing - Optimizes resource allocation across pipeline stages - Reduces unnecessary compute overhead ## Configuration Users can configure the backpressure behavior via DataContext: ```python ctx = ray.data.DataContext.get_current() # Set ratio threshold (default: inf, disabled) ctx.downstream_capacity_backpressure_ratio = 2.0 # Set absolute threshold (default: sys.maxsize, disabled) ctx.downstream_capacity_backpressure_max_queued_bundles = 4000 ``` ### Default Behavior - By default, backpressure is disabled (thresholds set to infinity) to maintain backward compatibility - Users can enable it by setting appropriate threshold values ## Impact & Results This implementation successfully addresses the core challenges: ✅ **Performance & Stability**: Eliminates memory pressure and spilling issues ✅ **Resource Efficiency**: Prevents waste in preemption scenarios and dynamic environments ✅ **Configuration Simplicity**: Reduces complex user configuration requirements ✅ **Adaptive Throughput**: Maintains consistent pipeline performance The solution provides a foundation for more intelligent, self-adaptive Ray Data pipelines that can handle dynamic cluster conditions while maintaining optimal performance and resource utilization. --- <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: Masahiro Tanaka <mtanaka@anyscale.com>
1 parent d9d84d1 commit 89d9045

File tree

6 files changed

+246
-0
lines changed

6 files changed

+246
-0
lines changed

python/ray/data/BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,20 @@ py_test(
13231323
],
13241324
)
13251325

1326+
py_test(
1327+
name = "test_downstream_capacity_backpressure_policy",
1328+
size = "medium",
1329+
srcs = ["tests/test_downstream_capacity_backpressure_policy.py"],
1330+
tags = [
1331+
"exclusive",
1332+
"team:data",
1333+
],
1334+
deps = [
1335+
":conftest",
1336+
"//:ray_lib",
1337+
],
1338+
)
1339+
13261340
py_test(
13271341
name = "test_backpressure_e2e",
13281342
size = "large",

python/ray/data/_internal/actor_autoscaler/autoscaling_actor_pool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,6 @@ def per_actor_resource_usage(self) -> ExecutionResources:
109109
def get_pool_util(self) -> float:
110110
"""Calculate the utilization of the given actor pool."""
111111
...
112+
113+
def max_concurrent_tasks(self) -> int:
114+
return self.max_actor_concurrency() * self.num_running_actors()

python/ray/data/_internal/execution/backpressure_policy/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
from .backpressure_policy import BackpressurePolicy
44
from .concurrency_cap_backpressure_policy import ConcurrencyCapBackpressurePolicy
5+
from .downstream_capacity_backpressure_policy import (
6+
DownstreamCapacityBackpressurePolicy,
7+
)
58
from .resource_budget_backpressure_policy import ResourceBudgetBackpressurePolicy
69
from ray.data.context import DataContext
710

@@ -14,6 +17,7 @@
1417
ENABLED_BACKPRESSURE_POLICIES = [
1518
ConcurrencyCapBackpressurePolicy,
1619
ResourceBudgetBackpressurePolicy,
20+
DownstreamCapacityBackpressurePolicy,
1721
]
1822
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled"
1923

@@ -33,6 +37,7 @@ def get_backpressure_policies(
3337
__all__ = [
3438
"BackpressurePolicy",
3539
"ConcurrencyCapBackpressurePolicy",
40+
"DownstreamCapacityBackpressurePolicy",
3641
"ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY",
3742
"get_backpressure_policies",
3843
]
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import logging
2+
from typing import TYPE_CHECKING
3+
4+
from .backpressure_policy import BackpressurePolicy
5+
from ray.data._internal.execution.operators.actor_pool_map_operator import (
6+
ActorPoolMapOperator,
7+
)
8+
from ray.data.context import DataContext
9+
10+
if TYPE_CHECKING:
11+
from ray.data._internal.execution.interfaces.physical_operator import (
12+
PhysicalOperator,
13+
)
14+
from ray.data._internal.execution.resource_manager import ResourceManager
15+
from ray.data._internal.execution.streaming_executor_state import Topology
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class DownstreamCapacityBackpressurePolicy(BackpressurePolicy):
21+
"""Backpressure policy based on downstream processing capacity.
22+
23+
This policy triggers backpressure when the output bundles size exceeds both:
24+
1. A ratio threshold multiplied by the number of running tasks in downstream operators
25+
2. An absolute threshold for the output bundles size
26+
27+
The policy monitors actual downstream processing capacity by tracking the number
28+
of currently running tasks rather than configured parallelism. This approach
29+
ensures effective backpressure even when cluster resources are insufficient or
30+
scaling is slow, preventing memory pressure and maintaining pipeline stability.
31+
32+
Key benefits:
33+
- Prevents memory bloat from unprocessed output objects
34+
- Adapts to actual cluster conditions and resource availability
35+
- Maintains balanced throughput across pipeline operators
36+
- Reduces object spilling and unnecessary rebuilds
37+
"""
38+
39+
def __init__(
40+
self,
41+
data_context: DataContext,
42+
topology: "Topology",
43+
resource_manager: "ResourceManager",
44+
):
45+
super().__init__(data_context, topology, resource_manager)
46+
self._backpressure_concurrency_ratio = (
47+
self._data_context.downstream_capacity_backpressure_ratio
48+
)
49+
self._backpressure_max_queued_bundles = (
50+
self._data_context.downstream_capacity_backpressure_max_queued_bundles
51+
)
52+
self._backpressure_disabled = (
53+
self._backpressure_concurrency_ratio is None
54+
or self._backpressure_max_queued_bundles is None
55+
)
56+
57+
def _max_concurrent_tasks(self, op: "PhysicalOperator") -> int:
58+
if isinstance(op, ActorPoolMapOperator):
59+
return sum(
60+
[
61+
actor_pool.max_concurrent_tasks()
62+
for actor_pool in op.get_autoscaling_actor_pools()
63+
]
64+
)
65+
return op.num_active_tasks()
66+
67+
def can_add_input(self, op: "PhysicalOperator") -> bool:
68+
"""Determine if we can add input to the operator based on downstream capacity."""
69+
if self._backpressure_disabled:
70+
return True
71+
for output_dependency in op.output_dependencies:
72+
total_enqueued_input_bundles = self._topology[
73+
output_dependency
74+
].total_enqueued_input_bundles()
75+
76+
avg_inputs_per_task = (
77+
output_dependency.metrics.num_task_inputs_processed
78+
/ max(output_dependency.metrics.num_tasks_finished, 1)
79+
)
80+
outstanding_tasks = total_enqueued_input_bundles / max(
81+
avg_inputs_per_task, 1
82+
)
83+
max_allowed_outstanding = (
84+
self._max_concurrent_tasks(output_dependency)
85+
* self._backpressure_concurrency_ratio
86+
)
87+
88+
if (
89+
total_enqueued_input_bundles > self._backpressure_max_queued_bundles
90+
and outstanding_tasks > max_allowed_outstanding
91+
):
92+
return False
93+
94+
return True

python/ray/data/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,9 @@ class DataContext:
534534
default_factory=_issue_detectors_config_factory
535535
)
536536

537+
downstream_capacity_backpressure_ratio: float = None
538+
downstream_capacity_backpressure_max_queued_bundles: int = None
539+
537540
def __post_init__(self):
538541
# The additonal ray remote args that should be added to
539542
# the task-pool-based data tasks.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
from unittest.mock import MagicMock
2+
3+
import pytest
4+
5+
from ray.data._internal.execution.backpressure_policy.downstream_capacity_backpressure_policy import (
6+
DownstreamCapacityBackpressurePolicy,
7+
)
8+
from ray.data._internal.execution.interfaces.physical_operator import (
9+
OpRuntimeMetrics,
10+
PhysicalOperator,
11+
)
12+
from ray.data._internal.execution.operators.actor_pool_map_operator import (
13+
ActorPoolMapOperator,
14+
)
15+
from ray.data._internal.execution.streaming_executor_state import OpState, Topology
16+
from ray.data.context import DataContext
17+
18+
19+
class TestDownstreamCapacityBackpressurePolicy:
20+
def _mock_operator(
21+
self,
22+
op_class: PhysicalOperator = PhysicalOperator,
23+
num_enqueued_input_bundles: int = 0,
24+
num_task_inputs_processed: int = 0,
25+
num_tasks_finished: int = 0,
26+
max_concurrent_tasks: int = 100,
27+
):
28+
"""Helper method to create mock operator."""
29+
mock_operator = MagicMock(spec=op_class)
30+
mock_operator.metrics = MagicMock(spec=OpRuntimeMetrics)
31+
mock_operator.metrics.num_task_inputs_processed = num_task_inputs_processed
32+
mock_operator.metrics.num_tasks_finished = num_tasks_finished
33+
mock_operator.num_active_tasks.return_value = max_concurrent_tasks
34+
35+
op_state = MagicMock(spec=OpState)
36+
op_state.total_enqueued_input_bundles.return_value = num_enqueued_input_bundles
37+
return mock_operator, op_state
38+
39+
def _mock_actor_pool_map_operator(
40+
self,
41+
num_enqueued_input_bundles: int,
42+
num_task_inputs_processed: int,
43+
num_tasks_finished: int,
44+
max_concurrent_tasks: int = 100,
45+
):
46+
"""Helper method to create mock actor pool map operator."""
47+
op, op_state = self._mock_operator(
48+
ActorPoolMapOperator,
49+
num_enqueued_input_bundles,
50+
num_task_inputs_processed,
51+
num_tasks_finished,
52+
max_concurrent_tasks,
53+
)
54+
actor_pool = MagicMock(
55+
spec="ray.data._internal.execution.operators.actor_pool_map_operator._ActorPool"
56+
)
57+
actor_pool.max_concurrent_tasks = MagicMock(return_value=max_concurrent_tasks)
58+
op.get_autoscaling_actor_pools.return_value = [actor_pool]
59+
return op, op_state
60+
61+
def _create_policy(
62+
self, data_context: DataContext = None, topology: Topology = None
63+
):
64+
"""Helper method to create policy instance."""
65+
context = data_context or self.context
66+
return DownstreamCapacityBackpressurePolicy(
67+
data_context=context,
68+
topology=topology,
69+
resource_manager=MagicMock(),
70+
)
71+
72+
@pytest.mark.parametrize(
73+
"mock_method",
74+
[
75+
(_mock_operator),
76+
(_mock_actor_pool_map_operator),
77+
],
78+
)
79+
@pytest.mark.parametrize(
80+
"num_enqueued, num_task_inputs_processed, num_tasks_finished, backpressure_ratio, max_queued_bundles, expected_result, test_name",
81+
[
82+
(100, 100, 10, 2, 4000, True, "no_backpressure_low_queue"),
83+
(5000, 100, 10, 2, 4000, False, "high_queue_pressure"),
84+
(100, 0, 0, 2, 400, True, "zero_inputs_protection"),
85+
(1000000, 1, 1, None, None, True, "default disabled"),
86+
],
87+
)
88+
def test_backpressure_conditions(
89+
self,
90+
mock_method,
91+
num_enqueued,
92+
num_task_inputs_processed,
93+
num_tasks_finished,
94+
backpressure_ratio,
95+
max_queued_bundles,
96+
expected_result,
97+
test_name,
98+
):
99+
"""Parameterized test covering various backpressure conditions."""
100+
context = DataContext()
101+
context.downstream_capacity_backpressure_ratio = backpressure_ratio
102+
context.downstream_capacity_backpressure_max_queued_bundles = max_queued_bundles
103+
104+
op, op_state = self._mock_operator(PhysicalOperator)
105+
op_output_dep, op_output_state = mock_method(
106+
self,
107+
num_enqueued_input_bundles=num_enqueued,
108+
num_task_inputs_processed=num_task_inputs_processed,
109+
num_tasks_finished=num_tasks_finished,
110+
)
111+
op.output_dependencies = [op_output_dep]
112+
113+
policy = self._create_policy(
114+
context, topology={op: op_state, op_output_dep: op_output_state}
115+
)
116+
result = policy.can_add_input(op)
117+
118+
assert result == expected_result, test_name
119+
assert (
120+
backpressure_ratio is None or max_queued_bundles is None
121+
) == policy._backpressure_disabled, test_name
122+
123+
124+
if __name__ == "__main__":
125+
import sys
126+
127+
sys.exit(pytest.main(["-v", __file__]))

0 commit comments

Comments
 (0)