Skip to content

Commit 0d8bbce

Browse files
alexeykudinkinharshit-anyscale
authored andcommitted
[Data] Fixed retry policy for hash-shuffle tasks (#57572)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Currently, Hash shuffle relies on default settings of `max_retries` for tasks which is incorrect. Instead, we explicitly configure it to be retrying indefinitely. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
1 parent 3a90fa2 commit 0d8bbce

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

python/ray/data/_internal/execution/operators/hash_shuffle.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,8 @@ def _do_add_input_inner(self, input_bundle: RefBundle, input_index: int):
657657
] = _shuffle_block.options(
658658
**shuffle_task_resource_bundle,
659659
num_returns=1,
660+
# Make sure tasks are retried indefinitely
661+
max_retries=-1,
660662
).remote(
661663
block_ref,
662664
input_index,
@@ -896,7 +898,7 @@ def _on_aggregation_done(partition_id: int, exc: Optional[Exception]):
896898

897899
# Request finalization of the partition
898900
block_gen = aggregator.finalize.options(
899-
**finalize_task_resource_bundle
901+
**finalize_task_resource_bundle,
900902
).remote(partition_id)
901903

902904
self._finalizing_tasks[partition_id] = DataOpTask(
@@ -1484,7 +1486,10 @@ def start_health_monitoring(self):
14841486
self._pending_aggregators_refs = None
14851487

14861488

1487-
@ray.remote
1489+
@ray.remote(
1490+
# Make sure tasks are retried indefinitely
1491+
max_task_retries=-1
1492+
)
14881493
class HashShuffleAggregator:
14891494
"""Actor handling of the assigned partitions during hash-shuffle operation
14901495

0 commit comments

Comments
 (0)