Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9c98194
add external scaler enabled flag
harshit-anyscale Oct 15, 2025
a685eb7
fix tests
harshit-anyscale Oct 16, 2025
7c1ef8f
review changes
harshit-anyscale Oct 27, 2025
0f6b1b0
merge master
harshit-anyscale Oct 27, 2025
826b69c
merge master
harshit-anyscale Oct 27, 2025
60beb9f
review changes
harshit-anyscale Oct 27, 2025
dee83f1
fix bug
harshit-anyscale Oct 27, 2025
e4d315d
Merge branch 'master' into add-external-scaler-c3
harshit-anyscale Oct 27, 2025
9229b97
review changes
harshit-anyscale Oct 28, 2025
4d30ab8
review changes
harshit-anyscale Oct 28, 2025
0492aed
Merge branch 'master' into add-external-scaler-c3
harshit-anyscale Oct 28, 2025
c92bb27
fix tests
harshit-anyscale Oct 28, 2025
83fd9d1
changes in java side code
harshit-anyscale Oct 29, 2025
cf556db
fix java tests
harshit-anyscale Nov 8, 2025
209a1fd
review changes
harshit-anyscale Nov 8, 2025
c87d182
review changes
harshit-anyscale Nov 8, 2025
f7e2f94
add more tests
harshit-anyscale Nov 11, 2025
2fed452
add more tests
harshit-anyscale Nov 11, 2025
ebeb0d2
lint changes
harshit-anyscale Nov 11, 2025
fda546e
Merge branch 'master' into add-external-scaler-c3
harshit-anyscale Nov 11, 2025
539d486
indentation changes
harshit-anyscale Nov 11, 2025
dbe3c0f
fix tests
harshit-anyscale Nov 12, 2025
8a4b589
Merge branch 'master' into add-external-scaler-c3
harshit-anyscale Nov 12, 2025
80e8cef
remove setting default value
harshit-anyscale Nov 17, 2025
c6c1b4c
Merge branch 'master' into add-external-scaler-c3
harshit-anyscale Nov 17, 2025
49eb96a
review changes
harshit-anyscale Nov 24, 2025
810fbcf
merge master
harshit-anyscale Nov 24, 2025
c0aa403
lint changes
harshit-anyscale Nov 24, 2025
7838218
fix tests
harshit-anyscale Nov 24, 2025
ad52517
review changes
harshit-anyscale Nov 24, 2025
f7ef18f
Merge branch 'master' into add-external-scaler-c3
harshit-anyscale Nov 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions python/ray/dashboard/modules/serve/serve_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ def _create_json_response(self, data, status: int) -> Response:
@validate_endpoint()
async def scale_deployment(self, req: Request) -> Response:
from ray.serve._private.common import DeploymentID
from ray.serve._private.exceptions import DeploymentIsBeingDeletedError
from ray.serve._private.exceptions import (
DeploymentIsBeingDeletedError,
ExternalScalerNotEnabledError,
)
from ray.serve.schema import ScaleDeploymentRequest

# Extract path parameters
Expand Down Expand Up @@ -250,11 +253,11 @@ async def scale_deployment(self, req: Request) -> Response:
200,
)
except Exception as e:
if isinstance(e.cause, DeploymentIsBeingDeletedError):
if isinstance(
e.cause, (ExternalScalerNotEnabledError, DeploymentIsBeingDeletedError)
):
return self._create_json_response(
# From customer's viewpoint, the deployment is deleted instead of being deleted
# as they must have already executed the delete command
{"error": "Deployment is deleted"},
{"error": str(e)},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unhandled Local Exceptions in Ray Task Error Handling

The exception handler attempts to access e.cause without checking if e is a RayTaskError. Only RayTaskError exceptions have a cause attribute. If a local exception occurs before or during the remote call (e.g., during JSON parsing or request validation), accessing e.cause will raise an AttributeError. The code should check if e is an instance of RayTaskError before accessing its .cause attribute, or use getattr(e, 'cause', None) to safely access it.

Fix in Cursor Fix in Web

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it now.

412,
)
if isinstance(e, ValueError) and "not found" in str(e):
Expand Down
28 changes: 15 additions & 13 deletions python/ray/dashboard/modules/serve/tests/test_serve_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
- name: test_app
route_prefix: /
import_path: ray.dashboard.modules.serve.tests.test_serve_dashboard.deployment_app
external_scaler_enabled: True
deployments:
- name: hello_world
num_replicas: 1
Expand Down Expand Up @@ -693,7 +694,7 @@ def _verify_deployment_details(
return True

def test_scale_deployment_endpoint_comprehensive(self, ray_start_stop):
serve.run(DeploymentClass.bind(), name="test_app")
serve.run(DeploymentClass.bind(), name="test_app", external_scaler_enabled=True)

wait_for_condition(
lambda: self._get_deployment_details().status == DeploymentStatus.HEALTHY
Expand All @@ -716,6 +717,7 @@ def test_scale_deployment_during_application_startup(self, ray_start_stop):
DeploymentClassWithBlockingInit.bind(semaphore),
name="test_app",
_blocking=False,
external_scaler_enabled=True,
)

wait_for_condition(
Expand Down Expand Up @@ -749,7 +751,12 @@ def test_scale_deployment_during_application_startup(self, ray_start_stop):
def test_scale_deployment_during_application_upgrade(self, ray_start_stop):
semaphore = Semaphore.remote(value=1)

serve._run(DeploymentClass.bind(), name="test_app", _blocking=False)
serve._run(
DeploymentClass.bind(),
name="test_app",
_blocking=False,
external_scaler_enabled=True,
)

wait_for_condition(
self._verify_deployment_details,
Expand All @@ -763,6 +770,7 @@ def test_scale_deployment_during_application_upgrade(self, ray_start_stop):
DeploymentClassWithBlockingInit.bind(semaphore),
name="test_app",
_blocking=False,
external_scaler_enabled=True,
)

wait_for_condition(
Expand Down Expand Up @@ -818,6 +826,7 @@ def __call__(self):
DeploymentClassWithBlockingDel.bind(signal_actor),
name="test_app",
_blocking=False,
external_scaler_enabled=True,
)

wait_for_condition(
Expand All @@ -837,7 +846,10 @@ def __call__(self):
)

assert response.status_code == 412
assert "Deployment is deleted" in response.json()["error"]
assert (
"is being deleted. Scaling operations are not allowed."
in response.json()["error"]
)

ray.get(signal_actor.send.remote())

Expand Down Expand Up @@ -939,16 +951,6 @@ def test_error_case(self, ray_start_stop):
assert error_response.status_code == 400
assert "not found" in error_response.json()["error"].lower()

error_response = requests.post(
SERVE_HEAD_DEPLOYMENT_SCALE_URL.format(
app_name="test_app", deployment_name="nonexistent"
),
json={"target_num_replicas": 2},
timeout=30,
)
assert error_response.status_code == 400
assert "not found" in error_response.json()["error"].lower()

error_response = requests.post(
SERVE_HEAD_DEPLOYMENT_SCALE_URL.format(
app_name="test_app", deployment_name="hello_world"
Expand Down
54 changes: 52 additions & 2 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ class ApplicationTargetState:
target_capacity_direction: the scale direction to use when
running the Serve autoscaler.
deleting: whether the application is being deleted.
external_scaler_enabled: whether external autoscaling is enabled for
this application.
"""

deployment_infos: Optional[Dict[str, DeploymentInfo]]
Expand All @@ -214,6 +216,7 @@ class ApplicationTargetState:
target_capacity_direction: Optional[TargetCapacityDirection]
deleting: bool
api_type: APIType
external_scaler_enabled: bool = False


class ApplicationState:
Expand All @@ -226,6 +229,7 @@ def __init__(
autoscaling_state_manager: AutoscalingStateManager,
endpoint_state: EndpointState,
logging_config: LoggingConfig,
external_scaler_enabled: bool = False,
):
"""
Initialize an ApplicationState instance.
Expand All @@ -236,6 +240,8 @@ def __init__(
autoscaling_state_manager: Manages autoscaling decisions in the cluster.
endpoint_state: Manages endpoints in the system.
logging_config: Logging configuration schema.
external_scaler_enabled: Whether external autoscaling is enabled for
this application.
"""

self._name = name
Expand All @@ -260,13 +266,18 @@ def __init__(
target_capacity_direction=None,
deleting=False,
api_type=APIType.UNKNOWN,
external_scaler_enabled=external_scaler_enabled,
)
self._logging_config = logging_config

@property
def route_prefix(self) -> Optional[str]:
return self._route_prefix

@property
def external_scaler_enabled(self) -> bool:
return self._target_state.external_scaler_enabled

@property
def docs_path(self) -> Optional[str]:
# get the docs path from the running deployments
Expand Down Expand Up @@ -325,6 +336,7 @@ def recover_target_state_from_checkpoint(
target_capacity=checkpoint_data.target_capacity,
target_capacity_direction=checkpoint_data.target_capacity_direction,
deleting=checkpoint_data.deleting,
external_scaler_enabled=checkpoint_data.external_scaler_enabled,
)

# Restore route prefix and docs path from checkpointed deployments when
Expand All @@ -342,6 +354,7 @@ def _set_target_state(
target_capacity: Optional[float] = None,
target_capacity_direction: Optional[TargetCapacityDirection] = None,
deleting: bool = False,
external_scaler_enabled: bool = False,
):
"""Set application target state.

Expand Down Expand Up @@ -372,6 +385,7 @@ def _set_target_state(
target_capacity_direction,
deleting,
api_type=api_type,
external_scaler_enabled=external_scaler_enabled,
)

self._target_state = target_state
Expand All @@ -387,6 +401,7 @@ def _set_target_state_deleting(self):
code_version=None,
target_config=None,
deleting=True,
external_scaler_enabled=self.external_scaler_enabled,
)

def _clear_target_state_and_store_config(
Expand All @@ -404,6 +419,9 @@ def _clear_target_state_and_store_config(
code_version=None,
target_config=target_config,
deleting=False,
external_scaler_enabled=target_config.external_scaler_enabled
if target_config
else False,
)

def _delete_deployment(self, name: str) -> bool:
Expand Down Expand Up @@ -534,7 +552,11 @@ def apply_deployment_info(

return target_state_changed

def deploy_app(self, deployment_infos: Dict[str, DeploymentInfo]):
def deploy_app(
self,
deployment_infos: Dict[str, DeploymentInfo],
external_scaler_enabled: bool = False,
):
"""(Re-)deploy the application from list of deployment infos.

This function should only be called to deploy an app from an
Expand All @@ -554,6 +576,7 @@ def deploy_app(self, deployment_infos: Dict[str, DeploymentInfo]):
target_config=None,
target_capacity=None,
target_capacity_direction=None,
external_scaler_enabled=external_scaler_enabled,
)

def apply_app_config(
Expand Down Expand Up @@ -593,6 +616,7 @@ def apply_app_config(
target_config=config,
target_capacity=target_capacity,
target_capacity_direction=target_capacity_direction,
external_scaler_enabled=config.external_scaler_enabled,
)
except (TypeError, ValueError, RayServeException):
self._clear_target_state_and_store_config(config)
Expand Down Expand Up @@ -879,6 +903,7 @@ def update(self) -> Tuple[bool, bool]:
target_capacity_direction=(
self._build_app_task_info.target_capacity_direction
),
external_scaler_enabled=self._target_state.external_scaler_enabled,
)
elif task_status == BuildAppStatus.FAILED:
self._update_status(ApplicationStatus.DEPLOY_FAILED, msg)
Expand Down Expand Up @@ -1024,13 +1049,18 @@ def deploy_apps(self, name_to_deployment_args: Dict[str, List[Dict]]) -> None:
# against during this batch operation.
live_route_prefixes[deploy_app_prefix] = name

external_scaler_enabled = deployment_args.get(
"external_scaler_enabled", False
)

if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self._deployment_state_manager,
self._autoscaling_state_manager,
self._endpoint_state,
self._logging_config,
external_scaler_enabled,
)
ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))

Expand All @@ -1040,7 +1070,9 @@ def deploy_apps(self, name_to_deployment_args: Dict[str, List[Dict]]) -> None:
)
for params in deployment_args
}
self._application_states[name].deploy_app(deployment_infos)
self._application_states[name].deploy_app(
deployment_infos, external_scaler_enabled
)

def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
"""Deploy the specified app to the list of deployment arguments.
Expand Down Expand Up @@ -1082,6 +1114,7 @@ def apply_app_configs(
self._autoscaling_state_manager,
endpoint_state=self._endpoint_state,
logging_config=self._logging_config,
external_scaler_enabled=app_config.external_scaler_enabled,
)

self._application_states[app_config.name].apply_app_config(
Expand Down Expand Up @@ -1122,6 +1155,9 @@ def get_app_status(self, name: str) -> ApplicationStatus:

return self._application_states[name].status

def does_app_exist(self, name: str) -> bool:
return name in self._application_states

def get_app_status_info(self, name: str) -> ApplicationStatusInfo:
if name not in self._application_states:
return ApplicationStatusInfo(
Expand All @@ -1146,6 +1182,20 @@ def get_ingress_deployment_name(self, name: str) -> Optional[str]:
def get_app_source(self, name: str) -> APIType:
return self._application_states[name].api_type

def is_external_scaler_enabled(self, app_name: str) -> bool:
"""Check if external scaler is enabled for the application.

Args:
app_name: Name of the application.

Returns:
True if external_scaler_enabled is set for the application, False otherwise.
"""
if app_name not in self._application_states:
return False

return self._application_states[app_name].external_scaler_enabled

def list_app_statuses(
self, source: Optional[APIType] = None
) -> Dict[str, ApplicationStatusInfo]:
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serve/_private/build_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class BuiltApplication:
# Dict[name, DeploymentHandle] mapping deployment names to the handles that replaced
# them in other deployments' init args/kwargs.
deployment_handles: Dict[str, DeploymentHandle]
external_scaler_enabled: bool


def _make_deployment_handle_default(
Expand All @@ -78,6 +79,7 @@ def build_app(
make_deployment_handle: Optional[
Callable[[Deployment, str], DeploymentHandle]
] = None,
external_scaler_enabled: bool = False,
) -> BuiltApplication:
"""Builds the application into a list of finalized deployments.

Expand Down Expand Up @@ -111,6 +113,7 @@ def build_app(
deployment_handles={
deployment_names[app]: handle for app, handle in handles.items()
},
external_scaler_enabled=external_scaler_enabled,
)


Expand Down
4 changes: 4 additions & 0 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ def deploy_applications(
deployment_config=deployment._deployment_config,
version=deployment._version or get_random_string(),
route_prefix=app.route_prefix if is_ingress else None,
external_scaler_enabled=app.external_scaler_enabled,
)

deployment_args_proto = DeploymentArgs()
Expand All @@ -329,6 +330,9 @@ def deploy_applications(
if deployment_args["route_prefix"]:
deployment_args_proto.route_prefix = deployment_args["route_prefix"]
deployment_args_proto.ingress = deployment_args["ingress"]
deployment_args_proto.external_scaler_enabled = deployment_args[
"external_scaler_enabled"
]

deployment_args_list.append(deployment_args_proto.SerializeToString())

Expand Down
22 changes: 22 additions & 0 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.deployment_state import DeploymentStateManager
from ray.serve._private.endpoint_state import EndpointState
from ray.serve._private.exceptions import ExternalScalerNotEnabledError
from ray.serve._private.grpc_util import set_proxy_default_grpc_options
from ray.serve._private.http_util import (
configure_http_options_with_defaults,
Expand Down Expand Up @@ -794,6 +795,7 @@ def deploy_applications(
"route_prefix": (
args.route_prefix if args.HasField("route_prefix") else None
),
"external_scaler_enabled": args.external_scaler_enabled,
}
)
name_to_deployment_args[name] = deployment_args_deserialized
Expand Down Expand Up @@ -945,7 +947,27 @@ def update_deployment_replicas(
Args:
deployment_id: The deployment to update.
target_num_replicas: The new target number of replicas.

Raises:
ExternalScalerNotEnabledError: If external_scaler_enabled is not set to True
for the application.
"""

# Check if external scaler is enabled for this application
app_name = deployment_id.app_name
if not self.application_state_manager.does_app_exist(app_name):
raise ValueError(f"Application '{app_name}' not found")

if not self.application_state_manager.is_external_scaler_enabled(app_name):
raise ExternalScalerNotEnabledError(
f"Cannot update replicas for deployment '{deployment_id.name}' in "
f"application '{app_name}'. The external scaling API can only be used "
f"when 'external_scaler_enabled' is set to true in the application "
f"configuration. Current value: external_scaler_enabled=false. "
f"To use this API, redeploy your application with "
f"'external_scaler_enabled: true' in the config."
)

self.deployment_state_manager.set_target_num_replicas(
deployment_id, target_num_replicas
)
Expand Down
Loading