Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions doc/source/serve/advanced-guides/asyncio-best-practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ Important differences:
- FastAPI always dispatches `def` endpoints to a threadpool.
- In pure Serve, `def` methods run on the event loop unless you opt into threadpool behavior.

## Threadpool sizing and overrides

Serve sets a default threadpool size for user code that mirrors Python's
`ThreadPoolExecutor` defaults while respecting `ray_actor_options["num_cpus"]`.

In most cases, the default is fine. If you need to tune it, you can override the default
executor inside your deployment:

```{literalinclude} ../doc_code/asyncio_best_practices.py
:start-after: __threadpool_override_begin__
:end-before: __threadpool_override_end__
:language: python
```

Guidance for choosing a size:

- Default is fine in most cases.
- For I/O-blocking code, consider a threadpool larger than `num_cpus`.
- For GIL-releasing compute (NumPy/Pandas/SciPy, etc.), keep the threadpool at or below `num_cpus`.

## Blocking versus non-blocking in practice

Blocking code keeps the event loop from processing other work. Non-blocking code yields control back to the loop when it's waiting on something.
Expand Down
23 changes: 21 additions & 2 deletions doc/source/serve/doc_code/asyncio_best_practices.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ def fetch():
return await asyncio.to_thread(fetch)
# __threaded_http_end__

# __threadpool_override_begin__
from concurrent.futures import ThreadPoolExecutor

@serve.deployment
class CustomThreadPool:
def __init__(self):
loop = asyncio.get_running_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=16))

async def __call__(self, request):
return await asyncio.to_thread(lambda: "ok")
# __threadpool_override_end__


# __numpy_deployment_begin__
@serve.deployment
Expand Down Expand Up @@ -331,7 +344,13 @@ async def __call__(self, request):
result = cpu_threadpool_handle.remote(None).result()
print(f"CPUWithThreadpool result: {result}")
assert result == "ok"


print("\nTesting CustomThreadPool deployment...")
custom_threadpool_handle = serve.run(CustomThreadPool.bind())
result = custom_threadpool_handle.remote(None).result()
print(f"CustomThreadPool result: {result}")
assert result == "ok"

print("\nTesting BlockingStream deployment...")
# Test BlockingStream - just verify it can be created and called
blocking_stream_handle = serve.run(BlockingStream.bind())
Expand Down Expand Up @@ -389,7 +408,7 @@ async def __call__(self, request):
print("✅ ThreadedHTTP test passed")
except Exception as e:
print(f"⚠️ ThreadedHTTP test failed (expected): {type(e).__name__}: {e}")

print("\nTesting OffloadIO deployment...")
try:
offload_io_handle = serve.run(OffloadIO.bind())
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/_private/local_testing_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def make_local_deployment_handle(
local_testing_mode=True,
deployment_config=deployment._deployment_config,
actor_id="local",
ray_actor_options=deployment.ray_actor_options,
)
try:
logger.info(f"Initializing local replica class for {deployment_id}.")
Expand Down
28 changes: 28 additions & 0 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import inspect
import logging
import math
import os
import pickle
import threading
Expand Down Expand Up @@ -635,6 +636,7 @@ def __init__(
local_testing_mode=False,
deployment_config=deployment_config,
actor_id=actor_id,
ray_actor_options=self._version.ray_actor_options,
)
self._semaphore = Semaphore(lambda: self.max_ongoing_requests)

Expand Down Expand Up @@ -1692,6 +1694,7 @@ def __init__(
local_testing_mode: bool,
deployment_config: DeploymentConfig,
actor_id: str,
ray_actor_options: Optional[Dict] = None,
):
if not (inspect.isfunction(deployment_def) or inspect.isclass(deployment_def)):
raise TypeError(
Expand All @@ -1715,6 +1718,10 @@ def __init__(
# Will be populated in `initialize_callable`.
self._callable = None
self._deployment_config = deployment_config
self._ray_actor_options = ray_actor_options or {}
self._user_code_threadpool: Optional[
concurrent.futures.ThreadPoolExecutor
] = None

if self._run_user_code_in_separate_thread:
# All interactions with user code run on this loop to avoid blocking the
Expand All @@ -1740,6 +1747,7 @@ def _run_user_code_event_loop():
# Required so that calls to get the current running event loop work
# properly in user code.
asyncio.set_event_loop(self._user_code_event_loop)
self._configure_user_code_threadpool()
# Start monitoring before run_forever so the task is scheduled.
self._user_code_loop_monitor.start(self._user_code_event_loop)
self._user_code_event_loop.run_forever()
Expand All @@ -1752,11 +1760,28 @@ def _run_user_code_event_loop():
else:
self._user_code_event_loop = asyncio.get_running_loop()
self._user_code_loop_monitor = None
self._configure_user_code_threadpool()

@property
def event_loop(self) -> asyncio.AbstractEventLoop:
return self._user_code_event_loop

def _get_user_code_threadpool_max_workers(self) -> Optional[int]:
num_cpus = self._ray_actor_options.get("num_cpus")
if num_cpus is None:
return None
# Mirror ThreadPoolExecutor default behavior while respecting num_cpus.
return min(32, max(1, int(math.ceil(num_cpus))) + 4)

def _configure_user_code_threadpool(self) -> None:
max_workers = self._get_user_code_threadpool_max_workers()
if max_workers is None:
return
self._user_code_threadpool = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
)
self._user_code_event_loop.set_default_executor(self._user_code_threadpool)

def _run_user_code(f: Callable) -> Callable:
"""Decorator to run a coroutine method on the user code event loop.

Expand Down Expand Up @@ -2477,3 +2502,6 @@ async def call_destructor(self):

except Exception as e:
logger.exception(f"Exception during graceful shutdown of replica: {e}")
finally:
if self._user_code_threadpool is not None:
self._user_code_threadpool.shutdown()
22 changes: 22 additions & 0 deletions python/ray/serve/tests/test_replica_sync_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,27 @@ async def __call__(self):
assert h.remote().result() == 10


@pytest.mark.skipif(
not RAY_SERVE_RUN_SYNC_IN_THREADPOOL,
reason="Run sync method in threadpool FF disabled.",
)
@pytest.mark.parametrize(
("num_cpus", "expected_workers"),
[(0, 5), (2.2, 7), (30, 32)],
)
def test_asyncio_default_executor_limited_by_num_cpus(
serve_instance, num_cpus, expected_workers
):
@serve.deployment(ray_actor_options={"num_cpus": num_cpus})
class D:
async def __call__(self):
loop = asyncio.get_running_loop()
executor = loop._default_executor
return executor._max_workers if executor is not None else None

h = serve.run(D.bind())
assert h.remote().result() == expected_workers


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def _make_user_callable_wrapper(
local_testing_mode=False,
deployment_config=DeploymentConfig(max_ongoing_requests=100),
actor_id="test-actor-id",
ray_actor_options={},
)


Expand Down