[Serve] Fix replicas hanging forever when requests are stuck draining…#60788
[Serve] Fix replicas hanging forever when requests are stuck draining…#60788aslonnie merged 1 commit intoreleases/2.54.0from
Conversation
… in direct ingress mode (#60754) - In direct ingress mode, replicas waiting for requests to drain were never force-killed, causing them to hang indefinitely if requests got stuck - Replicas are now force-killed after `max(graceful_shutdown_timeout_s, RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S)` ## Before | Scenario | Behavior | |----------|----------| | Requests drain normally | Wait forever for 30s min drain period | | Requests stuck | Hang forever | ## After | Scenario | Behavior | |----------|----------| | Requests drain normally | Force-kill after `max(timeout, 30s)` | | Requests stuck | Force-kill after `max(timeout, 30s)` | ## Test Plan - [x] `python/ray/serve/tests/unit/test_deployment_state.py` - [x] `python/ray/serve/tests/test_direct_ingress.py` <!-- BUGBOT_STATUS --><sup><a href="https://cursor.com/dashboard?tab=bugbot">Cursor Bugbot</a> reviewed your changes and found no issues for commit <u>6460607</u></sup><!-- /BUGBOT_STATUS --> --------- Signed-off-by: abrar <abrar@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request addresses an issue where replicas could hang indefinitely during the draining process. The fix involves removing the RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY flag and ensuring that replicas are unconditionally force-killed after their graceful shutdown timeout expires. This simplification makes the shutdown process more robust. Additionally, a thorough test case has been added to validate that stuck requests are correctly handled and replicas are force-killed as expected. The changes look solid and directly address the problem. I have one suggestion for a minor refactoring to improve code clarity.
| if ( | ||
| RAY_SERVE_ENABLE_DIRECT_INGRESS | ||
| and self._ingress | ||
| and self._user_callable_initialized | ||
| ): | ||
| # In direct ingress mode, we need to wait at least | ||
| # RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S to give external load | ||
| # balancers (e.g., ALB) time to deregister the replica, in addition to | ||
| # waiting for requests to drain. | ||
| await asyncio.gather( | ||
| asyncio.sleep(RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S), | ||
| self._drain_ongoing_requests(), | ||
| ) | ||
| logger.info( | ||
| f"Replica {self._replica_id} successfully drained ongoing requests." | ||
| super().perform_graceful_shutdown(), | ||
| ) | ||
| else: | ||
| await super().perform_graceful_shutdown() |
There was a problem hiding this comment.
The call to super().perform_graceful_shutdown() is duplicated in the if and else blocks. This can be refactored to reduce code duplication and improve maintainability.
| if ( | |
| RAY_SERVE_ENABLE_DIRECT_INGRESS | |
| and self._ingress | |
| and self._user_callable_initialized | |
| ): | |
| # In direct ingress mode, we need to wait at least | |
| # RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S to give external load | |
| # balancers (e.g., ALB) time to deregister the replica, in addition to | |
| # waiting for requests to drain. | |
| await asyncio.gather( | |
| asyncio.sleep(RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S), | |
| self._drain_ongoing_requests(), | |
| ) | |
| logger.info( | |
| f"Replica {self._replica_id} successfully drained ongoing requests." | |
| super().perform_graceful_shutdown(), | |
| ) | |
| else: | |
| await super().perform_graceful_shutdown() | |
| tasks = [super().perform_graceful_shutdown()] | |
| if ( | |
| RAY_SERVE_ENABLE_DIRECT_INGRESS | |
| and self._ingress | |
| and self._user_callable_initialized | |
| ): | |
| # In direct ingress mode, we need to wait at least | |
| # RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S to give external load | |
| # balancers (e.g., ALB) time to deregister the replica, in addition to | |
| # waiting for requests to drain. | |
| tasks.append( | |
| asyncio.sleep(RAY_SERVE_DIRECT_INGRESS_MIN_DRAINING_PERIOD_S) | |
| ) | |
| await asyncio.gather(*tasks) |
| if timeout_passed: | ||
| if ( | ||
| not self._logged_shutdown_message | ||
| and not RAY_SERVE_DISABLE_SHUTTING_DOWN_INGRESS_REPLICAS_FORCEFULLY |
There was a problem hiding this comment.
Repeated log messages when replica shutdown times out
Low Severity
The removal of _logged_shutdown_message tracking causes the "did not shut down after grace period, force-killing it" log message to be printed repeatedly. When check_stopped() is called in the control loop and the timeout has passed, it logs and calls force_stop() every iteration until the actor fully terminates. The old code used _logged_shutdown_message to ensure the message was only logged once. This can cause significant log spam during shutdown of stuck replicas.


No description provided.