From 62a57467d686993106b7960caaab0ff723f0e6ab Mon Sep 17 00:00:00 2001 From: Yeesian Ng Date: Tue, 27 May 2025 15:06:13 -0700 Subject: [PATCH] feat: Add Resource to tracer provider in Agent Engine templates PiperOrigin-RevId: 763962855 --- setup.py | 6 +-- vertexai/agent_engines/_utils.py | 28 +++++++++++ vertexai/agent_engines/templates/ag2.py | 38 +++++++++----- vertexai/agent_engines/templates/langchain.py | 38 +++++++++----- vertexai/agent_engines/templates/langgraph.py | 38 +++++++++----- .../reasoning_engines/templates/adk.py | 38 +++++++++----- .../templates/llama_index.py | 49 +++++++++++-------- 7 files changed, 163 insertions(+), 72 deletions(-) diff --git a/setup.py b/setup.py index af6e50ca78..442805eb90 100644 --- a/setup.py +++ b/setup.py @@ -146,9 +146,8 @@ reasoning_engine_extra_require = [ "cloudpickle >= 3.0, < 4.0", - "google-cloud-trace < 2", "opentelemetry-sdk < 2", - "opentelemetry-exporter-gcp-trace < 2", + "opentelemetry-exporter-otlp < 2", "pydantic >= 2.11.1, < 3", "typing_extensions", ] @@ -156,10 +155,9 @@ agent_engines_extra_require = [ "packaging >= 24.0", "cloudpickle >= 3.0, < 4.0", - "google-cloud-trace < 2", "google-cloud-logging < 4", "opentelemetry-sdk < 2", - "opentelemetry-exporter-gcp-trace < 2", + "opentelemetry-exporter-otlp < 2", "pydantic >= 2.11.1, < 3", "typing_extensions", ] diff --git a/vertexai/agent_engines/_utils.py b/vertexai/agent_engines/_utils.py index e5363992aa..605d3626f1 100644 --- a/vertexai/agent_engines/_utils.py +++ b/vertexai/agent_engines/_utils.py @@ -685,6 +685,20 @@ def _import_opentelemetry_or_warn() -> Optional[types.ModuleType]: return None +def _import_opentelemetry_sdk_resources_or_warn() -> Optional[types.ModuleType]: + """Tries to import the opentelemetry.sdk.trace module.""" + try: + import opentelemetry.sdk.resources # noqa:F401 + + return opentelemetry.sdk.resources + except ImportError: + LOGGER.warning( + "Failed to import opentelemetry.sdk.resources. Please call " + "'pip install google-cloud-aiplatform[agent_engines]'." + ) + return None + + def _import_opentelemetry_sdk_trace_or_warn() -> Optional[types.ModuleType]: """Tries to import the opentelemetry.sdk.trace module.""" try: @@ -713,6 +727,20 @@ def _import_cloud_trace_v2_or_warn() -> Optional[types.ModuleType]: return None +def _import_otlp_trace_exporter_or_warn() -> Optional[types.ModuleType]: + """Tries to import the google.cloud.trace_v2 module.""" + try: + import opentelemetry.exporter.otlp.proto.http.trace_exporter + + return opentelemetry.exporter.otlp.proto.http.trace_exporter + except ImportError: + LOGGER.warning( + "opentelemetry-exporter-otlp is not installed. Please " + "call 'pip install google-cloud-aiplatform[agent_engines]'." + ) + return None + + def _import_cloud_trace_exporter_or_warn() -> Optional[types.ModuleType]: """Tries to import the opentelemetry.exporter.cloud_trace module.""" try: diff --git a/vertexai/agent_engines/templates/ag2.py b/vertexai/agent_engines/templates/ag2.py index a7261a7d5f..fe6d223173 100644 --- a/vertexai/agent_engines/templates/ag2.py +++ b/vertexai/agent_engines/templates/ag2.py @@ -91,34 +91,44 @@ def _default_runnable_builder( def _default_instrumentor_builder(project_id: str): from vertexai.agent_engines import _utils - cloud_trace_exporter = _utils._import_cloud_trace_exporter_or_warn() - cloud_trace_v2 = _utils._import_cloud_trace_v2_or_warn() + otlp_trace_exporter = _utils._import_otlp_trace_exporter_or_warn() openinference_autogen = _utils._import_openinference_autogen_or_warn() opentelemetry = _utils._import_opentelemetry_or_warn() opentelemetry_sdk_trace = _utils._import_opentelemetry_sdk_trace_or_warn() + opentelemetry_sdk_resources = _utils._import_opentelemetry_sdk_resources_or_warn() if all( ( - cloud_trace_exporter, - cloud_trace_v2, + otlp_trace_exporter, openinference_autogen, opentelemetry, opentelemetry_sdk_trace, + opentelemetry_sdk_resources, ) ): import google.auth + import os + + SERVICE_INSTANCE_ID = opentelemetry_sdk_resources.SERVICE_INSTANCE_ID + SERVICE_NAME = opentelemetry_sdk_resources.SERVICE_NAME + AGENT_ENGINE_ID = os.environ.get("GOOGLE_CLOUD_AGENT_ENGINE_ID", "") credentials, _ = google.auth.default() - span_exporter = cloud_trace_exporter.CloudTraceSpanExporter( - project_id=project_id, - client=cloud_trace_v2.TraceServiceClient( - credentials=credentials.with_quota_project(project_id), - ), + span_exporter = otlp_trace_exporter.OTLPSpanExporter( + session=google.auth.transport.requests.AuthorizedSession(credentials), + endpoint="https://telemetry.googleapis.com/v1/traces", ) span_processor: SpanProcessor = ( opentelemetry_sdk_trace.export.SimpleSpanProcessor( span_exporter=span_exporter, ) ) + resource = opentelemetry_sdk_trace.Resource.create( + attributes={ + "gcp.project_id": project_id, + SERVICE_NAME: "aiplatform.googleapis.com/ReasoningEngine", + SERVICE_INSTANCE_ID: AGENT_ENGINE_ID, + } + ) tracer_provider: TracerProvider = opentelemetry.trace.get_tracer_provider() # Get the appropriate tracer provider: # 1. If _TRACER_PROVIDER is already set, use that. @@ -127,7 +137,7 @@ def _default_instrumentor_builder(project_id: str): # 3. As a final fallback, use _PROXY_TRACER_PROVIDER. # If none of the above is set, we log a warning, and # create a tracer provider. - if not tracer_provider: + if AGENT_ENGINE_ID or not tracer_provider: from google.cloud.aiplatform import base _LOGGER = base.Logger(__name__) @@ -137,13 +147,17 @@ def _default_instrumentor_builder(project_id: str): "OTEL_PYTHON_TRACER_PROVIDER, _TRACER_PROVIDER, " "or _PROXY_TRACER_PROVIDER." ) - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids AttributeError: # 'ProxyTracerProvider' and 'NoOpTracerProvider' objects has no # attribute 'add_span_processor'. if _utils.is_noop_or_proxy_tracer_provider(tracer_provider): - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids OpenTelemetry client already exists error. _override_active_span_processor( diff --git a/vertexai/agent_engines/templates/langchain.py b/vertexai/agent_engines/templates/langchain.py index 717fa10396..845a2487c5 100644 --- a/vertexai/agent_engines/templates/langchain.py +++ b/vertexai/agent_engines/templates/langchain.py @@ -170,34 +170,44 @@ def _default_runnable_builder( def _default_instrumentor_builder(project_id: str): from vertexai.agent_engines import _utils - cloud_trace_exporter = _utils._import_cloud_trace_exporter_or_warn() - cloud_trace_v2 = _utils._import_cloud_trace_v2_or_warn() + otlp_trace_exporter = _utils._import_otlp_trace_exporter_or_warn() openinference_langchain = _utils._import_openinference_langchain_or_warn() opentelemetry = _utils._import_opentelemetry_or_warn() opentelemetry_sdk_trace = _utils._import_opentelemetry_sdk_trace_or_warn() + opentelemetry_sdk_resources = _utils._import_opentelemetry_sdk_resources_or_warn() if all( ( - cloud_trace_exporter, - cloud_trace_v2, + otlp_trace_exporter, openinference_langchain, opentelemetry, opentelemetry_sdk_trace, + opentelemetry_sdk_resources, ) ): import google.auth + import os + + SERVICE_INSTANCE_ID = opentelemetry_sdk_resources.SERVICE_INSTANCE_ID + SERVICE_NAME = opentelemetry_sdk_resources.SERVICE_NAME + AGENT_ENGINE_ID = os.environ.get("GOOGLE_CLOUD_AGENT_ENGINE_ID", "") credentials, _ = google.auth.default() - span_exporter = cloud_trace_exporter.CloudTraceSpanExporter( - project_id=project_id, - client=cloud_trace_v2.TraceServiceClient( - credentials=credentials.with_quota_project(project_id), - ), + span_exporter = otlp_trace_exporter.OTLPSpanExporter( + session=google.auth.transport.requests.AuthorizedSession(credentials), + endpoint="https://telemetry.googleapis.com/v1/traces", ) span_processor: SpanProcessor = ( opentelemetry_sdk_trace.export.SimpleSpanProcessor( span_exporter=span_exporter, ) ) + resource = opentelemetry_sdk_trace.Resource.create( + attributes={ + "gcp.project_id": project_id, + SERVICE_NAME: "aiplatform.googleapis.com/ReasoningEngine", + SERVICE_INSTANCE_ID: AGENT_ENGINE_ID, + } + ) tracer_provider: TracerProvider = opentelemetry.trace.get_tracer_provider() # Get the appropriate tracer provider: # 1. If _TRACER_PROVIDER is already set, use that. @@ -206,7 +216,7 @@ def _default_instrumentor_builder(project_id: str): # 3. As a final fallback, use _PROXY_TRACER_PROVIDER. # If none of the above is set, we log a warning, and # create a tracer provider. - if not tracer_provider: + if AGENT_ENGINE_ID or not tracer_provider: from google.cloud.aiplatform import base _LOGGER = base.Logger(__name__) @@ -216,13 +226,17 @@ def _default_instrumentor_builder(project_id: str): "OTEL_PYTHON_TRACER_PROVIDER, _TRACER_PROVIDER, " "or _PROXY_TRACER_PROVIDER." ) - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids AttributeError: # 'ProxyTracerProvider' and 'NoOpTracerProvider' objects has no # attribute 'add_span_processor'. if _utils.is_noop_or_proxy_tracer_provider(tracer_provider): - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids OpenTelemetry client already exists error. _override_active_span_processor( diff --git a/vertexai/agent_engines/templates/langgraph.py b/vertexai/agent_engines/templates/langgraph.py index cab4a56836..fd6a89b2ea 100644 --- a/vertexai/agent_engines/templates/langgraph.py +++ b/vertexai/agent_engines/templates/langgraph.py @@ -161,34 +161,44 @@ def _default_runnable_builder( def _default_instrumentor_builder(project_id: str): from vertexai.agent_engines import _utils - cloud_trace_exporter = _utils._import_cloud_trace_exporter_or_warn() - cloud_trace_v2 = _utils._import_cloud_trace_v2_or_warn() + otlp_trace_exporter = _utils._import_otlp_trace_exporter_or_warn() openinference_langchain = _utils._import_openinference_langchain_or_warn() opentelemetry = _utils._import_opentelemetry_or_warn() opentelemetry_sdk_trace = _utils._import_opentelemetry_sdk_trace_or_warn() + opentelemetry_sdk_resources = _utils._import_opentelemetry_sdk_resources_or_warn() if all( ( - cloud_trace_exporter, - cloud_trace_v2, + otlp_trace_exporter, openinference_langchain, opentelemetry, opentelemetry_sdk_trace, + opentelemetry_sdk_resources, ) ): import google.auth + import os + + SERVICE_INSTANCE_ID = opentelemetry_sdk_resources.SERVICE_INSTANCE_ID + SERVICE_NAME = opentelemetry_sdk_resources.SERVICE_NAME + AGENT_ENGINE_ID = os.environ.get("GOOGLE_CLOUD_AGENT_ENGINE_ID", "") credentials, _ = google.auth.default() - span_exporter = cloud_trace_exporter.CloudTraceSpanExporter( - project_id=project_id, - client=cloud_trace_v2.TraceServiceClient( - credentials=credentials.with_quota_project(project_id), - ), + span_exporter = otlp_trace_exporter.OTLPSpanExporter( + session=google.auth.transport.requests.AuthorizedSession(credentials), + endpoint="https://telemetry.googleapis.com/v1/traces", ) span_processor: SpanProcessor = ( opentelemetry_sdk_trace.export.SimpleSpanProcessor( span_exporter=span_exporter, ) ) + resource = opentelemetry_sdk_trace.Resource.create( + attributes={ + "gcp.project_id": project_id, + SERVICE_NAME: "aiplatform.googleapis.com/ReasoningEngine", + SERVICE_INSTANCE_ID: AGENT_ENGINE_ID, + } + ) tracer_provider: TracerProvider = opentelemetry.trace.get_tracer_provider() # Get the appropriate tracer provider: # 1. If _TRACER_PROVIDER is already set, use that. @@ -197,7 +207,7 @@ def _default_instrumentor_builder(project_id: str): # 3. As a final fallback, use _PROXY_TRACER_PROVIDER. # If none of the above is set, we log a warning, and # create a tracer provider. - if not tracer_provider: + if AGENT_ENGINE_ID or not tracer_provider: from google.cloud.aiplatform import base base.Logger(__name__).warning( @@ -206,13 +216,17 @@ def _default_instrumentor_builder(project_id: str): "OTEL_PYTHON_TRACER_PROVIDER, _TRACER_PROVIDER, " "or _PROXY_TRACER_PROVIDER." ) - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids AttributeError: # 'ProxyTracerProvider' and 'NoOpTracerProvider' objects has no # attribute 'add_span_processor'. if _utils.is_noop_or_proxy_tracer_provider(tracer_provider): - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids OpenTelemetry client already exists error. _override_active_span_processor( diff --git a/vertexai/preview/reasoning_engines/templates/adk.py b/vertexai/preview/reasoning_engines/templates/adk.py index 06a7ff01ef..57a6f68ddf 100644 --- a/vertexai/preview/reasoning_engines/templates/adk.py +++ b/vertexai/preview/reasoning_engines/templates/adk.py @@ -172,30 +172,40 @@ def dump(self) -> Dict[str, Any]: def _default_instrumentor_builder(project_id: str): from vertexai.agent_engines import _utils - cloud_trace_exporter = _utils._import_cloud_trace_exporter_or_warn() - cloud_trace_v2 = _utils._import_cloud_trace_v2_or_warn() + otlp_trace_exporter = _utils._import_otlp_trace_exporter_or_warn() opentelemetry = _utils._import_opentelemetry_or_warn() opentelemetry_sdk_trace = _utils._import_opentelemetry_sdk_trace_or_warn() + opentelemetry_sdk_resources = _utils._import_opentelemetry_sdk_resources_or_warn() if all( ( - cloud_trace_exporter, - cloud_trace_v2, + otlp_trace_exporter, opentelemetry, opentelemetry_sdk_trace, + opentelemetry_sdk_resources, ) ): import google.auth + import os + + SERVICE_INSTANCE_ID = opentelemetry_sdk_resources.SERVICE_INSTANCE_ID + SERVICE_NAME = opentelemetry_sdk_resources.SERVICE_NAME + AGENT_ENGINE_ID = os.environ.get("GOOGLE_CLOUD_AGENT_ENGINE_ID", "") credentials, _ = google.auth.default() - span_exporter = cloud_trace_exporter.CloudTraceSpanExporter( - project_id=project_id, - client=cloud_trace_v2.TraceServiceClient( - credentials=credentials.with_quota_project(project_id), - ), + span_exporter = otlp_trace_exporter.OTLPSpanExporter( + session=google.auth.transport.requests.AuthorizedSession(credentials), + endpoint="https://telemetry.googleapis.com/v1/traces", ) span_processor = opentelemetry_sdk_trace.export.BatchSpanProcessor( span_exporter=span_exporter, ) + resource = opentelemetry_sdk_trace.Resource.create( + attributes={ + "gcp.project_id": project_id, + SERVICE_NAME: "aiplatform.googleapis.com/ReasoningEngine", + SERVICE_INSTANCE_ID: AGENT_ENGINE_ID, + } + ) tracer_provider = opentelemetry.trace.get_tracer_provider() # Get the appropriate tracer provider: # 1. If _TRACER_PROVIDER is already set, use that. @@ -204,7 +214,7 @@ def _default_instrumentor_builder(project_id: str): # 3. As a final fallback, use _PROXY_TRACER_PROVIDER. # If none of the above is set, we log a warning, and # create a tracer provider. - if not tracer_provider: + if AGENT_ENGINE_ID or not tracer_provider: from google.cloud.aiplatform import base _LOGGER = base.Logger(__name__) @@ -214,13 +224,17 @@ def _default_instrumentor_builder(project_id: str): "OTEL_PYTHON_TRACER_PROVIDER, _TRACER_PROVIDER, " "or _PROXY_TRACER_PROVIDER." ) - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids AttributeError: # 'ProxyTracerProvider' and 'NoOpTracerProvider' objects has no # attribute 'add_span_processor'. if _utils.is_noop_or_proxy_tracer_provider(tracer_provider): - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids OpenTelemetry client already exists error. _override_active_span_processor( diff --git a/vertexai/preview/reasoning_engines/templates/llama_index.py b/vertexai/preview/reasoning_engines/templates/llama_index.py index 93acd90073..70e20954ac 100644 --- a/vertexai/preview/reasoning_engines/templates/llama_index.py +++ b/vertexai/preview/reasoning_engines/templates/llama_index.py @@ -384,49 +384,54 @@ def set_up(self): that can not be serialized. """ if self._enable_tracing: - from vertexai.reasoning_engines import _utils + from vertexai.agent_engines import _utils - cloud_trace_exporter = _utils._import_cloud_trace_exporter_or_warn() - cloud_trace_v2 = _utils._import_cloud_trace_v2_or_warn() + otlp_trace_exporter = _utils._import_otlp_trace_exporter_or_warn() openinference_llama_index = ( _utils._import_openinference_llama_index_or_warn() ) opentelemetry = _utils._import_opentelemetry_or_warn() opentelemetry_sdk_trace = _utils._import_opentelemetry_sdk_trace_or_warn() + opentelemetry_sdk_resources = ( + _utils._import_opentelemetry_sdk_resources_or_warn() + ) if all( ( - cloud_trace_exporter, - cloud_trace_v2, + otlp_trace_exporter, openinference_llama_index, opentelemetry, opentelemetry_sdk_trace, + opentelemetry_sdk_resources, ) ): import google.auth + import os + + SERVICE_INSTANCE_ID = opentelemetry_sdk_resources.SERVICE_INSTANCE_ID + SERVICE_NAME = opentelemetry_sdk_resources.SERVICE_NAME + AGENT_ENGINE_ID = os.environ.get("GOOGLE_CLOUD_AGENT_ENGINE_ID", "") credentials, _ = google.auth.default() - span_exporter = cloud_trace_exporter.CloudTraceSpanExporter( - project_id=self._project, - client=cloud_trace_v2.TraceServiceClient( - credentials=credentials.with_quota_project(self._project), - ), + span_exporter = otlp_trace_exporter.OTLPSpanExporter( + session=google.auth.transport.requests.AuthorizedSession(credentials), + endpoint="https://telemetry.googleapis.com/v1/traces", ) span_processor: SpanProcessor = ( opentelemetry_sdk_trace.export.SimpleSpanProcessor( span_exporter=span_exporter, ) ) + resource = opentelemetry_sdk_trace.Resource.create( + attributes={ + "gcp.project_id": self._project, + SERVICE_NAME: "aiplatform.googleapis.com/ReasoningEngine", + SERVICE_INSTANCE_ID: AGENT_ENGINE_ID, + } + ) tracer_provider: TracerProvider = ( opentelemetry.trace.get_tracer_provider() ) - # Get the appropriate tracer provider: - # 1. If _TRACER_PROVIDER is already set, use that. - # 2. Otherwise, if the OTEL_PYTHON_TRACER_PROVIDER environment - # variable is set, use that. - # 3. As a final fallback, use _PROXY_TRACER_PROVIDER. - # If none of the above is set, we log a warning, and - # create a tracer provider. - if not tracer_provider: + if AGENT_ENGINE_ID or not tracer_provider: from google.cloud.aiplatform import base _LOGGER = base.Logger(__name__) @@ -436,13 +441,17 @@ def set_up(self): "OTEL_PYTHON_TRACER_PROVIDER, _TRACER_PROVIDER, " "or _PROXY_TRACER_PROVIDER." ) - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids AttributeError: # 'ProxyTracerProvider' and 'NoOpTracerProvider' objects has no # attribute 'add_span_processor'. if _utils.is_noop_or_proxy_tracer_provider(tracer_provider): - tracer_provider = opentelemetry_sdk_trace.TracerProvider() + tracer_provider = opentelemetry_sdk_trace.TracerProvider( + resource=resource, + ) opentelemetry.trace.set_tracer_provider(tracer_provider) # Avoids OpenTelemetry client already exists error. _override_active_span_processor(