diff --git a/src/sagemaker/jumpstart/model.py b/src/sagemaker/jumpstart/model.py index 619af2f7a9..6f263d9a7e 100644 --- a/src/sagemaker/jumpstart/model.py +++ b/src/sagemaker/jumpstart/model.py @@ -14,7 +14,6 @@ from __future__ import absolute_import -from functools import lru_cache from typing import Dict, List, Optional, Any, Union import pandas as pd from botocore.exceptions import ClientError @@ -48,6 +47,8 @@ get_jumpstart_configs, get_metrics_from_deployment_configs, add_instance_rate_stats_to_benchmark_metrics, + deployment_config_response_data, + _deployment_config_lru_cache, ) from sagemaker.jumpstart.constants import JUMPSTART_LOGGER from sagemaker.jumpstart.enums import JumpStartModelType @@ -449,10 +450,12 @@ def deployment_config(self) -> Optional[Dict[str, Any]]: Returns: Optional[Dict[str, Any]]: Deployment config. """ - deployment_config = self._retrieve_selected_deployment_config( - self.config_name, self.instance_type - ) - return deployment_config.to_json() if deployment_config is not None else None + if self.config_name is None: + return None + for config in self.list_deployment_configs(): + if config.get("DeploymentConfigName") == self.config_name: + return config + return None @property def benchmark_metrics(self) -> pd.DataFrame: @@ -461,16 +464,14 @@ def benchmark_metrics(self) -> pd.DataFrame: Returns: Benchmark Metrics: Pandas DataFrame object. """ - benchmark_metrics_data = self._get_deployment_configs_benchmarks_data( - self.config_name, self.instance_type - ) - keys = list(benchmark_metrics_data.keys()) - df = pd.DataFrame(benchmark_metrics_data).sort_values(by=[keys[0], keys[1]]) - return df + df = pd.DataFrame(self._get_deployment_configs_benchmarks_data()) + default_mask = df.apply(lambda row: any("Default" in str(val) for val in row), axis=1) + sorted_df = pd.concat([df[default_mask], df[~default_mask]]) + return sorted_df - def display_benchmark_metrics(self) -> None: + def display_benchmark_metrics(self, *args, **kwargs) -> None: """Display deployment configs benchmark metrics.""" - print(self.benchmark_metrics.to_markdown(index=False)) + print(self.benchmark_metrics.to_markdown(index=False), *args, **kwargs) def list_deployment_configs(self) -> List[Dict[str, Any]]: """List deployment configs for ``This`` model. @@ -478,12 +479,9 @@ def list_deployment_configs(self) -> List[Dict[str, Any]]: Returns: List[Dict[str, Any]]: A list of deployment configs. """ - return [ - deployment_config.to_json() - for deployment_config in self._get_deployment_configs( - self.config_name, self.instance_type - ) - ] + return deployment_config_response_data( + self._get_deployment_configs(self.config_name, self.instance_type) + ) def _create_sagemaker_model( self, @@ -873,71 +871,46 @@ def register_deploy_wrapper(*args, **kwargs): return model_package - @lru_cache - def _get_deployment_configs_benchmarks_data( - self, config_name: str, instance_type: str - ) -> Dict[str, Any]: + @_deployment_config_lru_cache + def _get_deployment_configs_benchmarks_data(self) -> Dict[str, Any]: """Deployment configs benchmark metrics. - Args: - config_name (str): Name of selected deployment config. - instance_type (str): The selected Instance type. Returns: Dict[str, List[str]]: Deployment config benchmark data. """ return get_metrics_from_deployment_configs( - self._get_deployment_configs(config_name, instance_type) + self._get_deployment_configs(None, None), ) - @lru_cache - def _retrieve_selected_deployment_config( - self, config_name: str, instance_type: str - ) -> Optional[DeploymentConfigMetadata]: - """Retrieve the deployment config to apply to `This` model. - - Args: - config_name (str): The name of the deployment config to retrieve. - instance_type (str): The instance type of the deployment config to retrieve. - Returns: - Optional[Dict[str, Any]]: The retrieved deployment config. - """ - if config_name is None: - return None - - for deployment_config in self._get_deployment_configs(config_name, instance_type): - if deployment_config.deployment_config_name == config_name: - return deployment_config - return None - - @lru_cache + @_deployment_config_lru_cache def _get_deployment_configs( - self, selected_config_name: str, selected_instance_type: str + self, selected_config_name: Optional[str], selected_instance_type: Optional[str] ) -> List[DeploymentConfigMetadata]: """Retrieve deployment configs metadata. Args: - selected_config_name (str): The name of the selected deployment config. - selected_instance_type (str): The selected instance type. + selected_config_name (Optional[str]): The name of the selected deployment config. + selected_instance_type (Optional[str]): The selected instance type. """ deployment_configs = [] - if self._metadata_configs is None: + if not self._metadata_configs: return deployment_configs err = None for config_name, metadata_config in self._metadata_configs.items(): - if err is None or "is not authorized to perform: pricing:GetProducts" not in err: - err, metadata_config.benchmark_metrics = ( - add_instance_rate_stats_to_benchmark_metrics( - self.region, metadata_config.benchmark_metrics - ) - ) - resolved_config = metadata_config.resolved_config if selected_config_name == config_name: instance_type_to_use = selected_instance_type else: instance_type_to_use = resolved_config.get("default_inference_instance_type") + if metadata_config.benchmark_metrics: + err, metadata_config.benchmark_metrics = ( + add_instance_rate_stats_to_benchmark_metrics( + self.region, metadata_config.benchmark_metrics + ) + ) + init_kwargs = get_init_kwargs( model_id=self.model_id, instance_type=instance_type_to_use, @@ -957,9 +930,9 @@ def _get_deployment_configs( ) deployment_configs.append(deployment_config_metadata) - if err is not None and "is not authorized to perform: pricing:GetProducts" in err: + if err and err["Code"] == "AccessDeniedException": error_message = "Instance rate metrics will be omitted. Reason: %s" - JUMPSTART_LOGGER.warning(error_message, err) + JUMPSTART_LOGGER.warning(error_message, err["Message"]) return deployment_configs diff --git a/src/sagemaker/jumpstart/types.py b/src/sagemaker/jumpstart/types.py index 0a586f60aa..f85f23c361 100644 --- a/src/sagemaker/jumpstart/types.py +++ b/src/sagemaker/jumpstart/types.py @@ -2255,6 +2255,8 @@ def _val_to_json(self, val: Any) -> Any: Any: The converted json value. """ if issubclass(type(val), JumpStartDataHolderType): + if isinstance(val, JumpStartBenchmarkStat): + val.name = val.name.replace("_", " ").title() return val.to_json() if isinstance(val, list): list_obj = [] diff --git a/src/sagemaker/jumpstart/utils.py b/src/sagemaker/jumpstart/utils.py index d2a0a396b5..44be0ea813 100644 --- a/src/sagemaker/jumpstart/utils.py +++ b/src/sagemaker/jumpstart/utils.py @@ -12,8 +12,10 @@ # language governing permissions and limitations under the License. """This module contains utilities related to SageMaker JumpStart.""" from __future__ import absolute_import + import logging import os +from functools import lru_cache, wraps from typing import Any, Dict, List, Set, Optional, Tuple, Union from urllib.parse import urlparse import boto3 @@ -1040,7 +1042,9 @@ def get_jumpstart_configs( raise ValueError(f"Unknown script scope: {scope}.") if not config_names: - config_names = metadata_configs.configs.keys() if metadata_configs else [] + config_names = ( + metadata_configs.config_rankings.get("overall").rankings if metadata_configs else [] + ) return ( {config_name: metadata_configs.configs[config_name] for config_name in config_names} @@ -1052,43 +1056,42 @@ def get_jumpstart_configs( def add_instance_rate_stats_to_benchmark_metrics( region: str, benchmark_metrics: Optional[Dict[str, List[JumpStartBenchmarkStat]]], -) -> Optional[Tuple[str, Dict[str, List[JumpStartBenchmarkStat]]]]: +) -> Optional[Tuple[Dict[str, str], Dict[str, List[JumpStartBenchmarkStat]]]]: """Adds instance types metric stats to the given benchmark_metrics dict. Args: region (str): AWS region. - benchmark_metrics (Dict[str, List[JumpStartBenchmarkStat]]): + benchmark_metrics (Optional[Dict[str, List[JumpStartBenchmarkStat]]]): Returns: - Tuple[str, Dict[str, List[JumpStartBenchmarkStat]]]: - Contains Error message and metrics dict. + Optional[Tuple[Dict[str, str], Dict[str, List[JumpStartBenchmarkStat]]]]: + Contains Error and metrics. """ - - if benchmark_metrics is None: + if not benchmark_metrics: return None - final_benchmark_metrics = {} - err_message = None + final_benchmark_metrics = {} for instance_type, benchmark_metric_stats in benchmark_metrics.items(): instance_type = instance_type if instance_type.startswith("ml.") else f"ml.{instance_type}" - if not has_instance_rate_stat(benchmark_metric_stats): + if not has_instance_rate_stat(benchmark_metric_stats) and not err_message: try: instance_type_rate = get_instance_rate_per_hour( instance_type=instance_type, region=region ) + if not benchmark_metric_stats: + benchmark_metric_stats = [] benchmark_metric_stats.append(JumpStartBenchmarkStat(instance_type_rate)) - final_benchmark_metrics[instance_type] = benchmark_metric_stats + final_benchmark_metrics[instance_type] = benchmark_metric_stats except ClientError as e: final_benchmark_metrics[instance_type] = benchmark_metric_stats - err_message = e.response["Error"]["Message"] + err_message = e.response["Error"] except Exception: # pylint: disable=W0703 final_benchmark_metrics[instance_type] = benchmark_metric_stats - err_message = ( - f"Unable to get instance rate per hour for instance type: {instance_type}." - ) + else: + final_benchmark_metrics[instance_type] = benchmark_metric_stats return err_message, final_benchmark_metrics @@ -1103,31 +1106,32 @@ def has_instance_rate_stat(benchmark_metric_stats: Optional[List[JumpStartBenchm bool: Whether the benchmark metric stats contains instance rate metric stat. """ if benchmark_metric_stats is None: - return False - + return True for benchmark_metric_stat in benchmark_metric_stats: if benchmark_metric_stat.name.lower() == "instance rate": return True - return False def get_metrics_from_deployment_configs( - deployment_configs: List[DeploymentConfigMetadata], + deployment_configs: Optional[List[DeploymentConfigMetadata]], ) -> Dict[str, List[str]]: """Extracts benchmark metrics from deployment configs metadata. Args: - deployment_configs (List[DeploymentConfigMetadata]): List of deployment configs metadata. + deployment_configs (Optional[List[DeploymentConfigMetadata]]): + List of deployment configs metadata. + Returns: + Dict[str, List[str]]: Deployment configs bench metrics dict. """ - data = {"Config Name": [], "Instance Type": []} - - for outer_index, deployment_config in enumerate(deployment_configs): - if deployment_config.deployment_args is None: - continue + if not deployment_configs: + return {} + data = {"Instance Type": [], "Config Name": []} + instance_rate_data = {} + for index, deployment_config in enumerate(deployment_configs): benchmark_metrics = deployment_config.benchmark_metrics - if benchmark_metrics is None: + if not deployment_config.deployment_args or not benchmark_metrics: continue for inner_index, current_instance_type in enumerate(benchmark_metrics): @@ -1136,23 +1140,108 @@ def get_metrics_from_deployment_configs( data["Config Name"].append(deployment_config.deployment_config_name) instance_type_to_display = ( f"{current_instance_type} (Default)" - if current_instance_type == deployment_config.deployment_args.default_instance_type + if index == 0 + and current_instance_type == deployment_config.deployment_args.default_instance_type else current_instance_type ) data["Instance Type"].append(instance_type_to_display) - if outer_index == 0 and inner_index == 0: - temp_data = {} - for metric in current_instance_type_metrics: - column_name = f"{metric.name.replace('_', ' ').title()} ({metric.unit})" - if metric.name.lower() == "instance rate": - data[column_name] = [] - else: - temp_data[column_name] = [] - data = {**data, **temp_data} - for metric in current_instance_type_metrics: - column_name = f"{metric.name.replace('_', ' ').title()} ({metric.unit})" - if column_name in data: + column_name = f"{metric.name} ({metric.unit})" + + if metric.name.lower() == "instance rate": + if column_name not in instance_rate_data: + instance_rate_data[column_name] = [] + instance_rate_data[column_name].append(metric.value) + else: + if column_name not in data: + data[column_name] = [] + for _ in range(len(data[column_name]), inner_index): + data[column_name].append(" - ") data[column_name].append(metric.value) + + data = {**data, **instance_rate_data} return data + + +def deployment_config_response_data( + deployment_configs: Optional[List[DeploymentConfigMetadata]], +) -> List[Dict[str, Any]]: + """Deployment config api response data. + + Args: + deployment_configs (Optional[List[DeploymentConfigMetadata]]): + List of deployment configs metadata. + Returns: + List[Dict[str, Any]]: List of deployment config api response data. + """ + configs = [] + if not deployment_configs: + return configs + + for deployment_config in deployment_configs: + deployment_config_json = deployment_config.to_json() + benchmark_metrics = deployment_config_json.get("BenchmarkMetrics") + if benchmark_metrics and deployment_config.deployment_args: + deployment_config_json["BenchmarkMetrics"] = { + deployment_config.deployment_args.instance_type: benchmark_metrics.get( + deployment_config.deployment_args.instance_type + ) + } + + configs.append(deployment_config_json) + return configs + + +def _deployment_config_lru_cache(_func=None, *, maxsize: int = 128, typed: bool = False): + """LRU cache for deployment configs.""" + + def has_instance_rate_metric(config: DeploymentConfigMetadata) -> bool: + """Determines whether metadata config contains instance rate metric stat. + + Args: + config (DeploymentConfigMetadata): Metadata config metadata. + Returns: + bool: Whether the metadata config contains instance rate metric stat. + """ + if config.benchmark_metrics is None: + return True + for benchmark_metric_stats in config.benchmark_metrics.values(): + if not has_instance_rate_stat(benchmark_metric_stats): + return False + return True + + def wrapper_cache(f): + f = lru_cache(maxsize=maxsize, typed=typed)(f) + + @wraps(f) + def wrapped_f(*args, **kwargs): + res = f(*args, **kwargs) + + # Clear cache on first call if + # - The output does not contain Instant rate metrics + # as this is caused by missing policy. + if f.cache_info().hits == 0 and f.cache_info().misses == 1: + if isinstance(res, list): + for item in res: + if isinstance( + item, DeploymentConfigMetadata + ) and not has_instance_rate_metric(item): + f.cache_clear() + break + elif isinstance(res, dict): + keys = list(res.keys()) + if "Instance Rate" not in keys[-1]: + f.cache_clear() + elif len(res[keys[1]]) > len(res[keys[-1]]): + del res[keys[-1]] + f.cache_clear() + return res + + wrapped_f.cache_info = f.cache_info + wrapped_f.cache_clear = f.cache_clear + return wrapped_f + + if _func is None: + return wrapper_cache + return wrapper_cache(_func) diff --git a/src/sagemaker/serve/builder/jumpstart_builder.py b/src/sagemaker/serve/builder/jumpstart_builder.py index ec987dd9fe..f6a4d165df 100644 --- a/src/sagemaker/serve/builder/jumpstart_builder.py +++ b/src/sagemaker/serve/builder/jumpstart_builder.py @@ -454,14 +454,14 @@ def get_deployment_config(self) -> Optional[Dict[str, Any]]: Optional[Dict[str, Any]]: Deployment config to apply to this model. """ if not hasattr(self, "pysdk_model") or self.pysdk_model is None: - self.pysdk_model = self._create_pre_trained_js_model() + self._build_for_jumpstart() return self.pysdk_model.deployment_config def display_benchmark_metrics(self): """Display Markdown Benchmark Metrics for deployment configs.""" if not hasattr(self, "pysdk_model") or self.pysdk_model is None: - self.pysdk_model = self._create_pre_trained_js_model() + self._build_for_jumpstart() self.pysdk_model.display_benchmark_metrics() @@ -472,18 +472,20 @@ def list_deployment_configs(self) -> List[Dict[str, Any]]: List[Dict[str, Any]]: A list of deployment configs. """ if not hasattr(self, "pysdk_model") or self.pysdk_model is None: - self.pysdk_model = self._create_pre_trained_js_model() + self._build_for_jumpstart() return self.pysdk_model.list_deployment_configs() def _build_for_jumpstart(self): """Placeholder docstring""" + if hasattr(self, "pysdk_model") and self.pysdk_model is not None: + return self.pysdk_model + # we do not pickle for jumpstart. set to none self.secret_key = None self.jumpstart = True - if not hasattr(self, "pysdk_model") or self.pysdk_model is None: - self.pysdk_model = self._create_pre_trained_js_model() + self.pysdk_model = self._create_pre_trained_js_model() logger.info( "JumpStart ID %s is packaged with Image URI: %s", self.model, self.pysdk_model.image_uri diff --git a/tests/unit/sagemaker/jumpstart/model/test_model.py b/tests/unit/sagemaker/jumpstart/model/test_model.py index f32687bd99..75b3fd7300 100644 --- a/tests/unit/sagemaker/jumpstart/model/test_model.py +++ b/tests/unit/sagemaker/jumpstart/model/test_model.py @@ -1733,7 +1733,7 @@ def test_model_list_deployment_configs( mock_get_init_kwargs.side_effect = lambda *args, **kwargs: get_mock_init_kwargs(model_id) mock_verify_model_region_and_return_specs.side_effect = ( - lambda *args, **kwargs: get_base_spec_with_prototype_configs() + lambda *args, **kwargs: get_base_spec_with_prototype_configs_with_missing_benchmarks() ) mock_add_instance_rate_stats_to_benchmark_metrics.side_effect = lambda region, metrics: ( None, @@ -1750,7 +1750,7 @@ def test_model_list_deployment_configs( configs = model.list_deployment_configs() - self.assertEqual(configs, get_base_deployment_configs()) + self.assertEqual(configs, get_base_deployment_configs(True)) @mock.patch("sagemaker.jumpstart.utils.verify_model_region_and_return_specs") @mock.patch("sagemaker.jumpstart.accessors.JumpStartModelsAccessor._get_manifest") @@ -1803,7 +1803,7 @@ def test_model_retrieve_deployment_config( model_id, _ = "pytorch-eqa-bert-base-cased", "*" mock_verify_model_region_and_return_specs.side_effect = ( - lambda *args, **kwargs: get_base_spec_with_prototype_configs_with_missing_benchmarks() + lambda *args, **kwargs: get_base_spec_with_prototype_configs() ) mock_add_instance_rate_stats_to_benchmark_metrics.side_effect = lambda region, metrics: ( None, @@ -1815,7 +1815,7 @@ def test_model_retrieve_deployment_config( ) mock_model_deploy.return_value = default_predictor - expected = get_base_deployment_configs(True)[0] + expected = get_base_deployment_configs()[0] config_name = expected.get("DeploymentConfigName") instance_type = expected.get("InstanceType") mock_get_init_kwargs.side_effect = lambda *args, **kwargs: get_mock_init_kwargs( diff --git a/tests/unit/sagemaker/jumpstart/test_utils.py b/tests/unit/sagemaker/jumpstart/test_utils.py index 5b30a94dd6..e6ea212994 100644 --- a/tests/unit/sagemaker/jumpstart/test_utils.py +++ b/tests/unit/sagemaker/jumpstart/test_utils.py @@ -52,6 +52,7 @@ get_special_model_spec, get_prototype_manifest, get_base_deployment_configs_metadata, + get_base_deployment_configs, ) from mock import MagicMock @@ -1869,29 +1870,15 @@ def test_add_instance_rate_stats_to_benchmark_metrics_client_ex( mock_get_instance_rate_per_hour, ): mock_get_instance_rate_per_hour.side_effect = ClientError( - {"Error": {"Message": "is not authorized to perform: pricing:GetProducts"}}, "GetProducts" - ) - - err, out = utils.add_instance_rate_stats_to_benchmark_metrics( - "us-west-2", { - "ml.p2.xlarge": [ - JumpStartBenchmarkStat({"name": "Latency", "value": "100", "unit": "Tokens/S"}) - ], + "Error": { + "Message": "is not authorized to perform: pricing:GetProducts", + "Code": "AccessDenied", + }, }, + "GetProducts", ) - assert err == "is not authorized to perform: pricing:GetProducts" - for key in out: - assert len(out[key]) == 1 - - -@patch("sagemaker.jumpstart.utils.get_instance_rate_per_hour") -def test_add_instance_rate_stats_to_benchmark_metrics_ex( - mock_get_instance_rate_per_hour, -): - mock_get_instance_rate_per_hour.side_effect = Exception() - err, out = utils.add_instance_rate_stats_to_benchmark_metrics( "us-west-2", { @@ -1901,7 +1888,8 @@ def test_add_instance_rate_stats_to_benchmark_metrics_ex( }, ) - assert err == "Unable to get instance rate per hour for instance type: ml.p2.xlarge." + assert err["Message"] == "is not authorized to perform: pricing:GetProducts" + assert err["Code"] == "AccessDenied" for key in out: assert len(out[key]) == 1 @@ -1909,7 +1897,7 @@ def test_add_instance_rate_stats_to_benchmark_metrics_ex( @pytest.mark.parametrize( "stats, expected", [ - (None, False), + (None, True), ( [JumpStartBenchmarkStat({"name": "Instance Rate", "unit": "USD/Hrs", "value": "3.76"})], True, @@ -1919,3 +1907,14 @@ def test_add_instance_rate_stats_to_benchmark_metrics_ex( ) def test_has_instance_rate_stat(stats, expected): assert utils.has_instance_rate_stat(stats) is expected + + +@pytest.mark.parametrize( + "data, expected", + [(None, []), ([], []), (get_base_deployment_configs_metadata(), get_base_deployment_configs())], +) +def test_deployment_config_response_data(data, expected): + out = utils.deployment_config_response_data(data) + + print(out) + assert out == expected diff --git a/tests/unit/sagemaker/jumpstart/utils.py b/tests/unit/sagemaker/jumpstart/utils.py index e8a93dff6c..63b964e16e 100644 --- a/tests/unit/sagemaker/jumpstart/utils.py +++ b/tests/unit/sagemaker/jumpstart/utils.py @@ -358,7 +358,8 @@ def get_base_deployment_configs_metadata( else get_base_spec_with_prototype_configs() ) configs = [] - for config_name, jumpstart_config in specs.inference_configs.configs.items(): + for config_name in specs.inference_configs.config_rankings.get("overall").rankings: + jumpstart_config = specs.inference_configs.configs.get(config_name) benchmark_metrics = jumpstart_config.benchmark_metrics if benchmark_metrics: @@ -388,9 +389,17 @@ def get_base_deployment_configs_metadata( def get_base_deployment_configs( omit_benchmark_metrics: bool = False, ) -> List[Dict[str, Any]]: - return [ - config.to_json() for config in get_base_deployment_configs_metadata(omit_benchmark_metrics) - ] + configs = [] + for config in get_base_deployment_configs_metadata(omit_benchmark_metrics): + config_json = config.to_json() + if config_json["BenchmarkMetrics"]: + config_json["BenchmarkMetrics"] = { + config.deployment_args.instance_type: config_json["BenchmarkMetrics"].get( + config.deployment_args.instance_type + ) + } + configs.append(config_json) + return configs def append_instance_stat_metrics( diff --git a/tests/unit/sagemaker/serve/builder/test_js_builder.py b/tests/unit/sagemaker/serve/builder/test_js_builder.py index 56b01cd9e3..4ec96e88e3 100644 --- a/tests/unit/sagemaker/serve/builder/test_js_builder.py +++ b/tests/unit/sagemaker/serve/builder/test_js_builder.py @@ -866,6 +866,12 @@ def test_display_benchmark_metrics_initial( model="facebook/galactica-mock-model-id", schema_builder=mock_schema_builder, ) + + mock_pre_trained_model.return_value.image_uri = mock_tgi_image_uri + mock_pre_trained_model.return_value.list_deployment_configs.side_effect = ( + lambda: DEPLOYMENT_CONFIGS + ) + builder.display_benchmark_metrics() mock_pre_trained_model.return_value.display_benchmark_metrics.assert_called_once()