Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions doc/source/serve/advanced-guides/advanced-autoscaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -513,5 +513,3 @@ In your policy, access custom metrics via:
* **`ctx.raw_metrics[metric_name]`** — A mapping of replica IDs to lists of raw metric values.
The number of data points stored for each replica depends on the [`look_back_period_s`](../api/doc/ray.serve.config.AutoscalingConfig.look_back_period_s.rst) (the sliding window size) and `RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S` (the metric recording interval).
* **`ctx.aggregated_metrics[metric_name]`** — A time-weighted average computed from the raw metric values for each replica.

> Today, aggregation is a time-weighted average. In future releases, additional aggregation options may be supported.
6 changes: 4 additions & 2 deletions doc/source/serve/doc_code/autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ def custom_metrics_autoscaling_policy(
) -> tuple[int, Dict[str, Any]]:
cpu_usage_metric = ctx.aggregated_metrics.get("cpu_usage", {})
memory_usage_metric = ctx.aggregated_metrics.get("memory_usage", {})
max_cpu_usage = max(cpu_usage_metric.values())
max_memory_usage = max(memory_usage_metric.values())
max_cpu_usage = list(cpu_usage_metric.values())[-1] if cpu_usage_metric else 0
max_memory_usage = (
list(memory_usage_metric.values())[-1] if memory_usage_metric else 0
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Autoscaling Policy Uses Incorrect Metrics

The autoscaling policy example now calculates CPU and memory usage based on a single replica's metrics instead of the maximum across all replicas. This change can lead to incorrect autoscaling decisions by not considering the highest resource utilization.

Fix in Cursor Fix in Web


if max_cpu_usage > 80 or max_memory_usage > 85:
return min(ctx.capacity_adjusted_max_replicas, ctx.current_num_replicas + 1), {}
Expand Down
15 changes: 8 additions & 7 deletions doc/source/serve/doc_code/custom_metrics_autoscaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
autoscaling_config={
"min_replicas": 1,
"max_replicas": 5,
"metrics_interval_s": 0.1,
"policy": {
"policy_function": "autoscaling_policy:custom_metrics_autoscaling_policy"
},
Expand All @@ -21,7 +22,7 @@ def __init__(self):
self.memory_usage = 60.0

def __call__(self) -> str:
time.sleep(0.1)
time.sleep(0.5)
self.cpu_usage = min(100, self.cpu_usage + 5)
self.memory_usage = min(100, self.memory_usage + 3)
return "Hello, world!"
Expand All @@ -39,10 +40,10 @@ def record_autoscaling_stats(self) -> Dict[str, float]:
app = CustomMetricsDeployment.bind()
# __serve_example_end__

# TODO: uncomment after autoscaling context is populated with all metrics
# if __name__ == "__main__":
# import requests # noqa
if __name__ == "__main__":
import requests # noqa

# serve.run(app)
# resp = requests.get("http://localhost:8000/")
# assert resp.text == "Hello, world!"
serve.run(app)
for _ in range(10):
resp = requests.get("http://localhost:8000/")
assert resp.text == "Hello, world!"
1 change: 1 addition & 0 deletions doc/source/serve/doc_code/scheduled_batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
policy=AutoscalingPolicy(
policy_function="autoscaling_policy:scheduled_batch_processing_policy"
),
metrics_interval_s=0.1,
),
max_ongoing_requests=3,
)
Expand Down
152 changes: 127 additions & 25 deletions python/ray/serve/_private/autoscaling_state.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set
from typing import Dict, List, Optional, Set

from ray.serve._private.common import (
ONGOING_REQUESTS_KEY,
QUEUED_REQUESTS_KEY,
RUNNING_REQUESTS_KEY,
DeploymentID,
HandleMetricReport,
Expand Down Expand Up @@ -207,23 +208,28 @@ def get_decision_num_replicas(
`_skip_bound_check` is True, then the bounds are not applied.
"""

total_num_requests = self.get_total_num_requests()
total_queued_requests = self._get_queued_requests()
# NOTE: for non additive aggregation functions, total_running_requests is not
# accurate, consider this is a approximation.
total_running_requests = total_num_requests - total_queued_requests
autoscaling_context: AutoscalingContext = AutoscalingContext(
deployment_id=self._deployment_id,
deployment_name=self._deployment_id.name,
app_name=self._deployment_id.app_name,
current_num_replicas=len(self._running_replicas),
target_num_replicas=curr_target_num_replicas,
running_replicas=self._running_replicas,
total_num_requests=self.get_total_num_requests(),
total_num_requests=total_num_requests,
capacity_adjusted_min_replicas=self.get_num_replicas_lower_bound(),
capacity_adjusted_max_replicas=self.get_num_replicas_upper_bound(),
policy_state=self._policy_state.copy(),
current_time=time.time(),
config=self._config,
queued_requests=None,
requests_per_replica=None,
aggregated_metrics=None,
raw_metrics=None,
total_queued_requests=total_queued_requests,
total_running_requests=total_running_requests,
aggregated_metrics=self._get_aggregated_custom_metrics(),
raw_metrics=self._get_raw_custom_metrics(),
last_scale_up_time=None,
last_scale_down_time=None,
)
Expand Down Expand Up @@ -300,19 +306,22 @@ def _collect_handle_running_requests(self) -> List[List[TimeStampedValue]]:

return timeseries_list

def _aggregate_ongoing_requests(
self, metrics_timeseries_dicts: List[Dict[str, List[TimeStampedValue]]]
def _aggregate_timeseries_metric(
self,
metrics_timeseries_dicts: List[Dict[str, List[TimeStampedValue]]],
metric_key: str,
) -> float:
"""Aggregate and average ongoing requests from timeseries data using instantaneous merge.
"""Aggregate and average a metric from timeseries data using instantaneous merge.

Args:
metrics_timeseries_dicts: A list of dictionaries, each containing a key-value pair:
- The key is the name of the metric (ONGOING_REQUESTS_KEY)
- The key is the name of the metric (e.g., ONGOING_REQUESTS_KEY or custom metric name)
- The value is a list of TimeStampedValue objects, each representing a single measurement of the metric
this list is sorted by timestamp ascending
metric_key: The key to use when extracting the metric from the dictionaries

Returns:
The time-weighted average of the ongoing requests
The time-weighted average of the metric

Example:
If the metrics_timeseries_dicts is:
Expand All @@ -339,10 +348,10 @@ def _aggregate_ongoing_requests(

# Use instantaneous merge approach - no arbitrary windowing needed
aggregated_metrics = merge_timeseries_dicts(*metrics_timeseries_dicts)
ongoing_requests_timeseries = aggregated_metrics.get(ONGOING_REQUESTS_KEY, [])
if ongoing_requests_timeseries:
metric_timeseries = aggregated_metrics.get(metric_key, [])
if metric_timeseries:
# assume that the last recorded metric is valid for last_window_s seconds
last_metric_time = ongoing_requests_timeseries[-1].timestamp
last_metric_time = metric_timeseries[-1].timestamp
# we dont want to make any assumption about how long the last metric will be valid
# only conclude that the last metric is valid for last_window_s seconds that is the
# difference between the current time and the last metric recorded time
Expand All @@ -351,16 +360,34 @@ def _aggregate_ongoing_requests(
# between replicas and controller. Also add a small epsilon to avoid division by zero
if last_window_s <= 0:
last_window_s = 1e-3
# Calculate the aggregated running requests
# Calculate the aggregated metric value
value = aggregate_timeseries(
ongoing_requests_timeseries,
metric_timeseries,
aggregation_function=self._config.aggregation_function,
last_window_s=last_window_s,
)
return value if value is not None else 0.0

return 0.0

def _aggregate_ongoing_requests(
self, metrics_timeseries_dicts: List[Dict[str, List[TimeStampedValue]]]
) -> float:
"""Aggregate and average ongoing requests from timeseries data using instantaneous merge.

This is a convenience wrapper around _aggregate_timeseries_metric for ongoing requests.

Args:
metrics_timeseries_dicts: A list of dictionaries containing ONGOING_REQUESTS_KEY
mapped to timeseries data.

Returns:
The time-weighted average of the ongoing requests
"""
return self._aggregate_timeseries_metric(
metrics_timeseries_dicts, ONGOING_REQUESTS_KEY
)

def _calculate_total_requests_aggregate_mode(self) -> float:
"""Calculate total requests using aggregate metrics mode with timeseries data.

Expand Down Expand Up @@ -541,11 +568,8 @@ def get_total_num_requests(self) -> float:
else:
return self._calculate_total_requests_simple_mode()

def get_replica_metrics(self, agg_func: str) -> Dict[ReplicaID, List[Any]]:
def get_replica_metrics(self) -> Dict[ReplicaID, List[TimeStampedValue]]:
"""Get the raw replica metrics dict."""
# arcyleung TODO: pass agg_func from autoscaling policy https://github.com/ray-project/ray/pull/51905
# Dummy implementation of mean agg_func across all values of the same metrics key

metric_values = defaultdict(list)
for id in self._running_replicas:
if id in self._replica_metrics and self._replica_metrics[id].metrics:
Expand All @@ -554,6 +578,86 @@ def get_replica_metrics(self, agg_func: str) -> Dict[ReplicaID, List[Any]]:

return metric_values

def _get_queued_requests(self) -> float:
"""Calculate the total number of queued requests across all handles.

Returns:
Sum of queued requests at all handles. Uses aggregated values in simple mode,
or aggregates timeseries data in aggregate mode.
"""
if RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER:
# Aggregate mode: collect and aggregate timeseries
queued_timeseries = self._collect_handle_queued_requests()
if not queued_timeseries:
return 0.0

queued_metrics = [
{QUEUED_REQUESTS_KEY: timeseries} for timeseries in queued_timeseries
]
return self._aggregate_timeseries_metric(
queued_metrics, QUEUED_REQUESTS_KEY
)
else:
# Simple mode: sum pre-aggregated values
return sum(
handle_metric.aggregated_queued_requests
for handle_metric in self._handle_requests.values()
)

def _get_aggregated_custom_metrics(self) -> Dict[str, Dict[ReplicaID, float]]:
"""Aggregate custom metrics from replica metric reports.

Custom metrics are all metrics except RUNNING_REQUESTS_KEY. These are metrics
emitted by the deployment using the `record_autoscaling_stats` method.

This method aggregates raw timeseries data from replicas on the controller,
similar to how ongoing requests are aggregated.

Returns:
Dict mapping metric name to dict of replica ID to aggregated metric value.
"""
aggregated_metrics = defaultdict(dict)

for replica_id in self._running_replicas:
replica_metric_report = self._replica_metrics.get(replica_id)
if replica_metric_report is None:
continue

for metric_name, timeseries in replica_metric_report.metrics.items():
# Aggregate the timeseries for this custom metric
# Use the actual metric name as the key
metrics = [{metric_name: timeseries}]
aggregated_value = self._aggregate_timeseries_metric(
metrics, metric_name
)
aggregated_metrics[metric_name][replica_id] = aggregated_value

return dict(aggregated_metrics)

def _get_raw_custom_metrics(
self,
) -> Dict[str, Dict[ReplicaID, List[TimeStampedValue]]]:
"""Extract raw custom metric values from replica metric reports.

Custom metrics are all metrics except RUNNING_REQUESTS_KEY. These are metrics
emitted by the deployment using the `record_autoscaling_stats` method.

Returns:
Dict mapping metric name to dict of replica ID to list of raw metric values.
"""
raw_metrics = defaultdict(dict)

for replica_id in self._running_replicas:
replica_metric_report = self._replica_metrics.get(replica_id)
if replica_metric_report is None:
continue

for metric_name, timeseries in replica_metric_report.metrics.items():
# Extract values from TimeStampedValue list
raw_metrics[metric_name][replica_id] = timeseries

return dict(raw_metrics)


class AutoscalingStateManager:
"""Manages all things autoscaling related.
Expand Down Expand Up @@ -602,12 +706,10 @@ def get_metrics(self) -> Dict[DeploymentID, float]:
}

def get_all_metrics(
self, agg_func="mean"
) -> Dict[DeploymentID, Dict[ReplicaID, List[Any]]]:
self,
) -> Dict[DeploymentID, Dict[ReplicaID, List[TimeStampedValue]]]:
return {
deployment_id: self._autoscaling_states[deployment_id].get_replica_metrics(
agg_func
)
deployment_id: self._autoscaling_states[deployment_id].get_replica_metrics()
for deployment_id in self._autoscaling_states
}

Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@ class CreatePlacementGroupRequest:

RUNNING_REQUESTS_KEY = "running_requests"
ONGOING_REQUESTS_KEY = "ongoing_requests"
QUEUED_REQUESTS_KEY = "queued_requests"


@dataclass(order=True)
Expand Down
14 changes: 7 additions & 7 deletions python/ray/serve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ray._common.utils import import_attr

# Import types needed for AutoscalingContext
from ray.serve._private.common import DeploymentID, ReplicaID
from ray.serve._private.common import DeploymentID, ReplicaID, TimeStampedValue
from ray.serve._private.constants import (
DEFAULT_AUTOSCALING_POLICY_NAME,
DEFAULT_GRPC_PORT,
Expand Down Expand Up @@ -63,18 +63,18 @@ class AutoscalingContext:

# Built-in metrics
total_num_requests: float #: Total number of requests across all replicas.
queued_requests: Optional[float] #: Number of requests currently queued.
requests_per_replica: Dict[
ReplicaID, float
] #: Mapping of replica ID to number of requests.
total_queued_requests: Optional[float] #: Number of requests currently queued.
total_running_requests: Optional[
float
] #: Total number of requests currently running.

# Custom metrics
aggregated_metrics: Dict[
str, Dict[ReplicaID, float]
] #: Time-weighted averages of custom metrics per replica.
raw_metrics: Dict[
str, Dict[ReplicaID, List[float]]
] #: Raw custom metric values per replica.
str, Dict[ReplicaID, List[TimeStampedValue]]
] #: Raw custom metric timeseries per replica.

# Capacity and bounds
capacity_adjusted_min_replicas: int #: Minimum replicas adjusted for cluster capacity.
Expand Down
25 changes: 20 additions & 5 deletions python/ray/serve/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,27 @@ py_test_module_list(
)

# Custom metrics tests.
py_test_module_list(
py_test_module_list_with_env_variants(
size = "small",
env = {
"RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "0",
"RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S": "2",
"RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S": "3",
env_variants = {
"metr_agg_at_controller": {
"env": {
"RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER": "1",
"RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "0",
"RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S": "0.1",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test also fails locally for me when I set "RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S": "0.1" but passes with "0.5"

"RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S": "3",
},
"name_suffix": "_metr_agg_at_controller",
},
"metr_agg_at_replicas": {
"env": {
"RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER": "0",
"RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "0",
"RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S": "0.1",
"RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S": "3",
},
"name_suffix": "_metr_agg_at_replicas",
},
},
files = [
"test_custom_autoscaling_metrics.py",
Expand Down
Loading