[Data] Concurrency Cap Backpressure tuning#58163
Conversation
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
BenchmarkBaseline vs After |
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
| op_budget.object_store_memory / op_usage.object_store_memory | ||
| > self.OBJECT_STORE_USAGE_RATIO | ||
| ): | ||
| return running < self._concurrency_caps[op] |
There was a problem hiding this comment.
Hold on, what's this for?
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Show resolved
Hide resolved
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Show resolved
Hide resolved
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the ConcurrencyCapBackpressurePolicy to use a simpler deadband-based algorithm for adjusting concurrency, which improves clarity and maintainability. The changes also make the policy's parameters configurable via environment variables.
My review focuses on the correctness of the new algorithm and its implementation. I've found a high-severity issue where the object store memory pressure check appears to be logically inverted, which could lead to incorrect backpressure behavior. I've also pointed out a related confusing comment and a minor typo in the docstring. Additionally, the corresponding unit test for the memory pressure check will need to be updated once the main logic is fixed.
Overall, this is a positive change that simplifies the system, and with the suggested fixes, it will be a solid improvement.
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
…rency_cap_backpressure_policy.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Show resolved
Hide resolved
raulchen
left a comment
There was a problem hiding this comment.
Approving as this change per se makes sense.
But I think we need to revisit the whole policy again, after we fix all the accounting issues, e.g., prefetched data.
Because when that is done, this should no longer be needed to prevent spilling.
But this may still be useful for smoothing out sudden jumps.
So I'd suggest keeping it experimental for now and do more experiments.
Also, we should separate out the smoothing logic as standalone backpressure policy.
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency Cap Backpressure tuning - Maintain asymmetric EWMA of total queued bytes (this op + downstream) as the typical level: level. - Maintain asymmetric EWMA of absolute residual vs the previous level as a scale proxy: dev = EWMA(|q - level_prev|). - Define deadband: [lower, upper] = [level - K_DEVdev, level + K_DEVdev]. If q > upper -> target cap = running - BACKOFF_FACTOR (back off) If q < lower -> target cap = running + RAMPUP_FACTOR (ramp up) Else -> target cap = running (hold) - Clamp to [1, configured_cap], admit iff running < target cap. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency Cap Backpressure tuning - Maintain asymmetric EWMA of total queued bytes (this op + downstream) as the typical level: level. - Maintain asymmetric EWMA of absolute residual vs the previous level as a scale proxy: dev = EWMA(|q - level_prev|). - Define deadband: [lower, upper] = [level - K_DEVdev, level + K_DEVdev]. If q > upper -> target cap = running - BACKOFF_FACTOR (back off) If q < lower -> target cap = running + RAMPUP_FACTOR (ramp up) Else -> target cap = running (hold) - Clamp to [1, configured_cap], admit iff running < target cap. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency Cap Backpressure tuning - Maintain asymmetric EWMA of total queued bytes (this op + downstream) as the typical level: level. - Maintain asymmetric EWMA of absolute residual vs the previous level as a scale proxy: dev = EWMA(|q - level_prev|). - Define deadband: [lower, upper] = [level - K_DEVdev, level + K_DEVdev]. If q > upper -> target cap = running - BACKOFF_FACTOR (back off) If q < lower -> target cap = running + RAMPUP_FACTOR (ramp up) Else -> target cap = running (hold) - Clamp to [1, configured_cap], admit iff running < target cap. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency Cap Backpressure tuning - Maintain asymmetric EWMA of total queued bytes (this op + downstream) as the typical level: level. - Maintain asymmetric EWMA of absolute residual vs the previous level as a scale proxy: dev = EWMA(|q - level_prev|). - Define deadband: [lower, upper] = [level - K_DEVdev, level + K_DEVdev]. If q > upper -> target cap = running - BACKOFF_FACTOR (back off) If q < lower -> target cap = running + RAMPUP_FACTOR (ramp up) Else -> target cap = running (hold) - Clamp to [1, configured_cap], admit iff running < target cap. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency Cap Backpressure tuning - Maintain asymmetric EWMA of total queued bytes (this op + downstream) as the typical level: level. - Maintain asymmetric EWMA of absolute residual vs the previous level as a scale proxy: dev = EWMA(|q - level_prev|). - Define deadband: [lower, upper] = [level - K_DEVdev, level + K_DEVdev]. If q > upper -> target cap = running - BACKOFF_FACTOR (back off) If q < lower -> target cap = running + RAMPUP_FACTOR (ramp up) Else -> target cap = running (hold) - Clamp to [1, configured_cap], admit iff running < target cap. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Concurrency Cap Backpressure tuning - Maintain asymmetric EWMA of total queued bytes (this op + downstream) as the typical level: level. - Maintain asymmetric EWMA of absolute residual vs the previous level as a scale proxy: dev = EWMA(|q - level_prev|). - Define deadband: [lower, upper] = [level - K_DEVdev, level + K_DEVdev]. If q > upper -> target cap = running - BACKOFF_FACTOR (back off) If q < lower -> target cap = running + RAMPUP_FACTOR (ramp up) Else -> target cap = running (hold) - Clamp to [1, configured_cap], admit iff running < target cap. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
[Data] Concurrency Cap Backpressure tuning
If q > upper -> target cap = running - BACKOFF_FACTOR (back off)
If q < lower -> target cap = running + RAMPUP_FACTOR (ramp up)
Else -> target cap = running (hold)
Related issues
Additional information