[Data] Add local aggregation fast-path for small datasets (#61016)#61402
[Data] Add local aggregation fast-path for small datasets (#61016)#61402kaveti wants to merge 3 commits intoray-project:masterfrom
Conversation
…t#61016) For small datasets (below a configurable threshold, default 10 MiB), groupby/aggregate now executes entirely on the driver using existing map/reduce primitives instead of spawning a distributed actor pool. This eliminates actor startup and coordination overhead that caused ~350x slowdown vs pandas on 1M-row single-node workloads. The fast-path is controlled by: - DataContext.small_dataset_agg_threshold_bytes (default: 10 MiB) - Env var: RAY_DATA_SMALL_DATASET_AGG_THRESHOLD_BYTES Set threshold to 0 to always use distributed aggregation. Fixes ray-project#61016 Signed-off-by: rkaveti <kavetiraviteja1992@gmail.com>
46363c8 to
9c76ece
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces a valuable optimization for groupby/aggregate operations on small datasets by adding a local aggregation fast-path. The implementation is clean, well-contained, and effectively reuses existing map/reduce primitives to avoid the overhead of a distributed actor pool. The new feature is controlled by a configurable threshold, which is a good design choice. I have one minor suggestion to improve memory efficiency.
The previous implementation placed the small-dataset fast-path inside generate_aggregate_fn, but the default shuffle strategy (HASH_SHUFFLE) bypasses that function entirely via plan_all_to_all_op.py, making the fast-path unreachable in the common case. Fix: - plan_all_to_all_op: when threshold > 0 and HASH_SHUFFLE is set, fall through to generate_aggregate_fn (AllToAllOperator) instead of immediately returning HashAggregateOperator. Small data gets the local fast-path; large data falls back to sort-based distributed agg. Set threshold=0 to always use HashAggregateOperator unchanged. - aggregate.py: remove the sort-strategy-only assert and default the large-data scheduler to pull-based (covers HASH_SHUFFLE fallback). Signed-off-by: rkaveti <kavetiraviteja1992@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: rkaveti <kavetiraviteja1992@gmail.com>
| This avoids the overhead of spawning a distributed actor pool when the | ||
| total input data size is below the configured threshold. | ||
| """ | ||
| blocks = ray.get(ref for bundle in refs for ref in bundle.block_refs) |
There was a problem hiding this comment.
Generator expression passed to ray.get causes ValueError
High Severity
ray.get() is called with a bare generator expression instead of a list. Internally, ray.get checks isinstance(object_refs, list) and raises a ValueError if the argument is not a list or ObjectRef. A generator expression is neither, so _local_aggregate will always crash at runtime, making the entire small-dataset fast-path non-functional.
| # Otherwise fall through to generate_aggregate_fn, which will run | ||
| # local aggregation for small datasets and sort-based distributed | ||
| # aggregation for larger ones. Users can set the threshold to 0 to | ||
| # always use hash-shuffle aggregation. |
There was a problem hiding this comment.
Hash shuffle silently falls back to sort shuffle
Medium Severity
When shuffle_strategy is HASH_SHUFFLE and small_dataset_agg_threshold_bytes > 0 (the default), datasets exceeding the threshold silently fall through to generate_aggregate_fn, which uses PullBasedShuffleTaskScheduler instead of the HashAggregateOperator the user configured. This is a behavioral regression — users who explicitly chose HASH_SHUFFLE get sort-based aggregation for large datasets without any warning.


For small datasets (below a configurable threshold, default 10 MiB), groupby/aggregate now executes entirely on the driver using existing map/reduce primitives instead of spawning a distributed actor pool. This eliminates actor startup and coordination overhead that caused ~350x slowdown vs pandas on 1M-row single-node workloads.
The fast-path is controlled by:
Set threshold to 0 to always use distributed aggregation.
Fixes #61016
Description
Related issues
Additional information