Skip to content

Commit 74c3089

Browse files
abrarsheikhArturNiederfahrenhorst
authored andcommitted
[2/n] [Serve] poll outbound deployments into deployment state (ray-project#58350)
fetch outbound deployments from all replicas at initialization. Next PR -> ray-project#58355 --------- Signed-off-by: abrar <abrar@anyscale.com>
1 parent a48c7ed commit 74c3089

File tree

4 files changed

+236
-41
lines changed

4 files changed

+236
-41
lines changed

python/ray/serve/_private/deployment_state.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ def __init__(
281281
self._last_record_routing_stats_time: float = 0.0
282282
self._ingress: bool = False
283283

284+
# Outbound deployments polling state
285+
self._outbound_deployments: Optional[List[DeploymentID]] = None
286+
284287
@property
285288
def replica_id(self) -> str:
286289
return self._replica_id
@@ -771,6 +774,7 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
771774
self._grpc_port,
772775
self._rank,
773776
self._route_patterns,
777+
self._outbound_deployments,
774778
) = ray.get(self._ready_obj_ref)
775779
except RayTaskError as e:
776780
logger.exception(
@@ -1043,6 +1047,9 @@ def force_stop(self, log_shutdown_message: bool = False):
10431047
except ValueError:
10441048
pass
10451049

1050+
def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
1051+
return self._outbound_deployments
1052+
10461053

10471054
class DeploymentReplica:
10481055
"""Manages state transitions for deployment replicas.
@@ -1323,6 +1330,9 @@ def resource_requirements(self) -> Tuple[str, str]:
13231330
# https://github.com/ray-project/ray/issues/26210 for the issue.
13241331
return json.dumps(required), json.dumps(available)
13251332

1333+
def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
1334+
return self._actor.get_outbound_deployments()
1335+
13261336

13271337
class ReplicaStateContainer:
13281338
"""Container for mapping ReplicaStates to lists of DeploymentReplicas."""
@@ -3088,6 +3098,27 @@ def _stop_one_running_replica_for_testing(self):
30883098
def is_ingress(self) -> bool:
30893099
return self._target_state.info.ingress
30903100

3101+
def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
3102+
"""Get the outbound deployments.
3103+
3104+
Returns:
3105+
Sorted list of deployment IDs that this deployment calls. None if
3106+
outbound deployments are not yet polled.
3107+
"""
3108+
result: Set[DeploymentID] = set()
3109+
has_outbound_deployments = False
3110+
for replica in self._replicas.get([ReplicaState.RUNNING]):
3111+
if replica.version != self._target_state.version:
3112+
# Only consider replicas of the target version
3113+
continue
3114+
outbound_deployments = replica.get_outbound_deployments()
3115+
if outbound_deployments is not None:
3116+
result.update(outbound_deployments)
3117+
has_outbound_deployments = True
3118+
if not has_outbound_deployments:
3119+
return None
3120+
return sorted(result, key=lambda d: (d.name))
3121+
30913122

30923123
class DeploymentStateManager:
30933124
"""Manages all state for deployments in the system.
@@ -3696,3 +3727,21 @@ def _get_replica_ranks_mapping(self, deployment_id: DeploymentID) -> Dict[str, i
36963727
return {}
36973728

36983729
return deployment_state._get_replica_ranks_mapping()
3730+
3731+
def get_deployment_outbound_deployments(
3732+
self, deployment_id: DeploymentID
3733+
) -> Optional[List[DeploymentID]]:
3734+
"""Get the cached outbound deployments for a specific deployment.
3735+
3736+
Args:
3737+
deployment_id: The deployment ID to get outbound deployments for.
3738+
3739+
Returns:
3740+
List of deployment IDs that this deployment calls, or None if
3741+
the deployment doesn't exist or hasn't been polled yet.
3742+
"""
3743+
deployment_state = self._deployment_states.get(deployment_id)
3744+
if deployment_state is None:
3745+
return None
3746+
3747+
return deployment_state.get_outbound_deployments()

python/ray/serve/_private/replica.py

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@
131131
int,
132132
ReplicaRank, # rank
133133
Optional[List[str]], # route_patterns
134+
Optional[List[DeploymentID]], # outbound_deployments
134135
]
135136

136137

@@ -604,11 +605,51 @@ def get_metadata(self) -> ReplicaMetadata:
604605
self._grpc_port,
605606
current_rank,
606607
route_patterns,
608+
self.list_outbound_deployments(),
607609
)
608610

609611
def get_dynamically_created_handles(self) -> Set[DeploymentID]:
610612
return self._dynamically_created_handles
611613

614+
def list_outbound_deployments(self) -> List[DeploymentID]:
615+
"""List all outbound deployment IDs this replica calls into.
616+
617+
This includes:
618+
- Handles created via get_deployment_handle()
619+
- Handles passed as init args/kwargs to the deployment constructor
620+
621+
This is used to determine which deployments are reachable from this replica.
622+
The list of DeploymentIDs can change over time as new handles can be created at runtime.
623+
Also its not guaranteed that the list of DeploymentIDs are identical across replicas
624+
because it depends on user code.
625+
626+
Returns:
627+
A list of DeploymentIDs that this replica calls into.
628+
"""
629+
seen_deployment_ids: Set[DeploymentID] = set()
630+
631+
# First, collect dynamically created handles
632+
for deployment_id in self.get_dynamically_created_handles():
633+
seen_deployment_ids.add(deployment_id)
634+
635+
# Get the init args/kwargs
636+
init_args = self._user_callable_wrapper._init_args
637+
init_kwargs = self._user_callable_wrapper._init_kwargs
638+
639+
# Use _PyObjScanner to find all DeploymentHandle objects in:
640+
# The init_args and init_kwargs (handles might be passed as init args)
641+
scanner = _PyObjScanner(source_type=DeploymentHandle)
642+
try:
643+
handles = scanner.find_nodes((init_args, init_kwargs))
644+
645+
for handle in handles:
646+
deployment_id = handle.deployment_id
647+
seen_deployment_ids.add(deployment_id)
648+
finally:
649+
scanner.clear()
650+
651+
return list(seen_deployment_ids)
652+
612653
def _set_internal_replica_context(
613654
self, *, servable_object: Callable = None, rank: ReplicaRank = None
614655
):
@@ -1219,45 +1260,6 @@ def get_num_ongoing_requests(self) -> int:
12191260
"""
12201261
return self._replica_impl.get_num_ongoing_requests()
12211262

1222-
def list_outbound_deployments(self) -> List[DeploymentID]:
1223-
"""List all outbound deployment IDs this replica calls into.
1224-
1225-
This includes:
1226-
- Handles created via get_deployment_handle()
1227-
- Handles passed as init args/kwargs to the deployment constructor
1228-
1229-
This is used to determine which deployments are reachable from this replica.
1230-
The list of DeploymentIDs can change over time as new handles can be created at runtime.
1231-
Also its not guaranteed that the list of DeploymentIDs are identical across replicas
1232-
because it depends on user code.
1233-
1234-
Returns:
1235-
A list of DeploymentIDs that this replica calls into.
1236-
"""
1237-
seen_deployment_ids: Set[DeploymentID] = set()
1238-
1239-
# First, collect dynamically created handles
1240-
for deployment_id in self._replica_impl.get_dynamically_created_handles():
1241-
seen_deployment_ids.add(deployment_id)
1242-
1243-
# Get the init args/kwargs
1244-
init_args = self._replica_impl._user_callable_wrapper._init_args
1245-
init_kwargs = self._replica_impl._user_callable_wrapper._init_kwargs
1246-
1247-
# Use _PyObjScanner to find all DeploymentHandle objects in:
1248-
# The init_args and init_kwargs (handles might be passed as init args)
1249-
scanner = _PyObjScanner(source_type=DeploymentHandle)
1250-
try:
1251-
handles = scanner.find_nodes((init_args, init_kwargs))
1252-
1253-
for handle in handles:
1254-
deployment_id = handle.deployment_id
1255-
seen_deployment_ids.add(deployment_id)
1256-
finally:
1257-
scanner.clear()
1258-
1259-
return list(seen_deployment_ids)
1260-
12611263
async def is_allocated(self) -> str:
12621264
"""poke the replica to check whether it's alive.
12631265
@@ -1281,6 +1283,9 @@ async def is_allocated(self) -> str:
12811283
get_component_logger_file_path(),
12821284
)
12831285

1286+
def list_outbound_deployments(self) -> Optional[List[DeploymentID]]:
1287+
return self._replica_impl.list_outbound_deployments()
1288+
12841289
async def initialize_and_get_metadata(
12851290
self, deployment_config: DeploymentConfig = None, _after: Optional[Any] = None
12861291
) -> ReplicaMetadata:

python/ray/serve/tests/test_controller_recovery.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def __call__(self, *args):
6565
replica_version_hash = None
6666
for replica in deployment_dict[id]:
6767
ref = replica.get_actor_handle().initialize_and_get_metadata.remote()
68-
_, version, _, _, _, _, _, _, _ = ray.get(ref)
68+
_, version, _, _, _, _, _, _, _, _ = ray.get(ref)
6969
if replica_version_hash is None:
7070
replica_version_hash = hash(version)
7171
assert replica_version_hash == hash(version), (
@@ -118,7 +118,7 @@ def __call__(self, *args):
118118
for replica_name in recovered_replica_names:
119119
actor_handle = ray.get_actor(replica_name, namespace=SERVE_NAMESPACE)
120120
ref = actor_handle.initialize_and_get_metadata.remote()
121-
_, version, _, _, _, _, _, _, _ = ray.get(ref)
121+
_, version, _, _, _, _, _, _, _, _ = ray.get(ref)
122122
assert replica_version_hash == hash(
123123
version
124124
), "Replica version hash should be the same after recover from actor names"

python/ray/serve/tests/unit/test_deployment_state.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,9 @@ def check_health(self):
311311
def get_routing_stats(self) -> Dict[str, Any]:
312312
return {}
313313

314+
def get_outbound_deployments(self) -> Optional[List[DeploymentID]]:
315+
return getattr(self, "_outbound_deployments", None)
316+
314317
@property
315318
def route_patterns(self) -> Optional[List[str]]:
316319
return None
@@ -5609,5 +5612,143 @@ def test_rank_assignment_with_replica_failures(self, mock_deployment_state_manag
56095612
}, f"Expected ranks [0, 1, 2], got {ranks_mapping.values()}"
56105613

56115614

5615+
class TestGetOutboundDeployments:
5616+
def test_basic_outbound_deployments(self, mock_deployment_state_manager):
5617+
"""Test that outbound deployments are returned."""
5618+
create_dsm, _, _, _ = mock_deployment_state_manager
5619+
dsm: DeploymentStateManager = create_dsm()
5620+
5621+
deployment_id = DeploymentID(name="test_deployment", app_name="test_app")
5622+
b_info_1, _ = deployment_info(num_replicas=1)
5623+
dsm.deploy(deployment_id, b_info_1)
5624+
5625+
# Create a RUNNING replica
5626+
ds = dsm._deployment_states[deployment_id]
5627+
dsm.update() # Transitions to STARTING
5628+
for replica in ds._replicas.get([ReplicaState.STARTING]):
5629+
replica._actor.set_ready()
5630+
dsm.update() # Transitions to RUNNING
5631+
5632+
# Set outbound deployments on the mock replica
5633+
running_replicas = ds._replicas.get([ReplicaState.RUNNING])
5634+
assert len(running_replicas) == 1
5635+
d1 = DeploymentID(name="dep1", app_name="test_app")
5636+
d2 = DeploymentID(name="dep2", app_name="test_app")
5637+
running_replicas[0]._actor._outbound_deployments = [d1, d2]
5638+
5639+
outbound_deployments = ds.get_outbound_deployments()
5640+
assert outbound_deployments == [d1, d2]
5641+
5642+
# Verify it's accessible through DeploymentStateManager
5643+
assert dsm.get_deployment_outbound_deployments(deployment_id) == [
5644+
d1,
5645+
d2,
5646+
]
5647+
5648+
def test_deployment_state_manager_returns_none_for_nonexistent_deployment(
5649+
self, mock_deployment_state_manager
5650+
):
5651+
"""Test that DeploymentStateManager returns None for nonexistent deployments."""
5652+
(
5653+
create_dsm,
5654+
timer,
5655+
cluster_node_info_cache,
5656+
autoscaling_state_manager,
5657+
) = mock_deployment_state_manager
5658+
dsm = create_dsm()
5659+
5660+
deployment_id = DeploymentID(name="nonexistent", app_name="test_app")
5661+
assert dsm.get_deployment_outbound_deployments(deployment_id) is None
5662+
5663+
def test_returns_none_if_replicas_are_not_running(
5664+
self, mock_deployment_state_manager
5665+
):
5666+
"""Test that DeploymentStateManager returns None if replicas are not running."""
5667+
create_dsm, _, _, _ = mock_deployment_state_manager
5668+
dsm: DeploymentStateManager = create_dsm()
5669+
5670+
deployment_id = DeploymentID(name="test_deployment", app_name="test_app")
5671+
b_info_1, _ = deployment_info(num_replicas=2)
5672+
dsm.deploy(deployment_id, b_info_1)
5673+
ds = dsm._deployment_states[deployment_id]
5674+
dsm.update()
5675+
replicas = ds._replicas.get([ReplicaState.STARTING])
5676+
assert len(replicas) == 2
5677+
d1 = DeploymentID(name="dep1", app_name="test_app")
5678+
d2 = DeploymentID(name="dep2", app_name="test_app")
5679+
d3 = DeploymentID(name="dep3", app_name="test_app")
5680+
d4 = DeploymentID(name="dep4", app_name="test_app")
5681+
replicas[0]._actor._outbound_deployments = [d1, d2]
5682+
replicas[1]._actor._outbound_deployments = [d3, d4]
5683+
dsm.update()
5684+
5685+
outbound_deployments = ds.get_outbound_deployments()
5686+
assert outbound_deployments is None
5687+
5688+
# Set replicas ready
5689+
replicas[0]._actor.set_ready()
5690+
dsm.update()
5691+
outbound_deployments = ds.get_outbound_deployments()
5692+
assert outbound_deployments == [d1, d2]
5693+
5694+
def test_only_considers_replicas_matching_target_version(
5695+
self, mock_deployment_state_manager
5696+
):
5697+
"""Test that only replicas with target version are considered.
5698+
5699+
When a new version is deployed, old version replicas that are still
5700+
running should not be included in the outbound deployments result.
5701+
"""
5702+
create_dsm, _, _, _ = mock_deployment_state_manager
5703+
dsm: DeploymentStateManager = create_dsm()
5704+
5705+
# Deploy version 1
5706+
b_info_1, v1 = deployment_info(version="1")
5707+
dsm.deploy(TEST_DEPLOYMENT_ID, b_info_1)
5708+
ds = dsm._deployment_states[TEST_DEPLOYMENT_ID]
5709+
dsm.update()
5710+
5711+
# Get v1 replica to RUNNING state
5712+
ds._replicas.get()[0]._actor.set_ready()
5713+
dsm.update()
5714+
5715+
# Set outbound deployments for v1 replica
5716+
d1 = DeploymentID(name="dep1", app_name="test_app")
5717+
d2 = DeploymentID(name="dep2", app_name="test_app")
5718+
ds._replicas.get()[0]._actor._outbound_deployments = [d1, d2]
5719+
5720+
# Verify v1 outbound deployments are returned
5721+
assert ds.get_outbound_deployments() == [d1, d2]
5722+
5723+
# Deploy version 2 - this triggers rolling update
5724+
b_info_2, v2 = deployment_info(version="2")
5725+
dsm.deploy(TEST_DEPLOYMENT_ID, b_info_2)
5726+
dsm.update()
5727+
5728+
# Now we have v1 stopping and v2 starting
5729+
check_counts(
5730+
ds,
5731+
total=2,
5732+
by_state=[(ReplicaState.STOPPING, 1, v1), (ReplicaState.STARTING, 1, v2)],
5733+
)
5734+
5735+
# Key test: Even though v1 replica exists (stopping), it should not be
5736+
# included because target version is v2. Since v2 is not RUNNING yet,
5737+
# should return None.
5738+
assert ds.get_outbound_deployments() is None
5739+
5740+
# Set outbound deployments for v2 replica and mark it ready
5741+
d3 = DeploymentID(name="dep3", app_name="test_app")
5742+
ds._replicas.get(states=[ReplicaState.STARTING])[
5743+
0
5744+
]._actor._outbound_deployments = [d3]
5745+
ds._replicas.get(states=[ReplicaState.STARTING])[0]._actor.set_ready()
5746+
dsm.update()
5747+
5748+
# Now v2 is running. Should only return v2's outbound deployments (d3),
5749+
# not v1's outbound deployments (d1, d2).
5750+
assert ds.get_outbound_deployments() == [d3]
5751+
5752+
56125753
if __name__ == "__main__":
56135754
sys.exit(pytest.main(["-v", "-s", __file__]))

0 commit comments

Comments
 (0)