Skip to content

Commit 16e16b4

Browse files
committed
finish v1
Signed-off-by: kyuds <kyuseung1016@gmail.com>
1 parent bc33fab commit 16e16b4

File tree

3 files changed

+212
-106
lines changed

3 files changed

+212
-106
lines changed

python/ray/data/_internal/execution/progress_manager.py

Lines changed: 196 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
11
import logging
22
import math
3-
import time
43
import threading
4+
import time
5+
import uuid
56
from enum import Enum
6-
from typing import List, Optional, Tuple
7+
from typing import Optional, Tuple
78

8-
from ray.data._internal.execution.resource_manager import ResourceManager
9-
from ray.data._internal.execution.streaming_executor_state import Topology, OpState
109
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
10+
from ray.data._internal.execution.resource_manager import ResourceManager
11+
from ray.data._internal.execution.streaming_executor_state import OpState, Topology
1112
from ray.util.debug import log_once
1213

1314
try:
1415
import rich
1516
from rich.console import Console
1617
from rich.live import Live
1718
from rich.progress import (
19+
BarColumn,
1820
Progress,
1921
ProgressColumn,
20-
TaskID,
21-
BarColumn,
22-
TextColumn,
2322
SpinnerColumn,
23+
TextColumn,
2424
)
25-
from rich.table import Table, Column
25+
from rich.table import Column, Table
2626
from rich.text import Text
2727

2828
needs_rich_warning = False
@@ -32,10 +32,11 @@
3232

3333
logger = logging.getLogger(__name__)
3434

35-
_TREE_BRANCH = " ├─ "
36-
_TREE_VERTICAL = " │"
35+
_TREE_BRANCH = " ├─"
36+
_TREE_VERTICAL = "│"
37+
_TREE_VERTICAL_INDENT = f" {_TREE_VERTICAL} "
3738
_TOTAL_PROGRESS_TOTAL = 1.0
38-
_RESOURCE_REPORT_HEADER = f"{_TREE_VERTICAL} Active/total resources: "
39+
_RESOURCE_REPORT_HEADER = f" {_TREE_VERTICAL} Active/total resources: "
3940

4041

4142
class _ManagerMode(Enum):
@@ -83,24 +84,6 @@ def get_mode(cls) -> "_ManagerMode":
8384
return cls.ALL
8485

8586

86-
def _format_k(val: int) -> str:
87-
if val >= 1000:
88-
fval = val / 1000.0
89-
fval_str = f"{int(fval)}" if fval.is_integer() else f"{fval:.2f}"
90-
return fval_str + "k"
91-
return str(val)
92-
93-
94-
def _format_row_count(completed: int, total: Optional[int]) -> str:
95-
"""Formats row counts with k units."""
96-
cstr = _format_k(completed)
97-
if total is None or math.isinf(total):
98-
tstr = "?k" if cstr.endswith("k") else "?"
99-
else:
100-
tstr = _format_k(total)
101-
return f"{cstr}/{tstr}"
102-
103-
10487
if rich:
10588

10689
class CustomTimeColumn(ProgressColumn):
@@ -146,20 +129,54 @@ def __init__(self, dataset_id: str, topology: Topology):
146129
self._dataset_id = dataset_id
147130
self._lock = None
148131

149-
if self._mode.is_enabled():
150-
self._start_time: Optional[float] = None
151-
self._lock = threading.RLock()
132+
if not self._mode.is_enabled():
133+
self._live = None
134+
return
135+
136+
self._start_time: Optional[float] = None
137+
self._lock = threading.RLock()
138+
139+
# rich
140+
self._console = Console()
141+
self._total = Progress(
142+
TextColumn(" ", table_column=Column(no_wrap=True)),
143+
SpinnerColumn(finished_text="•"),
144+
TextColumn(
145+
"{task.description} {task.percentage:>3.0f}%",
146+
table_column=Column(no_wrap=True),
147+
),
148+
BarColumn(bar_width=15),
149+
TextColumn("{task.fields[count_str]}", table_column=Column(no_wrap=True)),
150+
TextColumn("["),
151+
CustomTimeColumn(),
152+
TextColumn(","),
153+
TextColumn("{task.fields[rate]}", table_column=Column(no_wrap=True)),
154+
TextColumn("]"),
155+
console=self._console,
156+
transient=False,
157+
expand=False,
158+
)
159+
self._current_count = 0
160+
self._total_resources = Text(
161+
f"{_RESOURCE_REPORT_HEADER}Initializing...", no_wrap=True
162+
)
163+
164+
self._op_display = {}
165+
166+
self._layout_table = Table.grid(padding=(0, 1, 0, 0), expand=True)
167+
self._layout_table.add_row(self._total)
168+
self._layout_table.add_row(self._total_resources)
169+
self._layout_table.add_row(Text(f" {_TREE_VERTICAL}", no_wrap=True))
152170

153-
# rich
154-
self._console = Console()
155-
self._total = Progress(
156-
TextColumn(" ", table_column=Column(no_wrap=True)),
171+
for state in topology.values():
172+
if isinstance(state.op, InputDataBuffer):
173+
continue
174+
uid = uuid.uuid4()
175+
progress = Progress(
176+
TextColumn(_TREE_BRANCH, table_column=Column(no_wrap=True)),
157177
SpinnerColumn(),
158-
TextColumn(
159-
"{task.description} {task.percentage:>3.0f}%",
160-
table_column=Column(no_wrap=True),
161-
),
162-
BarColumn(bar_width=15),
178+
TextColumn("{task.description}", table_column=Column(no_wrap=True)),
179+
BarColumn(bar_width=10),
163180
TextColumn(
164181
"{task.fields[count_str]}", table_column=Column(no_wrap=True)
165182
),
@@ -172,30 +189,34 @@ def __init__(self, dataset_id: str, topology: Topology):
172189
transient=False,
173190
expand=False,
174191
)
175-
self._total_resources = Text(
176-
f"{_RESOURCE_REPORT_HEADER}Initializing...", no_wrap=True
177-
)
178-
# TODO (kyuds): op rows
179-
180-
self._layout_table = Table.grid(padding=(0, 1, 0, 0), expand=True)
181-
self._layout_table.add_row(self._total)
182-
self._layout_table.add_row(self._total_resources)
183-
self._layout_table.add_row(Text(_TREE_VERTICAL, no_wrap=True))
184-
self._live = Live(
185-
self._layout_table,
186-
console=self._console,
187-
refresh_per_second=2,
188-
vertical_overflow="visible",
189-
)
190-
191-
self._total_task_id = self._total.add_task(
192-
f"Dataset {self._dataset_id} running:",
193-
total=_TOTAL_PROGRESS_TOTAL,
192+
stats = Text(f"{_TREE_VERTICAL_INDENT}Initializing...", no_wrap=True)
193+
total = state.op.num_output_rows_total()
194+
tid = progress.add_task(
195+
state.op.name,
196+
total=float(total) if total is not None else float("inf"),
197+
start=True,
194198
rate="? rows/s",
195-
count_str="0/?",
199+
count_str="?/?",
196200
)
197-
else:
198-
self._live = None
201+
self._layout_table.add_row(progress)
202+
self._layout_table.add_row(stats)
203+
state.progress_manager_uuid = uid
204+
self._op_display[uid] = (tid, progress, stats)
205+
# empty new line to prevent "packed" feeling
206+
self._layout_table.add_row(Text())
207+
self._live = Live(
208+
self._layout_table,
209+
console=self._console,
210+
refresh_per_second=2,
211+
vertical_overflow="visible",
212+
)
213+
214+
self._total_task_id = self._total.add_task(
215+
f"Dataset {self._dataset_id} running:",
216+
total=_TOTAL_PROGRESS_TOTAL,
217+
rate="? rows/s",
218+
count_str="0/?",
219+
)
199220

200221
# Management
201222
def start(self):
@@ -210,13 +231,27 @@ def refresh(self):
210231
if self._live.is_started:
211232
self._live.refresh()
212233

234+
def _close_no_lock(self):
235+
self.refresh()
236+
time.sleep(0.02)
237+
self._live.stop()
238+
213239
def close(self):
214240
if self._mode.is_enabled():
215241
with self._lock:
216242
if self._live.is_started:
217-
self.refresh()
218-
time.sleep(0.1)
219-
self._live.stop()
243+
self._close_no_lock()
244+
245+
def close_with_finishing_description(self, desc: str, success: bool):
246+
if self._mode.is_enabled():
247+
with self._lock:
248+
if self._live.is_started:
249+
kwargs = {}
250+
if success:
251+
kwargs["completed"] = 1.0
252+
kwargs["total"] = 1.0
253+
self._total.update(self._total_task_id, description=desc, **kwargs)
254+
self._close_no_lock()
220255

221256
# Total Progress
222257
def _can_update_total(self) -> bool:
@@ -234,34 +269,21 @@ def update_total_progress(self, total_rows: Optional[int], current_rows: int):
234269
self._update_total_progress_no_lock(total_rows, current_rows)
235270

236271
def _update_total_progress_no_lock(
237-
self, current_rows: Optional[int], total_rows: Optional[int]
272+
self, new_rows: Optional[int], total_rows: Optional[int]
238273
):
239274
if self._start_time is None:
240275
self._start_time = time.time()
241-
242-
completed = 0.0
243-
if current_rows is None and total_rows is None:
244-
rate_str = "? row/s"
245-
count_str = "?/?"
246-
elif current_rows is None:
247-
rate_str = "? row/s"
248-
count_str = f"?/{_format_k(total_rows)}"
249-
else:
250-
elapsed = time.time() - self._start_time
251-
rate_val = current_rows / elapsed if elapsed > 1 else 0
252-
rate_unit = "row/s"
253-
if rate_val >= 1000:
254-
rate_val /= 1000
255-
rate_unit = "k row/s"
256-
rate_str = f"{rate_val:.2f} {rate_unit}"
257-
if total_rows is not None and total_rows > 0:
258-
completed = min(1.0, current_rows / total_rows)
259-
count_str = _format_row_count(current_rows, total_rows)
276+
if new_rows is not None:
277+
self._current_count += new_rows
278+
c, t, rs, cs = _get_progress_metrics(
279+
self._start_time, self._current_count, total_rows
280+
)
260281
self._total.update(
261282
self._total_task_id,
262-
completed=completed,
263-
total=_TOTAL_PROGRESS_TOTAL,
264-
fields={"rate": rate_str, "count_str": count_str},
283+
completed=c,
284+
total=t,
285+
rate=rs,
286+
count_str=cs,
265287
)
266288

267289
def update_resource_status(self, resource_manager: ResourceManager):
@@ -302,11 +324,91 @@ def _update_resource_status_no_lock(self, resource_manager: ResourceManager):
302324

303325
self._total_resources.plain = resource_usage
304326

305-
def set_finishing_message(self, desc: str):
306-
if not self._can_update_total():
327+
def _can_update_operator(self, op_state: OpState) -> bool:
328+
if not self._mode.show_op():
329+
return False
330+
uid = op_state.progress_manager_uuid
331+
if uid is None or uid not in self._op_display:
332+
return False
333+
tid, progress, stats = self._op_display[uid]
334+
if tid is None or not progress or not stats or tid not in progress.task_ids:
335+
return False
336+
return True
337+
338+
def update_operator_progress(self, op_state: OpState):
339+
if not self._can_update_operator(op_state):
307340
return
308341
with self._lock:
309-
if self._live.is_started:
310-
self._total.update(self._total_task_id, description=desc, refresh=True)
342+
self._update_operator_progress_no_lock(op_state)
343+
344+
def _update_operator_progress_no_lock(self, op_state: OpState):
345+
if self._start_time is None:
346+
self._start_time = time.time()
347+
uid = op_state.progress_manager_uuid
348+
tid, progress, stats = self._op_display[uid]
349+
350+
# progress
351+
current_rows = op_state.output_row_count
352+
total_rows = op_state.op.num_output_rows_total()
353+
c, t, rs, cs = _get_progress_metrics(self._start_time, current_rows, total_rows)
354+
progress.update(
355+
tid,
356+
completed=c,
357+
total=t,
358+
rate=rs,
359+
count_str=cs,
360+
)
361+
# stats
362+
stats_str = op_state.op_display_metrics.display_str()
363+
stats.plain = f"{_TREE_VERTICAL_INDENT}{stats_str}"
311364

312-
# Op Progress
365+
366+
# utilities
367+
def _format_k(val: int) -> str:
368+
if val >= 1000:
369+
fval = val / 1000.0
370+
fval_str = f"{int(fval)}" if fval.is_integer() else f"{fval:.2f}"
371+
return fval_str + "k"
372+
return str(val)
373+
374+
375+
def _format_row_count(completed: int, total: Optional[int]) -> str:
376+
"""Formats row counts with k units."""
377+
cstr = _format_k(completed)
378+
if total is None or math.isinf(total):
379+
tstr = "?k" if cstr.endswith("k") else "?"
380+
else:
381+
tstr = _format_k(total)
382+
return f"{cstr}/{tstr}"
383+
384+
385+
def _get_progress_metrics(
386+
start_time: float, current_rows: int, total_rows: Optional[int]
387+
) -> Tuple[int, int, str, str]:
388+
"""
389+
Args:
390+
start_time: time when progress tracking started
391+
current_rows: current rows outputted (cumulative)
392+
total_rows: total rows expected (can be unknown)
393+
Returns:
394+
completed (int)
395+
total (int)
396+
rate (str)
397+
count (str)
398+
"""
399+
total = 1 if total_rows is None or total_rows < 1 else total_rows
400+
401+
if total_rows is None:
402+
rate_str = "? row/s"
403+
count_str = "?/?"
404+
else:
405+
elapsed = time.time() - start_time
406+
rate_val = current_rows / elapsed if elapsed > 1 else 0
407+
rate_unit = "row/s"
408+
if rate_val >= 1000:
409+
rate_val /= 1000
410+
rate_unit = "k row/s"
411+
rate_str = f"{rate_val:.2f} {rate_unit}"
412+
count_str = _format_row_count(current_rows, total_rows)
413+
414+
return current_rows, total, rate_str, count_str

0 commit comments

Comments
 (0)