Skip to content

Commit 221a193

Browse files
authored
[Serve] Preserve user-set gRPC status codes when exceptions are raised (#60482)
Fixes #58851 ### Changes 1. **New `gRPCStatusError` exception class** - Wraps exceptions with user-set gRPC status codes so they flow through Ray's error handling path. 2. **Exception wrapping in replica methods** - `handle_request`, `handle_request_streaming`, and `handle_request_with_rejection` now wrap exceptions with `gRPCStatusError` when the user has set a status code on the gRPC context. 3. **Status code preservation in proxy** - `get_grpc_response_status()` now detects `gRPCStatusError` and returns the user's intended status code instead of `INTERNAL`. 4. **Message truncation** - Added `_truncate_message()` to limit error details to 4KB, avoiding HTTP/2 trailer size limits. 5. **Documentation updates** - Updated the gRPC guide to document the new behavior. --------- Signed-off-by: abrar <abrar@anyscale.com>
1 parent 829c85c commit 221a193

File tree

8 files changed

+325
-48
lines changed

8 files changed

+325
-48
lines changed

doc/source/serve/advanced-guides/grpc-guide.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,10 @@ about the request as well as setting response metadata such as code and details.
304304
If the handler function is defined with a `grpc_context` argument, Serve will pass a
305305
[RayServegRPCContext](../api/doc/ray.serve.grpc_util.RayServegRPCContext.rst) object
306306
in for each request. Below is an example of how to set a custom status code,
307-
details, and trailing metadata.
307+
details, and trailing metadata. You can also set a status code before raising an
308+
exception, and Serve will preserve that status code in the error response. This is
309+
useful for returning meaningful status codes like `RESOURCE_EXHAUSTED` (retryable)
310+
or `INVALID_ARGUMENT` (not retryable) instead of the generic `INTERNAL` error.
308311

309312
```{literalinclude} ../doc_code/grpc_proxy/grpc_guide.py
310313
:start-after: __begin_grpc_context_define_app__
@@ -320,7 +323,10 @@ The client code is defined like the following to get those attributes.
320323
```
321324

322325
:::{note}
323-
If the handler raises an unhandled exception, Serve will return an `INTERNAL` error code
324-
with the stacktrace in the details, regardless of what code and details
325-
are set in the `RayServegRPCContext` object.
326+
If the handler raises an unhandled exception without setting a status code on the
327+
`RayServegRPCContext` object, Serve returns an `INTERNAL` error code with the
328+
exception message in the details. However, if you set a status code on the context
329+
before raising the exception, Serve preserves that status code in the response.
330+
This allows you to return meaningful status codes like `INVALID_ARGUMENT` or
331+
`RESOURCE_EXHAUSTED` even when raising exceptions.
326332
:::

doc/source/serve/api/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ See the [model composition guide](serve-model-composition) for how to update cod
143143
serve.exceptions.BackPressureError
144144
serve.exceptions.RayServeException
145145
serve.exceptions.RequestCancelledError
146+
serve.exceptions.gRPCStatusError
146147
serve.exceptions.DeploymentUnavailableError
147148
```
148149

doc/source/serve/doc_code/grpc_proxy/grpc_guide.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,13 @@ def __call__(
349349
grpc_context.set_details(message)
350350
grpc_context.set_trailing_metadata([("num", str(num))])
351351

352+
# You can also set a status code before raising an exception.
353+
# The status code will be preserved in the response.
354+
if user_message.name == "error":
355+
grpc_context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
356+
grpc_context.set_details("Resource exhausted, please retry later.")
357+
raise RuntimeError("Simulated error")
358+
352359
user_response = UserDefinedResponse(
353360
greeting=greeting,
354361
num=num,

python/ray/serve/_private/grpc_util.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,17 @@
1515
)
1616
from ray.serve._private.proxy_request_response import ResponseStatus
1717
from ray.serve.config import gRPCOptions
18-
from ray.serve.exceptions import BackPressureError, DeploymentUnavailableError
18+
from ray.serve.exceptions import (
19+
BackPressureError,
20+
DeploymentUnavailableError,
21+
gRPCStatusError,
22+
)
1923
from ray.serve.generated.serve_pb2_grpc import add_RayServeAPIServiceServicer_to_server
2024

25+
# Maximum length for gRPC status details to avoid hitting HTTP/2 trailer limits.
26+
# gRPC default max metadata size is 8KB, so we use a conservative limit.
27+
GRPC_MAX_STATUS_DETAILS_LENGTH = 4096
28+
2129
logger = logging.getLogger(SERVE_LOGGER_NAME)
2230

2331

@@ -110,6 +118,20 @@ async def start_grpc_server(
110118
return event_loop.create_task(server.wait_for_termination())
111119

112120

121+
def _truncate_message(
122+
message: str, max_length: int = GRPC_MAX_STATUS_DETAILS_LENGTH
123+
) -> str:
124+
"""Truncate a message to avoid exceeding HTTP/2 trailer limits.
125+
126+
gRPC status details are sent as part of HTTP/2 trailers, which have a fixed size limit.
127+
If the message (e.g., a stack trace) is too long, it can cause issues on the client side.
128+
"""
129+
if len(message) <= max_length:
130+
return message
131+
truncation_notice = "... [truncated]"
132+
return message[: max_length - len(truncation_notice)] + truncation_notice
133+
134+
113135
def get_grpc_response_status(
114136
exc: BaseException, request_timeout_s: float, request_id: str
115137
) -> ResponseStatus:
@@ -141,6 +163,25 @@ def get_grpc_response_status(
141163
is_error=True,
142164
message=exc.message,
143165
)
166+
elif isinstance(exc, gRPCStatusError):
167+
# User set a gRPC status code before raising the exception.
168+
# Respect the user's status code instead of returning INTERNAL.
169+
original_exc = exc.original_exception
170+
if isinstance(original_exc, (RayActorError, RayTaskError)):
171+
logger.warning(
172+
f"Request failed: {original_exc}", extra={"log_to_stderr": False}
173+
)
174+
else:
175+
logger.exception(
176+
f"Request failed with user-set gRPC status code {exc.grpc_code}."
177+
)
178+
# Use user-set details if provided, otherwise use the original exception message.
179+
message = exc.grpc_details if exc.grpc_details else str(original_exc)
180+
return ResponseStatus(
181+
code=exc.grpc_code,
182+
is_error=True,
183+
message=_truncate_message(message),
184+
)
144185
else:
145186
if isinstance(exc, (RayActorError, RayTaskError)):
146187
logger.warning(f"Request failed: {exc}", extra={"log_to_stderr": False})
@@ -149,7 +190,7 @@ def get_grpc_response_status(
149190
return ResponseStatus(
150191
code=grpc.StatusCode.INTERNAL,
151192
is_error=True,
152-
message=str(exc),
193+
message=_truncate_message(str(exc)),
153194
)
154195

155196

python/ray/serve/_private/replica.py

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
BackPressureError,
121121
DeploymentUnavailableError,
122122
RayServeException,
123+
gRPCStatusError,
123124
)
124125
from ray.serve.generated.serve_pb2 import (
125126
ASGIRequest,
@@ -972,9 +973,13 @@ async def handle_request(
972973
)
973974
with self._wrap_request(request_metadata, ray_trace_ctx):
974975
async with self._start_request(request_metadata):
975-
return await self._user_callable_wrapper.call_user_method(
976-
request_metadata, request_args, request_kwargs
977-
)
976+
try:
977+
return await self._user_callable_wrapper.call_user_method(
978+
request_metadata, request_args, request_kwargs
979+
)
980+
except Exception as e:
981+
# For gRPC requests, wrap exception with user-set status code
982+
raise self._maybe_wrap_grpc_exception(e, request_metadata) from e
978983

979984
async def handle_request_streaming(
980985
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
@@ -987,22 +992,45 @@ async def handle_request_streaming(
987992
request_metadata, ray_trace_ctx
988993
) as status_code_callback:
989994
async with self._start_request(request_metadata):
990-
if request_metadata.is_http_request:
991-
scope, receive = request_args
992-
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
993-
request_metadata,
994-
status_code_callback,
995-
scope,
996-
receive,
997-
):
998-
yield pickle.dumps(msgs)
999-
else:
1000-
async for result in self._user_callable_wrapper.call_user_generator(
1001-
request_metadata,
1002-
request_args,
1003-
request_kwargs,
1004-
):
1005-
yield result
995+
try:
996+
if request_metadata.is_http_request:
997+
scope, receive = request_args
998+
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
999+
request_metadata,
1000+
status_code_callback,
1001+
scope,
1002+
receive,
1003+
):
1004+
yield pickle.dumps(msgs)
1005+
else:
1006+
async for result in self._user_callable_wrapper.call_user_generator(
1007+
request_metadata,
1008+
request_args,
1009+
request_kwargs,
1010+
):
1011+
yield result
1012+
except Exception as e:
1013+
# For gRPC requests, wrap exception with user-set status code
1014+
raise self._maybe_wrap_grpc_exception(e, request_metadata) from e
1015+
1016+
def _maybe_wrap_grpc_exception(
1017+
self, e: BaseException, request_metadata: RequestMetadata
1018+
) -> BaseException:
1019+
"""Wrap exception with gRPCStatusError if user set a status code.
1020+
1021+
For gRPC requests, if the user set a status code on the grpc_context before
1022+
raising an exception, we wrap the exception with gRPCStatusError to preserve
1023+
the user's intended status code through the error handling path.
1024+
"""
1025+
if request_metadata.is_grpc_request:
1026+
grpc_context = request_metadata.grpc_context
1027+
if grpc_context and grpc_context.code():
1028+
return gRPCStatusError(
1029+
original_exception=e,
1030+
code=grpc_context.code(),
1031+
details=grpc_context.details(),
1032+
)
1033+
return e
10061034

10071035
async def handle_request_with_rejection(
10081036
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
@@ -1032,26 +1060,30 @@ async def handle_request_with_rejection(
10321060
num_ongoing_requests=self.get_num_ongoing_requests(),
10331061
)
10341062

1035-
if request_metadata.is_http_request:
1036-
scope, receive = request_args
1037-
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
1038-
request_metadata,
1039-
status_code_callback,
1040-
scope,
1041-
receive,
1042-
):
1043-
yield pickle.dumps(msgs)
1044-
elif request_metadata.is_streaming:
1045-
async for result in self._user_callable_wrapper.call_user_generator(
1046-
request_metadata,
1047-
request_args,
1048-
request_kwargs,
1049-
):
1050-
yield result
1051-
else:
1052-
yield await self._user_callable_wrapper.call_user_method(
1053-
request_metadata, request_args, request_kwargs
1054-
)
1063+
try:
1064+
if request_metadata.is_http_request:
1065+
scope, receive = request_args
1066+
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
1067+
request_metadata,
1068+
status_code_callback,
1069+
scope,
1070+
receive,
1071+
):
1072+
yield pickle.dumps(msgs)
1073+
elif request_metadata.is_streaming:
1074+
async for result in self._user_callable_wrapper.call_user_generator(
1075+
request_metadata,
1076+
request_args,
1077+
request_kwargs,
1078+
):
1079+
yield result
1080+
else:
1081+
yield await self._user_callable_wrapper.call_user_method(
1082+
request_metadata, request_args, request_kwargs
1083+
)
1084+
except Exception as e:
1085+
# For gRPC requests, wrap exception with user-set status code
1086+
raise self._maybe_wrap_grpc_exception(e, request_metadata) from e
10551087

10561088
@abstractmethod
10571089
async def _on_initialized(self):

python/ray/serve/exceptions.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Optional
22

3+
import grpc
4+
35
from ray.exceptions import TaskCancelledError
46
from ray.serve._private.common import DeploymentID
57
from ray.util.annotations import PublicAPI
@@ -10,6 +12,48 @@ class RayServeException(Exception):
1012
pass
1113

1214

15+
@PublicAPI(stability="stable")
16+
class gRPCStatusError(RayServeException):
17+
"""Internal exception that wraps an exception with user-set gRPC status code.
18+
19+
This is used to preserve user-set gRPC status codes when exceptions are raised
20+
in deployments. When a user sets a status code on the gRPC context before raising
21+
an exception, this wrapper carries that status code through the error handling
22+
path so the proxy can return the user's intended status code instead of INTERNAL.
23+
"""
24+
25+
def __init__(
26+
self,
27+
original_exception: BaseException,
28+
code: Optional[grpc.StatusCode] = None,
29+
details: Optional[str] = None,
30+
):
31+
# Store attributes with underscore prefix to avoid conflicts with
32+
# Ray's exception handling (Ray uses 'cause' internally).
33+
self._original_exception = original_exception
34+
self._grpc_code = code
35+
self._grpc_details = details
36+
super().__init__(str(original_exception))
37+
38+
@property
39+
def original_exception(self) -> BaseException:
40+
"""The original exception that was raised."""
41+
return self._original_exception
42+
43+
@property
44+
def grpc_code(self) -> Optional[grpc.StatusCode]:
45+
"""The user-set gRPC status code, if any."""
46+
return self._grpc_code
47+
48+
@property
49+
def grpc_details(self) -> Optional[str]:
50+
"""The user-set gRPC status details, if any."""
51+
return self._grpc_details
52+
53+
def __str__(self) -> str:
54+
return str(self._original_exception)
55+
56+
1357
@PublicAPI(stability="alpha")
1458
class BackPressureError(RayServeException):
1559
"""Raised when max_queued_requests is exceeded on a DeploymentHandle."""

python/ray/serve/tests/test_grpc.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,8 +465,8 @@ def Streaming(
465465
def test_using_grpc_context_exception(ray_instance, ray_shutdown, streaming: bool):
466466
"""Test setting code on gRPC context then raised exception.
467467
468-
When the deployment in the gRPC context and then raised exception, the response
469-
code should still be internal error instead of user defined error.
468+
When the deployment sets a status code on the gRPC context and then raises an
469+
exception, the user-defined status code should be preserved in the response.
470470
"""
471471
grpc_port = 9000
472472
grpc_servicer_functions = [
@@ -516,6 +516,68 @@ def Streaming(
516516
_ = stub.__call__(request=request)
517517
rpc_error = exception_info.value
518518

519+
# User-defined status code should be preserved instead of INTERNAL
520+
assert rpc_error.code() == user_defined_error_code
521+
assert real_error_message in rpc_error.details()
522+
523+
524+
@pytest.mark.parametrize("streaming", [False, True])
525+
def test_exception_without_grpc_context_code(
526+
ray_instance, ray_shutdown, streaming: bool
527+
):
528+
"""Test raising exception without setting gRPC status code.
529+
530+
When the deployment raises an exception without setting a status code on the
531+
gRPC context, the response should be INTERNAL error.
532+
"""
533+
grpc_port = 9000
534+
grpc_servicer_functions = [
535+
"ray.serve.generated.serve_pb2_grpc.add_UserDefinedServiceServicer_to_server",
536+
]
537+
538+
serve.start(
539+
grpc_options=gRPCOptions(
540+
port=grpc_port,
541+
grpc_servicer_functions=grpc_servicer_functions,
542+
),
543+
)
544+
real_error_message = "test error without status code"
545+
546+
@serve.deployment()
547+
class HelloModel:
548+
def __call__(
549+
self,
550+
user_message: serve_pb2.UserDefinedMessage,
551+
grpc_context: RayServegRPCContext,
552+
):
553+
# Don't set any status code, just raise exception
554+
raise RuntimeError(real_error_message)
555+
556+
def Streaming(
557+
self,
558+
user_message: serve_pb2.UserDefinedMessage,
559+
grpc_context: RayServegRPCContext,
560+
):
561+
# Don't set any status code, just raise exception
562+
raise RuntimeError(real_error_message)
563+
564+
model = HelloModel.bind()
565+
app_name = "app1"
566+
serve.run(model, name=app_name)
567+
568+
url = get_application_url("gRPC", app_name=app_name, use_localhost=True)
569+
channel = grpc.insecure_channel(url)
570+
stub = serve_pb2_grpc.UserDefinedServiceStub(channel)
571+
request = serve_pb2.UserDefinedMessage(name="foo", num=30, foo="bar")
572+
573+
with pytest.raises(grpc.RpcError) as exception_info:
574+
if streaming:
575+
list(stub.Streaming(request=request))
576+
else:
577+
_ = stub.__call__(request=request)
578+
rpc_error = exception_info.value
579+
580+
# Without user-defined status code, should be INTERNAL
519581
assert rpc_error.code() == grpc.StatusCode.INTERNAL
520582
assert real_error_message in rpc_error.details()
521583

0 commit comments

Comments
 (0)