Skip to content

[PD] Add control to slow down a server #5572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 8, 2025
14 changes: 14 additions & 0 deletions python/sglang/srt/entrypoints/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
ResumeMemoryOccupationReqInput,
SeparateReasoningReqInput,
SetInternalStateReq,
SlowDownReqInput,
UpdateWeightFromDiskReqInput,
UpdateWeightsFromDistributedReqInput,
UpdateWeightsFromTensorReqInput,
Expand Down Expand Up @@ -494,6 +495,19 @@ async def resume_memory_occupation(
return _create_error_response(e)


@app.api_route("/slow_down", methods=["GET", "POST"])
async def slow_down(obj: SlowDownReqInput, request: Request):
"""Slow down the system deliberately. Only for testing. Example scenario:
when we want to test performance of D in large-scale PD disaggregation and have no enough nodes for P,
we can use this to slow down D to let it have enough running sequences, and then disable slowdown
to let it run in full batch size.
"""
try:
await _global_state.tokenizer_manager.slow_down(obj, request)
except Exception as e:
return _create_error_response(e)


@app.api_route("/open_session", methods=["GET", "POST"])
async def open_session(obj: OpenSessionReqInput, request: Request):
"""Open a session, and return its unique session id."""
Expand Down
10 changes: 10 additions & 0 deletions python/sglang/srt/managers/io_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,16 @@ class ResumeMemoryOccupationReqOutput:
pass


@dataclass
class SlowDownReqInput:
forward_sleep_time: Optional[float]


@dataclass
class SlowDownReqOutput:
pass


@dataclass
class AbortReq:
# The request id
Expand Down
16 changes: 16 additions & 0 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
RpcReqOutput,
SetInternalStateReq,
SetInternalStateReqOutput,
SlowDownReqInput,
SlowDownReqOutput,
TokenizedEmbeddingReqInput,
TokenizedGenerateReqInput,
UpdateWeightFromDiskReqInput,
Expand Down Expand Up @@ -417,6 +419,8 @@ def __init__(
self.profiler_id: Optional[str] = None
self.profiler_target_forward_ct: Optional[int] = None

self.forward_sleep_time = None

# Init metrics stats
self.init_metrics()

Expand All @@ -439,6 +443,7 @@ def __init__(
(GetWeightsByNameReqInput, self.get_weights_by_name),
(ReleaseMemoryOccupationReqInput, self.release_memory_occupation),
(ResumeMemoryOccupationReqInput, self.resume_memory_occupation),
(SlowDownReqInput, self.slow_down),
(ProfileReq, self.profile),
(GetInternalStateReq, self.get_internal_state),
(SetInternalStateReq, self.set_internal_state),
Expand Down Expand Up @@ -1536,6 +1541,10 @@ def run_batch(
):
self.stop_profile()

if self.forward_sleep_time is not None:
logger.info(f"Scheduler.run_batch sleep {self.forward_sleep_time}s")
time.sleep(self.forward_sleep_time)

# Run forward
if self.is_generation:
if self.spec_algorithm.is_none():
Expand Down Expand Up @@ -2011,6 +2020,13 @@ def resume_memory_occupation(self, recv_req: ResumeMemoryOccupationReqInput):
del self.stashed_model_static_state
return ResumeMemoryOccupationReqOutput()

def slow_down(self, recv_req: SlowDownReqInput):
t = recv_req.forward_sleep_time
if t is not None and t <= 0:
t = None
self.forward_sleep_time = t
return SlowDownReqOutput()

def profile(self, recv_req: ProfileReq):
if recv_req.type == ProfileReqType.START_PROFILE:
return self.start_profile(
Expand Down
17 changes: 17 additions & 0 deletions python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
ResumeMemoryOccupationReqInput,
ResumeMemoryOccupationReqOutput,
SessionParams,
SlowDownReqInput,
SlowDownReqOutput,
TokenizedEmbeddingReqInput,
TokenizedGenerateReqInput,
UpdateWeightFromDiskReqInput,
Expand Down Expand Up @@ -269,6 +271,9 @@ def __init__(
self.resume_memory_occupation_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
self.slow_down_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
self.flush_cache_communicator = _Communicator(
self.send_to_scheduler, server_args.dp_size
)
Expand Down Expand Up @@ -322,6 +327,10 @@ def __init__(
ResumeMemoryOccupationReqOutput,
self.resume_memory_occupation_communicator.handle_recv,
),
(
SlowDownReqOutput,
self.slow_down_communicator.handle_recv,
),
(
FlushCacheReqOutput,
self.flush_cache_communicator.handle_recv,
Expand Down Expand Up @@ -880,6 +889,14 @@ async def resume_memory_occupation(
self.auto_create_handle_loop()
await self.resume_memory_occupation_communicator(obj)

async def slow_down(
self,
obj: SlowDownReqInput,
request: Optional[fastapi.Request] = None,
):
self.auto_create_handle_loop()
await self.slow_down_communicator(obj)

async def open_session(
self, obj: OpenSessionReqInput, request: Optional[fastapi.Request] = None
):
Expand Down
Loading