Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ async def _resolve_deployment_responses(
_DeploymentResponseBase,
)

scanner = _PyObjScanner(source_type=_DeploymentResponseBase)
scanner = _PyObjScanner(
source_type=(_DeploymentResponseBase, ray.ObjectRef, ray.ObjectRefGenerator)
)

try:
responses = []
Expand All @@ -390,6 +392,12 @@ async def _resolve_deployment_responses(
elif isinstance(obj, DeploymentResponse):
responses.append(obj)

# This is no-op replacing the object with itself. The purpose is to make
# sure both object refs and object ref generator are not getting pinned
# to memory by the scanner and cause memory leak.
elif isinstance(obj, (ray.ObjectRef, ray.ObjectRefGenerator)):
replacement_table[obj] = obj

# Gather `DeploymentResponse` object refs concurrently.
if len(responses) > 0:
obj_refs = await asyncio.gather(
Expand Down
54 changes: 54 additions & 0 deletions python/ray/serve/tests/test_standalone_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pytest
import requests
from starlette.requests import Request

import ray
import ray._private.state
Expand All @@ -19,6 +20,7 @@
from ray.serve._private.logging_utils import get_serve_logs_dir
from ray.serve._private.utils import get_head_node_id
from ray.serve.context import _get_global_client
from ray.serve.handle import DeploymentHandle
from ray.serve.schema import ServeInstanceDetails
from ray.tests.conftest import call_ray_stop_only # noqa: F401

Expand Down Expand Up @@ -672,5 +674,57 @@ def __call__(self):
assert all_serve_logs.count("Deleting application 'default'") == 1


def test_passing_object_ref_to_deployment_not_pinned_to_memory(
shutdown_ray, call_ray_stop_only # noqa: F811
):
"""Test passing object ref to deployment not pinned to memory and cause memory leak.

We had issue that passing object ref to a deployment will result in memory leak
due to _PyObjScanner/ cloudpickler pinning the object to memory. This test will
ensure the object ref is release after the request is done.

See: https://github.com/ray-project/ray/issues/43248
"""
cluster = Cluster()
cluster.add_node(num_cpus=5)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

@serve.deployment
class Dep1:
def multiple_by_two(self, length: int):
return length * 2

@serve.deployment
class Gateway:
def __init__(self, dep1: DeploymentHandle):
self.dep1: DeploymentHandle = dep1

async def __call__(self, http_request: Request) -> str:
_length = int(http_request.query_params.get("length"))
length_ref = ray.put(_length)
obj_ref_hex = length_ref.hex()

# Object ref should be in the memory for downstream deployment to access.
assert obj_ref_hex in ray._private.internal_api.memory_summary()
return {
"result": await self.dep1.multiple_by_two.remote(length_ref),
"length": _length,
"obj_ref_hex": obj_ref_hex,
}

app = Gateway.bind(Dep1.bind())
serve.run(target=app)

length = 10
response = requests.get(f"http://localhost:8000?length={length}").json()
assert response["result"] == length * 2
assert response["length"] == length

# Ensure the object ref is not in the memory anymore.
assert response["obj_ref_hex"] not in ray._private.internal_api.memory_summary()
serve.shutdown()


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))