Skip to content

Commit 28fb6c3

Browse files
[Data] Switched default shuffle strategy from sort-based to hash-based (#55510)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Hash-based shuffle has been around for some time now bringing clear performance advantages in our internal benchmarks and tests. Therefore we're switching default shuffle-strategy from existing (legacy) range-sort based one to a hash-shuffle. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
1 parent b32f0d9 commit 28fb6c3

24 files changed

+925
-376
lines changed

python/ray/data/BUILD.bazel

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,20 @@ py_test(
588588
],
589589
)
590590

591+
py_test(
592+
name = "test_hash_shuffle",
593+
size = "small",
594+
srcs = ["tests/test_hash_shuffle.py"],
595+
tags = [
596+
"exclusive",
597+
"team:data",
598+
],
599+
deps = [
600+
":conftest",
601+
"//:ray_lib",
602+
],
603+
)
604+
591605
py_test(
592606
name = "test_hudi",
593607
size = "medium",

python/ray/data/_internal/execution/operators/hash_aggregate.py

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
HashShufflingOperatorBase,
1010
StatefulShuffleAggregation,
1111
)
12-
from ray.data._internal.util import GiB
12+
from ray.data._internal.util import GiB, MiB
1313
from ray.data.aggregate import AggregateFn
1414
from ray.data.block import Block, BlockAccessor
1515
from ray.data.context import DataContext
@@ -111,15 +111,13 @@ def __init__(
111111
key_columns: Tuple[str],
112112
aggregation_fns: Tuple[AggregateFn],
113113
*,
114-
num_partitions: int,
114+
num_partitions: Optional[int] = None,
115115
aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None,
116116
):
117117
super().__init__(
118-
name=(
119-
f"HashAggregate("
120-
f"num_partitions={num_partitions}, "
121-
f"key_columns={key_columns}"
122-
f")"
118+
name_factory=(
119+
lambda num_partitions: f"HashAggregate(key_columns={key_columns}, "
120+
f"num_partitions={num_partitions})"
123121
),
124122
input_ops=[input_op],
125123
data_context=data_context,
@@ -147,39 +145,25 @@ def __init__(
147145
finalize_progress_bar_name="Aggregation",
148146
)
149147

150-
def _get_default_num_cpus_per_partition(self) -> int:
151-
"""
152-
CPU allocation for aggregating actors of Aggregate operator is calculated as:
153-
num_cpus (per partition) = CPU budget / # partitions
154-
155-
Assuming:
156-
- Default number of partitions: 200
157-
- Total operator's CPU budget with default settings: 2 cores
158-
- Number of CPUs per partition: 2 / 200 = 0.01
159-
160-
These CPU budgets are derived such that Ray Data pipeline could run on a
161-
single node (using the default settings).
162-
"""
163-
return 0.01
164-
165-
def _get_operator_num_cpus_per_partition_override(self) -> int:
166-
return (
167-
self.data_context.hash_aggregate_operator_actor_num_cpus_per_partition_override
168-
)
148+
def _get_operator_num_cpus_override(self) -> float:
149+
return self.data_context.hash_aggregate_operator_actor_num_cpus_override
169150

170151
@classmethod
171152
def _estimate_aggregator_memory_allocation(
172153
cls,
173154
*,
174155
num_aggregators: int,
175156
num_partitions: int,
176-
partition_byte_size_estimate: int,
157+
estimated_dataset_bytes: int,
177158
) -> int:
178-
dataset_size = num_partitions * partition_byte_size_estimate
159+
partition_byte_size_estimate = math.ceil(
160+
estimated_dataset_bytes / num_partitions
161+
)
162+
179163
# Estimate of object store memory required to accommodate all partitions
180164
# handled by a single aggregator
181165
aggregator_shuffle_object_store_memory_required: int = math.ceil(
182-
dataset_size / num_aggregators
166+
estimated_dataset_bytes / num_aggregators
183167
)
184168
# Estimate of memory required to accommodate single partition as an output
185169
# (inside Object Store)
@@ -193,12 +177,14 @@ def _estimate_aggregator_memory_allocation(
193177
output_object_store_memory_required
194178
)
195179

196-
logger.debug(
197-
f"Estimated memory requirement for aggregating operator "
198-
f"(partitions={num_partitions}, aggregators={num_aggregators}): "
199-
f"shuffle={aggregator_shuffle_object_store_memory_required / GiB:.2f}GiB, "
200-
f"output={output_object_store_memory_required / GiB:.2f}GiB, "
201-
f"total={aggregator_total_memory_required / GiB:.2f}GiB, "
180+
logger.info(
181+
f"Estimated memory requirement for aggregating aggregator "
182+
f"(partitions={num_partitions}, "
183+
f"aggregators={num_aggregators}, "
184+
f"dataset (estimate)={estimated_dataset_bytes / GiB:.1f}GiB): "
185+
f"shuffle={aggregator_shuffle_object_store_memory_required / MiB:.1f}MiB, "
186+
f"output={output_object_store_memory_required / MiB:.1f}MiB, "
187+
f"total={aggregator_total_memory_required / MiB:.1f}MiB, "
202188
)
203189

204190
return aggregator_total_memory_required

0 commit comments

Comments
 (0)