Skip to content
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
e9e8469
Swapped default shuffle strategy from sort-based to hash-based
alexeykudinkin Aug 12, 2025
153ef43
Revisited resource allocation for hash-shuffle operation to fallback …
alexeykudinkin Aug 12, 2025
4a5d010
Recalibrated CPU allocations for hash-shuffle
alexeykudinkin Aug 12, 2025
0ca7342
Fixing tests
alexeykudinkin Aug 12, 2025
061a7cf
`lint`
alexeykudinkin Aug 12, 2025
1aae379
Cap max number of aggregators to not exceed total # of CPUs x 4
alexeykudinkin Aug 13, 2025
ad674e2
Reduced overprovisioning factor to 2x
alexeykudinkin Aug 13, 2025
e73ed76
`lint`
alexeykudinkin Aug 13, 2025
75a18c4
Updated tests
alexeykudinkin Aug 13, 2025
7714d49
Revisited max shuffle aggregators cap computation
alexeykudinkin Aug 14, 2025
3bdc03f
Revisited shuffle-aggregator allocations to not exceed 5% of CPUs
alexeykudinkin Aug 14, 2025
dca3994
Cap max_concurrency at 8
alexeykudinkin Aug 14, 2025
33e1895
Tidying up;
alexeykudinkin Aug 14, 2025
2d171ea
Cleaned up join tests
alexeykudinkin Aug 14, 2025
f226c71
Abstracted common util tests of hash-shuffle base
alexeykudinkin Aug 14, 2025
6a83435
Fixing refs
alexeykudinkin Aug 14, 2025
f266f5c
Bumped max shuffle aggregators to 128
alexeykudinkin Aug 14, 2025
5cfc409
`lint`
alexeykudinkin Aug 14, 2025
28e8d65
Fallback to `estimated_num_outputs`
alexeykudinkin Aug 14, 2025
1cb0824
Cleaning up dead code
alexeykudinkin Aug 14, 2025
3de39d0
Make `num_partitions` an optional param for hash-shuffle ops
alexeykudinkin Aug 14, 2025
28046ad
Tidying up
alexeykudinkin Aug 14, 2025
0567b5f
Fixing copy
alexeykudinkin Sep 19, 2025
adbc0c1
Deleted dead args
alexeykudinkin Sep 19, 2025
b878f12
Implemented `estimate_num_outputs` for `Read` op
alexeykudinkin Sep 19, 2025
954e1f7
Tidying up
alexeykudinkin Sep 19, 2025
7730ec8
Rebased `HashShufflingOperatorBase` to estimate number of input op bl…
alexeykudinkin Sep 19, 2025
80109f4
Remove fallback
alexeykudinkin Sep 19, 2025
0e01f61
Fixed import
alexeykudinkin Sep 19, 2025
28cdaef
Missing clean up of dead args
alexeykudinkin Sep 19, 2025
48ab514
Deleted useless test
alexeykudinkin Sep 19, 2025
2b73135
Invalid ref
alexeykudinkin Sep 19, 2025
719c5f1
Fixing tests
alexeykudinkin Sep 19, 2025
cb18cf4
Fixed ref
alexeykudinkin Sep 19, 2025
e43e5bd
Bumped tests up to run on 2 CPUs
alexeykudinkin Sep 19, 2025
4585612
`lint`
alexeykudinkin Sep 19, 2025
36ea0da
Added missing bazel def
alexeykudinkin Sep 19, 2025
8e38dc7
Updated baseline
alexeykudinkin Sep 19, 2025
90b1219
Tidying up
alexeykudinkin Sep 19, 2025
c0a74d3
Fix handling of the joins having multiple inputs
alexeykudinkin Sep 19, 2025
1a99010
Fixed aggregator memory estimation to fallback to DEFAULT_TARGET_MAX_…
alexeykudinkin Sep 19, 2025
235190f
Added estimation for partition size hint
alexeykudinkin Sep 19, 2025
1598705
Tidying up
alexeykudinkin Sep 19, 2025
6dbed1b
Revisited HA aggregators num_cpus estimation
alexeykudinkin Sep 19, 2025
d556195
Cleaned up assertion
alexeykudinkin Sep 19, 2025
d0a7add
Fixed HS ops to bind name to actual, derived number of partitions
alexeykudinkin Sep 19, 2025
b2b5a37
`lint`
alexeykudinkin Sep 19, 2025
e2a058c
Restore `num_outputs` ctor param
alexeykudinkin Sep 19, 2025
8eedbbf
Tidying up
alexeykudinkin Sep 19, 2025
56a696b
Updated baseline
alexeykudinkin Sep 19, 2025
030b5a0
Fixed invalid ref
alexeykudinkin Sep 19, 2025
19c81c3
Rebased partition size estimate w/ dataset size estimate
alexeykudinkin Sep 19, 2025
e11d3a9
Log input block size estimate
alexeykudinkin Sep 19, 2025
9ffd52c
Revisited test to make it E2E
alexeykudinkin Sep 19, 2025
a204408
Fixed Join op memory estimation
alexeykudinkin Sep 19, 2025
5b0d932
Expanded the test
alexeykudinkin Sep 19, 2025
a978346
Streamline dataset size estimation
alexeykudinkin Sep 20, 2025
96b6f71
Updated test fixtures
alexeykudinkin Sep 20, 2025
458ff91
Fixed num_cpus derivation from memory
alexeykudinkin Sep 20, 2025
a34775a
Added test
alexeykudinkin Sep 20, 2025
9463ecc
Added test covering absent dataset size estimates
alexeykudinkin Sep 20, 2025
3c7e1cf
Relocated to test_join.py
alexeykudinkin Sep 20, 2025
96bd44d
Fixed HashShuffle/Aggregate utils
alexeykudinkin Sep 20, 2025
b2b2b7a
Added tests for HS/HA;
alexeykudinkin Sep 20, 2025
ce9c243
Missing imports
alexeykudinkin Sep 20, 2025
b880347
Updated fixtures
alexeykudinkin Sep 20, 2025
25e700b
Round up to 2 decimals
alexeykudinkin Sep 20, 2025
5837502
Fixing more tests
alexeykudinkin Sep 20, 2025
3786209
Fixed mocking
alexeykudinkin Sep 20, 2025
e0cc34f
`lint`
alexeykudinkin Sep 20, 2025
dcb1807
Fix handling empty lists
alexeykudinkin Sep 20, 2025
9fee679
`lint`
alexeykudinkin Sep 20, 2025
20a6c92
Reverting changes
alexeykudinkin Sep 20, 2025
6beb1fd
`lint`
alexeykudinkin Sep 20, 2025
1a65577
Fallback to 2Gb memory request per aggregator, when can't accurately …
alexeykudinkin Sep 20, 2025
6318cca
Tidying up
alexeykudinkin Sep 20, 2025
58a7ef9
Updated fixtures
alexeykudinkin Sep 20, 2025
90c6926
Fixing test
alexeykudinkin Sep 20, 2025
3752240
`lint`
alexeykudinkin Sep 20, 2025
31cef95
Updated test fixture
alexeykudinkin Sep 20, 2025
da42fa6
Fixed derivation of max num of hash-shuffle aggregators to be bounded…
alexeykudinkin Sep 20, 2025
16d6f16
Improved dataset estimation in cases when number of outputs of the pr…
alexeykudinkin Sep 20, 2025
53ac624
Added new test;
alexeykudinkin Sep 20, 2025
3073e6b
`lint`
alexeykudinkin Sep 20, 2025
f902b2f
Reverted estimation based on the number of outputs
alexeykudinkin Sep 21, 2025
33c47d4
Improving logging
alexeykudinkin Sep 21, 2025
515e01e
Lowered default fallback to 1Gb
alexeykudinkin Sep 21, 2025
dadfa44
Revised shuffle task memory allocation
alexeykudinkin Sep 21, 2025
f2df2c5
`lint`
alexeykudinkin Sep 21, 2025
087ec07
Updated fixtures
alexeykudinkin Sep 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/ray/data/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,20 @@ py_test(
],
)

py_test(
name = "test_hash_shuffle",
size = "small",
srcs = ["tests/test_hash_shuffle.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_hudi",
size = "medium",
Expand Down
56 changes: 21 additions & 35 deletions python/ray/data/_internal/execution/operators/hash_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
HashShufflingOperatorBase,
StatefulShuffleAggregation,
)
from ray.data._internal.util import GiB
from ray.data._internal.util import GiB, MiB
from ray.data.aggregate import AggregateFn
from ray.data.block import Block, BlockAccessor
from ray.data.context import DataContext
Expand Down Expand Up @@ -111,15 +111,13 @@ def __init__(
key_columns: Tuple[str],
aggregation_fns: Tuple[AggregateFn],
*,
num_partitions: int,
num_partitions: Optional[int] = None,
aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None,
):
super().__init__(
name=(
f"HashAggregate("
f"num_partitions={num_partitions}, "
f"key_columns={key_columns}"
f")"
name_factory=(
lambda num_partitions: f"HashAggregate(key_columns={key_columns}, "
f"num_partitions={num_partitions})"
),
input_ops=[input_op],
data_context=data_context,
Expand Down Expand Up @@ -147,39 +145,25 @@ def __init__(
finalize_progress_bar_name="Aggregation",
)

def _get_default_num_cpus_per_partition(self) -> int:
"""
CPU allocation for aggregating actors of Aggregate operator is calculated as:
num_cpus (per partition) = CPU budget / # partitions

Assuming:
- Default number of partitions: 200
- Total operator's CPU budget with default settings: 2 cores
- Number of CPUs per partition: 2 / 200 = 0.01

These CPU budgets are derived such that Ray Data pipeline could run on a
single node (using the default settings).
"""
return 0.01

def _get_operator_num_cpus_per_partition_override(self) -> int:
return (
self.data_context.hash_aggregate_operator_actor_num_cpus_per_partition_override
)
def _get_operator_num_cpus_override(self) -> float:
return self.data_context.hash_aggregate_operator_actor_num_cpus_override

@classmethod
def _estimate_aggregator_memory_allocation(
cls,
*,
num_aggregators: int,
num_partitions: int,
partition_byte_size_estimate: int,
estimated_dataset_bytes: int,
) -> int:
dataset_size = num_partitions * partition_byte_size_estimate
partition_byte_size_estimate = math.ceil(
estimated_dataset_bytes / num_partitions
)

# Estimate of object store memory required to accommodate all partitions
# handled by a single aggregator
aggregator_shuffle_object_store_memory_required: int = math.ceil(
dataset_size / num_aggregators
estimated_dataset_bytes / num_aggregators
)
# Estimate of memory required to accommodate single partition as an output
# (inside Object Store)
Expand All @@ -193,12 +177,14 @@ def _estimate_aggregator_memory_allocation(
output_object_store_memory_required
)

logger.debug(
f"Estimated memory requirement for aggregating operator "
f"(partitions={num_partitions}, aggregators={num_aggregators}): "
f"shuffle={aggregator_shuffle_object_store_memory_required / GiB:.2f}GiB, "
f"output={output_object_store_memory_required / GiB:.2f}GiB, "
f"total={aggregator_total_memory_required / GiB:.2f}GiB, "
logger.info(
f"Estimated memory requirement for aggregating aggregator "
f"(partitions={num_partitions}, "
f"aggregators={num_aggregators}, "
f"dataset (estimate)={estimated_dataset_bytes / GiB:.1f}GiB): "
f"shuffle={aggregator_shuffle_object_store_memory_required / MiB:.1f}MiB, "
f"output={output_object_store_memory_required / MiB:.1f}MiB, "
f"total={aggregator_total_memory_required / MiB:.1f}MiB, "
)

return aggregator_total_memory_required
Expand Down
Loading