Skip to content

Commit c9aa1b3

Browse files
authored
[Serve] set last scale up/down time on autoscaling context (#59057)
fixes #59001 --------- Signed-off-by: abrar <abrar@anyscale.com>
1 parent b242160 commit c9aa1b3

File tree

3 files changed

+216
-2
lines changed

3 files changed

+216
-2
lines changed

python/ray/serve/_private/autoscaling_state.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ def __init__(self, deployment_id: DeploymentID):
5656
self._running_replicas: List[ReplicaID] = []
5757
self._target_capacity: Optional[float] = None
5858
self._target_capacity_direction: Optional[TargetCapacityDirection] = None
59+
# Track timestamps of last scale up and scale down events
60+
self._last_scale_up_time: Optional[float] = None
61+
self._last_scale_down_time: Optional[float] = None
5962

6063
def register(self, info: DeploymentInfo, curr_target_num_replicas: int) -> int:
6164
"""Registers an autoscaling deployment's info.
@@ -121,6 +124,14 @@ def update_running_replica_ids(self, running_replicas: List[ReplicaID]):
121124
"""Update cached set of running replica IDs for this deployment."""
122125
self._running_replicas = running_replicas
123126

127+
def record_scale_up(self):
128+
"""Record a scale up event by updating the timestamp."""
129+
self._last_scale_up_time = time.time()
130+
131+
def record_scale_down(self):
132+
"""Record a scale down event by updating the timestamp."""
133+
self._last_scale_down_time = time.time()
134+
124135
def is_within_bounds(self, num_replicas_running_at_target_version: int):
125136
"""Whether or not this deployment is within the autoscaling bounds.
126137
@@ -253,8 +264,8 @@ def get_autoscaling_context(self, curr_target_num_replicas) -> AutoscalingContex
253264
total_queued_requests=self._get_queued_requests,
254265
aggregated_metrics=self._get_aggregated_custom_metrics,
255266
raw_metrics=self._get_raw_custom_metrics,
256-
last_scale_up_time=None,
257-
last_scale_down_time=None,
267+
last_scale_up_time=self._last_scale_up_time,
268+
last_scale_down_time=self._last_scale_down_time,
258269
)
259270

260271
def _collect_replica_running_requests(self) -> List[TimeSeries]:
@@ -792,6 +803,16 @@ def update_running_replica_ids(
792803
running_replicas
793804
)
794805

806+
def record_scale_up(self, deployment_id: DeploymentID):
807+
"""Record a scale up event for a deployment."""
808+
if deployment_id in self._deployment_autoscaling_states:
809+
self._deployment_autoscaling_states[deployment_id].record_scale_up()
810+
811+
def record_scale_down(self, deployment_id: DeploymentID):
812+
"""Record a scale down event for a deployment."""
813+
if deployment_id in self._deployment_autoscaling_states:
814+
self._deployment_autoscaling_states[deployment_id].record_scale_down()
815+
795816
def on_replica_stopped(self, replica_id: ReplicaID):
796817
dep_id = replica_id.deployment_id
797818
if dep_id in self._deployment_autoscaling_states:
@@ -943,6 +964,26 @@ def update_running_replica_ids(
943964
if app_state:
944965
app_state.update_running_replica_ids(deployment_id, running_replicas)
945966

967+
def record_scale_up(self, deployment_id: DeploymentID):
968+
"""Record a scale up event for a deployment.
969+
970+
Args:
971+
deployment_id: The ID of the deployment being scaled up.
972+
"""
973+
app_state = self._app_autoscaling_states.get(deployment_id.app_name)
974+
if app_state:
975+
app_state.record_scale_up(deployment_id)
976+
977+
def record_scale_down(self, deployment_id: DeploymentID):
978+
"""Record a scale down event for a deployment.
979+
980+
Args:
981+
deployment_id: The ID of the deployment being scaled down.
982+
"""
983+
app_state = self._app_autoscaling_states.get(deployment_id.app_name)
984+
if app_state:
985+
app_state.record_scale_down(deployment_id)
986+
946987
def on_replica_stopped(self, replica_id: ReplicaID):
947988
app_state = self._app_autoscaling_states.get(replica_id.deployment_id.app_name)
948989
if app_state:

python/ray/serve/_private/deployment_state.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2361,6 +2361,7 @@ def autoscale(self, decision_num_replicas: int) -> bool:
23612361
trigger=DeploymentStatusInternalTrigger.AUTOSCALE_UP,
23622362
message=f"Upscaling from {old_num} to {new_num} replicas.",
23632363
)
2364+
self._autoscaling_state_manager.record_scale_up(self._id)
23642365
elif new_num < old_num:
23652366
logger.info(
23662367
f"Downscaling {self._id} from {old_num} to {new_num} replicas. "
@@ -2370,6 +2371,8 @@ def autoscale(self, decision_num_replicas: int) -> bool:
23702371
trigger=DeploymentStatusInternalTrigger.AUTOSCALE_DOWN,
23712372
message=f"Downscaling from {old_num} to {new_num} replicas.",
23722373
)
2374+
self._autoscaling_state_manager.record_scale_down(self._id)
2375+
23732376
return True
23742377

23752378
def delete(self) -> bool:

python/ray/serve/tests/unit/test_deployment_state.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3785,6 +3785,176 @@ def test_handle_metrics_on_dead_serve_actor(self, mock_deployment_state_manager)
37853785
dsm.update()
37863786
check_counts(ds1, total=2, by_state=[(ReplicaState.STOPPING, 2, None)])
37873787

3788+
def test_autoscaling_timestamps(self, mock_deployment_state_manager):
3789+
"""Test that last_scale_up_time and last_scale_down_time are properly tracked.
3790+
3791+
This test verifies that:
3792+
1. Timestamps are None initially
3793+
2. last_scale_up_time is set after a scale-up event
3794+
3. last_scale_down_time is set after a scale-down event
3795+
4. Timestamps are available in AutoscalingContext
3796+
"""
3797+
# Create deployment state manager
3798+
create_dsm, timer, _, asm = mock_deployment_state_manager
3799+
dsm: DeploymentStateManager = create_dsm()
3800+
asm: AutoscalingStateManager = asm
3801+
3802+
# Deploy deployment with autoscaling
3803+
info, _ = deployment_info(
3804+
autoscaling_config={
3805+
"target_ongoing_requests": 1,
3806+
"min_replicas": 1,
3807+
"max_replicas": 5,
3808+
"initial_replicas": 2,
3809+
"upscale_delay_s": 0,
3810+
"downscale_delay_s": 0,
3811+
"metrics_interval_s": 100,
3812+
}
3813+
)
3814+
dsm.deploy(TEST_DEPLOYMENT_ID, info)
3815+
ds: DeploymentState = dsm._deployment_states[TEST_DEPLOYMENT_ID]
3816+
3817+
# Make replicas ready
3818+
dsm.update()
3819+
for replica in ds._replicas.get():
3820+
replica._actor.set_ready()
3821+
dsm.update()
3822+
3823+
# Get autoscaling state
3824+
app_state = asm._app_autoscaling_states[TEST_DEPLOYMENT_ID.app_name]
3825+
dep_autoscaling_state = app_state._deployment_autoscaling_states[
3826+
TEST_DEPLOYMENT_ID
3827+
]
3828+
3829+
# Initially, timestamps should be None
3830+
ctx = dep_autoscaling_state.get_autoscaling_context(2)
3831+
assert ctx.last_scale_up_time is None
3832+
assert ctx.last_scale_down_time is None
3833+
3834+
# Trigger scale-up by setting high request metrics
3835+
replicas = ds._replicas.get()
3836+
req_per_replica = 5 # High load to trigger scale-up
3837+
3838+
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
3839+
handle_metric_report = HandleMetricReport(
3840+
deployment_id=TEST_DEPLOYMENT_ID,
3841+
handle_id="test_handle",
3842+
actor_id="test_actor",
3843+
handle_source=DeploymentHandleSource.UNKNOWN,
3844+
queued_requests=[TimeStampedValue(timer.time() - 0.1, 0)],
3845+
aggregated_queued_requests=0,
3846+
aggregated_metrics={
3847+
RUNNING_REQUESTS_KEY: {
3848+
replica._actor.replica_id: req_per_replica
3849+
for replica in replicas
3850+
}
3851+
},
3852+
metrics={
3853+
RUNNING_REQUESTS_KEY: {
3854+
replica._actor.replica_id: [
3855+
TimeStampedValue(timer.time() - 0.1, req_per_replica)
3856+
]
3857+
for replica in replicas
3858+
}
3859+
},
3860+
timestamp=timer.time(),
3861+
)
3862+
asm.record_request_metrics_for_handle(handle_metric_report)
3863+
else:
3864+
for replica in replicas:
3865+
replica_metric_report = ReplicaMetricReport(
3866+
replica_id=replica._actor.replica_id,
3867+
aggregated_metrics={RUNNING_REQUESTS_KEY: req_per_replica},
3868+
metrics={
3869+
RUNNING_REQUESTS_KEY: [
3870+
TimeStampedValue(timer.time() - 0.1, req_per_replica)
3871+
]
3872+
},
3873+
timestamp=timer.time(),
3874+
)
3875+
asm.record_request_metrics_for_replica(replica_metric_report)
3876+
3877+
# Record time before scale-up
3878+
time_before_scale_up = timer.time()
3879+
3880+
# Trigger autoscaling decision
3881+
self.scale(dsm, asm, [TEST_DEPLOYMENT_ID])
3882+
3883+
# After scale-up, last_scale_up_time should be set and greater than the time before
3884+
ctx_after_scale_up = dep_autoscaling_state.get_autoscaling_context(5)
3885+
assert ctx_after_scale_up.last_scale_up_time is not None
3886+
assert ctx_after_scale_up.last_scale_up_time >= time_before_scale_up
3887+
assert ctx_after_scale_up.last_scale_down_time is None
3888+
3889+
scale_up_time = ctx_after_scale_up.last_scale_up_time
3890+
3891+
# Advance timer to simulate time passing
3892+
timer.advance(10)
3893+
3894+
# Set replicas ready
3895+
dsm.update()
3896+
for replica in ds._replicas.get():
3897+
replica._actor.set_ready()
3898+
dsm.update()
3899+
3900+
# Now trigger scale-down by setting low request metrics
3901+
replicas = ds._replicas.get()
3902+
req_per_replica = 0 # No load to trigger scale-down
3903+
3904+
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
3905+
handle_metric_report = HandleMetricReport(
3906+
deployment_id=TEST_DEPLOYMENT_ID,
3907+
handle_id="test_handle",
3908+
actor_id="test_actor",
3909+
handle_source=DeploymentHandleSource.UNKNOWN,
3910+
queued_requests=[TimeStampedValue(timer.time() - 0.1, 0)],
3911+
aggregated_queued_requests=0,
3912+
aggregated_metrics={
3913+
RUNNING_REQUESTS_KEY: {
3914+
replica._actor.replica_id: req_per_replica
3915+
for replica in replicas
3916+
}
3917+
},
3918+
metrics={
3919+
RUNNING_REQUESTS_KEY: {
3920+
replica._actor.replica_id: [
3921+
TimeStampedValue(timer.time() - 0.1, req_per_replica)
3922+
]
3923+
for replica in replicas
3924+
}
3925+
},
3926+
timestamp=timer.time(),
3927+
)
3928+
asm.record_request_metrics_for_handle(handle_metric_report)
3929+
else:
3930+
for replica in replicas:
3931+
replica_metric_report = ReplicaMetricReport(
3932+
replica_id=replica._actor.replica_id,
3933+
aggregated_metrics={RUNNING_REQUESTS_KEY: req_per_replica},
3934+
metrics={
3935+
RUNNING_REQUESTS_KEY: [
3936+
TimeStampedValue(timer.time() - 0.1, req_per_replica)
3937+
]
3938+
},
3939+
timestamp=timer.time(),
3940+
)
3941+
asm.record_request_metrics_for_replica(replica_metric_report)
3942+
3943+
# Record time before scale-down
3944+
time_before_scale_down = timer.time()
3945+
3946+
# Trigger autoscaling decision for scale-down
3947+
self.scale(dsm, asm, [TEST_DEPLOYMENT_ID])
3948+
3949+
# After scale-down, last_scale_down_time should be set and greater than the time before
3950+
ctx_after_scale_down = dep_autoscaling_state.get_autoscaling_context(1)
3951+
assert (
3952+
ctx_after_scale_down.last_scale_up_time == scale_up_time
3953+
) # Should remain unchanged
3954+
assert ctx_after_scale_down.last_scale_down_time is not None
3955+
assert ctx_after_scale_down.last_scale_down_time >= time_before_scale_down
3956+
assert ctx_after_scale_down.last_scale_down_time > scale_up_time
3957+
37883958

37893959
class TestTargetCapacity:
37903960
"""

0 commit comments

Comments
 (0)