Skip to content

Commit 78477de

Browse files
abrarsheikhpeterxcli
authored andcommitted
[Serve][2/n] add batching metrics (ray-project#59232)
fixes ray-project#59218 ### Performance Delta ```python from ray import serve from typing import List @serve.deployment(max_ongoing_requests=1000) class MyDeployment: @serve.batch(max_batch_size=10, batch_wait_timeout_s=1) async def handle_batch(self, requests: List[int]) -> List[int]: return [request + 1 for request in requests] async def __call__(self) -> List[int]: return await self.handle_batch(1) app = MyDeployment.bind() ``` `ray start --head --metrics-export-port=8080` -> `serve run batch_test:app` locust 100 users Metric | With Change | Master | Δ (Master – With Change) -- | -- | -- | -- Requests | 32,033 | 33,541 | +1,508 Fails | 0 | 0 | 0 Median (ms) | 170 | 170 | 0 95%ile (ms) | 240 | 240 | 0 99%ile (ms) | 280 | 270 | –10 ms Average (ms) | 172.98 | 171.87 | –1.11 ms Min (ms) | 70 | 84 | +14 ms Max (ms) | 352 | 365 | +13 ms Average size (bytes) | 1 | 1 | 0 Current RPS | 581.9 | 604.1 | +22.2 Current Failures/s | 0 | 0 | 0 --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
1 parent d933787 commit 78477de

File tree

5 files changed

+290
-13
lines changed

5 files changed

+290
-13
lines changed

doc/source/serve/monitoring.md

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -493,15 +493,23 @@ You can customize these buckets using environment variables:
493493
- `ray_serve_multiplexed_model_load_latency_ms`
494494
- `ray_serve_multiplexed_model_unload_latency_ms`
495495

496-
Set these as comma-separated values, for example: `RAY_SERVE_REQUEST_LATENCY_BUCKETS_MS="10,50,100,500,1000,5000"`.
496+
- **`RAY_SERVE_BATCH_UTILIZATION_BUCKETS_PERCENT`**: Controls bucket boundaries for batch utilization histogram. Default: `[5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95, 99, 100]` (percentage).
497+
- `ray_serve_batch_utilization_percent`
498+
499+
- **`RAY_SERVE_BATCH_SIZE_BUCKETS`**: Controls bucket boundaries for batch size histogram. Default: `[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]`.
500+
- `ray_serve_actual_batch_size`
501+
502+
Note: `ray_serve_batch_wait_time_ms` and `ray_serve_batch_execution_time_ms` use the same buckets as `RAY_SERVE_REQUEST_LATENCY_BUCKETS_MS`.
503+
504+
Set these as comma-separated values, for example: `RAY_SERVE_REQUEST_LATENCY_BUCKETS_MS="10,50,100,500,1000,5000"` or `RAY_SERVE_BATCH_SIZE_BUCKETS="1,4,8,16,32,64"`.
497505

498506
**Histogram accuracy considerations**
499507

500508
Prometheus histograms aggregate data into predefined buckets, which can affect the accuracy of percentile calculations (e.g., p50, p95, p99) displayed on dashboards:
501509

502510
- **Values outside bucket range**: If your latencies exceed the largest bucket boundary (default: 600,000ms / 10 minutes), they all fall into the `+Inf` bucket and percentile estimates become inaccurate.
503511
- **Sparse bucket coverage**: If your actual latencies cluster between two widely-spaced buckets, the calculated percentiles are interpolated and may not reflect true values.
504-
- **Bucket boundaries are fixed at startup**: Changes to `RAY_SERVE_REQUEST_LATENCY_BUCKETS_MS` or `RAY_SERVE_MODEL_LOAD_LATENCY_BUCKETS_MS` require restarting Serve actors to take effect.
512+
- **Bucket boundaries are fixed at startup**: Changes to bucket environment variables (such as `RAY_SERVE_REQUEST_LATENCY_BUCKETS_MS`, `RAY_SERVE_BATCH_SIZE_BUCKETS`, etc.) require restarting Serve actors to take effect.
505513

506514
For accurate percentile calculations, configure bucket boundaries that closely match your expected latency distribution. For example, if most requests complete in 10-100ms, use finer-grained buckets in that range.
507515
:::
@@ -599,6 +607,19 @@ These metrics track request throughput, errors, and latency at the replica level
599607
| `ray_serve_deployment_processing_latency_ms` **[D]** | Histogram | `deployment`, `replica`, `route`, `application` | Histogram of request processing time in milliseconds (excludes queue wait time). |
600608
| `ray_serve_deployment_error_counter_total` **[D]** | Counter | `deployment`, `replica`, `route`, `application` | Total number of exceptions raised while processing requests. |
601609

610+
### Batching metrics
611+
612+
These metrics track request batching behavior for deployments using `@serve.batch`. Use them to tune batching parameters and debug latency issues.
613+
614+
| Metric | Type | Tags | Description |
615+
|--------|------|------|-------------|
616+
| `ray_serve_batch_wait_time_ms` | Histogram | `deployment`, `replica`, `application`, `function_name` | Time requests waited for the batch to fill in milliseconds. High values indicate batch timeout may be too long. |
617+
| `ray_serve_batch_execution_time_ms` | Histogram | `deployment`, `replica`, `application`, `function_name` | Time to execute the batch function in milliseconds. |
618+
| `ray_serve_batch_queue_length` | Gauge | `deployment`, `replica`, `application`, `function_name` | Current number of requests waiting in the batch queue. High values indicate a batching bottleneck. |
619+
| `ray_serve_batch_utilization_percent` | Histogram | `deployment`, `replica`, `application`, `function_name` | Batch utilization as percentage (`computed_batch_size / max_batch_size * 100`). Low utilization suggests `batch_wait_timeout_s` is too aggressive or traffic is too low. |
620+
| `ray_serve_actual_batch_size` | Histogram | `deployment`, `replica`, `application`, `function_name` | The computed size of each batch. When `batch_size_fn` is configured, this reports the custom computed size (such as total tokens). Otherwise, it reports the number of requests. |
621+
| `ray_serve_batches_processed_total` | Counter | `deployment`, `replica`, `application`, `function_name` | Total number of batches executed. Compare with request counter to measure batching efficiency. |
622+
602623
### Replica lifecycle metrics
603624

604625
These metrics track replica health and restarts.

python/ray/serve/_private/constants.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,58 @@
125125
DEFAULT_LATENCY_BUCKET_MS,
126126
)
127127

128+
#: Histogram buckets for batch execution time in milliseconds.
129+
BATCH_EXECUTION_TIME_BUCKETS_MS = REQUEST_LATENCY_BUCKETS_MS
130+
131+
#: Histogram buckets for batch wait time in milliseconds.
132+
BATCH_WAIT_TIME_BUCKETS_MS = REQUEST_LATENCY_BUCKETS_MS
133+
134+
#: Histogram buckets for batch utilization percentage.
135+
DEFAULT_BATCH_UTILIZATION_BUCKETS_PERCENT = [
136+
5,
137+
10,
138+
20,
139+
30,
140+
40,
141+
50,
142+
60,
143+
70,
144+
80,
145+
90,
146+
95,
147+
99,
148+
100,
149+
]
150+
BATCH_UTILIZATION_BUCKETS_PERCENT = parse_latency_buckets(
151+
get_env_str(
152+
"RAY_SERVE_BATCH_UTILIZATION_BUCKETS_PERCENT",
153+
"",
154+
),
155+
DEFAULT_BATCH_UTILIZATION_BUCKETS_PERCENT,
156+
)
157+
158+
#: Histogram buckets for actual batch size.
159+
DEFAULT_BATCH_SIZE_BUCKETS = [
160+
1,
161+
2,
162+
4,
163+
8,
164+
16,
165+
32,
166+
64,
167+
128,
168+
256,
169+
512,
170+
1024,
171+
]
172+
BATCH_SIZE_BUCKETS = parse_latency_buckets(
173+
get_env_str(
174+
"RAY_SERVE_BATCH_SIZE_BUCKETS",
175+
"",
176+
),
177+
DEFAULT_BATCH_SIZE_BUCKETS,
178+
)
179+
128180
#: Name of deployment health check method implemented by user.
129181
HEALTH_CHECK_METHOD = "check_health"
130182

python/ray/serve/batching.py

Lines changed: 108 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,16 @@
2727
from ray import serve
2828
from ray._common.signature import extract_signature, flatten_args, recover_args
2929
from ray._common.utils import get_or_create_event_loop
30-
from ray.serve._private.constants import SERVE_LOGGER_NAME
30+
from ray.serve._private.constants import (
31+
BATCH_EXECUTION_TIME_BUCKETS_MS,
32+
BATCH_SIZE_BUCKETS,
33+
BATCH_UTILIZATION_BUCKETS_PERCENT,
34+
BATCH_WAIT_TIME_BUCKETS_MS,
35+
SERVE_LOGGER_NAME,
36+
)
3137
from ray.serve._private.utils import extract_self_if_method_call
3238
from ray.serve.exceptions import RayServeException
39+
from ray.serve.metrics import Counter, Gauge, Histogram
3340
from ray.util.annotations import PublicAPI
3441

3542
logger = logging.getLogger(SERVE_LOGGER_NAME)
@@ -148,6 +155,46 @@ def __init__(
148155
# Used for observability.
149156
self.curr_iteration_start_times: Dict[asyncio.Task, float] = {}
150157

158+
# Initialize batching metrics.
159+
self._batch_wait_time_histogram = Histogram(
160+
"serve_batch_wait_time_ms",
161+
description="Time requests waited for batch to fill (in milliseconds).",
162+
boundaries=BATCH_WAIT_TIME_BUCKETS_MS,
163+
tag_keys=("function_name",),
164+
)
165+
self._batch_execution_time_histogram = Histogram(
166+
"serve_batch_execution_time_ms",
167+
description="Time to execute the batch function (in milliseconds).",
168+
boundaries=BATCH_EXECUTION_TIME_BUCKETS_MS,
169+
tag_keys=("function_name",),
170+
)
171+
self._batch_queue_length_gauge = Gauge(
172+
"serve_batch_queue_length",
173+
description="Number of requests waiting in the batch queue.",
174+
tag_keys=("function_name",),
175+
)
176+
self._batch_utilization_histogram = Histogram(
177+
"serve_batch_utilization_percent",
178+
description="Batch utilization as percentage (actual_batch_size / max_batch_size * 100).",
179+
boundaries=BATCH_UTILIZATION_BUCKETS_PERCENT,
180+
tag_keys=("function_name",),
181+
)
182+
self._batch_size_histogram = Histogram(
183+
"serve_actual_batch_size",
184+
description="The actual number of requests in each batch.",
185+
boundaries=BATCH_SIZE_BUCKETS,
186+
tag_keys=("function_name",),
187+
)
188+
self._batches_processed_counter = Counter(
189+
"serve_batches_processed",
190+
description="Counter of batches executed.",
191+
tag_keys=("function_name",),
192+
)
193+
194+
self._function_name = (
195+
handle_batch_func.__name__ if handle_batch_func is not None else "unknown"
196+
)
197+
151198
self._handle_batch_task = None
152199
self._loop = get_or_create_event_loop()
153200
if handle_batch_func is not None:
@@ -199,12 +246,13 @@ def _compute_batch_size(self, batch: List[_SingleRequest]) -> int:
199246

200247
return self.batch_size_fn(items)
201248

202-
async def wait_for_batch(self) -> List[_SingleRequest]:
249+
async def wait_for_batch(self) -> Tuple[List[_SingleRequest], int]:
203250
"""Wait for batch respecting self.max_batch_size and self.timeout_s.
204251
205-
Returns a batch of up to self.max_batch_size items. Waits for up to
206-
to self.timeout_s after receiving the first request that will be in
207-
the next batch. After the timeout, returns as many items as are ready.
252+
Returns a tuple of (batch, computed_batch_size) where batch contains
253+
up to self.max_batch_size items. Waits for up to self.timeout_s after
254+
receiving the first request that will be in the next batch. After the
255+
timeout, returns as many items as are ready.
208256
209257
Always returns a batch with at least one item - will block
210258
indefinitely until an item comes in.
@@ -228,13 +276,18 @@ async def wait_for_batch(self) -> List[_SingleRequest]:
228276
)
229277
# Set exception on the future so the caller receives it
230278
first_item.future.set_exception(exc)
231-
return []
279+
return [], 0
232280

233281
batch.append(first_item)
234282

235283
# Wait self.timeout_s seconds for new queue arrivals.
236284
batch_start_time = time.time()
237285
while True:
286+
# Record queue length metric.
287+
self._batch_queue_length_gauge.set(
288+
self.queue.qsize(), tags={"function_name": self._function_name}
289+
)
290+
238291
remaining_batch_time_s = max(
239292
batch_wait_timeout_s - (time.time() - batch_start_time), 0
240293
)
@@ -270,6 +323,9 @@ async def wait_for_batch(self) -> List[_SingleRequest]:
270323
# so newer requests may be processed before it. Consider using
271324
# asyncio.PriorityQueue if strict ordering is required.
272325
self.queue.put_nowait(deferred_item)
326+
# Compute final batch size before breaking (batch is now valid
327+
# after popping the deferred item).
328+
current_batch_size = self._compute_batch_size(batch)
273329
# break the loop early because the deferred item is too large to fit in the batch
274330
break
275331
else:
@@ -293,7 +349,13 @@ async def wait_for_batch(self) -> List[_SingleRequest]:
293349
):
294350
break
295351

296-
return batch
352+
# Record batch wait time metric (time spent waiting for batch to fill).
353+
batch_wait_time_ms = (time.time() - batch_start_time) * 1000
354+
self._batch_wait_time_histogram.observe(
355+
batch_wait_time_ms, tags={"function_name": self._function_name}
356+
)
357+
358+
return batch, current_batch_size
297359

298360
def _validate_results(
299361
self, results: Iterable[Any], input_batch_length: int
@@ -379,29 +441,57 @@ async def _process_batches(self, func: Callable) -> None:
379441
# So we unset the request context so the current context is not inherited by the task, _process_batch.
380442
serve.context._unset_request_context()
381443
while not self._loop.is_closed():
382-
batch = await self.wait_for_batch()
383-
promise = self._process_batch(func, batch)
444+
batch, computed_batch_size = await self.wait_for_batch()
445+
promise = self._process_batch(func, batch, computed_batch_size)
384446
task = asyncio.create_task(promise)
385447
self.tasks.add(task)
386448
self.curr_iteration_start_times[task] = time.time()
387449
task.add_done_callback(self._handle_completed_task)
388450

389-
async def _process_batch(self, func: Callable, batch: List[_SingleRequest]) -> None:
451+
async def _process_batch(
452+
self, func: Callable, batch: List[_SingleRequest], computed_batch_size: int
453+
) -> None:
390454
"""Processes queued request batch."""
391455
# NOTE: this semaphore caps the number of concurrent batches specified by `max_concurrent_batches`
392456
async with self.semaphore:
393457
# Remove requests that have been cancelled from the batch. If
394458
# all requests have been cancelled, simply return and wait for
395459
# the next batch.
460+
original_batch_len = len(batch)
396461
batch = [req for req in batch if not req.future.cancelled()]
397462
if len(batch) == 0:
398463
return
399464

465+
# Record batch utilization metric.
466+
# Use computed_batch_size from wait_for_batch for efficiency.
467+
# If requests were cancelled, we need to recompute since the batch changed.
468+
if len(batch) != original_batch_len:
469+
computed_batch_size = self._compute_batch_size(batch)
470+
471+
# Calculate and record batch utilization percentage.
472+
batch_utilization_percent = (
473+
computed_batch_size / self.max_batch_size
474+
) * 100
475+
self._batch_utilization_histogram.observe(
476+
batch_utilization_percent, tags={"function_name": self._function_name}
477+
)
478+
479+
# Record actual batch size (number of requests in the batch computed by the batch_size_fn).
480+
self._batch_size_histogram.observe(
481+
computed_batch_size, tags={"function_name": self._function_name}
482+
)
483+
484+
# Increment batches processed counter.
485+
self._batches_processed_counter.inc(
486+
tags={"function_name": self._function_name}
487+
)
488+
400489
futures = [item.future for item in batch]
401490

402491
# Most of the logic in the function should be wrapped in this try-
403492
# except block, so the futures' exceptions can be set if an exception
404493
# occurs. Otherwise, the futures' requests may hang indefinitely.
494+
batch_execution_start_time = time.time()
405495
try:
406496
self_arg = batch[0].self_arg
407497
args, kwargs = _batch_args_kwargs(
@@ -436,6 +526,14 @@ async def _process_batch(self, func: Callable, batch: List[_SingleRequest]) -> N
436526

437527
for future in futures:
438528
_set_exception_if_not_done(future, e)
529+
finally:
530+
# Record batch execution time.
531+
batch_execution_time_ms = (
532+
time.time() - batch_execution_start_time
533+
) * 1000
534+
self._batch_execution_time_histogram.observe(
535+
batch_execution_time_ms, tags={"function_name": self._function_name}
536+
)
439537

440538
def _handle_completed_task(self, task: asyncio.Task) -> None:
441539
self.tasks.remove(task)

python/ray/serve/tests/test_batching.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ async def f(self, request: Request):
635635

636636

637637
def test_batch_size_fn_deferred_item_processed(serve_instance):
638-
@serve.deployment
638+
@serve.deployment(max_ongoing_requests=15)
639639
class DeferredItemBatcher:
640640
def __init__(self):
641641
self.batch_sizes = []

0 commit comments

Comments
 (0)