From 08633956b0026dc7d479f4e60f4e5c419548b769 Mon Sep 17 00:00:00 2001 From: xgui Date: Tue, 19 Aug 2025 23:14:30 +0000 Subject: [PATCH 1/9] add stats for the first block time Signed-off-by: xgui --- .../_internal/block_batching/iter_batches.py | 19 +++++++++++++++++-- python/ray/data/_internal/stats.py | 10 ++++++++++ python/ray/data/tests/test_stats.py | 4 ++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/block_batching/iter_batches.py b/python/ray/data/_internal/block_batching/iter_batches.py index f807ae2078dc..3347fb6bbae9 100644 --- a/python/ray/data/_internal/block_batching/iter_batches.py +++ b/python/ray/data/_internal/block_batching/iter_batches.py @@ -135,6 +135,7 @@ def __init__( if actor_prefetcher_enabled else WaitBlockPrefetcher() ) + self.waiting_for_first_batch = True def _prefetch_blocks( self, ref_bundles: Iterator[RefBundle] @@ -242,8 +243,22 @@ def after_epoch_end(self): @contextmanager def get_next_batch_context(self): - with self._stats.iter_total_blocked_s.timer() if self._stats else nullcontext(): - yield + if self._stats: + # Always track total blocked time + total_timer = self._stats.iter_total_blocked_s.timer() + # Also track first batch blocked time if this is the first batch + first_batch_timer = ( + self._stats.iter_first_batch_blocked_s.timer() + if self.waiting_for_first_batch + else nullcontext() + ) + with total_timer, first_batch_timer: + yield + self.waiting_for_first_batch = False + else: + with nullcontext(): + yield + self.waiting_for_first_batch = False @contextmanager def yield_batch_context(self, batch: Batch): diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index d00b45c89b8a..b26f0606b29d 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -948,6 +948,7 @@ def __init__( self.iter_format_batch_s: Timer = Timer() self.iter_collate_batch_s: Timer = Timer() self.iter_finalize_batch_s: Timer = Timer() + self.iter_first_batch_blocked_s: Timer = Timer() self.iter_total_blocked_s: Timer = Timer() self.iter_user_s: Timer = Timer() self.iter_initialize_s: Timer = Timer() @@ -1003,6 +1004,7 @@ def to_summary(self) -> "DatasetStatsSummary": self.iter_format_batch_s, self.iter_collate_batch_s, self.iter_finalize_batch_s, + self.iter_first_batch_blocked_s, self.iter_total_blocked_s, self.iter_user_s, self.iter_initialize_s, @@ -1642,6 +1644,8 @@ class IterStatsSummary: collate_time: Timer # Time spent in finalize_fn, in seconds finalize_batch_time: Timer + # Time user thread is blocked waiting for first batch + first_batch_block_time: Timer # Total time user thread is blocked by iter_batches block_time: Timer # Time spent in user code, in seconds @@ -1665,6 +1669,7 @@ def to_string(self) -> str: out = "" if ( self.block_time.get() + or self.first_batch_block_time.get() or self.total_time.get() or self.get_time.get() or self.next_time.get() @@ -1685,6 +1690,11 @@ def to_string(self) -> str: " * Total time user thread is blocked by Ray Data iter_batches: " "{}\n".format(fmt(self.block_time.get())) ) + if self.first_batch_block_time.get(): + out += ( + " * Total time user thread is blocked waiting for first batch: " + "{}\n".format(fmt(self.first_batch_block_time.get())) + ) if self.user_time.get(): out += " * Total execution time for user thread: {}\n".format( fmt(self.user_time.get()) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 0a3a32e9d63e..5b16c556b743 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -395,6 +395,7 @@ def test_streaming_split_stats(ray_start_regular_shared, restore_data_context): * Total time overall: T * Total time in Ray Data iterator initialization code: T * Total time user thread is blocked by Ray Data iter_batches: T + * Total time user thread is blocked waiting for first batch: T * Total execution time for user thread: T * Batch iteration time breakdown (summed across prefetch threads): * In ray.get(): T min, T max, T avg, T total @@ -577,6 +578,7 @@ def test_dataset_stats_basic( f"* Total time overall: T\n" f" * Total time in Ray Data iterator initialization code: T\n" f" * Total time user thread is blocked by Ray Data iter_batches: T\n" + f" * Total time user thread is blocked waiting for first batch: T\n" f" * Total execution time for user thread: T\n" f"* Batch iteration time breakdown (summed across prefetch threads):\n" f" * In ray.get(): T min, T max, T avg, T total\n" @@ -618,6 +620,7 @@ def test_block_location_nums(ray_start_regular_shared, restore_data_context): f"* Total time overall: T\n" f" * Total time in Ray Data iterator initialization code: T\n" f" * Total time user thread is blocked by Ray Data iter_batches: T\n" + f" * Total time user thread is blocked waiting for first batches: T\n" f" * Total execution time for user thread: T\n" f"* Batch iteration time breakdown (summed across prefetch threads):\n" f" * In ray.get(): T min, T max, T avg, T total\n" @@ -1363,6 +1366,7 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): * Total time overall: T * Total time in Ray Data iterator initialization code: T * Total time user thread is blocked by Ray Data iter_batches: T + * Total time user thread is blocked waiting for first batches: T * Total execution time for user thread: T * Batch iteration time breakdown (summed across prefetch threads): * In ray.get(): T min, T max, T avg, T total From 8b52f749cc68f406ac1488b1e209876f4aaca076 Mon Sep 17 00:00:00 2001 From: xgui Date: Wed, 20 Aug 2025 00:08:50 +0000 Subject: [PATCH 2/9] fix unittest Signed-off-by: xgui --- python/ray/data/_internal/stats.py | 8 ++++++++ python/ray/data/tests/test_stats.py | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index b26f0606b29d..c10a9c921d50 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -280,6 +280,11 @@ def __init__(self, max_stats=1000): description="Seconds user thread is blocked by iter_batches()", tag_keys=iter_tag_keys, ) + self.iter_first_batch_blocked_s = Gauge( + "data_iter_first_batch_blocked_seconds", + description="Seconds user thread is blocked waiting for first batch", + tag_keys=iter_tag_keys, + ) self.iter_user_s = Gauge( "data_iter_user_seconds", description="Seconds spent in user code", @@ -469,6 +474,9 @@ def update_iteration_metrics( ): tags = self._create_tags(dataset_tag) self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags) + self.iter_first_batch_blocked_s.set( + stats.iter_first_batch_blocked_s.get(), tags + ) self.iter_user_s.set(stats.iter_user_s.get(), tags) self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 5b16c556b743..5ed0a72eed28 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -620,7 +620,7 @@ def test_block_location_nums(ray_start_regular_shared, restore_data_context): f"* Total time overall: T\n" f" * Total time in Ray Data iterator initialization code: T\n" f" * Total time user thread is blocked by Ray Data iter_batches: T\n" - f" * Total time user thread is blocked waiting for first batches: T\n" + f" * Total time user thread is blocked waiting for first batch: T\n" f" * Total execution time for user thread: T\n" f"* Batch iteration time breakdown (summed across prefetch threads):\n" f" * In ray.get(): T min, T max, T avg, T total\n" @@ -1366,7 +1366,7 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): * Total time overall: T * Total time in Ray Data iterator initialization code: T * Total time user thread is blocked by Ray Data iter_batches: T - * Total time user thread is blocked waiting for first batches: T + * Total time user thread is blocked waiting for first batch: T * Total execution time for user thread: T * Batch iteration time breakdown (summed across prefetch threads): * In ray.get(): T min, T max, T avg, T total From 3ab8f1566db9137b83b6121fcce31494ab9fccf1 Mon Sep 17 00:00:00 2001 From: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Date: Tue, 19 Aug 2025 17:25:08 -0700 Subject: [PATCH 3/9] Update python/ray/data/_internal/block_batching/iter_batches.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> --- .../_internal/block_batching/iter_batches.py | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/python/ray/data/_internal/block_batching/iter_batches.py b/python/ray/data/_internal/block_batching/iter_batches.py index 3347fb6bbae9..9af4890a8c70 100644 --- a/python/ray/data/_internal/block_batching/iter_batches.py +++ b/python/ray/data/_internal/block_batching/iter_batches.py @@ -243,22 +243,22 @@ def after_epoch_end(self): @contextmanager def get_next_batch_context(self): - if self._stats: - # Always track total blocked time - total_timer = self._stats.iter_total_blocked_s.timer() - # Also track first batch blocked time if this is the first batch - first_batch_timer = ( - self._stats.iter_first_batch_blocked_s.timer() - if self.waiting_for_first_batch - else nullcontext() - ) - with total_timer, first_batch_timer: - yield - self.waiting_for_first_batch = False - else: - with nullcontext(): + try: + if self._stats: + # Always track total blocked time + total_timer = self._stats.iter_total_blocked_s.timer() + # Also track first batch blocked time if this is the first batch + first_batch_timer = ( + self._stats.iter_first_batch_blocked_s.timer() + if self.waiting_for_first_batch + else nullcontext() + ) + with total_timer, first_batch_timer: + yield + else: yield - self.waiting_for_first_batch = False + finally: + self.waiting_for_first_batch = False @contextmanager def yield_batch_context(self, batch: Batch): From 772a88dd589567dc7420b44dd199cdea4411b1c5 Mon Sep 17 00:00:00 2001 From: xgui Date: Thu, 21 Aug 2025 21:58:26 +0000 Subject: [PATCH 4/9] resolve comments Signed-off-by: xgui --- python/ray/data/_internal/block_batching/iter_batches.py | 8 ++++---- python/ray/data/_internal/stats.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/ray/data/_internal/block_batching/iter_batches.py b/python/ray/data/_internal/block_batching/iter_batches.py index 9af4890a8c70..ecbdf51c3202 100644 --- a/python/ray/data/_internal/block_batching/iter_batches.py +++ b/python/ray/data/_internal/block_batching/iter_batches.py @@ -135,7 +135,7 @@ def __init__( if actor_prefetcher_enabled else WaitBlockPrefetcher() ) - self.waiting_for_first_batch = True + self._yielded_first_batch = False def _prefetch_blocks( self, ref_bundles: Iterator[RefBundle] @@ -236,7 +236,7 @@ def __iter__(self) -> Iterator[DataBatch]: return self._iter_batches() def before_epoch_start(self): - pass + self._yielded_first_batch = False def after_epoch_end(self): StatsManager.clear_iteration_metrics(self._dataset_tag) @@ -250,7 +250,7 @@ def get_next_batch_context(self): # Also track first batch blocked time if this is the first batch first_batch_timer = ( self._stats.iter_first_batch_blocked_s.timer() - if self.waiting_for_first_batch + if not self._yielded_first_batch else nullcontext() ) with total_timer, first_batch_timer: @@ -258,7 +258,7 @@ def get_next_batch_context(self): else: yield finally: - self.waiting_for_first_batch = False + self._yielded_first_batch = True @contextmanager def yield_batch_context(self, batch: Batch): diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index c10a9c921d50..ce58764eeb5f 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -1653,7 +1653,7 @@ class IterStatsSummary: # Time spent in finalize_fn, in seconds finalize_batch_time: Timer # Time user thread is blocked waiting for first batch - first_batch_block_time: Timer + time_to_first_batch: Timer # Total time user thread is blocked by iter_batches block_time: Timer # Time spent in user code, in seconds @@ -1677,7 +1677,7 @@ def to_string(self) -> str: out = "" if ( self.block_time.get() - or self.first_batch_block_time.get() + or self.time_to_first_batch.get() or self.total_time.get() or self.get_time.get() or self.next_time.get() @@ -1698,10 +1698,10 @@ def to_string(self) -> str: " * Total time user thread is blocked by Ray Data iter_batches: " "{}\n".format(fmt(self.block_time.get())) ) - if self.first_batch_block_time.get(): + if self.time_to_first_batch.get(): out += ( " * Total time user thread is blocked waiting for first batch: " - "{}\n".format(fmt(self.first_batch_block_time.get())) + "{}\n".format(fmt(self.time_to_first_batch.get())) ) if self.user_time.get(): out += " * Total execution time for user thread: {}\n".format( From 2263be096d81ea1cc23a90969f0bf1eb2e16f088 Mon Sep 17 00:00:00 2001 From: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Date: Fri, 22 Aug 2025 15:25:03 -0700 Subject: [PATCH 5/9] Apply suggestion from @justinvyu Co-authored-by: Justin Yu Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> --- python/ray/data/_internal/stats.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index ce58764eeb5f..4c67d7884d5e 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -280,9 +280,9 @@ def __init__(self, max_stats=1000): description="Seconds user thread is blocked by iter_batches()", tag_keys=iter_tag_keys, ) - self.iter_first_batch_blocked_s = Gauge( - "data_iter_first_batch_blocked_seconds", - description="Seconds user thread is blocked waiting for first batch", + self.iter_time_to_first_batch_s = Gauge( + "data_iter_time_to_first_batch_seconds", + description="Seconds from starting iteration until first batch is ready. This includes the dataset pipeline warmup time.", tag_keys=iter_tag_keys, ) self.iter_user_s = Gauge( From 702dab3fad8fb2a97c39768925b0a8d7878796f2 Mon Sep 17 00:00:00 2001 From: xgui Date: Fri, 22 Aug 2025 22:29:45 +0000 Subject: [PATCH 6/9] update log strings Signed-off-by: xgui --- python/ray/data/_internal/stats.py | 4 ++-- python/ray/data/tests/test_stats.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 4c67d7884d5e..8e2279372fcf 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -282,7 +282,7 @@ def __init__(self, max_stats=1000): ) self.iter_time_to_first_batch_s = Gauge( "data_iter_time_to_first_batch_seconds", - description="Seconds from starting iteration until first batch is ready. This includes the dataset pipeline warmup time.", + description="Seconds from starting iteration until the first batch is ready. This includes the dataset pipeline warmup time.", tag_keys=iter_tag_keys, ) self.iter_user_s = Gauge( @@ -1700,7 +1700,7 @@ def to_string(self) -> str: ) if self.time_to_first_batch.get(): out += ( - " * Total time user thread is blocked waiting for first batch: " + " * Seconds from starting iteration until the first batch is ready: " "{}\n".format(fmt(self.time_to_first_batch.get())) ) if self.user_time.get(): diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 5ed0a72eed28..578c665fee59 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -395,7 +395,7 @@ def test_streaming_split_stats(ray_start_regular_shared, restore_data_context): * Total time overall: T * Total time in Ray Data iterator initialization code: T * Total time user thread is blocked by Ray Data iter_batches: T - * Total time user thread is blocked waiting for first batch: T + * Seconds from starting iteration until the first batch is ready: T * Total execution time for user thread: T * Batch iteration time breakdown (summed across prefetch threads): * In ray.get(): T min, T max, T avg, T total @@ -578,7 +578,7 @@ def test_dataset_stats_basic( f"* Total time overall: T\n" f" * Total time in Ray Data iterator initialization code: T\n" f" * Total time user thread is blocked by Ray Data iter_batches: T\n" - f" * Total time user thread is blocked waiting for first batch: T\n" + f" * Seconds from starting iteration until the first batch is ready: T\n" f" * Total execution time for user thread: T\n" f"* Batch iteration time breakdown (summed across prefetch threads):\n" f" * In ray.get(): T min, T max, T avg, T total\n" @@ -620,7 +620,7 @@ def test_block_location_nums(ray_start_regular_shared, restore_data_context): f"* Total time overall: T\n" f" * Total time in Ray Data iterator initialization code: T\n" f" * Total time user thread is blocked by Ray Data iter_batches: T\n" - f" * Total time user thread is blocked waiting for first batch: T\n" + f" * Seconds from starting iteration until the first batch is ready: T\n" f" * Total execution time for user thread: T\n" f"* Batch iteration time breakdown (summed across prefetch threads):\n" f" * In ray.get(): T min, T max, T avg, T total\n" @@ -1366,7 +1366,7 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): * Total time overall: T * Total time in Ray Data iterator initialization code: T * Total time user thread is blocked by Ray Data iter_batches: T - * Total time user thread is blocked waiting for first batch: T + * Seconds from starting iteration until the first batch is ready: T * Total execution time for user thread: T * Batch iteration time breakdown (summed across prefetch threads): * In ray.get(): T min, T max, T avg, T total From 399ffb005e952689e83edaa81d3612d9b549e522 Mon Sep 17 00:00:00 2001 From: xgui Date: Sun, 24 Aug 2025 01:50:20 +0000 Subject: [PATCH 7/9] fix comments Signed-off-by: xgui --- python/ray/data/_internal/block_batching/iter_batches.py | 8 ++++---- python/ray/data/_internal/stats.py | 9 +++++---- python/ray/data/tests/test_stats.py | 8 ++++---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/python/ray/data/_internal/block_batching/iter_batches.py b/python/ray/data/_internal/block_batching/iter_batches.py index ecbdf51c3202..9dc052d12aaa 100644 --- a/python/ray/data/_internal/block_batching/iter_batches.py +++ b/python/ray/data/_internal/block_batching/iter_batches.py @@ -247,13 +247,13 @@ def get_next_batch_context(self): if self._stats: # Always track total blocked time total_timer = self._stats.iter_total_blocked_s.timer() - # Also track first batch blocked time if this is the first batch - first_batch_timer = ( - self._stats.iter_first_batch_blocked_s.timer() + # Also track the time until the first batch is ready + first_batch_ready_timer = ( + self._stats.iter_time_to_first_batch_s.timer() if not self._yielded_first_batch else nullcontext() ) - with total_timer, first_batch_timer: + with total_timer, first_batch_ready_timer: yield else: yield diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 8e2279372fcf..c6747bad49be 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -280,9 +280,10 @@ def __init__(self, max_stats=1000): description="Seconds user thread is blocked by iter_batches()", tag_keys=iter_tag_keys, ) - self.iter_time_to_first_batch_s = Gauge( + self.time_to_first_batch_s = Gauge( "data_iter_time_to_first_batch_seconds", - description="Seconds from starting iteration until the first batch is ready. This includes the dataset pipeline warmup time.", + description="Total time spent waiting for the first batch after starting iteration. " + "This includes the dataset pipeline warmup time. This metrics is accumulated across different epochs.", tag_keys=iter_tag_keys, ) self.iter_user_s = Gauge( @@ -956,7 +957,7 @@ def __init__( self.iter_format_batch_s: Timer = Timer() self.iter_collate_batch_s: Timer = Timer() self.iter_finalize_batch_s: Timer = Timer() - self.iter_first_batch_blocked_s: Timer = Timer() + self.iter_time_to_first_batch_s: Timer = Timer() self.iter_total_blocked_s: Timer = Timer() self.iter_user_s: Timer = Timer() self.iter_initialize_s: Timer = Timer() @@ -1700,7 +1701,7 @@ def to_string(self) -> str: ) if self.time_to_first_batch.get(): out += ( - " * Seconds from starting iteration until the first batch is ready: " + " * Total time spent waiting for the first batch after starting iteration: " "{}\n".format(fmt(self.time_to_first_batch.get())) ) if self.user_time.get(): diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 578c665fee59..cbe5b5dfa8ba 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -395,7 +395,7 @@ def test_streaming_split_stats(ray_start_regular_shared, restore_data_context): * Total time overall: T * Total time in Ray Data iterator initialization code: T * Total time user thread is blocked by Ray Data iter_batches: T - * Seconds from starting iteration until the first batch is ready: T + * Total time spent waiting for the first batch after starting iteration: T * Total execution time for user thread: T * Batch iteration time breakdown (summed across prefetch threads): * In ray.get(): T min, T max, T avg, T total @@ -578,7 +578,7 @@ def test_dataset_stats_basic( f"* Total time overall: T\n" f" * Total time in Ray Data iterator initialization code: T\n" f" * Total time user thread is blocked by Ray Data iter_batches: T\n" - f" * Seconds from starting iteration until the first batch is ready: T\n" + f" * Total time spent waiting for the first batch after starting iteration: T\n" f" * Total execution time for user thread: T\n" f"* Batch iteration time breakdown (summed across prefetch threads):\n" f" * In ray.get(): T min, T max, T avg, T total\n" @@ -620,7 +620,7 @@ def test_block_location_nums(ray_start_regular_shared, restore_data_context): f"* Total time overall: T\n" f" * Total time in Ray Data iterator initialization code: T\n" f" * Total time user thread is blocked by Ray Data iter_batches: T\n" - f" * Seconds from starting iteration until the first batch is ready: T\n" + f" * Total time spent waiting for the first batch after starting iteration: T\n" f" * Total execution time for user thread: T\n" f"* Batch iteration time breakdown (summed across prefetch threads):\n" f" * In ray.get(): T min, T max, T avg, T total\n" @@ -1366,7 +1366,7 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): * Total time overall: T * Total time in Ray Data iterator initialization code: T * Total time user thread is blocked by Ray Data iter_batches: T - * Seconds from starting iteration until the first batch is ready: T + * Total time spent waiting for the first batch after starting iteration: T * Total execution time for user thread: T * Batch iteration time breakdown (summed across prefetch threads): * In ray.get(): T min, T max, T avg, T total From e55102a57fd9ca2a2039277df479c83df85e645d Mon Sep 17 00:00:00 2001 From: xgui Date: Sun, 24 Aug 2025 17:21:20 +0000 Subject: [PATCH 8/9] fix property names Signed-off-by: xgui --- python/ray/data/_internal/stats.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index c6747bad49be..2c0fca2c1b6b 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -475,9 +475,7 @@ def update_iteration_metrics( ): tags = self._create_tags(dataset_tag) self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags) - self.iter_first_batch_blocked_s.set( - stats.iter_first_batch_blocked_s.get(), tags - ) + self.time_to_first_batch_s.set(stats.iter_time_to_first_batch_s.get(), tags) self.iter_user_s.set(stats.iter_user_s.get(), tags) self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags) @@ -1013,7 +1011,7 @@ def to_summary(self) -> "DatasetStatsSummary": self.iter_format_batch_s, self.iter_collate_batch_s, self.iter_finalize_batch_s, - self.iter_first_batch_blocked_s, + self.iter_time_to_first_batch_s, self.iter_total_blocked_s, self.iter_user_s, self.iter_initialize_s, From 9bbdfdc7c771ae51b8a3c959f52ddab5fe67ca68 Mon Sep 17 00:00:00 2001 From: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Date: Mon, 25 Aug 2025 12:02:22 -0700 Subject: [PATCH 9/9] Update python/ray/data/_internal/stats.py Co-authored-by: Justin Yu Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> --- python/ray/data/_internal/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 2c0fca2c1b6b..7f4222f68426 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -283,7 +283,7 @@ def __init__(self, max_stats=1000): self.time_to_first_batch_s = Gauge( "data_iter_time_to_first_batch_seconds", description="Total time spent waiting for the first batch after starting iteration. " - "This includes the dataset pipeline warmup time. This metrics is accumulated across different epochs.", + "This includes the dataset pipeline warmup time. This metric is accumulated across different epochs.", tag_keys=iter_tag_keys, ) self.iter_user_s = Gauge(