Skip to content

Commit 450eec7

Browse files
committed
Support db.statement, server and url attributes
1 parent 4c15f79 commit 450eec7

File tree

4 files changed

+157
-21
lines changed

4 files changed

+157
-21
lines changed

elastic_transport/_async_transport.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from ._models import DEFAULT, DefaultType, HttpHeaders, NodeConfig, SniffOptions
4141
from ._node import AiohttpHttpNode, BaseAsyncNode
4242
from ._node_pool import NodePool, NodeSelector
43+
from ._otel import OpenTelemetrySpan
4344
from ._serializer import Serializer
4445
from ._transport import (
4546
DEFAULT_CLIENT_META_SERVICE,
@@ -220,7 +221,7 @@ async def perform_request( # type: ignore[override]
220221
method,
221222
endpoint_id=resolve_default(endpoint_id, None),
222223
path_parts=resolve_default(path_parts, {}),
223-
) as span:
224+
) as otel_span:
224225
response = await self._perform_request(
225226
method,
226227
target,
@@ -231,8 +232,9 @@ async def perform_request( # type: ignore[override]
231232
retry_on_timeout=retry_on_timeout,
232233
request_timeout=request_timeout,
233234
client_meta=client_meta,
235+
otel_span=otel_span,
234236
)
235-
span.set_elastic_cloud_metadata(response.meta.headers)
237+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
236238
return response
237239

238240
async def _perform_request( # type: ignore[override,return]
@@ -247,6 +249,7 @@ async def _perform_request( # type: ignore[override,return]
247249
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
248250
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
249251
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
252+
otel_span: OpenTelemetrySpan,
250253
) -> TransportApiResponse:
251254
await self._async_call()
252255

@@ -275,6 +278,7 @@ async def _perform_request( # type: ignore[override,return]
275278
request_body = self.serializers.dumps(
276279
body, mimetype=request_headers["content-type"]
277280
)
281+
otel_span.set_db_statement(request_body)
278282
else:
279283
request_body = None
280284

@@ -293,6 +297,7 @@ async def _perform_request( # type: ignore[override,return]
293297
node: BaseAsyncNode = self.node_pool.get() # type: ignore[assignment]
294298
start_time = self._loop.time()
295299
try:
300+
otel_span.set_node_metadata(node.host, node.port, node.base_url, target)
296301
resp = await node.perform_request(
297302
method,
298303
target,

elastic_transport/_otel.py

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import contextlib
2121
import os
22-
from typing import Generator, Mapping, Optional
22+
from typing import Any, Generator, Mapping, Optional
2323

2424
try:
2525
from opentelemetry import trace
@@ -30,33 +30,97 @@
3030
_tracer = None
3131

3232

33+
# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
3334
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
35+
# Describes how to handle search queries in the request body when assigned to
36+
# a span attribute.
37+
# Valid values are 'omit' and 'raw'.
38+
# Default is 'omit' as 'raw' has security implications.
39+
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
40+
DEFAULT_BODY_STRATEGY = "omit"
41+
42+
# A list of the Elasticsearch endpoints that qualify as "search" endpoints. The search query in
43+
# the request body may be captured for these endpoints, depending on the body capture strategy.
44+
SEARCH_ENDPOINTS = (
45+
"search",
46+
"async_search.submit",
47+
"msearch",
48+
"eql.search",
49+
"esql.query",
50+
"terms_enum",
51+
"search_template",
52+
"msearch_template",
53+
"render_search_template",
54+
)
3455

3556

3657
class OpenTelemetrySpan:
37-
def __init__(self, otel_span: Optional[Span]):
58+
def __init__(
59+
self,
60+
otel_span: Optional[Span],
61+
endpoint_id: Optional[str] = None,
62+
body_strategy: Optional[str] = None,
63+
):
3864
self.otel_span = otel_span
65+
self.body_strategy = body_strategy
66+
self.endpoint_id = endpoint_id
67+
68+
def set_node_metadata(
69+
self, host: str, port: int, base_url: str, target: str
70+
) -> None:
71+
if self.otel_span is None:
72+
return
3973

40-
def set_attribute(self, key: str, value: str) -> None:
41-
if self.otel_span is not None:
42-
self.otel_span.set_attribute(key, value)
74+
# url.full does not contain auth info which is passed as headers
75+
self.otel_span.set_attribute("url.full", base_url + target)
76+
self.otel_span.set_attribute("server.address", host)
77+
self.otel_span.set_attribute("server.port", port)
4378

4479
def set_elastic_cloud_metadata(self, headers: Mapping[str, str]) -> None:
80+
if self.otel_span is None:
81+
return
82+
4583
cluster_name = headers.get("X-Found-Handling-Cluster")
4684
if cluster_name is not None:
47-
self.set_attribute("db.elasticsearch.cluster.name", cluster_name)
85+
self.otel_span.set_attribute("db.elasticsearch.cluster.name", cluster_name)
4886
node_name = headers.get("X-Found-Handling-Instance")
4987
if node_name is not None:
50-
self.set_attribute("db.elasticsearch.node.name", node_name)
88+
self.otel_span.set_attribute("db.elasticsearch.node.name", node_name)
89+
90+
def set_db_statement(self, serialized_body: bytes) -> None:
91+
if self.otel_span is None:
92+
return
93+
94+
print(f"{self.body_strategy=} {self.endpoint_id=}")
95+
96+
if self.body_strategy == "omit":
97+
return
98+
elif self.body_strategy == "raw" and self.endpoint_id in SEARCH_ENDPOINTS:
99+
print("set", serialized_body)
100+
self.otel_span.set_attribute(
101+
"db.statement", serialized_body.decode("utf-8")
102+
)
51103

52104

53105
class OpenTelemetry:
54-
def __init__(self, enabled: bool | None = None, tracer: trace.Tracer | None = None):
106+
def __init__(
107+
self,
108+
enabled: bool | None = None,
109+
tracer: trace.Tracer | None = None,
110+
body_strategy: str | None = None,
111+
):
55112
if enabled is None:
56113
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
57114
self.tracer = tracer or _tracer
58115
self.enabled = enabled and self.tracer is not None
59116

117+
if body_strategy is not None:
118+
self.body_strategy = body_strategy
119+
else:
120+
self.body_strategy = os.environ.get(
121+
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
122+
)
123+
60124
@contextlib.contextmanager
61125
def span(
62126
self,
@@ -77,4 +141,9 @@ def span(
77141
otel_span.set_attribute("db.operation", endpoint_id)
78142
for key, value in path_parts.items():
79143
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
80-
yield OpenTelemetrySpan(otel_span)
144+
145+
yield OpenTelemetrySpan(
146+
otel_span,
147+
endpoint_id=endpoint_id,
148+
body_strategy=self.body_strategy,
149+
)

elastic_transport/_transport.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
Urllib3HttpNode,
6161
)
6262
from ._node_pool import NodePool, NodeSelector
63-
from ._otel import OpenTelemetry
63+
from ._otel import OpenTelemetry, OpenTelemetrySpan
6464
from ._serializer import DEFAULT_SERIALIZERS, Serializer, SerializerCollection
6565
from ._version import __version__
6666
from .client_utils import client_meta_version, resolve_default
@@ -303,8 +303,8 @@ def perform_request(
303303
method,
304304
endpoint_id=resolve_default(endpoint_id, None),
305305
path_parts=resolve_default(path_parts, {}),
306-
) as span:
307-
api_response = self._perform_request(
306+
) as otel_span:
307+
response = self._perform_request(
308308
method,
309309
target,
310310
body=body,
@@ -314,9 +314,10 @@ def perform_request(
314314
retry_on_timeout=retry_on_timeout,
315315
request_timeout=request_timeout,
316316
client_meta=client_meta,
317+
otel_span=otel_span,
317318
)
318-
span.set_elastic_cloud_metadata(api_response.meta.headers)
319-
return api_response
319+
otel_span.set_elastic_cloud_metadata(response.meta.headers)
320+
return response
320321

321322
def _perform_request( # type: ignore[return]
322323
self,
@@ -330,6 +331,7 @@ def _perform_request( # type: ignore[return]
330331
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
331332
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
332333
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
334+
otel_span: OpenTelemetrySpan,
333335
) -> TransportApiResponse:
334336
if headers is DEFAULT:
335337
request_headers = HttpHeaders()
@@ -356,6 +358,7 @@ def _perform_request( # type: ignore[return]
356358
request_body = self.serializers.dumps(
357359
body, mimetype=request_headers["content-type"]
358360
)
361+
otel_span.set_db_statement(request_body)
359362
else:
360363
request_body = None
361364

@@ -374,6 +377,7 @@ def _perform_request( # type: ignore[return]
374377
node = self.node_pool.get()
375378
start_time = time.time()
376379
try:
380+
otel_span.set_node_metadata(node.host, node.port, node.base_url, target)
377381
resp = node.perform_request(
378382
method,
379383
target,

tests/test_otel.py

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import os
1819

1920
from opentelemetry.sdk.trace import TracerProvider, export
2021
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
2122

22-
from elastic_transport._otel import OpenTelemetry
23+
from elastic_transport import JsonSerializer
24+
from elastic_transport._otel import ENABLED_ENV_VAR, OpenTelemetry
2325

2426

2527
def setup_tracing():
@@ -32,6 +34,34 @@ def setup_tracing():
3234
return tracer, memory_exporter
3335

3436

37+
def test_no_span():
38+
# With telemetry disabled, those calls should not raise
39+
otel = OpenTelemetry(enabled=False)
40+
with otel.span(
41+
"GET",
42+
endpoint_id="ml.open_job",
43+
path_parts={"job_id": "my-job"},
44+
) as span:
45+
span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"}))
46+
span.set_node_metadata(
47+
"localhost",
48+
9200,
49+
"http://localhost:9200/",
50+
"_ml/anomaly_detectors/my-job/_open",
51+
)
52+
span.set_elastic_cloud_metadata(
53+
{
54+
"X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f",
55+
"X-Found-Handling-Instance": "instance-0000000001",
56+
}
57+
)
58+
59+
60+
def test_enabled():
61+
otel = OpenTelemetry()
62+
assert otel.enabled == bool(os.environ.get(ENABLED_ENV_VAR, "false") != "false")
63+
64+
3565
def test_minimal_span():
3666
tracer, memory_exporter = setup_tracing()
3767

@@ -52,8 +82,17 @@ def test_detailed_span():
5282
tracer, memory_exporter = setup_tracing()
5383
otel = OpenTelemetry(enabled=True, tracer=tracer)
5484
with otel.span(
55-
"GET", endpoint_id="ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"}
85+
"GET",
86+
endpoint_id="ml.open_job",
87+
path_parts={"job_id": "my-job"},
5688
) as span:
89+
span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"}))
90+
span.set_node_metadata(
91+
"localhost",
92+
9200,
93+
"http://localhost:9200/",
94+
"_ml/anomaly_detectors/my-job/_open",
95+
)
5796
span.set_elastic_cloud_metadata(
5897
{
5998
"X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f",
@@ -63,13 +102,32 @@ def test_detailed_span():
63102

64103
spans = memory_exporter.get_finished_spans()
65104
assert len(spans) == 1
66-
assert spans[0].name == "ml.close_job"
105+
assert spans[0].name == "ml.open_job"
67106
assert spans[0].attributes == {
68107
"http.request.method": "GET",
108+
"url.full": "http://localhost:9200/_ml/anomaly_detectors/my-job/_open",
109+
"server.address": "localhost",
110+
"server.port": 9200,
69111
"db.system": "elasticsearch",
70-
"db.operation": "ml.close_job",
112+
"db.operation": "ml.open_job",
71113
"db.elasticsearch.path_parts.job_id": "my-job",
72-
"db.elasticsearch.path_parts.foo": "bar",
73114
"db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f",
74115
"db.elasticsearch.node.name": "instance-0000000001",
75116
}
117+
118+
119+
def test_db_statement():
120+
tracer, memory_exporter = setup_tracing()
121+
otel = OpenTelemetry(enabled=True, tracer=tracer, body_strategy="raw")
122+
with otel.span("GET", endpoint_id="search", path_parts={}) as span:
123+
span.set_db_statement(JsonSerializer().dumps({"query": {"match_all": {}}}))
124+
125+
spans = memory_exporter.get_finished_spans()
126+
assert len(spans) == 1
127+
assert spans[0].name == "search"
128+
assert spans[0].attributes == {
129+
"http.request.method": "GET",
130+
"db.system": "elasticsearch",
131+
"db.operation": "search",
132+
"db.statement": '{"query":{"match_all":{}}}',
133+
}

0 commit comments

Comments
 (0)