Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
626c87b
attempt #1: total progress only
kyuds Sep 28, 2025
225f4f2
format
kyuds Sep 28, 2025
c828f85
incorrect boolean fix
kyuds Sep 28, 2025
2b7e58b
introduce locks, custom time, fix bugs
kyuds Sep 28, 2025
d627f3e
format
kyuds Sep 28, 2025
bfc08d7
rich import check
kyuds Sep 28, 2025
27551e0
.
kyuds Sep 28, 2025
814228b
finish v1
kyuds Sep 29, 2025
fc04b23
fix bug from rebase
kyuds Sep 29, 2025
b794aca
add assertion statement from before
kyuds Sep 29, 2025
5b1c3f1
reflect review comments
kyuds Sep 29, 2025
a3dd050
rate calculation refinement
kyuds Sep 29, 2025
d043eec
reflect review comments
kyuds Sep 29, 2025
fb23fc6
non-inf initialization for total for progress
kyuds Sep 29, 2025
d8e1c49
add support for sub-progress bars
kyuds Oct 7, 2025
c5fdfa9
remove unnecessary function
kyuds Oct 7, 2025
e084a9f
better docstring
kyuds Oct 7, 2025
c57dfe4
fix invalid assertion
kyuds Oct 7, 2025
a9a5c52
revive summary_str to get tests to pass
kyuds Oct 7, 2025
b5893ab
nit
kyuds Oct 10, 2025
677e1f9
better completions
kyuds Oct 10, 2025
9c5483a
add support for None num_rows
kyuds Oct 10, 2025
fdec716
fix start time init
kyuds Oct 10, 2025
f236bb9
direct rich output to stderr same as tqdm
kyuds Oct 10, 2025
c932142
always finish when complete
kyuds Oct 10, 2025
62365c4
allow toggle between tqdm and rich progress
kyuds Oct 11, 2025
36783f3
remove outdated comment
kyuds Oct 11, 2025
55df375
cleanup
kyuds Oct 11, 2025
793bee0
fix typing errors
kyuds Oct 11, 2025
b2f8316
update docs
kyuds Oct 11, 2025
e2d5b96
trigger ci
kyuds Oct 11, 2025
d405f97
Merge branch 'master' into better-progress-bar
kyuds Oct 14, 2025
cd54f7b
reflect review comments
kyuds Oct 14, 2025
66e6347
sanity type improvement
kyuds Oct 14, 2025
e869e0b
add backpressure
kyuds Oct 14, 2025
e8a8167
change envvar name
kyuds Oct 14, 2025
46a6d73
Merge branch 'master' into better-progress-bar
kyuds Oct 14, 2025
01f00bd
remove locks
kyuds Oct 15, 2025
3d3ab72
correct rate string
kyuds Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/source/data/monitoring-your-workload.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ For operator names longer than a threshold of 100 characters, Ray Data truncates
* To turn off this behavior and show the full operator name, set `DataContext.get_current().enable_progress_bar_name_truncation = False`.
* To change the threshold of truncating the name, update the constant `ray.data._internal.progress_bar.ProgressBar.MAX_NAME_LENGTH = 42`.

.. tip::
There is a new experimental console UI to show progress bars. Set `DataContext.get_current().enable_rich_progress_bars = True` or set the `RAY_DATA_ENABLE_RICH_PROGRESS_BARS=1` environment variable to enable.

.. _ray-data-dashboard:

Ray Data dashboard
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import abc
from abc import ABC, abstractmethod
from typing import List, Optional

from ray.data._internal.execution.interfaces import (
Expand All @@ -8,13 +8,14 @@
TaskContext,
)
from ray.data._internal.execution.interfaces.physical_operator import _create_sub_pb
from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.stats import StatsDict
from ray.data.context import DataContext


class InternalQueueOperatorMixin(PhysicalOperator, abc.ABC):
@abc.abstractmethod
class InternalQueueOperatorMixin(PhysicalOperator, ABC):
@abstractmethod
def internal_queue_size(self) -> int:
"""Returns Operator's internal queue size"""
...
Expand Down Expand Up @@ -47,7 +48,9 @@ def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class AllToAllOperator(InternalQueueOperatorMixin, PhysicalOperator):
class AllToAllOperator(
InternalQueueOperatorMixin, SubProgressBarMixin, PhysicalOperator
):
"""A blocking operator that executes once its inputs are complete.
This operator implements distributed sort / shuffle operations, etc.
Expand Down Expand Up @@ -169,6 +172,15 @@ def close_sub_progress_bars(self):
for sub_bar in self._sub_progress_bar_dict.values():
sub_bar.close()

def get_sub_progress_bar_names(self) -> Optional[List[str]]:
return self._sub_progress_bar_names

def set_sub_progress_bar(self, name, pg):
# not type-checking due to circular imports
if self._sub_progress_bar_dict is None:
self._sub_progress_bar_dict = {}
self._sub_progress_bar_dict[name] = pg

def supports_fusion(self):
return True

Expand Down
53 changes: 39 additions & 14 deletions python/ray/data/_internal/execution/operators/hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
_create_sub_pb,
estimate_total_num_of_blocks,
)
from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.stats import OpRuntimeMetrics
from ray.data._internal.table_block import TableBlockAccessor
Expand Down Expand Up @@ -363,7 +364,7 @@ def combine(one: "_PartitionStats", other: "_PartitionStats") -> "_PartitionStat
)


class HashShuffleProgressBarMixin(abc.ABC):
class HashShuffleProgressBarMixin(SubProgressBarMixin):
@property
@abc.abstractmethod
def shuffle_name(self) -> str:
Expand All @@ -374,26 +375,29 @@ def shuffle_name(self) -> str:
def reduce_name(self) -> str:
...

def _validate_sub_progress_bar_names(self):
assert self.shuffle_name is not None, "shuffle_name should not be None"
assert self.reduce_name is not None, "reduce_name should not be None"

def initialize_sub_progress_bars(self, position: int) -> int:
"""Display all sub progres bars in the termainl, and return the number of bars."""
"""Display all sub progress bars in the termainl, and return the number of bars."""
self._validate_sub_progress_bar_names()

# shuffle
progress_bars_created = 0
self.shuffle_bar = None
if self.shuffle_name is not None:
self.shuffle_bar, position = _create_sub_pb(
self.shuffle_name, self.num_output_rows_total(), position
)
progress_bars_created += 1
self.shuffle_bar, position = _create_sub_pb(
Comment on lines 388 to +389
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to set shuffle_bar to none and immediately overwrite it

self.shuffle_name, self.num_output_rows_total(), position
)
progress_bars_created += 1
self.shuffle_metrics = OpRuntimeMetrics(self)

# reduce
self.reduce_bar = None
if self.reduce_name is not None:
self.reduce_bar, position = _create_sub_pb(
self.reduce_name, self.num_output_rows_total(), position
)
progress_bars_created += 1
self.reduce_bar, position = _create_sub_pb(
self.reduce_name, self.num_output_rows_total(), position
)
progress_bars_created += 1
self.reduce_metrics = OpRuntimeMetrics(self)

return progress_bars_created
Expand All @@ -403,6 +407,27 @@ def close_sub_progress_bars(self):
self.shuffle_bar.close()
self.reduce_bar.close()

def get_sub_progress_bar_names(self) -> Optional[List[str]]:
self._validate_sub_progress_bar_names()

# shuffle
self.shuffle_bar = None
self.shuffle_metrics = OpRuntimeMetrics(self)
Copy link
Contributor

@alexeykudinkin alexeykudinkin Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this reinitializing the metrics and resetting the bar?

Copy link
Member Author

@kyuds kyuds Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is called once upon initializing progress manager, not called at all when using the tqdm progress bar implementation.

btw, when initializing the progress manager, the initialize_sub_progress_bars method is not called, so we need an alternative method of initializing the metrics, which is done in that function. Otherwise the program will crash entirely because metrics are not initialized at all.

However, I do understand the naming of the function may cause confusion. Would you have some suggestions for an alternative name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some ideas that come into mind: initialize_sub_progress_bar_infos, initialize_sub_progress_attrs
I'll submit a follow-up PR for this if any is ok with you. Any suggestions are, as always, very welcome


# reduce
self.reduce_bar = None
self.reduce_metrics = OpRuntimeMetrics(self)

return [self.shuffle_name, self.reduce_name]

def set_sub_progress_bar(self, name, pg):
# No type-hints due to circular imports. `name` should be a `str`
# and `pg` should be a `SubProgressBar`
if self.shuffle_name is not None and self.shuffle_name == name:
self.shuffle_bar = pg
elif self.reduce_name is not None and self.reduce_name == name:
self.reduce_bar = pg


def _derive_max_shuffle_aggregators(
total_cluster_resources: ExecutionResources,
Expand Down Expand Up @@ -705,7 +730,7 @@ def _on_partitioning_done(cur_shuffle_task_idx: int):
self.shuffle_metrics.on_task_finished(cur_shuffle_task_idx, None)

# Update Shuffle progress bar
self.shuffle_bar.update(i=input_block_metadata.num_rows)
self.shuffle_bar.update(increment=input_block_metadata.num_rows or 0)

# TODO update metrics
task = self._shuffling_tasks[input_index][
Expand Down Expand Up @@ -823,7 +848,7 @@ def _on_bundle_ready(partition_id: int, bundle: RefBundle):

# Update Finalize progress bar
self.reduce_bar.update(
i=bundle.num_rows(), total=self.num_output_rows_total()
increment=bundle.num_rows() or 0, total=self.num_output_rows_total()
)

def _on_aggregation_done(partition_id: int, exc: Optional[Exception]):
Expand Down
28 changes: 28 additions & 0 deletions python/ray/data/_internal/execution/operators/sub_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from abc import ABC, abstractmethod
from typing import List, Optional


class SubProgressBarMixin(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So based on this implementation, we can only have 1 level of subprogress bars right?

OOC, how would we extend this implementation for this example (tree reduction case):

- RandomShuffle  
  *- Map
  *- Reduce
    **- Merge Round 1
    **- Merge Round 2
    **- ...
    **- Final Reduce

I know we don't implement this today, but we might for larger datasets.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some potential approaches:
basis is to understand that sub-progress bars (specifically the SubProgressBar class) follows the same class definition as the original ProgressBar. Thus, utilizing the consuming functions like fetch_until_complete will be the same.

Therefore, the "levels" of progress bars are going to just involve adding more indents when adding progress bars. Currently, the indent is hardcoded, but making variable indents is trivial.

The big change would be in the SubProgressBarMixin, specifically get_sub_progress_bar_names. Instead of returning a List[str] (a list containing the names of subprogress bars in order), we could have a List[Tuple[str, int]], where the string is the name of the subprogress bar, and the integer being the indent level. This way, we will be able to achieve the levels of indentation that we desire without introducing much complexity in the codebase.

** note that this is assuming that the tree reduction case is implemented within a single operator (ie: single operator spawns the nested/tree subprogress bars), as discussed in offline

"""Abstract class for operators that support sub-progress bars"""

@abstractmethod
def get_sub_progress_bar_names(self) -> Optional[List[str]]:
"""
Returns list of sub-progress bar names
This is used to create the sub-progress bars in the progress manager.
Note that sub-progress bars will be created in the order returned by
this method.
"""
...

@abstractmethod
def set_sub_progress_bar(self, name, pg):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add typing

"""
Sets sub-progress bars
name: name of sub-progress bar
pg: SubProgressBar instance (progress_manager.py)
"""
# Skipping type-checking for circular imports
...
Loading