Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions doc/source/serve/advanced-guides/grpc-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ about the request as well as setting response metadata such as code and details.
If the handler function is defined with a `grpc_context` argument, Serve will pass a
[RayServegRPCContext](../api/doc/ray.serve.grpc_util.RayServegRPCContext.rst) object
in for each request. Below is an example of how to set a custom status code,
details, and trailing metadata.
details, and trailing metadata. You can also set a status code before raising an
exception, and Serve will preserve that status code in the error response. This is
useful for returning meaningful status codes like `RESOURCE_EXHAUSTED` (retryable)
or `INVALID_ARGUMENT` (not retryable) instead of the generic `INTERNAL` error.

```{literalinclude} ../doc_code/grpc_proxy/grpc_guide.py
:start-after: __begin_grpc_context_define_app__
Expand All @@ -320,7 +323,10 @@ The client code is defined like the following to get those attributes.
```

:::{note}
If the handler raises an unhandled exception, Serve will return an `INTERNAL` error code
with the stacktrace in the details, regardless of what code and details
are set in the `RayServegRPCContext` object.
If the handler raises an unhandled exception without setting a status code on the
`RayServegRPCContext` object, Serve returns an `INTERNAL` error code with the
exception message in the details. However, if you set a status code on the context
before raising the exception, Serve preserves that status code in the response.
This allows you to return meaningful status codes like `INVALID_ARGUMENT` or
`RESOURCE_EXHAUSTED` even when raising exceptions.
:::
7 changes: 7 additions & 0 deletions doc/source/serve/doc_code/grpc_proxy/grpc_guide.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ def __call__(
grpc_context.set_details(message)
grpc_context.set_trailing_metadata([("num", str(num))])

# You can also set a status code before raising an exception.
# The status code will be preserved in the response.
if user_message.name == "error":
grpc_context.set_code(grpc.StatusCode.RESOURCE_EXHAUSTED)
grpc_context.set_details("Resource exhausted, please retry later.")
raise RuntimeError("Simulated error")

user_response = UserDefinedResponse(
greeting=greeting,
num=num,
Expand Down
45 changes: 43 additions & 2 deletions python/ray/serve/_private/grpc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@
)
from ray.serve._private.proxy_request_response import ResponseStatus
from ray.serve.config import gRPCOptions
from ray.serve.exceptions import BackPressureError, DeploymentUnavailableError
from ray.serve.exceptions import (
BackPressureError,
DeploymentUnavailableError,
gRPCStatusError,
)
from ray.serve.generated.serve_pb2_grpc import add_RayServeAPIServiceServicer_to_server

# Maximum length for gRPC status details to avoid hitting HTTP/2 trailer limits.
# gRPC default max metadata size is 8KB, so we use a conservative limit.
GRPC_MAX_STATUS_DETAILS_LENGTH = 4096

logger = logging.getLogger(SERVE_LOGGER_NAME)


Expand Down Expand Up @@ -110,6 +118,20 @@ async def start_grpc_server(
return event_loop.create_task(server.wait_for_termination())


def _truncate_message(
message: str, max_length: int = GRPC_MAX_STATUS_DETAILS_LENGTH
) -> str:
"""Truncate a message to avoid exceeding HTTP/2 trailer limits.

gRPC status details are sent as part of HTTP/2 trailers, which have a fixed size limit.
If the message (e.g., a stack trace) is too long, it can cause issues on the client side.
"""
if len(message) <= max_length:
return message
truncation_notice = "... [truncated]"
return message[: max_length - len(truncation_notice)] + truncation_notice


def get_grpc_response_status(
exc: BaseException, request_timeout_s: float, request_id: str
) -> ResponseStatus:
Expand Down Expand Up @@ -141,6 +163,25 @@ def get_grpc_response_status(
is_error=True,
message=exc.message,
)
elif isinstance(exc, gRPCStatusError):
# User set a gRPC status code before raising the exception.
# Respect the user's status code instead of returning INTERNAL.
original_exc = exc.original_exception
if isinstance(original_exc, (RayActorError, RayTaskError)):
logger.warning(
f"Request failed: {original_exc}", extra={"log_to_stderr": False}
)
else:
logger.exception(
f"Request failed with user-set gRPC status code {exc.grpc_code}."
)
# Use user-set details if provided, otherwise use the original exception message.
message = exc.grpc_details if exc.grpc_details else str(original_exc)
return ResponseStatus(
code=exc.grpc_code,
is_error=True,
message=_truncate_message(message),
)
else:
if isinstance(exc, (RayActorError, RayTaskError)):
logger.warning(f"Request failed: {exc}", extra={"log_to_stderr": False})
Expand All @@ -149,7 +190,7 @@ def get_grpc_response_status(
return ResponseStatus(
code=grpc.StatusCode.INTERNAL,
is_error=True,
message=str(exc),
message=_truncate_message(str(exc)),
)


Expand Down
110 changes: 71 additions & 39 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
BackPressureError,
DeploymentUnavailableError,
RayServeException,
gRPCStatusError,
)
from ray.serve.generated.serve_pb2 import (
ASGIRequest,
Expand Down Expand Up @@ -970,9 +971,13 @@ async def handle_request(
)
with self._wrap_request(request_metadata, ray_trace_ctx):
async with self._start_request(request_metadata):
return await self._user_callable_wrapper.call_user_method(
request_metadata, request_args, request_kwargs
)
try:
return await self._user_callable_wrapper.call_user_method(
request_metadata, request_args, request_kwargs
)
except Exception as e:
# For gRPC requests, wrap exception with user-set status code
raise self._maybe_wrap_grpc_exception(e, request_metadata) from e

async def handle_request_streaming(
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
Expand All @@ -985,22 +990,45 @@ async def handle_request_streaming(
request_metadata, ray_trace_ctx
) as status_code_callback:
async with self._start_request(request_metadata):
if request_metadata.is_http_request:
scope, receive = request_args
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
request_metadata,
status_code_callback,
scope,
receive,
):
yield pickle.dumps(msgs)
else:
async for result in self._user_callable_wrapper.call_user_generator(
request_metadata,
request_args,
request_kwargs,
):
yield result
try:
if request_metadata.is_http_request:
scope, receive = request_args
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
request_metadata,
status_code_callback,
scope,
receive,
):
yield pickle.dumps(msgs)
else:
async for result in self._user_callable_wrapper.call_user_generator(
request_metadata,
request_args,
request_kwargs,
):
yield result
except Exception as e:
# For gRPC requests, wrap exception with user-set status code
raise self._maybe_wrap_grpc_exception(e, request_metadata) from e

def _maybe_wrap_grpc_exception(
self, e: BaseException, request_metadata: RequestMetadata
) -> BaseException:
"""Wrap exception with gRPCStatusError if user set a status code.

For gRPC requests, if the user set a status code on the grpc_context before
raising an exception, we wrap the exception with gRPCStatusError to preserve
the user's intended status code through the error handling path.
"""
if request_metadata.is_grpc_request:
grpc_context = request_metadata.grpc_context
if grpc_context and grpc_context.code():
return gRPCStatusError(
original_exception=e,
code=grpc_context.code(),
details=grpc_context.details(),
)
return e

async def handle_request_with_rejection(
self, request_metadata: RequestMetadata, *request_args, **request_kwargs
Expand Down Expand Up @@ -1030,26 +1058,30 @@ async def handle_request_with_rejection(
num_ongoing_requests=self.get_num_ongoing_requests(),
)

if request_metadata.is_http_request:
scope, receive = request_args
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
request_metadata,
status_code_callback,
scope,
receive,
):
yield pickle.dumps(msgs)
elif request_metadata.is_streaming:
async for result in self._user_callable_wrapper.call_user_generator(
request_metadata,
request_args,
request_kwargs,
):
yield result
else:
yield await self._user_callable_wrapper.call_user_method(
request_metadata, request_args, request_kwargs
)
try:
if request_metadata.is_http_request:
scope, receive = request_args
async for msgs in self._user_callable_wrapper.call_http_entrypoint(
request_metadata,
status_code_callback,
scope,
receive,
):
yield pickle.dumps(msgs)
elif request_metadata.is_streaming:
async for result in self._user_callable_wrapper.call_user_generator(
request_metadata,
request_args,
request_kwargs,
):
yield result
else:
yield await self._user_callable_wrapper.call_user_method(
request_metadata, request_args, request_kwargs
)
except Exception as e:
# For gRPC requests, wrap exception with user-set status code
raise self._maybe_wrap_grpc_exception(e, request_metadata) from e

@abstractmethod
async def _on_initialized(self):
Expand Down
43 changes: 43 additions & 0 deletions python/ray/serve/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional

import grpc

from ray.exceptions import TaskCancelledError
from ray.serve._private.common import DeploymentID
from ray.util.annotations import PublicAPI
Expand All @@ -10,6 +12,47 @@ class RayServeException(Exception):
pass


class gRPCStatusError(RayServeException):
"""Internal exception that wraps an exception with user-set gRPC status code.

This is used to preserve user-set gRPC status codes when exceptions are raised
in deployments. When a user sets a status code on the gRPC context before raising
an exception, this wrapper carries that status code through the error handling
path so the proxy can return the user's intended status code instead of INTERNAL.
"""

def __init__(
self,
original_exception: BaseException,
code: Optional[grpc.StatusCode] = None,
details: Optional[str] = None,
):
# Store attributes with underscore prefix to avoid conflicts with
# Ray's exception handling (Ray uses 'cause' internally).
self._original_exception = original_exception
self._grpc_code = code
self._grpc_details = details
super().__init__(str(original_exception))

@property
def original_exception(self) -> BaseException:
"""The original exception that was raised."""
return self._original_exception

@property
def grpc_code(self) -> Optional[grpc.StatusCode]:
"""The user-set gRPC status code, if any."""
return self._grpc_code

@property
def grpc_details(self) -> Optional[str]:
"""The user-set gRPC status details, if any."""
return self._grpc_details

def __str__(self) -> str:
return str(self._original_exception)


@PublicAPI(stability="alpha")
class BackPressureError(RayServeException):
"""Raised when max_queued_requests is exceeded on a DeploymentHandle."""
Expand Down
66 changes: 64 additions & 2 deletions python/ray/serve/tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ def Streaming(
def test_using_grpc_context_exception(ray_instance, ray_shutdown, streaming: bool):
"""Test setting code on gRPC context then raised exception.

When the deployment in the gRPC context and then raised exception, the response
code should still be internal error instead of user defined error.
When the deployment sets a status code on the gRPC context and then raises an
exception, the user-defined status code should be preserved in the response.
"""
grpc_port = 9000
grpc_servicer_functions = [
Expand Down Expand Up @@ -516,6 +516,68 @@ def Streaming(
_ = stub.__call__(request=request)
rpc_error = exception_info.value

# User-defined status code should be preserved instead of INTERNAL
assert rpc_error.code() == user_defined_error_code
assert real_error_message in rpc_error.details()


@pytest.mark.parametrize("streaming", [False, True])
def test_exception_without_grpc_context_code(
ray_instance, ray_shutdown, streaming: bool
):
"""Test raising exception without setting gRPC status code.

When the deployment raises an exception without setting a status code on the
gRPC context, the response should be INTERNAL error.
"""
grpc_port = 9000
grpc_servicer_functions = [
"ray.serve.generated.serve_pb2_grpc.add_UserDefinedServiceServicer_to_server",
]

serve.start(
grpc_options=gRPCOptions(
port=grpc_port,
grpc_servicer_functions=grpc_servicer_functions,
),
)
real_error_message = "test error without status code"

@serve.deployment()
class HelloModel:
def __call__(
self,
user_message: serve_pb2.UserDefinedMessage,
grpc_context: RayServegRPCContext,
):
# Don't set any status code, just raise exception
raise RuntimeError(real_error_message)

def Streaming(
self,
user_message: serve_pb2.UserDefinedMessage,
grpc_context: RayServegRPCContext,
):
# Don't set any status code, just raise exception
raise RuntimeError(real_error_message)

model = HelloModel.bind()
app_name = "app1"
serve.run(model, name=app_name)

url = get_application_url("gRPC", app_name=app_name, use_localhost=True)
channel = grpc.insecure_channel(url)
stub = serve_pb2_grpc.UserDefinedServiceStub(channel)
request = serve_pb2.UserDefinedMessage(name="foo", num=30, foo="bar")

with pytest.raises(grpc.RpcError) as exception_info:
if streaming:
list(stub.Streaming(request=request))
else:
_ = stub.__call__(request=request)
rpc_error = exception_info.value

# Without user-defined status code, should be INTERNAL
assert rpc_error.code() == grpc.StatusCode.INTERNAL
assert real_error_message in rpc_error.details()

Expand Down
Loading