8
8
9
9
import requests
10
10
from fastapi .responses import StreamingResponse
11
+ from prometheus_client import start_http_server
11
12
12
- from comps import (
13
- MicroService ,
14
- ServiceOrchestrator ,
15
- ServiceRoleType ,
16
- ServiceType ,
17
- TextDoc ,
18
- opea_microservices ,
19
- register_microservice ,
20
- )
21
- from comps .cores .proto .api_protocol import ChatCompletionRequest , ChatCompletionResponse
13
+ from comps import ServiceOrchestrator , ServiceType , TextDoc , opea_microservices , register_microservice
22
14
23
- _MEGA_PORT = 8888
15
+ _METRIC_PORT = 8000
24
16
25
17
26
18
@register_microservice (name = "s1" , host = "0.0.0.0" , port = 8083 , endpoint = "/v1/add" )
@@ -58,19 +50,13 @@ def setUpClass(cls):
58
50
59
51
cls .service_builder = ServiceOrchestrator ()
60
52
61
- cls .service_builder .add (cls .s0 ).add (cls .s1 )
53
+ # simpler?: cls.service_builder.add(cls.s0).add(cls.s1)
54
+ cls .service_builder .add (opea_microservices ["s0" ]).add (opea_microservices ["s1" ])
62
55
cls .service_builder .flow_to (cls .s0 , cls .s1 )
63
56
64
- cls .service = MicroService (
65
- "megaservice" ,
66
- service_role = ServiceRoleType .MEGASERVICE ,
67
- host = "0.0.0.0" ,
68
- port = _MEGA_PORT ,
69
- endpoint = "/whatever" ,
70
- input_datatype = ChatCompletionRequest ,
71
- output_datatype = ChatCompletionResponse ,
72
- )
73
- cls .service .start ()
57
+ print (f"serving metrics from: http://localhost:{ _METRIC_PORT } /metrics" )
58
+ # requires prometheus_client >= 0.20.0 (earlier versions return None)
59
+ cls .server , cls .thread = start_http_server (_METRIC_PORT )
74
60
75
61
@classmethod
76
62
def tearDownClass (cls ):
@@ -79,6 +65,9 @@ def tearDownClass(cls):
79
65
cls .process1 .terminate ()
80
66
cls .process2 .terminate ()
81
67
68
+ cls .server .shutdown ()
69
+ cls .thread .join ()
70
+
82
71
async def test_schedule (self ):
83
72
result_dict , _ = await self .service_builder .schedule (initial_inputs = {"text" : "hello, " })
84
73
response = result_dict ["s1/MicroService" ]
@@ -87,9 +76,10 @@ async def test_schedule(self):
87
76
async for k in response .__reduce__ ()[2 ]["body_iterator" ]:
88
77
self .assertEqual (self .service_builder .extract_chunk_str (k ).strip (), res_expected [idx ])
89
78
idx += 1
90
- # check the metric increases from above
91
- r = requests .get (f"http://localhost:{ _MEGA_PORT } /metrics" , timeout = 10 )
79
+ # check ServiceOrchestrator "megaservice_*" metrics resulting from above processing
80
+ r = await requests .get (f"http://localhost:{ _METRIC_PORT } /metrics" , timeout = 5 )
92
81
self .assertEqual (r .status_code , 200 )
82
+ # TODO: verify that the expected metrics are present
93
83
print (r .text )
94
84
95
85
def test_extract_chunk_str (self ):
0 commit comments