Skip to content

Commit 8777449

Browse files
sunlei1024Jiang-Jia-Jun
authored andcommitted
perf: Optimize task queue communication from engine to worker (PaddlePaddle#4531)
* perf: Optimize task queue communication from engine to worker * perf: get_tasks to numpy * perf: get_tasks remove to_numpy * fix: request & replace ENV * remove test_e2w_perf.py * fix code style --------- Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
1 parent 24cda3a commit 8777449

3 files changed

Lines changed: 214 additions & 5 deletions

File tree

fastdeploy/engine/request.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import numpy as np
2525
from typing_extensions import TypeVar
2626

27+
from fastdeploy import envs
2728
from fastdeploy.engine.pooling_params import PoolingParams
2829
from fastdeploy.engine.sampling_params import SamplingParams
2930
from fastdeploy.entrypoints.openai.protocol import ToolCall
@@ -291,11 +292,20 @@ def set(self, key, value):
291292
setattr(self, key, value)
292293

293294
def __repr__(self) -> str:
294-
non_none_fields = []
295-
for attr, value in vars(self).items():
296-
if value is not None and not attr.startswith("_"):
297-
non_none_fields.append(f"{attr}={value!r}")
298-
return f"Request({', '.join(non_none_fields)})"
295+
"""Safe string representation that ignores private and None fields."""
296+
try:
297+
if not envs.FD_DEBUG:
298+
return f"Request(request_id={self.request_id})"
299+
else:
300+
attrs_snapshot = dict(vars(self))
301+
non_none_fields = [
302+
f"{attr}={value!r}"
303+
for attr, value in attrs_snapshot.items()
304+
if value is not None and not attr.startswith("_")
305+
]
306+
return f"Request({', '.join(non_none_fields)})"
307+
except Exception as e:
308+
return f"<{self.__class__.__name__} repr failed: {e}>"
299309

300310

301311
@dataclass(slots=True)

fastdeploy/inter_communicator/engine_worker_queue.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
from typing import Any, List, Tuple
2828

2929
import numpy as np
30+
import paddle
3031

32+
from fastdeploy import envs
3133
from fastdeploy.utils import llm_logger
3234

3335

@@ -294,6 +296,49 @@ def _connect_with_retry(self, max_retries: int = 5, interval: int = 3) -> None:
294296
time.sleep(interval)
295297
raise ConnectionError(f"TaskQueue cannot connect {self.address}")
296298

299+
@staticmethod
300+
def to_tensor(tasks):
301+
"""
302+
Convert NumPy arrays in multimodal inputs to PaddlePaddle tensors.
303+
304+
Args:
305+
tasks: List of tasks containing multimodal inputs.
306+
"""
307+
try:
308+
if envs.FD_ENABLE_MAX_PREFILL:
309+
llm_logger.debug(f"Convert image to tensor, type: {type(tasks)}")
310+
batch_tasks, _ = tasks
311+
for task in batch_tasks:
312+
if not hasattr(task, "multimodal_inputs"):
313+
continue
314+
images = task.multimodal_inputs["images"]
315+
if isinstance(images, np.ndarray):
316+
llm_logger.debug(f"Convert image to tensor, shape: {images.shape}")
317+
task.multimodal_inputs["images"] = paddle.to_tensor(images)
318+
except Exception as e:
319+
llm_logger.warning(f"Failed to convert to tensor: {e}")
320+
321+
@staticmethod
322+
def to_numpy(tasks):
323+
"""
324+
Convert PaddlePaddle tensors in multimodal inputs to NumPy arrays.
325+
326+
Args:
327+
tasks: List of tasks containing multimodal inputs.
328+
"""
329+
try:
330+
if envs.FD_ENABLE_MAX_PREFILL:
331+
for batch_tasks, _ in tasks:
332+
for task in batch_tasks:
333+
if not hasattr(task, "multimodal_inputs"):
334+
continue
335+
images = task.multimodal_inputs.get("images", None)
336+
if isinstance(images, paddle.Tensor):
337+
llm_logger.debug(f"Convert image to numpy, shape: {images.shape}")
338+
task.multimodal_inputs["images"] = images.numpy()
339+
except Exception as e:
340+
llm_logger.warning(f"Failed to convert to numpy: {e}")
341+
297342
def put_tasks(self, tasks: List[Any]) -> None:
298343
"""
299344
Add tasks to the shared queue in a thread-safe manner.
@@ -308,6 +353,9 @@ def put_tasks(self, tasks: List[Any]) -> None:
308353
time.sleep(0.001)
309354
self.lock.acquire()
310355

356+
# 多模态输入转换为张量
357+
EngineWorkerQueue.to_tensor(tasks)
358+
311359
self.tasks[:] = list()
312360
self.client_read_flag[:] = [0] * self.num_client
313361
self.tasks.append(tasks)
@@ -322,7 +370,11 @@ def get_tasks(self) -> Tuple[List[Any], bool]:
322370
"""
323371
tasks: List[Any] = list()
324372
self.lock.acquire()
373+
325374
tasks.extend(self.tasks)
375+
# 多模态输入转换为numpy
376+
# EngineWorkerQueue.to_numpy(tasks)
377+
326378
self.client_read_flag[self.client_id] = 1
327379
all_client_read: bool = np.sum(self.client_read_flag) == self.num_client
328380
if all_client_read:
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
"""
2+
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
"""
16+
17+
import unittest
18+
19+
import numpy as np
20+
import paddle
21+
22+
from fastdeploy import envs
23+
from fastdeploy.inter_communicator.engine_worker_queue import EngineWorkerQueue
24+
25+
26+
class DummyTask:
27+
def __init__(self, images):
28+
self.multimodal_inputs = {"images": images}
29+
30+
31+
class TestEngineWorkerQueue(unittest.TestCase):
32+
def test_to_tensor_success(self):
33+
envs.FD_ENABLE_MAX_PREFILL = 1
34+
# 模拟 numpy 数组输入(使用 paddle 转 numpy)
35+
np_images = paddle.randn([2, 3, 224, 224]).numpy()
36+
task = DummyTask(np_images)
37+
tasks = ([task], 1)
38+
39+
EngineWorkerQueue.to_tensor(tasks)
40+
41+
# 验证已转换为tensor
42+
self.assertIsInstance(task.multimodal_inputs["images"], paddle.Tensor)
43+
44+
def test_to_tensor_disabled(self):
45+
envs.FD_ENABLE_MAX_PREFILL = 0
46+
# 模拟 numpy 数组输入(使用 paddle 转 numpy)
47+
np_images = paddle.randn([2, 3, 224, 224]).numpy()
48+
task = DummyTask(np_images)
49+
tasks = ([task], 1)
50+
51+
EngineWorkerQueue.to_tensor(tasks)
52+
53+
# 验证已转换为tensor
54+
self.assertIsInstance(task.multimodal_inputs["images"], np.ndarray)
55+
56+
def test_to_tensor_no_multimodal_inputs(self):
57+
class NoMMTask:
58+
pass
59+
60+
task = NoMMTask()
61+
tasks = ([task], 1)
62+
63+
# 不应抛异常
64+
try:
65+
EngineWorkerQueue.to_tensor(tasks)
66+
except Exception as e:
67+
self.fail(f"Unexpected exception raised: {e}")
68+
69+
def test_to_tensor_exception_handling(self):
70+
bad_task = DummyTask(images="not an array")
71+
bad_tasks = ([bad_task], 1)
72+
73+
try:
74+
EngineWorkerQueue.to_tensor(bad_tasks)
75+
except Exception as e:
76+
self.fail(f"Exception should be handled internally, but got: {e}")
77+
78+
def test_to_numpy_success(self):
79+
envs.FD_ENABLE_MAX_PREFILL = 1
80+
# 构造 paddle.Tensor 输入
81+
tensor_images = paddle.randn([2, 3, 224, 224])
82+
task = DummyTask(tensor_images)
83+
tasks = [([task], 1)]
84+
85+
EngineWorkerQueue.to_numpy(tasks)
86+
87+
# 验证转换为 numpy.ndarray
88+
self.assertIsInstance(task.multimodal_inputs["images"], np.ndarray)
89+
90+
def test_to_numpy_disabled(self):
91+
# 禁用张量转换开关
92+
envs.FD_ENABLE_MAX_PREFILL = 0
93+
# 创建随机张量作为测试输入
94+
tensor_images = paddle.randn([2, 3, 224, 224])
95+
# 创建模拟任务
96+
task = DummyTask(tensor_images)
97+
tasks = [([task], 1)]
98+
99+
# 调用转换方法(预期不会转换)
100+
EngineWorkerQueue.to_numpy(tasks)
101+
102+
# 因为开关关闭,应仍为 Tensor
103+
self.assertIsInstance(task.multimodal_inputs["images"], paddle.Tensor)
104+
105+
def test_to_numpy_no_multimodal_inputs(self):
106+
class NoMMTask:
107+
pass
108+
109+
task = NoMMTask()
110+
tasks = [([task], 1)]
111+
112+
# 不应抛异常
113+
try:
114+
EngineWorkerQueue.to_numpy(tasks)
115+
except Exception as e:
116+
self.fail(f"Unexpected exception raised: {e}")
117+
118+
def test_to_numpy_non_tensor_input(self):
119+
envs.FD_ENABLE_MAX_PREFILL = 1
120+
np_images = np.random.randn(2, 3, 224, 224)
121+
task = DummyTask(np_images)
122+
tasks = [([task], 1)]
123+
124+
EngineWorkerQueue.to_numpy(tasks)
125+
126+
# 非 Tensor 输入应保持为 numpy 数组
127+
self.assertIsInstance(task.multimodal_inputs["images"], np.ndarray)
128+
129+
def test_to_numpy_exception_handling(self):
130+
envs.FD_ENABLE_MAX_PREFILL = 1
131+
132+
# 构造错误输入(让 .numpy() 抛异常)
133+
class BadTensor:
134+
def numpy(self):
135+
raise RuntimeError("mock error")
136+
137+
bad_task = DummyTask(images=BadTensor())
138+
bad_tasks = [([bad_task], 1)]
139+
140+
try:
141+
EngineWorkerQueue.to_numpy(bad_tasks)
142+
except Exception as e:
143+
self.fail(f"Exception should be handled internally, but got: {e}")
144+
145+
146+
if __name__ == "__main__":
147+
unittest.main()

0 commit comments

Comments
 (0)