Skip to content

Commit 29462a4

Browse files
kyudslandscapepainter
authored andcommitted
[data] Improve execution progress rendering (ray-project#56992)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <img width="1263" height="859" alt="Screenshot 2025-10-07 at 9 46 09 PM" src="https://github.com/user-attachments/assets/45249e77-af49-4e3d-a758-608a51d15e10" /> [Link to video recording](https://drive.google.com/file/d/13Erjd_K4OXmn1r_7iMS97u8cgUdBgTvG/view?usp=sharing) Currently, by default, the original `tqdm` based progress is used. To enable `rich` progress reporting as shown in the screenshot, set: ``` ray.data.DataContext.get_current().enable_rich_progress_bars = True ``` or set the envvar: ``` export RAY_DATA_ENABLE_RICH_PROGRESS_BARS=1 ``` <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number Fixes ray-project#52505 <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Replaces legacy per-operator/global progress bars with a Rich-based progress manager that tracks global/operator progress and resources, refactors topology/progress APIs, and updates tests. > > - **Execution Progress (Rich-based)**: > - Introduces `progress_manager.py` with `RichExecutionProgressManager` for global/operator progress, rates, elapsed/remaining time, and live resource usage. > - Streaming executor integrates manager (start/refresh/close, finishing messages), updates on row/output and resources, and periodic refresh via `PROGRESS_MANAGER_UPDATE_INTERVAL`. > - **Operator/State Refactor**: > - `OpState` gains `OpDisplayMetrics`, `progress_manager_uuid`, `output_row_count`, and `update_display_metrics`; removes legacy progress bar handling and summary methods. > - `_debug_dump_topology` now logs `op_display_metrics.display_str()`. > - Minor: add TODOs on sub-progress-bar helpers in `AllToAllOperator` and `HashShuffleProgressBarMixin`. > - **Topology API**: > - `build_streaming_topology(...)` now returns only `Topology` (no progress bar count); all call sites and tests updated. > - **Iterator/Reporting**: > - `_ClosingIterator` updates total progress via manager. > - Resource reporting moved to progress manager. > - **Tests**: > - Adjust unit tests to new topology return type and removed progress bar expectations. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 935f3c3. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Daniel Shin <kyuseung1016@gmail.com> Signed-off-by: kyuds <kyuseung1016@gmail.com> Signed-off-by: kyuds <kyuds@everspin.co.kr>
1 parent a56c1d8 commit 29462a4

File tree

12 files changed

+905
-194
lines changed

12 files changed

+905
-194
lines changed

doc/source/data/monitoring-your-workload.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ For operator names longer than a threshold of 100 characters, Ray Data truncates
4040
* To turn off this behavior and show the full operator name, set `DataContext.get_current().enable_progress_bar_name_truncation = False`.
4141
* To change the threshold of truncating the name, update the constant `ray.data._internal.progress_bar.ProgressBar.MAX_NAME_LENGTH = 42`.
4242

43+
.. tip::
44+
There is a new experimental console UI to show progress bars. Set `DataContext.get_current().enable_rich_progress_bars = True` or set the `RAY_DATA_ENABLE_RICH_PROGRESS_BARS=1` environment variable to enable.
45+
4346
.. _ray-data-dashboard:
4447

4548
Ray Data dashboard

python/ray/data/_internal/execution/operators/base_physical_operator.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import abc
1+
from abc import ABC, abstractmethod
22
from typing import List, Optional
33

44
from ray.data._internal.execution.interfaces import (
@@ -8,13 +8,14 @@
88
TaskContext,
99
)
1010
from ray.data._internal.execution.interfaces.physical_operator import _create_sub_pb
11+
from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin
1112
from ray.data._internal.logical.interfaces import LogicalOperator
1213
from ray.data._internal.stats import StatsDict
1314
from ray.data.context import DataContext
1415

1516

16-
class InternalQueueOperatorMixin(PhysicalOperator, abc.ABC):
17-
@abc.abstractmethod
17+
class InternalQueueOperatorMixin(PhysicalOperator, ABC):
18+
@abstractmethod
1819
def internal_queue_size(self) -> int:
1920
"""Returns Operator's internal queue size"""
2021
...
@@ -47,7 +48,9 @@ def input_dependency(self) -> PhysicalOperator:
4748
return self.input_dependencies[0]
4849

4950

50-
class AllToAllOperator(InternalQueueOperatorMixin, PhysicalOperator):
51+
class AllToAllOperator(
52+
InternalQueueOperatorMixin, SubProgressBarMixin, PhysicalOperator
53+
):
5154
"""A blocking operator that executes once its inputs are complete.
5255
5356
This operator implements distributed sort / shuffle operations, etc.
@@ -169,6 +172,15 @@ def close_sub_progress_bars(self):
169172
for sub_bar in self._sub_progress_bar_dict.values():
170173
sub_bar.close()
171174

175+
def get_sub_progress_bar_names(self) -> Optional[List[str]]:
176+
return self._sub_progress_bar_names
177+
178+
def set_sub_progress_bar(self, name, pg):
179+
# not type-checking due to circular imports
180+
if self._sub_progress_bar_dict is None:
181+
self._sub_progress_bar_dict = {}
182+
self._sub_progress_bar_dict[name] = pg
183+
172184
def supports_fusion(self):
173185
return True
174186

python/ray/data/_internal/execution/operators/hash_shuffle.py

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
_create_sub_pb,
4646
estimate_total_num_of_blocks,
4747
)
48+
from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin
4849
from ray.data._internal.logical.interfaces import LogicalOperator
4950
from ray.data._internal.stats import OpRuntimeMetrics
5051
from ray.data._internal.table_block import TableBlockAccessor
@@ -363,7 +364,7 @@ def combine(one: "_PartitionStats", other: "_PartitionStats") -> "_PartitionStat
363364
)
364365

365366

366-
class HashShuffleProgressBarMixin(abc.ABC):
367+
class HashShuffleProgressBarMixin(SubProgressBarMixin):
367368
@property
368369
@abc.abstractmethod
369370
def shuffle_name(self) -> str:
@@ -374,26 +375,29 @@ def shuffle_name(self) -> str:
374375
def reduce_name(self) -> str:
375376
...
376377

378+
def _validate_sub_progress_bar_names(self):
379+
assert self.shuffle_name is not None, "shuffle_name should not be None"
380+
assert self.reduce_name is not None, "reduce_name should not be None"
381+
377382
def initialize_sub_progress_bars(self, position: int) -> int:
378-
"""Display all sub progres bars in the termainl, and return the number of bars."""
383+
"""Display all sub progress bars in the termainl, and return the number of bars."""
384+
self._validate_sub_progress_bar_names()
379385

380386
# shuffle
381387
progress_bars_created = 0
382388
self.shuffle_bar = None
383-
if self.shuffle_name is not None:
384-
self.shuffle_bar, position = _create_sub_pb(
385-
self.shuffle_name, self.num_output_rows_total(), position
386-
)
387-
progress_bars_created += 1
389+
self.shuffle_bar, position = _create_sub_pb(
390+
self.shuffle_name, self.num_output_rows_total(), position
391+
)
392+
progress_bars_created += 1
388393
self.shuffle_metrics = OpRuntimeMetrics(self)
389394

390395
# reduce
391396
self.reduce_bar = None
392-
if self.reduce_name is not None:
393-
self.reduce_bar, position = _create_sub_pb(
394-
self.reduce_name, self.num_output_rows_total(), position
395-
)
396-
progress_bars_created += 1
397+
self.reduce_bar, position = _create_sub_pb(
398+
self.reduce_name, self.num_output_rows_total(), position
399+
)
400+
progress_bars_created += 1
397401
self.reduce_metrics = OpRuntimeMetrics(self)
398402

399403
return progress_bars_created
@@ -403,6 +407,27 @@ def close_sub_progress_bars(self):
403407
self.shuffle_bar.close()
404408
self.reduce_bar.close()
405409

410+
def get_sub_progress_bar_names(self) -> Optional[List[str]]:
411+
self._validate_sub_progress_bar_names()
412+
413+
# shuffle
414+
self.shuffle_bar = None
415+
self.shuffle_metrics = OpRuntimeMetrics(self)
416+
417+
# reduce
418+
self.reduce_bar = None
419+
self.reduce_metrics = OpRuntimeMetrics(self)
420+
421+
return [self.shuffle_name, self.reduce_name]
422+
423+
def set_sub_progress_bar(self, name, pg):
424+
# No type-hints due to circular imports. `name` should be a `str`
425+
# and `pg` should be a `SubProgressBar`
426+
if self.shuffle_name is not None and self.shuffle_name == name:
427+
self.shuffle_bar = pg
428+
elif self.reduce_name is not None and self.reduce_name == name:
429+
self.reduce_bar = pg
430+
406431

407432
def _derive_max_shuffle_aggregators(
408433
total_cluster_resources: ExecutionResources,
@@ -705,7 +730,7 @@ def _on_partitioning_done(cur_shuffle_task_idx: int):
705730
self.shuffle_metrics.on_task_finished(cur_shuffle_task_idx, None)
706731

707732
# Update Shuffle progress bar
708-
self.shuffle_bar.update(i=input_block_metadata.num_rows)
733+
self.shuffle_bar.update(increment=input_block_metadata.num_rows or 0)
709734

710735
# TODO update metrics
711736
task = self._shuffling_tasks[input_index][
@@ -823,7 +848,7 @@ def _on_bundle_ready(partition_id: int, bundle: RefBundle):
823848

824849
# Update Finalize progress bar
825850
self.reduce_bar.update(
826-
i=bundle.num_rows(), total=self.num_output_rows_total()
851+
increment=bundle.num_rows() or 0, total=self.num_output_rows_total()
827852
)
828853

829854
def _on_aggregation_done(partition_id: int, exc: Optional[Exception]):
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from abc import ABC, abstractmethod
2+
from typing import List, Optional
3+
4+
5+
class SubProgressBarMixin(ABC):
6+
"""Abstract class for operators that support sub-progress bars"""
7+
8+
@abstractmethod
9+
def get_sub_progress_bar_names(self) -> Optional[List[str]]:
10+
"""
11+
Returns list of sub-progress bar names
12+
13+
This is used to create the sub-progress bars in the progress manager.
14+
Note that sub-progress bars will be created in the order returned by
15+
this method.
16+
"""
17+
...
18+
19+
@abstractmethod
20+
def set_sub_progress_bar(self, name, pg):
21+
"""
22+
Sets sub-progress bars
23+
24+
name: name of sub-progress bar
25+
pg: SubProgressBar instance (progress_manager.py)
26+
"""
27+
# Skipping type-checking for circular imports
28+
...

0 commit comments

Comments
 (0)