29
29
30
30
31
31
class OrchestratorMetrics :
32
- # Need an static class-level ID for metric prefix because:
33
- # - Prometheus requires metrics (their names) to be unique
34
- _instance_id = 0
35
-
36
32
def __init__ (self ) -> None :
37
- OrchestratorMetrics ._instance_id += 1
38
- if OrchestratorMetrics ._instance_id > 1 :
39
- self ._prefix = f"megaservice{ self ._instance_id } "
40
- else :
41
- self ._prefix = "megaservice"
42
-
43
- self .request_pending = Gauge (f"{ self ._prefix } _request_pending" , "Count of currently pending requests (gauge)" )
44
-
45
33
# locking for latency metric creation / method change
46
34
self ._lock = threading .Lock ()
47
35
@@ -50,20 +38,22 @@ def __init__(self) -> None:
50
38
self .first_token_latency = None
51
39
self .inter_token_latency = None
52
40
self .request_latency = None
41
+ self .request_pending = None
53
42
54
43
# initial methods to create the metrics
55
44
self .token_update = self ._token_update_create
56
45
self .request_update = self ._request_update_create
46
+ self .pending_update = self ._pending_update_create
57
47
58
48
def _token_update_create (self , token_start : float , is_first : bool ) -> float :
59
49
with self ._lock :
60
50
# in case another thread already got here
61
51
if self .token_update == self ._token_update_create :
62
52
self .first_token_latency = Histogram (
63
- f" { self . _prefix } _first_token_latency " , "First token latency (histogram)"
53
+ "megaservice_first_token_latency " , "First token latency (histogram)"
64
54
)
65
55
self .inter_token_latency = Histogram (
66
- f" { self . _prefix } _inter_token_latency " , "Inter-token latency (histogram)"
56
+ "megaservice_inter_token_latency " , "Inter-token latency (histogram)"
67
57
)
68
58
self .token_update = self ._token_update_real
69
59
return self .token_update (token_start , is_first )
@@ -73,11 +63,21 @@ def _request_update_create(self, req_start: float) -> None:
73
63
# in case another thread already got here
74
64
if self .request_update == self ._request_update_create :
75
65
self .request_latency = Histogram (
76
- f" { self . _prefix } _request_latency " , "Whole LLM request/reply latency (histogram)"
66
+ "megaservice_request_latency " , "Whole LLM request/reply latency (histogram)"
77
67
)
78
68
self .request_update = self ._request_update_real
79
69
self .request_update (req_start )
80
70
71
+ def _pending_update_create (self , increase : bool ) -> None :
72
+ with self ._lock :
73
+ # in case another thread already got here
74
+ if self .pending_update == self ._pending_update_create :
75
+ self .request_pending = Gauge (
76
+ "megaservice_request_pending" , "Count of currently pending requests (gauge)"
77
+ )
78
+ self .pending_update = self ._pending_update_real
79
+ self .pending_update (increase )
80
+
81
81
def _token_update_real (self , token_start : float , is_first : bool ) -> float :
82
82
now = time .time ()
83
83
if is_first :
@@ -89,18 +89,22 @@ def _token_update_real(self, token_start: float, is_first: bool) -> float:
89
89
def _request_update_real (self , req_start : float ) -> None :
90
90
self .request_latency .observe (time .time () - req_start )
91
91
92
- def pending_update (self , increase : bool ) -> None :
92
+ def _pending_update_real (self , increase : bool ) -> None :
93
93
if increase :
94
94
self .request_pending .inc ()
95
95
else :
96
96
self .request_pending .dec ()
97
97
98
98
99
+ # Prometheus metrics need to be singletons, not per Orchestrator
100
+ _metrics = OrchestratorMetrics ()
101
+
102
+
99
103
class ServiceOrchestrator (DAG ):
100
104
"""Manage 1 or N micro services in a DAG through Python API."""
101
105
102
106
def __init__ (self ) -> None :
103
- self .metrics = OrchestratorMetrics ()
107
+ self .metrics = _metrics
104
108
self .services = {} # all services, id -> service
105
109
super ().__init__ ()
106
110
0 commit comments