From 01fab74e8e632daf171c88e5d780cb0bd271d07f Mon Sep 17 00:00:00 2001 From: Shawn Yang Date: Fri, 27 Jun 2025 10:55:10 -0700 Subject: [PATCH] feat: add logging for agent engine creation PiperOrigin-RevId: 776644637 --- .../unit/vertexai/genai/test_agent_engines.py | 93 +++++++++++++------ vertexai/_genai/agent_engines.py | 23 ++++- vertexai/agent_engines/_agent_engines.py | 49 +++++++--- vertexai/agent_engines/_utils.py | 9 +- 4 files changed, 128 insertions(+), 46 deletions(-) diff --git a/tests/unit/vertexai/genai/test_agent_engines.py b/tests/unit/vertexai/genai/test_agent_engines.py index c99747bfe4..dbacb2de57 100644 --- a/tests/unit/vertexai/genai/test_agent_engines.py +++ b/tests/unit/vertexai/genai/test_agent_engines.py @@ -521,16 +521,16 @@ def register_operations(self) -> Dict[str, List[str]]: OperationRegistrableEngine().custom_method, schema_name=_TEST_CUSTOM_METHOD_NAME, ) -_TEST_AGENT_ENGINE_CUSTOM_METHOD_SCHEMA[ - _TEST_MODE_KEY_IN_SCHEMA -] = _TEST_STANDARD_API_MODE +_TEST_AGENT_ENGINE_CUSTOM_METHOD_SCHEMA[_TEST_MODE_KEY_IN_SCHEMA] = ( + _TEST_STANDARD_API_MODE +) _TEST_AGENT_ENGINE_ASYNC_CUSTOM_METHOD_SCHEMA = _utils.generate_schema( OperationRegistrableEngine().custom_async_method, schema_name=_TEST_CUSTOM_ASYNC_METHOD_NAME, ) -_TEST_AGENT_ENGINE_ASYNC_CUSTOM_METHOD_SCHEMA[ - _TEST_MODE_KEY_IN_SCHEMA -] = _TEST_ASYNC_API_MODE +_TEST_AGENT_ENGINE_ASYNC_CUSTOM_METHOD_SCHEMA[_TEST_MODE_KEY_IN_SCHEMA] = ( + _TEST_ASYNC_API_MODE +) _TEST_AGENT_ENGINE_STREAM_QUERY_SCHEMA = _utils.generate_schema( StreamQueryEngine().stream_query, schema_name=_TEST_DEFAULT_STREAM_METHOD_NAME, @@ -540,23 +540,23 @@ def register_operations(self) -> Dict[str, List[str]]: OperationRegistrableEngine().custom_stream_method, schema_name=_TEST_CUSTOM_STREAM_METHOD_NAME, ) -_TEST_AGENT_ENGINE_CUSTOM_STREAM_QUERY_SCHEMA[ - _TEST_MODE_KEY_IN_SCHEMA -] = _TEST_STREAM_API_MODE +_TEST_AGENT_ENGINE_CUSTOM_STREAM_QUERY_SCHEMA[_TEST_MODE_KEY_IN_SCHEMA] = ( + _TEST_STREAM_API_MODE +) _TEST_AGENT_ENGINE_ASYNC_STREAM_QUERY_SCHEMA = _utils.generate_schema( AsyncStreamQueryEngine().async_stream_query, schema_name=_TEST_DEFAULT_ASYNC_STREAM_METHOD_NAME, ) -_TEST_AGENT_ENGINE_ASYNC_STREAM_QUERY_SCHEMA[ - _TEST_MODE_KEY_IN_SCHEMA -] = _TEST_ASYNC_STREAM_API_MODE +_TEST_AGENT_ENGINE_ASYNC_STREAM_QUERY_SCHEMA[_TEST_MODE_KEY_IN_SCHEMA] = ( + _TEST_ASYNC_STREAM_API_MODE +) _TEST_AGENT_ENGINE_CUSTOM_ASYNC_STREAM_QUERY_SCHEMA = _utils.generate_schema( OperationRegistrableEngine().custom_async_stream_method, schema_name=_TEST_CUSTOM_ASYNC_STREAM_METHOD_NAME, ) -_TEST_AGENT_ENGINE_CUSTOM_ASYNC_STREAM_QUERY_SCHEMA[ - _TEST_MODE_KEY_IN_SCHEMA -] = _TEST_ASYNC_STREAM_API_MODE +_TEST_AGENT_ENGINE_CUSTOM_ASYNC_STREAM_QUERY_SCHEMA[_TEST_MODE_KEY_IN_SCHEMA] = ( + _TEST_ASYNC_STREAM_API_MODE +) _TEST_OPERATION_REGISTRABLE_SCHEMAS = [ _TEST_AGENT_ENGINE_QUERY_SCHEMA, _TEST_AGENT_ENGINE_CUSTOM_METHOD_SCHEMA, @@ -581,9 +581,9 @@ def register_operations(self) -> Dict[str, List[str]]: MethodToBeUnregisteredEngine().method_to_be_unregistered, schema_name=_TEST_METHOD_TO_BE_UNREGISTERED_NAME, ) -_TEST_METHOD_TO_BE_UNREGISTERED_SCHEMA[ - _TEST_MODE_KEY_IN_SCHEMA -] = _TEST_STANDARD_API_MODE +_TEST_METHOD_TO_BE_UNREGISTERED_SCHEMA[_TEST_MODE_KEY_IN_SCHEMA] = ( + _TEST_STANDARD_API_MODE +) _TEST_ASYNC_QUERY_SCHEMAS = [_TEST_AGENT_ENGINE_ASYNC_METHOD_SCHEMA] _TEST_STREAM_QUERY_SCHEMAS = [ _TEST_AGENT_ENGINE_STREAM_QUERY_SCHEMA, @@ -963,10 +963,13 @@ def test_list_agent_engine(self): None, ) + @pytest.mark.usefixtures("caplog") @mock.patch.object(_agent_engines, "_prepare") @mock.patch.object(agent_engines.AgentEngines, "_await_operation") - def test_create_agent_engine(self, mock_await_operation, mock_prepare): - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + def test_create_agent_engine(self, mock_await_operation, mock_prepare, caplog): + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: @@ -1004,19 +1007,29 @@ def test_create_agent_engine(self, mock_await_operation, mock_prepare): }, None, ) + assert "View progress and logs at" in caplog.text + assert "Agent Engine created. To use it in another session:" in caplog.text + assert ( + f"agent_engine = client.agent_engines.get(" + f"'{_TEST_AGENT_ENGINE_RESOURCE_NAME}')" in caplog.text + ) + @pytest.mark.usefixtures("caplog") @mock.patch.object(agent_engines.AgentEngines, "_create_config") @mock.patch.object(agent_engines.AgentEngines, "_await_operation") def test_create_agent_engine_lightweight( self, mock_await_operation, mock_create_config, + caplog, ): mock_create_config.return_value = _genai_types.CreateAgentEngineConfig( display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME, description=_TEST_AGENT_ENGINE_DESCRIPTION, ) - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: @@ -1040,13 +1053,21 @@ def test_create_agent_engine_lightweight( }, None, ) + assert "View progress and logs at" in caplog.text + assert "Agent Engine created. To use it in another session:" in caplog.text + assert ( + f"agent_engine = client.agent_engines.get(" + f"'{_TEST_AGENT_ENGINE_RESOURCE_NAME}')" in caplog.text + ) + @pytest.mark.usefixtures("caplog") @mock.patch.object(agent_engines.AgentEngines, "_create_config") @mock.patch.object(agent_engines.AgentEngines, "_await_operation") def test_create_agent_engine_with_env_vars_dict( self, mock_await_operation, mock_create_config, + caplog, ): mock_create_config.return_value = { "display_name": _TEST_AGENT_ENGINE_DISPLAY_NAME, @@ -1061,7 +1082,9 @@ def test_create_agent_engine_with_env_vars_dict( "agent_framework": _TEST_AGENT_ENGINE_FRAMEWORK, }, } - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: @@ -1105,11 +1128,19 @@ def test_create_agent_engine_with_env_vars_dict( }, None, ) + assert "View progress and logs at" in caplog.text + assert "Agent Engine created. To use it in another session:" in caplog.text + assert ( + f"agent_engine = client.agent_engines.get(" + f"'{_TEST_AGENT_ENGINE_RESOURCE_NAME}')" in caplog.text + ) @mock.patch.object(_agent_engines, "_prepare") @mock.patch.object(agent_engines.AgentEngines, "_await_operation") def test_update_agent_engine_requirements(self, mock_await_operation, mock_prepare): - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: @@ -1155,7 +1186,9 @@ def test_update_agent_engine_requirements(self, mock_await_operation, mock_prepa def test_update_agent_engine_extra_packages( self, mock_await_operation, mock_prepare ): - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: @@ -1202,7 +1235,9 @@ def test_update_agent_engine_extra_packages( @mock.patch.object(_agent_engines, "_prepare") @mock.patch.object(agent_engines.AgentEngines, "_await_operation") def test_update_agent_engine_env_vars(self, mock_await_operation, mock_prepare): - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: @@ -1251,7 +1286,9 @@ def test_update_agent_engine_env_vars(self, mock_await_operation, mock_prepare): @mock.patch.object(agent_engines.AgentEngines, "_await_operation") def test_update_agent_engine_display_name(self, mock_await_operation): - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: @@ -1275,7 +1312,9 @@ def test_update_agent_engine_display_name(self, mock_await_operation): @mock.patch.object(agent_engines.AgentEngines, "_await_operation") def test_update_agent_engine_description(self, mock_await_operation): - mock_await_operation.return_value = _genai_types.AgentEngineOperation() + mock_await_operation.return_value = _genai_types.AgentEngineOperation( + response=_genai_types.ReasoningEngine(name=_TEST_AGENT_ENGINE_RESOURCE_NAME) + ) with mock.patch.object( self.client.agent_engines._api_client, "request" ) as request_mock: diff --git a/vertexai/_genai/agent_engines.py b/vertexai/_genai/agent_engines.py index 61023f26bb..2dcb0cc416 100644 --- a/vertexai/_genai/agent_engines.py +++ b/vertexai/_genai/agent_engines.py @@ -1988,6 +1988,10 @@ def create( env_vars=config.env_vars, ) operation = self._create(config=api_config) + logger.info( + "View progress and logs at" + f" https://console.cloud.google.com/logs/query?project={self._api_client.project}." + ) if agent_engine is None: poll_interval_seconds = 1 # Lightweight agent engine resource creation. else: @@ -2002,6 +2006,10 @@ def create( api_async_client=AsyncAgentEngines(api_client_=self._api_client), api_resource=operation.response, ) + logger.info("Agent Engine created. To use it in another session:") + logger.info( + "agent_engine =" f" client.agent_engines.get('{agent.api_resource.name}')" + ) if agent_engine is not None: # If the user did not provide an agent_engine (e.g. lightweight # provisioning), it will not have any API methods registered. @@ -2043,11 +2051,14 @@ def _create_config( if agent_engine is not None: sys_version = f"{sys.version_info.major}.{sys.version_info.minor}" gcs_dir_name = gcs_dir_name or _agent_engines._DEFAULT_GCS_DIR_NAME - agent_engine = _agent_engines._validate_agent_engine_or_raise(agent_engine) + agent_engine = _agent_engines._validate_agent_engine_or_raise( + agent_engine=agent_engine, logger=logger + ) _agent_engines._validate_staging_bucket_or_raise(staging_bucket) requirements = _agent_engines._validate_requirements_or_raise( agent_engine=agent_engine, requirements=requirements, + logger=logger, ) extra_packages = _agent_engines._validate_extra_packages_or_raise( extra_packages @@ -2063,6 +2074,7 @@ def _create_config( staging_bucket=staging_bucket, gcs_dir_name=gcs_dir_name, extra_packages=extra_packages, + logger=logger, ) # Update the package spec. update_masks.append("spec.package_spec.pickle_object_gcs_uri") @@ -2099,6 +2111,7 @@ def _create_config( class_methods = _agent_engines._generate_class_methods_spec_or_raise( agent_engine=agent_engine, operations=_agent_engines._get_registered_operations(agent_engine), + logger=logger, ) agent_engine_spec["class_methods"] = [ _utils.to_dict(class_method) for class_method in class_methods @@ -2305,12 +2318,20 @@ def update( env_vars=config.env_vars, ) operation = self._update(name=name, config=api_config) + logger.info( + "View progress and logs at" + f" https://console.cloud.google.com/logs/query?project={self._api_client.project}." + ) operation = self._await_operation(operation_name=operation.name) agent = types.AgentEngine( api_client=self, api_async_client=AsyncAgentEngines(api_client_=self._api_client), api_resource=operation.response, ) + logger.info("Agent Engine updated. To use it in another session:") + logger.info( + "agent_engine =" f" client.agent_engines.get('{agent.api_resource.name}')" + ) return self._register_api_methods(agent=agent) def _stream_query( diff --git a/vertexai/agent_engines/_agent_engines.py b/vertexai/agent_engines/_agent_engines.py index 4f3b096cea..cabf85407a 100644 --- a/vertexai/agent_engines/_agent_engines.py +++ b/vertexai/agent_engines/_agent_engines.py @@ -788,6 +788,7 @@ def _validate_staging_bucket_or_raise(staging_bucket: str) -> str: def _validate_agent_engine_or_raise( agent_engine: _AgentEngineInterface, + logger: Any = _LOGGER, ) -> _AgentEngineInterface: """Tries to validate the agent engine. @@ -813,7 +814,7 @@ def _validate_agent_engine_or_raise( from google.adk.agents import BaseAgent if isinstance(agent_engine, BaseAgent): - _LOGGER.info("Deploying google.adk.agents.Agent as an application.") + logger.info("Deploying google.adk.agents.Agent as an application.") from vertexai.preview import reasoning_engines agent_engine = reasoning_engines.AdkApp(agent=agent_engine) @@ -903,20 +904,25 @@ def _validate_requirements_or_raise( *, agent_engine: _AgentEngineInterface, requirements: Optional[Sequence[str]] = None, + logger: Any = _LOGGER, ) -> Sequence[str]: """Tries to validate the requirements.""" if requirements is None: requirements = [] elif isinstance(requirements, str): try: - _LOGGER.info(f"Reading requirements from {requirements=}") + logger.info(f"Reading requirements from {requirements=}") with open(requirements) as f: requirements = f.read().splitlines() - _LOGGER.info(f"Read the following lines: {requirements}") + logger.info(f"Read the following lines: {requirements}") except IOError as err: raise IOError(f"Failed to read requirements from {requirements=}") from err - requirements = _utils.validate_requirements_or_warn(agent_engine, requirements) - _LOGGER.info(f"The final list of requirements: {requirements}") + requirements = _utils.validate_requirements_or_warn( + obj=agent_engine, + requirements=requirements, + logger=logger, + ) + logger.info(f"The final list of requirements: {requirements}") return requirements @@ -940,7 +946,11 @@ def _validate_extra_packages_or_raise( def _get_gcs_bucket( - *, project: str, location: str, staging_bucket: str + *, + project: str, + location: str, + staging_bucket: str, + logger: Any, ) -> storage.Bucket: """Gets or creates the GCS bucket.""" storage = _utils._import_cloud_storage_or_raise() @@ -948,11 +958,11 @@ def _get_gcs_bucket( staging_bucket = staging_bucket.replace("gs://", "") try: gcs_bucket = storage_client.get_bucket(staging_bucket) - _LOGGER.info(f"Using bucket {staging_bucket}") + logger.info(f"Using bucket {staging_bucket}") except exceptions.NotFound: new_bucket = storage_client.bucket(staging_bucket) gcs_bucket = storage_client.create_bucket(new_bucket, location=location) - _LOGGER.info(f"Creating bucket {staging_bucket} in {location=}") + logger.info(f"Creating bucket {staging_bucket} in {location=}") return gcs_bucket @@ -961,6 +971,7 @@ def _upload_agent_engine( agent_engine: _AgentEngineInterface, gcs_bucket: storage.Bucket, gcs_dir_name: str, + logger: Any, ) -> None: """Uploads the agent engine to GCS.""" cloudpickle = _utils._import_cloudpickle_or_raise() @@ -979,7 +990,7 @@ def _upload_agent_engine( except Exception as e: raise TypeError("Agent engine serialized to an invalid format") from e dir_name = f"gs://{gcs_bucket.name}/{gcs_dir_name}" - _LOGGER.info(f"Wrote to {dir_name}/{_BLOB_FILENAME}") + logger.info(f"Wrote to {dir_name}/{_BLOB_FILENAME}") def _upload_requirements( @@ -987,12 +998,13 @@ def _upload_requirements( requirements: Sequence[str], gcs_bucket: storage.Bucket, gcs_dir_name: str, + logger: Any, ) -> None: """Uploads the requirements file to GCS.""" blob = gcs_bucket.blob(f"{gcs_dir_name}/{_REQUIREMENTS_FILE}") blob.upload_from_string("\n".join(requirements)) dir_name = f"gs://{gcs_bucket.name}/{gcs_dir_name}" - _LOGGER.info(f"Writing to {dir_name}/{_REQUIREMENTS_FILE}") + logger.info(f"Writing to {dir_name}/{_REQUIREMENTS_FILE}") def _upload_extra_packages( @@ -1000,9 +1012,10 @@ def _upload_extra_packages( extra_packages: Sequence[str], gcs_bucket: storage.Bucket, gcs_dir_name: str, + logger: Any, ) -> None: """Uploads extra packages to GCS.""" - _LOGGER.info("Creating in-memory tarfile of extra_packages") + logger.info("Creating in-memory tarfile of extra_packages") tar_fileobj = io.BytesIO() with tarfile.open(fileobj=tar_fileobj, mode="w|gz") as tar: for file in extra_packages: @@ -1011,7 +1024,7 @@ def _upload_extra_packages( blob = gcs_bucket.blob(f"{gcs_dir_name}/{_EXTRA_PACKAGES_FILE}") blob.upload_from_string(tar_fileobj.read()) dir_name = f"gs://{gcs_bucket.name}/{gcs_dir_name}" - _LOGGER.info(f"Writing to {dir_name}/{_EXTRA_PACKAGES_FILE}") + logger.info(f"Writing to {dir_name}/{_EXTRA_PACKAGES_FILE}") def _prepare( @@ -1022,6 +1035,7 @@ def _prepare( location: str, staging_bucket: str, gcs_dir_name: str, + logger: Any = _LOGGER, ) -> None: """Prepares the agent engine for creation or updates in Vertex AI. @@ -1047,23 +1061,27 @@ def _prepare( project=project, location=location, staging_bucket=staging_bucket, + logger=logger, ) _upload_agent_engine( agent_engine=agent_engine, gcs_bucket=gcs_bucket, gcs_dir_name=gcs_dir_name, + logger=logger, ) if requirements is not None: _upload_requirements( requirements=requirements, gcs_bucket=gcs_bucket, gcs_dir_name=gcs_dir_name, + logger=logger, ) if extra_packages is not None: _upload_extra_packages( extra_packages=extra_packages, gcs_bucket=gcs_bucket, gcs_dir_name=gcs_dir_name, + logger=logger, ) @@ -1474,7 +1492,10 @@ def _get_registered_operations( def _generate_class_methods_spec_or_raise( - *, agent_engine: _AgentEngineInterface, operations: Dict[str, List[str]] + *, + agent_engine: _AgentEngineInterface, + operations: Dict[str, List[str]], + logger: Any = _LOGGER, ) -> List[proto.Message]: """Generates a ReasoningEngineSpec based on the registered operations. @@ -1512,7 +1533,7 @@ def _generate_class_methods_spec_or_raise( try: schema_dict = _utils.generate_schema(method, schema_name=method_name) except Exception as e: - _LOGGER.warning(f"failed to generate schema for {method_name}: {e}") + logger.warning(f"failed to generate schema for {method_name}: {e}") continue class_method = _utils.to_proto(schema_dict) diff --git a/vertexai/agent_engines/_utils.py b/vertexai/agent_engines/_utils.py index 4cea417fe3..34d9b7fdd7 100644 --- a/vertexai/agent_engines/_utils.py +++ b/vertexai/agent_engines/_utils.py @@ -362,28 +362,29 @@ def parse_constraints( def validate_requirements_or_warn( obj: Any, requirements: List[str], + logger: Any = LOGGER, ) -> Mapping[str, str]: """Compiles the requirements into a list of requirements.""" requirements = requirements.copy() try: current_requirements = scan_requirements(obj) - LOGGER.info(f"Identified the following requirements: {current_requirements}") + logger.info(f"Identified the following requirements: {current_requirements}") constraints = parse_constraints(requirements) missing_requirements = compare_requirements(current_requirements, constraints) for warning_type, warnings in missing_requirements.get( _WARNINGS_KEY, {} ).items(): if warnings: - LOGGER.warning( + logger.warning( f"The following requirements are {warning_type}: {warnings}" ) for action_type, actions in missing_requirements.get(_ACTIONS_KEY, {}).items(): if actions and action_type == _ACTION_APPEND: for action in actions: requirements.append(action) - LOGGER.info(f"The following requirements are appended: {actions}") + logger.info(f"The following requirements are appended: {actions}") except Exception as e: - LOGGER.warning(f"Failed to compile requirements: {e}") + logger.warning(f"Failed to compile requirements: {e}") return requirements