Skip to content

Commit 766fe85

Browse files
abrarsheikhpeterxcli
authored andcommitted
[Serve] Fix replicas hanging forever when requests are stuck draining in direct ingress mode (ray-project#60754)
- In direct ingress mode, replicas waiting for requests to drain were never force-killed, causing them to hang indefinitely if requests got stuck - Replicas are now force-killed after `max(graceful_shutdown_timeout_s, RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S)` ## Before | Scenario | Behavior | |----------|----------| | Requests drain normally | Wait forever for 30s min drain period | | Requests stuck | Hang forever | ## After | Scenario | Behavior | |----------|----------| | Requests drain normally | Force-kill after `max(timeout, 30s)` | | Requests stuck | Force-kill after `max(timeout, 30s)` | ## Test Plan - [x] `python/ray/serve/tests/unit/test_deployment_state.py` - [x] `python/ray/serve/tests/test_direct_ingress.py` <!-- BUGBOT_STATUS --><sup><a href="https://cursor.com/dashboard?tab=bugbot">Cursor Bugbot</a> reviewed your changes and found no issues for commit <u>6460607</u></sup><!-- /BUGBOT_STATUS --> --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
1 parent d1561ce commit 766fe85

File tree

5 files changed

+132
-70
lines changed

5 files changed

+132
-70
lines changed

python/ray/serve/_private/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,8 @@
590590
os.environ.get("RAY_SERVE_DIRECT_INGRESS_PORT_RETRY_COUNT", "100")
591591
)
592592
# The minimum drain period for a HTTP proxy.
593+
# If RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS is set to 1,
594+
# then the minimum draining period is 0.
593595
RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S = float(
594596
os.environ.get("RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S", "30")
595597
)

python/ray/serve/_private/deployment_state.py

Lines changed: 28 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
from ray.serve._private.constants import (
3838
DEFAULT_LATENCY_BUCKET_MS,
3939
MAX_PER_REPLICA_RETRY_COUNT,
40+
RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S,
41+
RAY_SERVE_ENABLE_DIRECT_INGRESS,
4042
RAY_SERVE_ENABLE_TASK_EVENTS,
4143
RAY_SERVE_FAIL_ON_RANK_ERROR,
4244
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
@@ -251,11 +253,6 @@ class DeploymentStateUpdateResult:
251253

252254
ALL_REPLICA_STATES = list(ReplicaState)
253255
_SCALING_LOG_ENABLED = os.environ.get("SERVE_ENABLE_SCALING_LOG", "0") != "0"
254-
# Feature flag to disable forcibly shutting down replicas.
255-
RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY = (
256-
os.environ.get("RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY", "0")
257-
== "1"
258-
)
259256

260257

261258
def print_verbose_scaling_log():
@@ -758,7 +755,7 @@ def reconfigure(self, version: DeploymentVersion, rank: ReplicaRank) -> bool:
758755
self._rank = rank
759756
return updating
760757

761-
def recover(self) -> bool:
758+
def recover(self, ingress: bool = False) -> bool:
762759
"""Recover replica version from a live replica actor.
763760
764761
When controller dies, the deployment state loses the info on the version that's
@@ -767,12 +764,17 @@ def recover(self) -> bool:
767764
768765
Also confirm that actor is allocated and initialized before marking as running.
769766
770-
Returns: False if the replica actor is no longer alive; the
767+
Args:
768+
ingress: Whether this replica is an ingress replica.
769+
770+
Returns:
771+
False if the replica actor is no longer alive; the
771772
actor could have been killed in the time between when the
772773
controller fetching all Serve actors in the cluster and when
773774
the controller tries to recover it. Otherwise, return True.
774775
"""
775776
logger.info(f"Recovering {self.replica_id}.")
777+
self._ingress = ingress
776778
try:
777779
self._actor_handle = ray.get_actor(
778780
self._actor_name, namespace=SERVE_NAMESPACE
@@ -1189,20 +1191,8 @@ def get_routing_stats(self) -> Dict[str, Any]:
11891191

11901192
return self._routing_stats
11911193

1192-
def force_stop(self, log_shutdown_message: bool = False):
1194+
def force_stop(self):
11931195
"""Force the actor to exit without shutting down gracefully."""
1194-
if (
1195-
self._ingress
1196-
and RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY
1197-
):
1198-
if log_shutdown_message:
1199-
logger.info(
1200-
f"{self.replica_id} did not shut down because it had not finished draining requests. "
1201-
"Going to wait until the draining is complete. You can force-stop the replica by "
1202-
"setting RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY to 0."
1203-
)
1204-
return
1205-
12061196
try:
12071197
ray.kill(ray.get_actor(self._actor_name, namespace=SERVE_NAMESPACE))
12081198
except ValueError:
@@ -1235,7 +1225,6 @@ def __init__(
12351225
)
12361226
self._multiplexed_model_ids: List[str] = []
12371227
self._routing_stats: Dict[str, Any] = {}
1238-
self._logged_shutdown_message = False
12391228

12401229
def get_running_replica_info(
12411230
self, cluster_node_info_cache: ClusterNodeInfoCache
@@ -1366,7 +1355,6 @@ def start(
13661355
deployment_info, assign_rank_callback=assign_rank_callback
13671356
)
13681357
self._start_time = time.time()
1369-
self._logged_shutdown_message = False
13701358
self.update_actor_details(start_time_s=self._start_time)
13711359
return replica_scheduling_request
13721360

@@ -1383,16 +1371,20 @@ def reconfigure(
13831371
"""
13841372
return self._actor.reconfigure(version, rank=rank)
13851373

1386-
def recover(self) -> bool:
1374+
def recover(self, deployment_info: DeploymentInfo) -> bool:
13871375
"""
13881376
Recover states in DeploymentReplica instance by fetching running actor
13891377
status
13901378
1391-
Returns: False if the replica is no longer alive at the time
1392-
when this method is called.
1379+
Args:
1380+
deployment_info: The deployment info for this replica.
1381+
1382+
Returns:
1383+
True if the replica actor is alive and recovered successfully.
1384+
False if the replica actor is no longer alive.
13931385
"""
13941386
# If replica is no longer alive
1395-
if not self._actor.recover():
1387+
if not self._actor.recover(ingress=deployment_info.ingress):
13961388
return False
13971389

13981390
self._start_time = time.time()
@@ -1442,6 +1434,11 @@ def stop(self, graceful: bool = True) -> None:
14421434
timeout_s = self._actor.graceful_stop()
14431435
if not graceful:
14441436
timeout_s = 0
1437+
elif self._actor._ingress and RAY_SERVE_ENABLE_DIRECT_INGRESS:
1438+
# In direct ingress mode, ensure we wait at least
1439+
# RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S to give external
1440+
# load balancers (e.g., ALB) time to deregister the replica.
1441+
timeout_s = max(timeout_s, RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S)
14451442
self._shutdown_deadline = time.time() + timeout_s
14461443

14471444
def check_stopped(self) -> bool:
@@ -1451,19 +1448,11 @@ def check_stopped(self) -> bool:
14511448

14521449
timeout_passed = time.time() >= self._shutdown_deadline
14531450
if timeout_passed:
1454-
if (
1455-
not self._logged_shutdown_message
1456-
and not RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY
1457-
):
1458-
logger.info(
1459-
f"{self.replica_id} did not shut down after grace "
1460-
"period, force-killing it. "
1461-
)
1462-
1463-
self._actor.force_stop(
1464-
log_shutdown_message=not self._logged_shutdown_message
1451+
logger.info(
1452+
f"{self.replica_id} did not shut down after grace "
1453+
"period, force-killing it."
14651454
)
1466-
self._logged_shutdown_message = True
1455+
self._actor.force_stop()
14671456
return False
14681457

14691458
def check_health(self) -> bool:
@@ -2342,7 +2331,7 @@ def recover_current_state_from_replica_actor_names(
23422331
)
23432332
# If replica is no longer alive, simply don't add it to the
23442333
# deployment state manager to track.
2345-
if not new_deployment_replica.recover():
2334+
if not new_deployment_replica.recover(self._target_state.info):
23462335
logger.warning(f"{replica_id} died before controller could recover it.")
23472336
continue
23482337

python/ray/serve/_private/replica.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2311,38 +2311,23 @@ async def call_asgi():
23112311
raise asyncio.CancelledError
23122312

23132313
async def perform_graceful_shutdown(self):
2314-
if not RAY_SERVE_ENABLE_DIRECT_INGRESS or not self._ingress:
2315-
# if direct ingress is not enabled or the replica is not an ingress replica,
2316-
# we can just call the super method to perform the graceful shutdown.
2317-
await super().perform_graceful_shutdown()
2318-
return
2319-
2320-
# set the shutting down flag to True to signal ALBs with failing health checks
2321-
# to stop sending traffic to this replica.
2322-
self._shutting_down = True
2323-
2324-
# If the replica was never initialized it never served traffic, so we
2325-
# can skip the wait period.
2326-
if self._user_callable_initialized:
2327-
# in order to gracefully shutdown the replica, we need to wait for the
2328-
# requests to drain and for PROXY_MIN_DRAINING_PERIOD_S to pass.
2329-
# this is necessary because we want to give ALB time to update its
2330-
# target group to remove the replica from it and to mark this replica
2331-
# as unhealthy.
2332-
# TODO(abrar): the code below assumes that once ALB marks a replica target
2333-
# as unhealthy, it will not send traffic to it. This is not true because
2334-
# ALB can send traffic to a replica if all targets are unhealthy.
2335-
# The correct way to handle is this we start the cooldown period since
2336-
# the last request finished and wait for the cooldown period to pass.
2314+
if (
2315+
RAY_SERVE_ENABLE_DIRECT_INGRESS
2316+
and self._ingress
2317+
and self._user_callable_initialized
2318+
):
2319+
# In direct ingress mode, we need to wait at least
2320+
# RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S to give external load
2321+
# balancers (e.g., ALB) time to deregister the replica, in addition to
2322+
# waiting for requests to drain.
23372323
await asyncio.gather(
23382324
asyncio.sleep(RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S),
2339-
self._drain_ongoing_requests(),
2340-
)
2341-
logger.info(
2342-
f"Replica {self._replica_id} successfully drained ongoing requests."
2325+
super().perform_graceful_shutdown(),
23432326
)
2327+
else:
2328+
await super().perform_graceful_shutdown()
23442329

2345-
await self.shutdown()
2330+
# Cancel direct ingress HTTP/gRPC server tasks if they exist.
23462331
if self._direct_ingress_http_server_task:
23472332
self._direct_ingress_http_server_task.cancel()
23482333
if self._direct_ingress_grpc_server_task:

python/ray/serve/tests/test_direct_ingress.py

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,8 +2037,9 @@ def test_shutdown_replica_only_after_draining_requests(
20372037
"""Test that the replica is shutdown correctly when the deployment is shutdown."""
20382038
signal = SignalActor.remote()
20392039

2040-
# Increase graceful_shutdown_timeout_s to ensure replicas aren't force-killed
2041-
# before requests complete when RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY=0
2040+
# In direct ingress mode, graceful_shutdown_timeout_s is automatically bumped to
2041+
# max(graceful_shutdown_timeout_s, RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S)
2042+
# to give external load balancers time to deregister the replica.
20422043
@serve.deployment(name="replica-shutdown-deployment", graceful_shutdown_timeout_s=5)
20432044
class ReplicaShutdownTest:
20442045
async def __call__(self):
@@ -2439,5 +2440,88 @@ class App:
24392440
assert isinstance(deployment_config, DeploymentConfig)
24402441

24412442

2443+
def test_stuck_requests_are_force_killed(_skip_if_ff_not_enabled, serve_instance):
2444+
"""This test is really slow, because it waits for the ports to be released from TIME_WAIT state.
2445+
The ports are in TIME_WAIT state because the replicas are force-killed and the ports are not
2446+
released immediately."""
2447+
import socket
2448+
2449+
def _can_bind_to_port(port):
2450+
"""Check if we can bind to the port (not just if nothing is listening)."""
2451+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2452+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2453+
try:
2454+
sock.bind(("0.0.0.0", port))
2455+
sock.close()
2456+
return True
2457+
except OSError:
2458+
sock.close()
2459+
return False
2460+
2461+
signal = SignalActor.remote()
2462+
2463+
@serve.deployment(
2464+
name="stuck-requests-deployment",
2465+
graceful_shutdown_timeout_s=1,
2466+
)
2467+
class StuckRequestsTest:
2468+
async def __call__(self):
2469+
# This request will never complete - it waits forever
2470+
await signal.wait.remote()
2471+
return "ok"
2472+
2473+
serve.run(
2474+
StuckRequestsTest.bind(),
2475+
name="stuck-requests-deployment",
2476+
route_prefix="/stuck-requests-deployment",
2477+
)
2478+
2479+
# Collect all ports used by the application before deleting it
2480+
http_ports = get_http_ports(route_prefix="/stuck-requests-deployment")
2481+
grpc_ports = get_grpc_ports(route_prefix="/stuck-requests-deployment")
2482+
2483+
http_url = get_application_url("HTTP", app_name="stuck-requests-deployment")
2484+
2485+
with ThreadPoolExecutor() as executor:
2486+
# Send requests that will hang forever (signal is never sent)
2487+
futures = [executor.submit(httpx.get, http_url, timeout=60) for _ in range(2)]
2488+
2489+
# Wait for requests to be received by the replica
2490+
wait_for_condition(
2491+
lambda: ray.get(signal.cur_num_waiters.remote()) == 2, timeout=10
2492+
)
2493+
2494+
# Delete the deployment - requests are still stuck
2495+
serve.delete("stuck-requests-deployment", _blocking=False)
2496+
2497+
# Verify the application is eventually deleted (replica was force-killed).
2498+
# This should complete within graceful_shutdown_timeout_s (35s) + buffer.
2499+
wait_for_condition(
2500+
lambda: "stuck-requests-deployment" not in serve.status().applications,
2501+
timeout=10,
2502+
)
2503+
2504+
# The stuck requests should fail (connection closed or similar)
2505+
for future in futures:
2506+
try:
2507+
result = future.result(timeout=5)
2508+
# If we get a response, it should be an error (not 200)
2509+
assert result.status_code != 200
2510+
except Exception:
2511+
# Expected - request failed due to force-kill
2512+
pass
2513+
2514+
# Wait until all ports can be bound (not just until nothing is listening).
2515+
# This ensures the ports are fully released from TIME_WAIT state.
2516+
def all_ports_can_be_bound():
2517+
for port in http_ports + grpc_ports:
2518+
if not _can_bind_to_port(port):
2519+
return False
2520+
return True
2521+
2522+
# TIME_WAIT can last up to 60s on Linux, so use a generous timeout
2523+
wait_for_condition(all_ports_can_be_bound, timeout=120)
2524+
2525+
24422526
if __name__ == "__main__":
24432527
sys.exit(pytest.main(["-v", "-s", __file__]))

python/ray/serve/tests/unit/test_deployment_state.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def __init__(
109109
self._docs_path = None
110110
self._rank = replica_rank_context.get(replica_id.unique_id, None)
111111
self._assign_rank_callback = None
112+
self._ingress = False
112113

113114
@property
114115
def is_cross_language(self) -> bool:
@@ -280,10 +281,11 @@ def reconfigure(
280281
replica_rank_context[self._replica_id.unique_id] = rank
281282
return updating
282283

283-
def recover(self):
284+
def recover(self, ingress: bool = False):
284285
if self.replica_id in dead_replicas_context:
285286
return False
286287

288+
self._ingress = ingress
287289
self.recovering = True
288290
self.started = False
289291
self._rank = replica_rank_context.get(self._replica_id.unique_id, None)

0 commit comments

Comments
 (0)