Skip to content

[data] Add DownstreamCapacityBackpressurePolicy based on downstream processing capacity#55463

Merged
alexeykudinkin merged 1 commit intoray-project:masterfrom
dragongu:feature/output-based-backpressure
Aug 26, 2025
Merged

[data] Add DownstreamCapacityBackpressurePolicy based on downstream processing capacity#55463
alexeykudinkin merged 1 commit intoray-project:masterfrom
dragongu:feature/output-based-backpressure

Conversation

@dragongu
Copy link
Contributor

@dragongu dragongu commented Aug 10, 2025

Summary

Implement a downstream processing capacity-based backpressure mechanism to address stability and performance issues caused by unbalanced processing speeds across pipeline operators due to user misconfigurations, instance preemptions, and cluster resource scaling.

Problem Statement

Current Ray Data pipelines face several critical challenges:

1. Performance & Stability Issues

  • Large amounts of objects accumulate in memory while downstream operators cannot consume them timely
  • Memory resource waste and potential spilling leads to significant performance degradation
  • Pipeline instability due to memory pressure

2. Resource Waste in Dynamic Environments

  • In preemption scenarios, the situation becomes worse as large amounts of objects are repeatedly rebuilt when workers are preempted
  • Inefficient resource utilization due to upstream-downstream speed mismatch
  • Wasted compute resources on processing data that cannot be consumed

3. Complex Configuration Requirements

  • Users find it difficult to configure reasonable parallelism ratios
  • Inappropriate configurations lead to resource waste or insufficient throughput
  • Especially challenging on elastic resources where capacity changes dynamically

Solution

This PR introduces DownstreamCapacityBackpressurePolicy that provides:

1. Simplified User Configuration with Adaptive Concurrency

  • Ray Data automatically adjusts parallelism based on actual pipeline performance
  • When upstream is blocked due to backpressure, resources are released to allow downstream scaling up
  • Self-adaptive mechanism reduces the need for manual tuning and complex configuration

2. Consistent Pipeline Throughput

  • Objects output by upstream operators are consumed by downstream as quickly as possible
  • Ensures stability, saves memory resources, and avoids unnecessary object rebuilding risks
  • Maintains balanced flow throughout the entire pipeline

Key Benefits

🚀 Performance Improvements

  • Prevents memory bloat and reduces object spilling
  • Maintains optimal memory utilization across the pipeline
  • Eliminates performance degradation from memory pressure

🛡️ Enhanced Stability

  • Handles instance preemptions gracefully
  • Reduces object rebuilding in dynamic environments
  • Maintains pipeline stability under varying cluster conditions

⚙️ Simplified Operations

  • Reduces complex configuration requirements
  • Provides self-adaptive parallelism adjustment
  • Works effectively on elastic resources

💰 Resource Efficiency

  • Prevents resource waste from unbalanced processing
  • Optimizes resource allocation across pipeline stages
  • Reduces unnecessary compute overhead

Configuration

Users can configure the backpressure behavior via DataContext:

ctx = ray.data.DataContext.get_current()

# Set ratio threshold (default: inf, disabled)
ctx.downstream_capacity_backpressure_ratio = 2.0

# Set absolute threshold (default: sys.maxsize, disabled)  
ctx.downstream_capacity_backpressure_max_queued_bundles = 4000

Default Behavior

  • By default, backpressure is disabled (thresholds set to infinity) to maintain backward compatibility
  • Users can enable it by setting appropriate threshold values

Impact & Results

This implementation successfully addresses the core challenges:

Performance & Stability: Eliminates memory pressure and spilling issues
Resource Efficiency: Prevents waste in preemption scenarios and dynamic environments
Configuration Simplicity: Reduces complex user configuration requirements
Adaptive Throughput: Maintains consistent pipeline performance

The solution provides a foundation for more intelligent, self-adaptive Ray Data pipelines that can handle dynamic cluster conditions while maintaining optimal performance and resource utilization.


Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@dragongu dragongu requested a review from a team as a code owner August 10, 2025 12:38
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @dragongu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

I've implemented a new OutputSizeBackpressurePolicy for Ray Data pipelines. This policy aims to improve pipeline stability and performance by preventing memory bloat and resource waste caused by speed mismatches between upstream and downstream operators. It achieves this by adaptively adjusting parallelism based on actual downstream processing capacity, reducing the need for manual configuration and ensuring consistent throughput.

Highlights

  • Introduced OutputSizeBackpressurePolicy: This new backpressure mechanism monitors the output queue size and the number of active tasks in downstream operators to dynamically apply backpressure.
  • Adaptive Concurrency: The policy uses both a ratio threshold (relative to active tasks) and an absolute threshold to determine when to trigger backpressure, allowing Ray Data to automatically adjust parallelism.
  • Enhanced Metrics: Added num_input_blocks to OpRuntimeMetrics to track the number of blocks waiting for an operator, which is crucial for the new backpressure policy.
  • Improved Actor Pool Metrics: Added num_task_slots to AutoscalingActorPool to accurately reflect the processing capacity of actor-based operators.
  • Configurable Behavior: Users can enable and configure the backpressure policy via DataContext by setting backpressure_output_ratio_threshold and backpressure_output_absolute_threshold. By default, it's disabled for backward compatibility.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new OutputSizeBackpressurePolicy to Ray Data, which aims to improve pipeline stability and performance by applying backpressure based on the processing capacity of downstream operators. The changes include the policy implementation, its integration into the backpressure framework, and corresponding tests.

My review focuses on the correctness of the new policy's logic and its documentation. I've identified a discrepancy between the documented behavior and the implementation, as well as a potential ZeroDivisionError in the debug logging. I've also suggested an additional test case to cover an important edge case. Overall, this is a valuable addition to Ray Data's execution engine.

@dragongu dragongu force-pushed the feature/output-based-backpressure branch 5 times, most recently from c36eba1 to 59961ba Compare August 11, 2025 08:21
@ray-gardener ray-gardener bot added community-contribution Contributed by the community data Ray Data-related issues labels Aug 12, 2025
@dragongu dragongu force-pushed the feature/output-based-backpressure branch from 59961ba to a8fb024 Compare August 12, 2025 06:49
@dragongu dragongu changed the title [data] Add OutputSizeBackpressurePolicy based on downstream processing capacity [data] Add DownstreamCapacityBackpressurePolicy based on downstream processing capacity Aug 12, 2025
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Aug 21, 2025
@dragongu dragongu force-pushed the feature/output-based-backpressure branch 5 times, most recently from 08f8cec to 4750475 Compare August 22, 2025 23:51
@dragongu dragongu force-pushed the feature/output-based-backpressure branch 6 times, most recently from 777a8c1 to 6b9af44 Compare August 25, 2025 22:48
@dragongu dragongu force-pushed the feature/output-based-backpressure branch from 6b9af44 to 034628b Compare August 26, 2025 02:36
@dragongu dragongu force-pushed the feature/output-based-backpressure branch from 034628b to 2cd035b Compare August 26, 2025 02:44
…rocessing capacity

Signed-off-by: dragongu <andrewgu@vip.qq.com>
@dragongu dragongu force-pushed the feature/output-based-backpressure branch from 2cd035b to b6b5874 Compare August 26, 2025 09:22
@alexeykudinkin alexeykudinkin merged commit a6e9955 into ray-project:master Aug 26, 2025
4 of 5 checks passed
liulehui pushed a commit to liulehui/ray that referenced this pull request Aug 26, 2025
…rocessing capacity (ray-project#55463)

## Summary

Implement a downstream processing capacity-based backpressure mechanism
to address stability and performance issues caused by unbalanced
processing speeds across pipeline operators due to user
misconfigurations, instance preemptions, and cluster resource scaling.

## Problem Statement

Current Ray Data pipelines face several critical challenges:

### 1. Performance & Stability Issues
- Large amounts of objects accumulate in memory while downstream
operators cannot consume them timely
- Memory resource waste and potential spilling leads to significant
performance degradation
- Pipeline instability due to memory pressure

### 2. Resource Waste in Dynamic Environments
- In preemption scenarios, the situation becomes worse as large amounts
of objects are repeatedly rebuilt when workers are preempted
- Inefficient resource utilization due to upstream-downstream speed
mismatch
- Wasted compute resources on processing data that cannot be consumed

### 3. Complex Configuration Requirements
- Users find it difficult to configure reasonable parallelism ratios
- Inappropriate configurations lead to resource waste or insufficient
throughput
- Especially challenging on elastic resources where capacity changes
dynamically

## Solution

This PR introduces `DownstreamCapacityBackpressurePolicy` that provides:

### 1. Simplified User Configuration with Adaptive Concurrency
- Ray Data automatically adjusts parallelism based on actual pipeline
performance
- When upstream is blocked due to backpressure, resources are released
to allow downstream scaling up
- Self-adaptive mechanism reduces the need for manual tuning and complex
configuration

### 2. Consistent Pipeline Throughput
- Objects output by upstream operators are consumed by downstream as
quickly as possible
- Ensures stability, saves memory resources, and avoids unnecessary
object rebuilding risks
- Maintains balanced flow throughout the entire pipeline

## Key Benefits

### 🚀 Performance Improvements
- Prevents memory bloat and reduces object spilling
- Maintains optimal memory utilization across the pipeline
- Eliminates performance degradation from memory pressure

### 🛡️ Enhanced Stability
- Handles instance preemptions gracefully
- Reduces object rebuilding in dynamic environments
- Maintains pipeline stability under varying cluster conditions

### ⚙️ Simplified Operations
- Reduces complex configuration requirements
- Provides self-adaptive parallelism adjustment
- Works effectively on elastic resources

### 💰 Resource Efficiency
- Prevents resource waste from unbalanced processing
- Optimizes resource allocation across pipeline stages
- Reduces unnecessary compute overhead

## Configuration

Users can configure the backpressure behavior via DataContext:

```python
ctx = ray.data.DataContext.get_current()

# Set ratio threshold (default: inf, disabled)
ctx.downstream_capacity_backpressure_ratio = 2.0

# Set absolute threshold (default: sys.maxsize, disabled)
ctx.downstream_capacity_backpressure_max_queued_bundles = 4000
```

### Default Behavior
- By default, backpressure is disabled (thresholds set to infinity) to
maintain backward compatibility
- Users can enable it by setting appropriate threshold values

## Impact & Results

This implementation successfully addresses the core challenges:

✅ **Performance & Stability**: Eliminates memory pressure and spilling
issues
✅ **Resource Efficiency**: Prevents waste in preemption scenarios and
dynamic environments
✅ **Configuration Simplicity**: Reduces complex user configuration
requirements
✅ **Adaptive Throughput**: Maintains consistent pipeline performance

The solution provides a foundation for more intelligent, self-adaptive
Ray Data pipelines that can handle dynamic cluster conditions while
maintaining optimal performance and resource utilization.

---

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: Lehui Liu <lehui@anyscale.com>
tohtana pushed a commit to tohtana/ray that referenced this pull request Aug 29, 2025
…rocessing capacity (ray-project#55463)

## Summary

Implement a downstream processing capacity-based backpressure mechanism
to address stability and performance issues caused by unbalanced
processing speeds across pipeline operators due to user
misconfigurations, instance preemptions, and cluster resource scaling.

## Problem Statement

Current Ray Data pipelines face several critical challenges:

### 1. Performance & Stability Issues
- Large amounts of objects accumulate in memory while downstream
operators cannot consume them timely
- Memory resource waste and potential spilling leads to significant
performance degradation
- Pipeline instability due to memory pressure

### 2. Resource Waste in Dynamic Environments
- In preemption scenarios, the situation becomes worse as large amounts
of objects are repeatedly rebuilt when workers are preempted
- Inefficient resource utilization due to upstream-downstream speed
mismatch
- Wasted compute resources on processing data that cannot be consumed

### 3. Complex Configuration Requirements
- Users find it difficult to configure reasonable parallelism ratios
- Inappropriate configurations lead to resource waste or insufficient
throughput
- Especially challenging on elastic resources where capacity changes
dynamically

## Solution

This PR introduces `DownstreamCapacityBackpressurePolicy` that provides:

### 1. Simplified User Configuration with Adaptive Concurrency
- Ray Data automatically adjusts parallelism based on actual pipeline
performance
- When upstream is blocked due to backpressure, resources are released
to allow downstream scaling up
- Self-adaptive mechanism reduces the need for manual tuning and complex
configuration

### 2. Consistent Pipeline Throughput
- Objects output by upstream operators are consumed by downstream as
quickly as possible
- Ensures stability, saves memory resources, and avoids unnecessary
object rebuilding risks
- Maintains balanced flow throughout the entire pipeline

## Key Benefits

### 🚀 Performance Improvements
- Prevents memory bloat and reduces object spilling
- Maintains optimal memory utilization across the pipeline
- Eliminates performance degradation from memory pressure

### 🛡️ Enhanced Stability
- Handles instance preemptions gracefully
- Reduces object rebuilding in dynamic environments
- Maintains pipeline stability under varying cluster conditions

### ⚙️ Simplified Operations
- Reduces complex configuration requirements
- Provides self-adaptive parallelism adjustment
- Works effectively on elastic resources

### 💰 Resource Efficiency
- Prevents resource waste from unbalanced processing
- Optimizes resource allocation across pipeline stages
- Reduces unnecessary compute overhead

## Configuration

Users can configure the backpressure behavior via DataContext:

```python
ctx = ray.data.DataContext.get_current()

# Set ratio threshold (default: inf, disabled)
ctx.downstream_capacity_backpressure_ratio = 2.0

# Set absolute threshold (default: sys.maxsize, disabled)
ctx.downstream_capacity_backpressure_max_queued_bundles = 4000
```

### Default Behavior
- By default, backpressure is disabled (thresholds set to infinity) to
maintain backward compatibility
- Users can enable it by setting appropriate threshold values

## Impact & Results

This implementation successfully addresses the core challenges:

✅ **Performance & Stability**: Eliminates memory pressure and spilling
issues
✅ **Resource Efficiency**: Prevents waste in preemption scenarios and
dynamic environments
✅ **Configuration Simplicity**: Reduces complex user configuration
requirements
✅ **Adaptive Throughput**: Maintains consistent pipeline performance

The solution provides a foundation for more intelligent, self-adaptive
Ray Data pipelines that can handle dynamic cluster conditions while
maintaining optimal performance and resource utilization.

---

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: Masahiro Tanaka <mtanaka@anyscale.com>
tohtana pushed a commit to tohtana/ray that referenced this pull request Aug 29, 2025
…rocessing capacity (ray-project#55463)

## Summary

Implement a downstream processing capacity-based backpressure mechanism
to address stability and performance issues caused by unbalanced
processing speeds across pipeline operators due to user
misconfigurations, instance preemptions, and cluster resource scaling.

## Problem Statement

Current Ray Data pipelines face several critical challenges:

### 1. Performance & Stability Issues
- Large amounts of objects accumulate in memory while downstream
operators cannot consume them timely
- Memory resource waste and potential spilling leads to significant
performance degradation
- Pipeline instability due to memory pressure

### 2. Resource Waste in Dynamic Environments
- In preemption scenarios, the situation becomes worse as large amounts
of objects are repeatedly rebuilt when workers are preempted
- Inefficient resource utilization due to upstream-downstream speed
mismatch
- Wasted compute resources on processing data that cannot be consumed

### 3. Complex Configuration Requirements
- Users find it difficult to configure reasonable parallelism ratios
- Inappropriate configurations lead to resource waste or insufficient
throughput
- Especially challenging on elastic resources where capacity changes
dynamically

## Solution

This PR introduces `DownstreamCapacityBackpressurePolicy` that provides:

### 1. Simplified User Configuration with Adaptive Concurrency
- Ray Data automatically adjusts parallelism based on actual pipeline
performance
- When upstream is blocked due to backpressure, resources are released
to allow downstream scaling up
- Self-adaptive mechanism reduces the need for manual tuning and complex
configuration

### 2. Consistent Pipeline Throughput
- Objects output by upstream operators are consumed by downstream as
quickly as possible
- Ensures stability, saves memory resources, and avoids unnecessary
object rebuilding risks
- Maintains balanced flow throughout the entire pipeline

## Key Benefits

### 🚀 Performance Improvements
- Prevents memory bloat and reduces object spilling
- Maintains optimal memory utilization across the pipeline
- Eliminates performance degradation from memory pressure

### 🛡️ Enhanced Stability
- Handles instance preemptions gracefully
- Reduces object rebuilding in dynamic environments
- Maintains pipeline stability under varying cluster conditions

### ⚙️ Simplified Operations
- Reduces complex configuration requirements
- Provides self-adaptive parallelism adjustment
- Works effectively on elastic resources

### 💰 Resource Efficiency
- Prevents resource waste from unbalanced processing
- Optimizes resource allocation across pipeline stages
- Reduces unnecessary compute overhead

## Configuration

Users can configure the backpressure behavior via DataContext:

```python
ctx = ray.data.DataContext.get_current()

# Set ratio threshold (default: inf, disabled)
ctx.downstream_capacity_backpressure_ratio = 2.0

# Set absolute threshold (default: sys.maxsize, disabled)
ctx.downstream_capacity_backpressure_max_queued_bundles = 4000
```

### Default Behavior
- By default, backpressure is disabled (thresholds set to infinity) to
maintain backward compatibility
- Users can enable it by setting appropriate threshold values

## Impact & Results

This implementation successfully addresses the core challenges:

✅ **Performance & Stability**: Eliminates memory pressure and spilling
issues
✅ **Resource Efficiency**: Prevents waste in preemption scenarios and
dynamic environments
✅ **Configuration Simplicity**: Reduces complex user configuration
requirements
✅ **Adaptive Throughput**: Maintains consistent pipeline performance

The solution provides a foundation for more intelligent, self-adaptive
Ray Data pipelines that can handle dynamic cluster conditions while
maintaining optimal performance and resource utilization.

---

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: Masahiro Tanaka <mtanaka@anyscale.com>
jugalshah291 pushed a commit to jugalshah291/ray_fork that referenced this pull request Sep 11, 2025
…rocessing capacity (ray-project#55463)

## Summary

Implement a downstream processing capacity-based backpressure mechanism
to address stability and performance issues caused by unbalanced
processing speeds across pipeline operators due to user
misconfigurations, instance preemptions, and cluster resource scaling.

## Problem Statement

Current Ray Data pipelines face several critical challenges:

### 1. Performance & Stability Issues
- Large amounts of objects accumulate in memory while downstream
operators cannot consume them timely
- Memory resource waste and potential spilling leads to significant
performance degradation
- Pipeline instability due to memory pressure

### 2. Resource Waste in Dynamic Environments
- In preemption scenarios, the situation becomes worse as large amounts
of objects are repeatedly rebuilt when workers are preempted
- Inefficient resource utilization due to upstream-downstream speed
mismatch
- Wasted compute resources on processing data that cannot be consumed

### 3. Complex Configuration Requirements
- Users find it difficult to configure reasonable parallelism ratios
- Inappropriate configurations lead to resource waste or insufficient
throughput
- Especially challenging on elastic resources where capacity changes
dynamically

## Solution

This PR introduces `DownstreamCapacityBackpressurePolicy` that provides:

### 1. Simplified User Configuration with Adaptive Concurrency
- Ray Data automatically adjusts parallelism based on actual pipeline
performance
- When upstream is blocked due to backpressure, resources are released
to allow downstream scaling up
- Self-adaptive mechanism reduces the need for manual tuning and complex
configuration

### 2. Consistent Pipeline Throughput
- Objects output by upstream operators are consumed by downstream as
quickly as possible
- Ensures stability, saves memory resources, and avoids unnecessary
object rebuilding risks
- Maintains balanced flow throughout the entire pipeline

## Key Benefits

### 🚀 Performance Improvements
- Prevents memory bloat and reduces object spilling
- Maintains optimal memory utilization across the pipeline
- Eliminates performance degradation from memory pressure

### 🛡️ Enhanced Stability
- Handles instance preemptions gracefully
- Reduces object rebuilding in dynamic environments
- Maintains pipeline stability under varying cluster conditions

### ⚙️ Simplified Operations
- Reduces complex configuration requirements
- Provides self-adaptive parallelism adjustment
- Works effectively on elastic resources

### 💰 Resource Efficiency
- Prevents resource waste from unbalanced processing
- Optimizes resource allocation across pipeline stages
- Reduces unnecessary compute overhead

## Configuration

Users can configure the backpressure behavior via DataContext:

```python
ctx = ray.data.DataContext.get_current()

# Set ratio threshold (default: inf, disabled)
ctx.downstream_capacity_backpressure_ratio = 2.0

# Set absolute threshold (default: sys.maxsize, disabled)
ctx.downstream_capacity_backpressure_max_queued_bundles = 4000
```

### Default Behavior
- By default, backpressure is disabled (thresholds set to infinity) to
maintain backward compatibility
- Users can enable it by setting appropriate threshold values

## Impact & Results

This implementation successfully addresses the core challenges:

✅ **Performance & Stability**: Eliminates memory pressure and spilling
issues
✅ **Resource Efficiency**: Prevents waste in preemption scenarios and
dynamic environments
✅ **Configuration Simplicity**: Reduces complex user configuration
requirements
✅ **Adaptive Throughput**: Maintains consistent pipeline performance

The solution provides a foundation for more intelligent, self-adaptive
Ray Data pipelines that can handle dynamic cluster conditions while
maintaining optimal performance and resource utilization.

Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>

---

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
…rocessing capacity (#55463)

## Summary

Implement a downstream processing capacity-based backpressure mechanism
to address stability and performance issues caused by unbalanced
processing speeds across pipeline operators due to user
misconfigurations, instance preemptions, and cluster resource scaling.

## Problem Statement

Current Ray Data pipelines face several critical challenges:

### 1. Performance & Stability Issues
- Large amounts of objects accumulate in memory while downstream
operators cannot consume them timely
- Memory resource waste and potential spilling leads to significant
performance degradation
- Pipeline instability due to memory pressure

### 2. Resource Waste in Dynamic Environments
- In preemption scenarios, the situation becomes worse as large amounts
of objects are repeatedly rebuilt when workers are preempted
- Inefficient resource utilization due to upstream-downstream speed
mismatch
- Wasted compute resources on processing data that cannot be consumed

### 3. Complex Configuration Requirements
- Users find it difficult to configure reasonable parallelism ratios
- Inappropriate configurations lead to resource waste or insufficient
throughput
- Especially challenging on elastic resources where capacity changes
dynamically

## Solution

This PR introduces `DownstreamCapacityBackpressurePolicy` that provides:

### 1. Simplified User Configuration with Adaptive Concurrency
- Ray Data automatically adjusts parallelism based on actual pipeline
performance
- When upstream is blocked due to backpressure, resources are released
to allow downstream scaling up
- Self-adaptive mechanism reduces the need for manual tuning and complex
configuration

### 2. Consistent Pipeline Throughput
- Objects output by upstream operators are consumed by downstream as
quickly as possible
- Ensures stability, saves memory resources, and avoids unnecessary
object rebuilding risks
- Maintains balanced flow throughout the entire pipeline

## Key Benefits

### 🚀 Performance Improvements
- Prevents memory bloat and reduces object spilling
- Maintains optimal memory utilization across the pipeline
- Eliminates performance degradation from memory pressure

### 🛡️ Enhanced Stability
- Handles instance preemptions gracefully
- Reduces object rebuilding in dynamic environments
- Maintains pipeline stability under varying cluster conditions

### ⚙️ Simplified Operations
- Reduces complex configuration requirements
- Provides self-adaptive parallelism adjustment
- Works effectively on elastic resources

### 💰 Resource Efficiency
- Prevents resource waste from unbalanced processing
- Optimizes resource allocation across pipeline stages
- Reduces unnecessary compute overhead

## Configuration

Users can configure the backpressure behavior via DataContext:

```python
ctx = ray.data.DataContext.get_current()

# Set ratio threshold (default: inf, disabled)
ctx.downstream_capacity_backpressure_ratio = 2.0

# Set absolute threshold (default: sys.maxsize, disabled)
ctx.downstream_capacity_backpressure_max_queued_bundles = 4000
```

### Default Behavior
- By default, backpressure is disabled (thresholds set to infinity) to
maintain backward compatibility
- Users can enable it by setting appropriate threshold values

## Impact & Results

This implementation successfully addresses the core challenges:

✅ **Performance & Stability**: Eliminates memory pressure and spilling
issues
✅ **Resource Efficiency**: Prevents waste in preemption scenarios and
dynamic environments
✅ **Configuration Simplicity**: Reduces complex user configuration
requirements
✅ **Adaptive Throughput**: Maintains consistent pipeline performance

The solution provides a foundation for more intelligent, self-adaptive
Ray Data pipelines that can handle dynamic cluster conditions while
maintaining optimal performance and resource utilization.

---

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…rocessing capacity (ray-project#55463)

## Summary

Implement a downstream processing capacity-based backpressure mechanism
to address stability and performance issues caused by unbalanced
processing speeds across pipeline operators due to user
misconfigurations, instance preemptions, and cluster resource scaling.

## Problem Statement

Current Ray Data pipelines face several critical challenges:

### 1. Performance & Stability Issues
- Large amounts of objects accumulate in memory while downstream
operators cannot consume them timely
- Memory resource waste and potential spilling leads to significant
performance degradation
- Pipeline instability due to memory pressure

### 2. Resource Waste in Dynamic Environments
- In preemption scenarios, the situation becomes worse as large amounts
of objects are repeatedly rebuilt when workers are preempted
- Inefficient resource utilization due to upstream-downstream speed
mismatch
- Wasted compute resources on processing data that cannot be consumed

### 3. Complex Configuration Requirements
- Users find it difficult to configure reasonable parallelism ratios
- Inappropriate configurations lead to resource waste or insufficient
throughput
- Especially challenging on elastic resources where capacity changes
dynamically

## Solution

This PR introduces `DownstreamCapacityBackpressurePolicy` that provides:

### 1. Simplified User Configuration with Adaptive Concurrency
- Ray Data automatically adjusts parallelism based on actual pipeline
performance
- When upstream is blocked due to backpressure, resources are released
to allow downstream scaling up
- Self-adaptive mechanism reduces the need for manual tuning and complex
configuration

### 2. Consistent Pipeline Throughput
- Objects output by upstream operators are consumed by downstream as
quickly as possible
- Ensures stability, saves memory resources, and avoids unnecessary
object rebuilding risks
- Maintains balanced flow throughout the entire pipeline

## Key Benefits

### 🚀 Performance Improvements
- Prevents memory bloat and reduces object spilling
- Maintains optimal memory utilization across the pipeline
- Eliminates performance degradation from memory pressure

### 🛡️ Enhanced Stability
- Handles instance preemptions gracefully
- Reduces object rebuilding in dynamic environments
- Maintains pipeline stability under varying cluster conditions

### ⚙️ Simplified Operations
- Reduces complex configuration requirements
- Provides self-adaptive parallelism adjustment
- Works effectively on elastic resources

### 💰 Resource Efficiency
- Prevents resource waste from unbalanced processing
- Optimizes resource allocation across pipeline stages
- Reduces unnecessary compute overhead

## Configuration

Users can configure the backpressure behavior via DataContext:

```python
ctx = ray.data.DataContext.get_current()

# Set ratio threshold (default: inf, disabled)
ctx.downstream_capacity_backpressure_ratio = 2.0

# Set absolute threshold (default: sys.maxsize, disabled)  
ctx.downstream_capacity_backpressure_max_queued_bundles = 4000
```

### Default Behavior
- By default, backpressure is disabled (thresholds set to infinity) to
maintain backward compatibility
- Users can enable it by setting appropriate threshold values

## Impact & Results

This implementation successfully addresses the core challenges:

✅ **Performance & Stability**: Eliminates memory pressure and spilling
issues
✅ **Resource Efficiency**: Prevents waste in preemption scenarios and
dynamic environments
✅ **Configuration Simplicity**: Reduces complex user configuration
requirements
✅ **Adaptive Throughput**: Maintains consistent pipeline performance  

The solution provides a foundation for more intelligent, self-adaptive
Ray Data pipelines that can handle dynamic cluster conditions while
maintaining optimal performance and resource utilization.

---

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants