|
7 | 7 | import json
|
8 | 8 | import os
|
9 | 9 | import re
|
| 10 | +import threading |
10 | 11 | import time
|
11 | 12 | from typing import Dict, List
|
12 | 13 |
|
|
27 | 28 |
|
28 | 29 |
|
29 | 30 | class OrchestratorMetrics:
|
30 |
| - # Because: |
| 31 | + # Need an instance ID for metric prefix because: |
| 32 | + # - Orchestror instances are not named |
31 | 33 | # - CI creates several orchestrator instances
|
32 |
| - # - Prometheus requires metrics to be singletons |
33 |
| - # - Oorchestror instances are not provided their own names |
34 |
| - # Metrics are class members with "megaservice" name prefix |
35 |
| - first_token_latency = Histogram("megaservice_first_token_latency", "First token latency (histogram)") |
36 |
| - inter_token_latency = Histogram("megaservice_inter_token_latency", "Inter-token latency (histogram)") |
37 |
| - request_latency = Histogram("megaservice_request_latency", "Whole request/reply latency (histogram)") |
38 |
| - request_pending = Gauge("megaservice_request_pending", "Count of currently pending requests (gauge)") |
| 34 | + # - Prometheus requires metrics (their names) to be unique |
| 35 | + _instance_id = 0 |
39 | 36 |
|
40 | 37 | def __init__(self) -> None:
|
41 |
| - pass |
| 38 | + self._instance_id += 1 |
| 39 | + if self._instance_id > 1: |
| 40 | + self._prefix = f"megaservice{self._instance_id}" |
| 41 | + else: |
| 42 | + self._prefix = "megaservice" |
| 43 | + |
| 44 | + self.request_pending = Gauge(f"{self._prefix}_request_pending", "Count of currently pending requests (gauge)") |
| 45 | + |
| 46 | + # locking for latency metric creation / method change |
| 47 | + self._lock = threading.Lock() |
| 48 | + |
| 49 | + # Metrics related to token processing are created on demand, |
| 50 | + # to avoid bogus ones for services that never handle tokens |
| 51 | + self.first_token_latency = None |
| 52 | + self.inter_token_latency = None |
| 53 | + self.request_latency = None |
| 54 | + |
| 55 | + # initial methods to create the metrics |
| 56 | + self.token_update = self._token_update_create |
| 57 | + self.request_update = self._request_update_create |
| 58 | + |
| 59 | + def _token_update_create(self, token_start: float, is_first: bool) -> float: |
| 60 | + with self._lock: |
| 61 | + # in case another thread already got here |
| 62 | + if self.token_update == self._token_update_create: |
| 63 | + self.first_token_latency = Histogram( |
| 64 | + f"{self._prefix}_first_token_latency", "First token latency (histogram)" |
| 65 | + ) |
| 66 | + self.inter_token_latency = Histogram( |
| 67 | + f"{self._prefix}_inter_token_latency", "Inter-token latency (histogram)" |
| 68 | + ) |
| 69 | + self.token_update = self._token_update_real |
| 70 | + return self.token_update(token_start, is_first) |
| 71 | + |
| 72 | + def _request_update_create(self, req_start: float) -> None: |
| 73 | + with self._lock: |
| 74 | + # in case another thread already got here |
| 75 | + if self.request_update == self._request_update_create: |
| 76 | + self.request_latency = Histogram( |
| 77 | + f"{self._prefix}_request_latency", "Whole LLM request/reply latency (histogram)" |
| 78 | + ) |
| 79 | + self.request_update = self._request_update_real |
| 80 | + self.request_update(req_start) |
42 | 81 |
|
43 |
| - def token_update(self, token_start: float, is_first: bool) -> float: |
| 82 | + def _token_update_real(self, token_start: float, is_first: bool) -> float: |
44 | 83 | now = time.time()
|
45 | 84 | if is_first:
|
46 | 85 | self.first_token_latency.observe(now - token_start)
|
47 | 86 | else:
|
48 | 87 | self.inter_token_latency.observe(now - token_start)
|
49 | 88 | return now
|
50 | 89 |
|
51 |
| - def request_update(self, req_start: float) -> None: |
| 90 | + def _request_update_real(self, req_start: float) -> None: |
52 | 91 | self.request_latency.observe(time.time() - req_start)
|
53 | 92 |
|
54 | 93 | def pending_update(self, increase: bool) -> None:
|
|
0 commit comments