Skip to content

Commit 0302c89

Browse files
ZhangYulonggfreeliuzc
authored andcommitted
add CI cases (PaddlePaddle#3714)
1 parent 2ec9c77 commit 0302c89

File tree

2 files changed

+340
-0
lines changed

2 files changed

+340
-0
lines changed

tests/ci_use/EB_Lite/test_EB_Lite_serving.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import os
1616
import re
17+
import shutil
1718
import signal
1819
import socket
1920
import subprocess
@@ -115,6 +116,9 @@ def setup_and_run_server():
115116
]
116117

117118
# Start subprocess in new process group
119+
# 清除log目录
120+
if os.path.exists("log"):
121+
shutil.rmtree("log")
118122
with open(log_path, "w") as logfile:
119123
process = subprocess.Popen(
120124
cmd,
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import json
16+
import os
17+
import shutil
18+
import signal
19+
import socket
20+
import subprocess
21+
import sys
22+
import time
23+
24+
import pytest
25+
import requests
26+
27+
# Read ports from environment variables; use default values if not set
28+
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
29+
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
30+
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
31+
32+
# List of ports to clean before and after tests
33+
PORTS_TO_CLEAN = [
34+
FD_API_PORT,
35+
FD_ENGINE_QUEUE_PORT,
36+
FD_METRICS_PORT,
37+
]
38+
39+
40+
def is_port_open(host: str, port: int, timeout=1.0):
41+
"""
42+
Check if a TCP port is open on the given host.
43+
Returns True if connection succeeds, False otherwise.
44+
"""
45+
try:
46+
with socket.create_connection((host, port), timeout):
47+
return True
48+
except Exception:
49+
return False
50+
51+
52+
def kill_process_on_port(port: int):
53+
"""
54+
Kill processes that are listening on the given port.
55+
Uses `lsof` to find process ids and sends SIGKILL.
56+
"""
57+
try:
58+
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
59+
current_pid = os.getpid()
60+
parent_pid = os.getppid()
61+
for pid in output.splitlines():
62+
pid = int(pid)
63+
if pid in (current_pid, parent_pid):
64+
print(f"Skip killing current process (pid={pid}) on port {port}")
65+
continue
66+
os.kill(pid, signal.SIGKILL)
67+
print(f"Killed process on port {port}, pid={pid}")
68+
except subprocess.CalledProcessError:
69+
pass
70+
71+
72+
def clean_ports():
73+
"""
74+
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
75+
"""
76+
for port in PORTS_TO_CLEAN:
77+
kill_process_on_port(port)
78+
79+
80+
@pytest.fixture(scope="session", autouse=True)
81+
def setup_and_run_server():
82+
"""
83+
Pytest fixture that runs once per test session:
84+
- Cleans ports before tests
85+
- Starts the API server as a subprocess
86+
- Waits for server port to open (up to 30 seconds)
87+
- Tears down server after all tests finish
88+
"""
89+
print("Pre-test port cleanup...")
90+
clean_ports()
91+
92+
base_path = os.getenv("MODEL_PATH")
93+
if base_path:
94+
model_path = os.path.join(base_path, "ernie-4_5-21b-a3b-bf16-paddle")
95+
else:
96+
model_path = "./ernie-4_5-21b-a3b-bf16-paddle"
97+
mtp_model_path = os.path.join(model_path, "mtp")
98+
speculative_config = {"method": "mtp", "num_speculative_tokens": 1, "model": mtp_model_path}
99+
100+
log_path = "server.log"
101+
cmd = [
102+
sys.executable,
103+
"-m",
104+
"fastdeploy.entrypoints.openai.api_server",
105+
"--model",
106+
model_path,
107+
"--port",
108+
str(FD_API_PORT),
109+
"--tensor-parallel-size",
110+
"2",
111+
"--engine-worker-queue-port",
112+
str(FD_ENGINE_QUEUE_PORT),
113+
"--metrics-port",
114+
str(FD_METRICS_PORT),
115+
"--max-model-len",
116+
"32768",
117+
"--max-num-seqs",
118+
"128",
119+
"--quantization",
120+
"wint4",
121+
"--speculative-config",
122+
json.dumps(speculative_config),
123+
]
124+
125+
# Start subprocess in new process group
126+
# 清除log目录
127+
if os.path.exists("log"):
128+
shutil.rmtree("log")
129+
with open(log_path, "w") as logfile:
130+
process = subprocess.Popen(
131+
cmd,
132+
stdout=logfile,
133+
stderr=subprocess.STDOUT,
134+
start_new_session=True, # Enables killing full group via os.killpg
135+
)
136+
137+
# Wait up to 300 seconds for API server to be ready
138+
for _ in range(300):
139+
if is_port_open("127.0.0.1", FD_API_PORT):
140+
print(f"Server is up on port {FD_API_PORT}")
141+
break
142+
time.sleep(1)
143+
else:
144+
print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...")
145+
try:
146+
os.killpg(process.pid, signal.SIGTERM)
147+
clean_ports()
148+
except Exception as e:
149+
print(f"Failed to kill process group: {e}")
150+
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
151+
152+
yield # Run tests
153+
154+
print("\n===== Post-test server cleanup... =====")
155+
try:
156+
os.killpg(process.pid, signal.SIGTERM)
157+
clean_ports()
158+
print(f"server (pid={process.pid}) terminated")
159+
except Exception as e:
160+
print(f"Failed to terminate API server: {e}")
161+
162+
163+
@pytest.fixture(scope="session")
164+
def api_url(request):
165+
"""
166+
Returns the API endpoint URL for chat completions.
167+
"""
168+
return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions"
169+
170+
171+
@pytest.fixture(scope="session")
172+
def metrics_url(request):
173+
"""
174+
Returns the metrics endpoint URL.
175+
"""
176+
return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics"
177+
178+
179+
@pytest.fixture
180+
def headers():
181+
"""
182+
Returns common HTTP request headers.
183+
"""
184+
return {"Content-Type": "application/json"}
185+
186+
187+
def send_request(url, payload, timeout=600):
188+
"""
189+
发送请求到指定的URL,并返回响应结果。
190+
"""
191+
headers = {
192+
"Content-Type": "application/json",
193+
}
194+
195+
try:
196+
res = requests.post(url, headers=headers, json=payload, timeout=timeout)
197+
print("🟢 接收响应中...\n")
198+
return res
199+
except requests.exceptions.Timeout:
200+
print(f"❌ 请求超时(超过 {timeout} 秒)")
201+
return None
202+
except requests.exceptions.RequestException as e:
203+
print(f"❌ 请求失败:{e}")
204+
return None
205+
206+
207+
def get_stream_chunks(response):
208+
"""解析流式返回,生成chunk List[dict]"""
209+
chunks = []
210+
211+
if response.status_code == 200:
212+
for line in response.iter_lines(decode_unicode=True):
213+
if line:
214+
if line.startswith("data: "):
215+
line = line[len("data: ") :]
216+
217+
if line.strip() == "[DONE]":
218+
break
219+
220+
try:
221+
chunk = json.loads(line)
222+
chunks.append(chunk)
223+
except Exception as e:
224+
print(f"解析失败: {e}, 行内容: {line}")
225+
else:
226+
print(f"请求失败,状态码: {response.status_code}")
227+
print("返回内容:", response.text)
228+
229+
return chunks
230+
231+
232+
def test_chat_usage_stream(api_url):
233+
"""测试流式chat usage"""
234+
payload = {
235+
"model": "default",
236+
"temperature": 0,
237+
"top_p": 0,
238+
"seed": 33,
239+
"messages": [
240+
{"role": "system", "content": "You are a helpful assistant."},
241+
{"role": "user", "content": "牛顿的三大运动定律是什么?"},
242+
],
243+
"max_tokens": 50,
244+
"stream": True,
245+
"stream_options": {"include_usage": True, "continuous_usage_stats": True},
246+
"metadata": {"min_tokens": 10},
247+
}
248+
249+
response = send_request(url=api_url, payload=payload)
250+
chunks = get_stream_chunks(response)
251+
result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]])
252+
print("Prefill Response:", result)
253+
assert result != "", "结果为空"
254+
usage = chunks[-1]["usage"]
255+
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
256+
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
257+
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
258+
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"
259+
260+
261+
def test_chat_usage_non_stream(api_url):
262+
"""测试非流式chat usage"""
263+
payload = {
264+
"model": "default",
265+
"temperature": 0,
266+
"top_p": 0,
267+
"seed": 33,
268+
"messages": [
269+
{"role": "system", "content": "You are a helpful assistant."},
270+
{"role": "user", "content": "牛顿的三大运动定律是什么?"},
271+
],
272+
"max_tokens": 50,
273+
"stream": False,
274+
"metadata": {"min_tokens": 10},
275+
}
276+
277+
response = send_request(url=api_url, payload=payload).json()
278+
usage = response["usage"]
279+
result = response["choices"][0]["message"]["content"]
280+
assert result != "", "结果为空"
281+
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
282+
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
283+
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
284+
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"
285+
286+
287+
def test_non_chat_usage_stream(api_url):
288+
"""测试流式非chat usage"""
289+
payload = {
290+
"model": "default",
291+
"temperature": 0,
292+
"top_p": 0,
293+
"seed": 33,
294+
"prompt": "牛顿的三大运动定律是什么?",
295+
"max_tokens": 50,
296+
"stream": True,
297+
"stream_options": {"include_usage": True, "continuous_usage_stats": True},
298+
"metadata": {"min_tokens": 10},
299+
}
300+
api_url = api_url.replace("chat/completions", "completions")
301+
302+
response = send_request(url=api_url, payload=payload)
303+
chunks = get_stream_chunks(response)
304+
result = "".join([x["choices"][0]["text"] for x in chunks[:-1]])
305+
# print("Prefill Response:", result)
306+
assert result != "", "结果为空"
307+
usage = chunks[-1]["usage"]
308+
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
309+
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
310+
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
311+
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"
312+
313+
314+
def test_non_chat_usage_non_stream(api_url):
315+
"""测试非流式非chat usage"""
316+
payload = {
317+
"model": "default",
318+
"temperature": 0,
319+
"top_p": 0,
320+
"seed": 33,
321+
"prompt": "牛顿的三大运动定律是什么?",
322+
"max_tokens": 50,
323+
"stream": False,
324+
"metadata": {"min_tokens": 10},
325+
}
326+
api_url = api_url.replace("chat/completions", "completions")
327+
328+
response = send_request(url=api_url, payload=payload).json()
329+
usage = response["usage"]
330+
result = response["choices"][0]["text"]
331+
# print("Prefill Response:", result)
332+
assert result != "", "结果为空"
333+
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
334+
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
335+
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
336+
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"

0 commit comments

Comments
 (0)