Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import logging
import math
import os
import time
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional

from ray._private.ray_constants import env_float
from ray._private.ray_constants import env_bool, env_float
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
Expand All @@ -33,7 +32,12 @@


logger = logging.getLogger(__name__)
DEBUG_RESOURCE_MANAGER = os.environ.get("RAY_DATA_DEBUG_RESOURCE_MANAGER", "0") == "1"


LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE: Optional[bool] = env_bool(
"RAY_DATA_DEBUG_RESOURCE_MANAGER", None
)


# These are physical operators that must receive all inputs before they start
# processing data.
Expand Down Expand Up @@ -83,8 +87,6 @@ def __init__(
# the operator, including the external output buffer in OpState, and the
# input buffers of the downstream operators.
self._mem_op_outputs: Dict[PhysicalOperator, int] = defaultdict(int)
# Whether to print debug information.
self._debug = DEBUG_RESOURCE_MANAGER

self._op_resource_allocator: Optional["OpResourceAllocator"] = None

Expand Down Expand Up @@ -266,7 +268,7 @@ def get_op_usage(self, op: PhysicalOperator) -> ExecutionResources:
"""Return the resource usage of the given operator at the current time."""
return self._op_usages[op]

def get_op_usage_str(self, op: PhysicalOperator) -> str:
def get_op_usage_str(self, op: PhysicalOperator, *, verbose: bool) -> str:
"""Return a human-readable string representation of the resource usage of
the given operator."""
usage_str = f"{self._op_running_usages[op].cpu:.1f} CPU"
Expand All @@ -276,7 +278,11 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str:
f", {self._op_running_usages[op].object_store_memory_str()} object store"
)

if self._debug:
# NOTE: Config can override requested verbosity level
if LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE is not None:
verbose = LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE

if verbose:
Comment on lines +282 to +285
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While the logic is correct, reassigning the verbose parameter can be a bit confusing. It's generally better to avoid modifying input parameters to improve code clarity and prevent unexpected side effects.

Consider using a local variable to hold the effective verbosity level, which makes the override logic more explicit.

        is_verbose = verbose
        if LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE is not None:
            is_verbose = LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDE

        if is_verbose:

usage_str += (
f" (in={memory_string(self._mem_op_internal[op])},"
f"out={memory_string(self._mem_op_outputs[op])})"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ def _debug_dump_topology(topology: Topology, resource_manager: ResourceManager)
for i, (op, state) in enumerate(topology.items()):
state.update_display_metrics(resource_manager)
logger.debug(
f"{i}: {state.summary_str(resource_manager)}, "
f"{i}: {state.summary_str(resource_manager, verbose=True)}, "
f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}, "
f"Metrics: {state.op_display_metrics.display_str()}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
self.progress_bar.set_description(self.summary_str(resource_manager))
self.progress_bar.refresh()

def summary_str(self, resource_manager: ResourceManager) -> str:
def summary_str(
self, resource_manager: ResourceManager, verbose: bool = False
) -> str:
# Active tasks
active = self.op.num_active_tasks()
desc = f"- {self.op.name}: Tasks: {active}"
Expand All @@ -409,7 +411,7 @@ def summary_str(self, resource_manager: ResourceManager) -> str:

# Queued blocks
desc += f"; Queued blocks: {self.total_enqueued_input_blocks()} ({memory_string(self.total_enqueued_input_blocks_bytes())})"
desc += f"; Resources: {resource_manager.get_op_usage_str(self.op)}"
desc += f"; Resources: {resource_manager.get_op_usage_str(self.op, verbose=verbose)}"

# Any additional operator specific information.
suffix = self.op.progress_str()
Expand Down