Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions doc/source/serve/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,40 @@ These metrics track the Serve controller's performance. Useful for debugging con
| `ray_serve_long_poll_latency_ms` **[†]** | Histogram | `namespace` | Time for updates to propagate from controller to clients in milliseconds. `namespace` is the long poll namespace such as `ROUTE_TABLE`, `DEPLOYMENT_CONFIG`, or `DEPLOYMENT_TARGETS`. Debug slow config propagation; impacts autoscaling response time. |
| `ray_serve_long_poll_pending_clients` **[†]** | Gauge | `namespace` | Number of clients waiting for updates. `namespace` is the long poll namespace such as `ROUTE_TABLE`, `DEPLOYMENT_CONFIG`, or `DEPLOYMENT_TARGETS`. Identify backpressure in notification system. |

### Event loop monitoring metrics

These metrics track the health of asyncio event loops in Serve components. High scheduling latency indicates the event loop is blocked, which can cause request latency issues. Use these metrics to detect blocking code in handlers or system bottlenecks.

| Metric | Type | Tags | Description |
|--------|------|------|-------------|
| `ray_serve_event_loop_scheduling_latency_ms` **[†]** | Histogram | `component`, `loop_type`, `actor_id`, `deployment`*, `application`* | Event loop scheduling delay in milliseconds. Measures how long the loop was blocked beyond the expected sleep interval. Values close to zero indicate a healthy loop; high values indicate either blocking code or a large number of tasks queued on the event loop. |
| `ray_serve_event_loop_monitoring_iterations_total` **[†]** | Counter | `component`, `loop_type`, `actor_id`, `deployment`*, `application`* | Number of event loop monitoring iterations. Acts as a heartbeat; a stalled counter indicates the loop is completely blocked. |
| `ray_serve_event_loop_tasks` **[†]** | Gauge | `component`, `loop_type`, `actor_id`, `deployment`*, `application`* | Number of pending asyncio tasks on the event loop. High values may indicate task accumulation. |

*\* `deployment` and `application` tags are only present for replica `main` and `user_code` loops, not for proxy or router loops.*

**Tag values:**

- `component`: The Serve component type.
- `proxy`: HTTP/gRPC proxy actor
- `replica`: Deployment replica actor
- `unknown`: When using `DeploymentHandle.remote()`
- `loop_type`: The type of event loop being monitored.
- `main`: Main event loop for the actor (always present)
- `user_code`: Separate event loop for user handler code (replicas only, when `RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD=1`, which is the default)
- `router`: Separate event loop for request routing (replicas only, when `RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP=1`, which is the default)
- `actor_id`: The Ray actor ID of the proxy or replica
- `deployment`: The deployment name (replicas only, for `main` and `user_code` loops)
- `application`: The application name (replicas only, for `main` and `user_code` loops)

**Interpreting scheduling latency:**

- **< 10ms**: Healthy event loop
- **10-50ms**: Acceptable under load
- **50-100ms**: Concerning; investigate for blocking code
- **100-500ms**: Problematic; likely blocking I/O or CPU-bound code in async handlers
- **> 500ms**: Severe blocking; definitely impacting request latency

To see this in action, first run the following command to start Ray and set up the metrics export port:

```bash
Expand Down
29 changes: 29 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,3 +606,32 @@
)
# Key for the decision counters in default autoscaling policy state
SERVE_AUTOSCALING_DECISION_COUNTERS_KEY = "__decision_counters"

# Event loop monitoring interval in seconds.
# This is how often the event loop lag is measured.
RAY_SERVE_EVENT_LOOP_MONITORING_INTERVAL_S = get_env_float_positive(
"RAY_SERVE_EVENT_LOOP_MONITORING_INTERVAL_S", 5.0
)

# Histogram buckets for event loop scheduling latency in milliseconds.
# These are tuned for detecting event loop blocking:
# - < 10ms: healthy
# - 10-50ms: acceptable under load
# - 50-100ms: concerning, investigate
# - 100-500ms: problematic, likely blocking code
# - > 500ms: severe, definitely blocking
# - > 5s: catastrophic
SERVE_EVENT_LOOP_LATENCY_HISTOGRAM_BOUNDARIES_MS = [
1, # 1ms
5, # 5ms
10, # 10ms
25, # 25ms
50, # 50ms
100, # 100ms
250, # 250ms
500, # 500ms
1000, # 1s
2500, # 2.5s
5000, # 5s
10000, # 10s
]
193 changes: 193 additions & 0 deletions python/ray/serve/_private/event_loop_monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import asyncio
import logging
import time
from typing import Dict, Optional

from ray.serve._private.constants import (
RAY_SERVE_EVENT_LOOP_MONITORING_INTERVAL_S,
SERVE_EVENT_LOOP_LATENCY_HISTOGRAM_BOUNDARIES_MS,
SERVE_LOGGER_NAME,
)
from ray.util import metrics

logger = logging.getLogger(SERVE_LOGGER_NAME)


def setup_event_loop_monitoring(
loop: asyncio.AbstractEventLoop,
scheduling_latency: metrics.Histogram,
iterations: metrics.Counter,
tasks: metrics.Gauge,
tags: Dict[str, str],
interval_s: Optional[float] = None,
) -> asyncio.Task:
"""Start monitoring an event loop and recording metrics.

This function creates a background task that periodically measures:
- How long it takes for the event loop to wake up after sleeping
(scheduling latency / event loop lag)
- The number of pending asyncio tasks

Args:
loop: The asyncio event loop to monitor.
scheduling_latency: Histogram metric to record scheduling latency.
iterations: Counter metric to track monitoring iterations.
tasks: Gauge metric to track number of pending tasks.
tags: Dictionary of tags to apply to all metrics.
interval_s: Optional override for the monitoring interval.
Defaults to RAY_SERVE_EVENT_LOOP_MONITORING_INTERVAL_S.

Returns:
The asyncio Task running the monitoring loop.
"""
if interval_s is None:
interval_s = RAY_SERVE_EVENT_LOOP_MONITORING_INTERVAL_S

return loop.create_task(
_run_monitoring_loop(
loop=loop,
schedule_latency=scheduling_latency,
iterations=iterations,
task_gauge=tasks,
tags=tags,
interval_s=interval_s,
),
name="serve_event_loop_monitoring",
)


async def _run_monitoring_loop(
loop: asyncio.AbstractEventLoop,
schedule_latency: metrics.Histogram,
iterations: metrics.Counter,
task_gauge: metrics.Gauge,
tags: Dict[str, str],
interval_s: float,
) -> None:
"""Internal monitoring loop that runs until the event loop stops.

The scheduling latency is measured by comparing the actual elapsed time
after sleeping to the expected sleep duration. In an ideal scenario
with no blocking, the latency should be close to zero.
"""
while loop.is_running():
iterations.inc(1, tags)
num_tasks = len(asyncio.all_tasks(loop))
task_gauge.set(num_tasks, tags)
yield_time = time.monotonic()
await asyncio.sleep(interval_s)
elapsed_time = time.monotonic() - yield_time

# Historically, Ray's implementation of histograms are extremely finicky
# with non-positive values (https://github.com/ray-project/ray/issues/26698).
# Technically it shouldn't be possible for this to be negative, add the
# max just to be safe.
# Convert to milliseconds for the metric.
latency_ms = max(0.0, (elapsed_time - interval_s) * 1000)
schedule_latency.observe(latency_ms, tags)


class EventLoopMonitor:
TAG_KEY_COMPONENT = "component"
TAG_KEY_LOOP_TYPE = "loop_type"
TAG_KEY_ACTOR_ID = "actor_id"

# Component types
COMPONENT_PROXY = "proxy"
COMPONENT_REPLICA = "replica"
COMPONENT_UNKNOWN = "unknown"

# Loop types
LOOP_TYPE_MAIN = "main"
LOOP_TYPE_USER_CODE = "user_code"
LOOP_TYPE_ROUTER = "router"

def __init__(
self,
component: str,
loop_type: str,
actor_id: str,
interval_s: float = RAY_SERVE_EVENT_LOOP_MONITORING_INTERVAL_S,
extra_tags: Optional[Dict[str, str]] = None,
):
"""Initialize the event loop monitor.

Args:
component: The component type ("proxy" or "replica").
loop_type: The type of event loop ("main", "user_code", or "router").
actor_id: The ID of the actor where this event loop runs.
interval_s: Optional override for the monitoring interval.
extra_tags: Optional dictionary of additional tags to include in metrics.
"""
self._interval_s = interval_s
self._tags = {
self.TAG_KEY_COMPONENT: component,
self.TAG_KEY_LOOP_TYPE: loop_type,
self.TAG_KEY_ACTOR_ID: actor_id,
}
if extra_tags:
self._tags.update(extra_tags)
self._tag_keys = tuple(self._tags.keys())

# Create metrics
self._scheduling_latency = metrics.Histogram(
"serve_event_loop_scheduling_latency_ms",
description=(
"Latency of getting yielded control on the event loop in milliseconds. "
"High values indicate the event loop is blocked."
),
boundaries=SERVE_EVENT_LOOP_LATENCY_HISTOGRAM_BOUNDARIES_MS,
tag_keys=self._tag_keys,
)
self._scheduling_latency.set_default_tags(self._tags)

self._iterations = metrics.Counter(
"serve_event_loop_monitoring_iterations",
description=(
"Number of times the event loop monitoring task has run. "
"Can be used as a heartbeat."
),
tag_keys=self._tag_keys,
)
self._iterations.set_default_tags(self._tags)

self._tasks = metrics.Gauge(
"serve_event_loop_tasks",
description="Number of pending asyncio tasks on the event loop.",
tag_keys=self._tag_keys,
)
self._tasks.set_default_tags(self._tags)

self._monitoring_task: Optional[asyncio.Task] = None

def start(self, loop: asyncio.AbstractEventLoop) -> asyncio.Task:
"""Start monitoring the given event loop.

Args:
loop: The asyncio event loop to monitor.

Returns:
The asyncio Task running the monitoring loop.
"""
self._monitoring_task = setup_event_loop_monitoring(
loop=loop,
scheduling_latency=self._scheduling_latency,
iterations=self._iterations,
tasks=self._tasks,
tags=self._tags,
interval_s=self._interval_s,
)
logger.debug(
f"Started event loop monitoring for {self._tags[self.TAG_KEY_COMPONENT]} "
f"({self._tags[self.TAG_KEY_LOOP_TYPE]}) actor {self._tags[self.TAG_KEY_ACTOR_ID]}"
)
return self._monitoring_task

def stop(self):
if self._monitoring_task is not None and not self._monitoring_task.done():
self._monitoring_task.cancel()
self._monitoring_task = None

@property
def tags(self) -> Dict[str, str]:
return self._tags.copy()
1 change: 1 addition & 0 deletions python/ray/serve/_private/local_testing_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def make_local_deployment_handle(
run_user_code_in_separate_thread=True,
local_testing_mode=True,
deployment_config=deployment._deployment_config,
actor_id="local",
)
try:
logger.info(f"Initializing local replica class for {deployment_id}.")
Expand Down
9 changes: 9 additions & 0 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
SERVE_NAMESPACE,
)
from ray.serve._private.default_impl import get_proxy_handle
from ray.serve._private.event_loop_monitoring import EventLoopMonitor
from ray.serve._private.grpc_util import (
get_grpc_response_status,
set_grpc_code_and_details,
Expand Down Expand Up @@ -1277,6 +1278,14 @@ def __init__(

_configure_gc_options()

# Start event loop monitoring for the proxy's main event loop.
self._event_loop_monitor = EventLoopMonitor(
component=EventLoopMonitor.COMPONENT_PROXY,
loop_type=EventLoopMonitor.LOOP_TYPE_MAIN,
actor_id=ray.get_runtime_context().get_actor_id(),
)
self._event_loop_monitor.start(event_loop)

def _update_routes_in_proxies(self, endpoints: Dict[DeploymentID, EndpointInfo]):
self.proxy_router.update_routes(endpoints)

Expand Down
32 changes: 32 additions & 0 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
create_replica_impl,
create_replica_metrics_manager,
)
from ray.serve._private.event_loop_monitoring import EventLoopMonitor
from ray.serve._private.http_util import (
ASGIAppReplicaWrapper,
ASGIArgs,
Expand Down Expand Up @@ -529,6 +530,7 @@ def __init__(
self._configure_logger_and_profilers(self._deployment_config.logging_config)
self._event_loop = get_or_create_event_loop()

actor_id = ray.get_runtime_context().get_actor_id()
self._user_callable_wrapper = UserCallableWrapper(
deployment_def,
init_args,
Expand All @@ -538,6 +540,7 @@ def __init__(
run_user_code_in_separate_thread=RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD,
local_testing_mode=False,
deployment_config=deployment_config,
actor_id=actor_id,
)
self._semaphore = Semaphore(lambda: self.max_ongoing_requests)

Expand Down Expand Up @@ -571,6 +574,18 @@ def __init__(
ingress=ingress,
)

# Start event loop monitoring for the replica's main event loop.
self._main_loop_monitor = EventLoopMonitor(
component=EventLoopMonitor.COMPONENT_REPLICA,
loop_type=EventLoopMonitor.LOOP_TYPE_MAIN,
actor_id=actor_id,
extra_tags={
"deployment": self._deployment_id.name,
"application": self._deployment_id.app_name,
},
)
self._main_loop_monitor.start(self._event_loop)

self._internal_grpc_port: Optional[int] = None
self._docs_path: Optional[str] = None
self._http_port: Optional[int] = None
Expand Down Expand Up @@ -1430,6 +1445,7 @@ def __init__(
run_user_code_in_separate_thread: bool,
local_testing_mode: bool,
deployment_config: DeploymentConfig,
actor_id: str,
):
if not (inspect.isfunction(deployment_def) or inspect.isclass(deployment_def)):
raise TypeError(
Expand Down Expand Up @@ -1461,10 +1477,25 @@ def __init__(
asyncio.new_event_loop()
)

# Start event loop monitoring for the user code event loop.
# We create the monitor here but start it inside the thread function
# so the task is created on the correct thread.
self._user_code_loop_monitor = EventLoopMonitor(
component=EventLoopMonitor.COMPONENT_REPLICA,
loop_type=EventLoopMonitor.LOOP_TYPE_USER_CODE,
actor_id=actor_id,
extra_tags={
"deployment": self._deployment_id.name,
"application": self._deployment_id.app_name,
},
)

def _run_user_code_event_loop():
# Required so that calls to get the current running event loop work
# properly in user code.
asyncio.set_event_loop(self._user_code_event_loop)
# Start monitoring before run_forever so the task is scheduled.
self._user_code_loop_monitor.start(self._user_code_event_loop)
self._user_code_event_loop.run_forever()

self._user_code_event_loop_thread = threading.Thread(
Expand All @@ -1474,6 +1505,7 @@ def _run_user_code_event_loop():
self._user_code_event_loop_thread.start()
else:
self._user_code_event_loop = asyncio.get_running_loop()
self._user_code_loop_monitor = None

@property
def event_loop(self) -> asyncio.AbstractEventLoop:
Expand Down
Loading