Skip to content

Commit 927e4e1

Browse files
alexeykudinkinYoussefEssDS
authored andcommitted
[Data] Dump verbose ResourceManager telemetry into ray-data.log (ray-project#58261)
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. > ⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description This change make RD by dump verbose telemetry for `ResourceManager` into the `ray-data.log` by default. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
1 parent 7e48795 commit 927e4e1

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import logging
22
import math
3-
import os
43
import time
54
from abc import ABC, abstractmethod
65
from collections import defaultdict
76
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional
87

9-
from ray._private.ray_constants import env_float
8+
from ray._private.ray_constants import env_bool, env_float
109
from ray.data._internal.execution.interfaces.execution_options import (
1110
ExecutionOptions,
1211
ExecutionResources,
@@ -33,7 +32,12 @@
3332

3433

3534
logger = logging.getLogger(__name__)
36-
DEBUG_RESOURCE_MANAGER = os.environ.get("RAY_DATA_DEBUG_RESOURCE_MANAGER", "0") == "1"
35+
36+
37+
LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE: Optional[bool] = env_bool(
38+
"RAY_DATA_DEBUG_RESOURCE_MANAGER", None
39+
)
40+
3741

3842
# These are physical operators that must receive all inputs before they start
3943
# processing data.
@@ -83,8 +87,6 @@ def __init__(
8387
# the operator, including the external output buffer in OpState, and the
8488
# input buffers of the downstream operators.
8589
self._mem_op_outputs: Dict[PhysicalOperator, int] = defaultdict(int)
86-
# Whether to print debug information.
87-
self._debug = DEBUG_RESOURCE_MANAGER
8890

8991
self._op_resource_allocator: Optional["OpResourceAllocator"] = None
9092

@@ -266,7 +268,7 @@ def get_op_usage(self, op: PhysicalOperator) -> ExecutionResources:
266268
"""Return the resource usage of the given operator at the current time."""
267269
return self._op_usages[op]
268270

269-
def get_op_usage_str(self, op: PhysicalOperator) -> str:
271+
def get_op_usage_str(self, op: PhysicalOperator, *, verbose: bool) -> str:
270272
"""Return a human-readable string representation of the resource usage of
271273
the given operator."""
272274
usage_str = f"{self._op_running_usages[op].cpu:.1f} CPU"
@@ -276,7 +278,11 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str:
276278
f", {self._op_running_usages[op].object_store_memory_str()} object store"
277279
)
278280

279-
if self._debug:
281+
# NOTE: Config can override requested verbosity level
282+
if LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE is not None:
283+
verbose = LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE
284+
285+
if verbose:
280286
usage_str += (
281287
f" (in={memory_string(self._mem_op_internal[op])},"
282288
f"out={memory_string(self._mem_op_outputs[op])})"

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ def _debug_dump_topology(topology: Topology, resource_manager: ResourceManager)
734734
for i, (op, state) in enumerate(topology.items()):
735735
state.update_display_metrics(resource_manager)
736736
logger.debug(
737-
f"{i}: {state.summary_str(resource_manager)}, "
737+
f"{i}: {state.summary_str(resource_manager, verbose=True)}, "
738738
f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}, "
739739
f"Metrics: {state.op_display_metrics.display_str()}"
740740
)

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,9 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
387387
self.progress_bar.set_description(self.summary_str(resource_manager))
388388
self.progress_bar.refresh()
389389

390-
def summary_str(self, resource_manager: ResourceManager) -> str:
390+
def summary_str(
391+
self, resource_manager: ResourceManager, verbose: bool = False
392+
) -> str:
391393
# Active tasks
392394
active = self.op.num_active_tasks()
393395
desc = f"- {self.op.name}: Tasks: {active}"
@@ -409,7 +411,7 @@ def summary_str(self, resource_manager: ResourceManager) -> str:
409411

410412
# Queued blocks
411413
desc += f"; Queued blocks: {self.total_enqueued_input_blocks()} ({memory_string(self.total_enqueued_input_blocks_bytes())})"
412-
desc += f"; Resources: {resource_manager.get_op_usage_str(self.op)}"
414+
desc += f"; Resources: {resource_manager.get_op_usage_str(self.op, verbose=verbose)}"
413415

414416
# Any additional operator specific information.
415417
suffix = self.op.progress_str()

0 commit comments

Comments
 (0)