Skip to content

Commit e8c0931

Browse files
committed
[LLM] support multi node deploy
1 parent f4c99de commit e8c0931

File tree

5 files changed

+55
-10
lines changed

5 files changed

+55
-10
lines changed

fastdeploy/engine/config.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ def __init__(
582582
self.max_capture_batch_size = max_capture_batch_size
583583
self.guided_decoding_backend = guided_decoding_backend
584584
self.disable_any_whitespace = disable_any_whitespace
585-
585+
self.is_master = True
586586
self._str_to_list("innode_prefill_ports", int)
587587
self._str_to_list("pod_ips", str)
588588

@@ -641,6 +641,8 @@ def postprocess(self):
641641

642642
if self.pod_ips is None:
643643
self.pod_ips = ["0.0.0.0"]
644+
elif self.host_ip != self.pod_ips[0]:
645+
self.is_master = False
644646

645647
import paddle
646648
self.paddle_commit_id = paddle.version.commit
@@ -812,6 +814,9 @@ def reset_value(cls, value_name, key):
812814
"return_full_hidden_states")
813815
reset_value(self.cache_config, "cache_dtype", "infer_model_dtype")
814816

817+
def _check_master(self):
818+
return self.is_master
819+
815820
def _str_to_list(self, attr_name, default_type):
816821
if hasattr(self, attr_name):
817822
val = getattr(self, attr_name)

fastdeploy/entrypoints/llm.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,16 @@ def __init__(
8585

8686
self.mutex = threading.Lock()
8787
self.req_output = dict()
88-
88+
self.master_node_ip = self.llm_engine.config.pod_ips[0]
8989
self._receive_output_thread = threading.Thread(
9090
target=self._receive_output, daemon=True)
9191
self._receive_output_thread.start()
92+
93+
def _check_master(self):
94+
"""
95+
Check if the current node is the master node.
96+
"""
97+
return self.llm_engine.config._check_master()
9298

9399
def _receive_output(self):
94100
"""
@@ -130,6 +136,10 @@ def generate(
130136
Union[str, list[str]]: The generated response.
131137
"""
132138

139+
if not self._check_master():
140+
err_msg = f"Only master node can accept completion request, please send request to master node: {self.master_node_ip}"
141+
raise ValueError(err_msg)
142+
133143
if sampling_params is None:
134144
sampling_params = self.default_sampling_params
135145

@@ -182,6 +192,11 @@ def chat(
182192
Returns:
183193
Union[str, list[str]]: The generated response.
184194
"""
195+
196+
if not self._check_master():
197+
err_msg = f"Only master node can accept completion request, please send request to master node: {self.master_node_ip}"
198+
raise ValueError(err_msg)
199+
185200
if sampling_params is None:
186201
sampling_params = self.default_sampling_params
187202

fastdeploy/entrypoints/openai/api_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ async def lifespan(app: FastAPI):
120120
args.mm_processor_kwargs, args.enable_mm,
121121
args.reasoning_parser)
122122
app.state.dynamic_load_weight = args.dynamic_load_weight
123-
chat_handler = OpenAIServingChat(engine_client, pid)
124-
completion_handler = OpenAIServingCompletion(engine_client, pid)
123+
chat_handler = OpenAIServingChat(engine_client, pid, args.pod_ips)
124+
completion_handler = OpenAIServingCompletion(engine_client, pid, args.pod_ips)
125125
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
126126
engine_client.pid = pid
127127
app.state.engine_client = engine_client

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@
3838
ErrorResponse,
3939
)
4040
from fastdeploy.metrics.work_metrics import work_process_metrics
41-
42-
from fastdeploy.utils import api_server_logger
43-
41+
from fastdeploy.utils import api_server_logger, get_host_ip
4442
from fastdeploy.engine.request import RequestOutput
4543

4644

@@ -50,9 +48,18 @@ class OpenAIServingChat:
5048
OpenAI-style chat completions serving
5149
"""
5250

53-
def __init__(self, engine_client, pid):
51+
def __init__(self, engine_client, pid, pod_ips):
5452
self.engine_client = engine_client
5553
self.pid = pid
54+
self.pod_ips = pod_ips
55+
self.host_ip = get_host_ip()
56+
57+
def _check_master(self):
58+
if self.pod_ips is None:
59+
return True
60+
if self.host_ip == self.pod_ips[0]:
61+
return True
62+
return False
5663

5764
async def create_chat_completion(
5865
self,
@@ -61,6 +68,11 @@ async def create_chat_completion(
6168
"""
6269
Create a new chat completion using the specified parameters.
6370
"""
71+
72+
if not self._check_master():
73+
err_msg = f"Only master node can accept completion request, please send request to master node: {self.pod_ips[0]}"
74+
api_server_logger.error(err_msg)
75+
return ErrorResponse(message=err_msg, code=400)
6476
if request.user is not None:
6577
request_id = f"chatcmpl-{request.user}-{uuid.uuid4()}"
6678
else:

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,32 @@
3939
ToolCall,
4040
FunctionCall
4141
)
42-
from fastdeploy.utils import api_server_logger
42+
from fastdeploy.utils import api_server_logger, get_host_ip
4343
from fastdeploy.engine.request import RequestOutput
4444

4545

4646
class OpenAIServingCompletion:
47-
def __init__(self, engine_client, pid):
47+
def __init__(self, engine_client, pid, pod_ips):
4848
self.engine_client = engine_client
4949
self.pid = pid
50+
self.pod_ips = pod_ips
51+
self.host_ip = get_host_ip()
52+
53+
def _check_master(self):
54+
if self.pod_ips is None:
55+
return True
56+
if self.host_ip == self.pod_ips[0]:
57+
return True
58+
return False
5059

5160
async def create_completion(self, request: CompletionRequest):
5261
"""
5362
Create a completion for the given prompt.
5463
"""
64+
if not self._check_master():
65+
err_msg = f"Only master node can accept completion request, please send request to master node: {self.pod_ips[0]}"
66+
api_server_logger.error(err_msg)
67+
return ErrorResponse(message=err_msg, code=400)
5568
created_time = int(time.time())
5669
if request.user is not None:
5770
request_id = f"cmpl-{request.user}-{uuid.uuid4()}"

0 commit comments

Comments
 (0)