Skip to content

Commit 9f515d0

Browse files
mabdinurgithub-actions[bot]
authored andcommitted
fix(telemetry): support python3.12 (#7043)
## Description This PR ensures the instrumentation telemetry client is compatible with python3.12's threading module by: - Removing `telemetry_writer.add_count_metric` from `SpanAggregator.shutdown`. This ensures telemetry events are not queued on tracer shutdown. - Ensuring the telemetry writer thread is only started once per application. - Ensures the telemetry writer thread is disabled when an application is shutdown. ## Motivation The following change failed to support the telemetry client in python3.12: #6859. This PR will hopefully fix this 🤞. ### Reproduction for python3.12 runtime errors ``` docker run --rm -it python:3.12.0rc3 bash root@a1f3c1d307ec:/# pip install ddtrace==2.0.0rc2 root@a1f3c1d307ec:/# python -c "import ddtrace; _ = ddtrace.tracer.trace('foo'); raise Exception" ``` ### Output ``` Traceback (most recent call last): File "<string>", line 1, in <module> Exception Exception ignored in atexit callback: <bound method Tracer._atexit of <ddtrace.tracer.Tracer object at 0xffffbd967260>> Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/ddtrace/tracer.py", line 293, in _atexit self.shutdown(timeout=self.SHUTDOWN_TIMEOUT) File "/usr/local/lib/python3.12/site-packages/ddtrace/tracer.py", line 1024, in shutdown processor.shutdown(timeout) File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/processor/trace.py", line 270, in shutdown self._queue_span_count_metrics("spans_created", "integration_name", None) File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/processor/trace.py", line 291, in _queue_span_count_metrics telemetry_writer.add_count_metric( File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/telemetry/writer.py", line 514, in add_count_metric if self.status == ServiceStatus.RUNNING or self.enable(): ^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/telemetry/writer.py", line 218, in enable self.start() File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/service.py", line 58, in start self._start_service(*args, **kwargs) File "/usr/local/lib/python3.12/site-packages/ddtrace/internal/periodic.py", line 135, in _start_service self._worker.start() File "/usr/local/lib/python3.12/threading.py", line 971, in start _start_new_thread(self._bootstrap, ()) RuntimeError: can't create new thread at interpreter shutdown ``` ## Risk This PR reverts an optimization that ensured telemetry span creation metrics were queued in batches of 100. Without this optimization we can expect a 2-5% increase to span creation and span finish. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. (cherry picked from commit 065784b)
1 parent 5e48006 commit 9f515d0

File tree

8 files changed

+71
-42
lines changed

8 files changed

+71
-42
lines changed

ddtrace/internal/processor/trace.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,21 @@ def shutdown(self, timeout):
265265
:type timeout: :obj:`int` | :obj:`float` | :obj:`None`
266266
"""
267267
if self._span_metrics["spans_created"] or self._span_metrics["spans_finished"]:
268-
# on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent
269-
# before the tracer is shutdown.
270-
self._queue_span_count_metrics("spans_created", "integration_name", None)
271-
# on_span_finish(...) queues span finish metrics in batches of 100.
272-
# This ensures all remaining counts are sent before the tracer is shutdown.
273-
self._queue_span_count_metrics("spans_finished", "integration_name", None)
274-
# The telemetry metrics writer can be shutdown before the tracer.
275-
# This ensures all tracer metrics always sent.
276-
telemetry_writer.periodic(True)
268+
if config._telemetry_enabled:
269+
# Telemetry writer is disabled when a process shutsdown. This is to support py3.12.
270+
# Here we submit the remanining span creation metrics without restarting the periodic thread.
271+
# Note - Due to how atexit hooks are registered the telemetry writer is shutdown before the tracer.
272+
telemetry_writer._is_periodic = False
273+
telemetry_writer._enabled = True
274+
# on_span_start queue span created counts in batches of 100. This ensures all remaining counts are sent
275+
# before the tracer is shutdown.
276+
self._queue_span_count_metrics("spans_created", "integration_name", None)
277+
# on_span_finish(...) queues span finish metrics in batches of 100.
278+
# This ensures all remaining counts are sent before the tracer is shutdown.
279+
self._queue_span_count_metrics("spans_finished", "integration_name", None)
280+
telemetry_writer.periodic(True)
281+
# Disable the telemetry writer so no events/metrics/logs are queued during process shutdown
282+
telemetry_writer.disable()
277283

278284
try:
279285
self._writer.stop(timeout)

ddtrace/internal/telemetry/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,11 @@ def _excepthook(tp, value, root_traceback):
4949
error_msg = "{}:{} {}".format(filename, lineno, str(value))
5050
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)
5151

52-
if telemetry_writer.started is False:
52+
if not telemetry_writer.started:
5353
telemetry_writer._app_started_event(False)
5454
telemetry_writer._app_dependencies_loaded_event()
5555

5656
telemetry_writer.app_shutdown()
57-
telemetry_writer.disable()
5857

5958
return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback)
6059

ddtrace/internal/telemetry/writer.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def __init__(self, is_periodic=True):
189189
self._error = (0, "") # type: Tuple[int, str]
190190
self._namespace = MetricNamespace()
191191
self._logs = set() # type: Set[Dict[str, Any]]
192-
self._disabled = False
192+
self._enabled = config._telemetry_enabled
193193
self._forked = False # type: bool
194194
self._events_queue = [] # type: List[Dict]
195195
self._configuration_queue = {} # type: Dict[str, Dict]
@@ -208,7 +208,7 @@ def enable(self):
208208
Enable the instrumentation telemetry collection service. If the service has already been
209209
activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service.
210210
"""
211-
if not config._telemetry_enabled:
211+
if not self._enabled:
212212
return False
213213

214214
if self.status == ServiceStatus.RUNNING:
@@ -227,7 +227,7 @@ def disable(self):
227227
Disable the telemetry collection service and drop the existing integrations and events
228228
Once disabled, telemetry collection can not be re-enabled.
229229
"""
230-
self._disabled = True
230+
self._enabled = False
231231
self.reset_queues()
232232
if self._is_periodic and self.status is ServiceStatus.RUNNING:
233233
self.stop()
@@ -243,7 +243,7 @@ def add_event(self, payload, payload_type):
243243
:param str payload_type: The payload_type denotes the type of telmetery request.
244244
Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change
245245
"""
246-
if not self._disabled and self.enable():
246+
if self.enable():
247247
event = {
248248
"tracer_time": int(time.time()),
249249
"runtime_id": get_runtime_id(),
@@ -292,7 +292,7 @@ def add_error(self, code, msg, filename, line_number):
292292
def _app_started_event(self, register_app_shutdown=True):
293293
# type: (bool) -> None
294294
"""Sent when TelemetryWriter is enabled or forks"""
295-
if self._forked:
295+
if self._forked or self.started:
296296
# app-started events should only be sent by the main process
297297
return
298298
# List of configurations to be collected
@@ -599,6 +599,7 @@ def periodic(self, force_flush=False):
599599
def app_shutdown(self):
600600
self._app_closing_event()
601601
self.periodic(force_flush=True)
602+
self.disable()
602603

603604
def reset_queues(self):
604605
# type: () -> None
@@ -624,7 +625,8 @@ def _fork_writer(self):
624625
if self.status == ServiceStatus.STOPPED:
625626
return
626627

627-
self.stop(join=False)
628+
if self._is_periodic:
629+
self.stop(join=False)
628630

629631
# Enable writer service in child process to avoid interpreter shutdown
630632
# error in Python 3.12

ddtrace/internal/writer/writer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,9 @@ def _send_payload(self, payload, count, client):
622622
def start(self):
623623
super(AgentWriter, self).start()
624624
try:
625-
telemetry_writer._app_started_event()
626-
telemetry_writer._app_dependencies_loaded_event()
625+
if not telemetry_writer.started:
626+
telemetry_writer._app_started_event()
627+
telemetry_writer._app_dependencies_loaded_event()
627628

628629
# appsec remote config should be enabled/started after the global tracer and configs
629630
# are initialized

tests/appsec/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
@pytest.fixture
1010
def mock_telemetry_lifecycle_writer():
11-
telemetry_writer.disable()
1211
telemetry_writer.enable()
12+
telemetry_writer.reset_queues()
1313
metrics_result = telemetry_writer._namespace._metrics_data
1414
assert len(metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
1515
assert len(metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE_TAG_APPSEC]) == 0
@@ -19,8 +19,8 @@ def mock_telemetry_lifecycle_writer():
1919

2020
@pytest.fixture
2121
def mock_logs_telemetry_lifecycle_writer():
22-
telemetry_writer.disable()
2322
telemetry_writer.enable()
23+
telemetry_writer.reset_queues()
2424
metrics_result = telemetry_writer._logs
2525
assert len(metrics_result) == 0
2626
yield telemetry_writer

tests/telemetry/app.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ def index():
1212
return "OK", 200
1313

1414

15+
@app.route("/start_application")
16+
def starting_app_view():
17+
# We must call app-started before telemetry events can be sent to the agent.
18+
# This endpoint mocks the behavior of the agent writer.
19+
telemetry_writer._app_started_event()
20+
return "OK", 200
21+
22+
1523
@app.route("/count_metric")
1624
def metrics_view():
1725
telemetry_writer.add_count_metric(

tests/telemetry/test_telemetry.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,22 @@ def test_app_started_error_unhandled_exception(test_agent_session, run_python_co
245245
assert "Unable to parse DD_SPAN_SAMPLING_RULES='invalid_rules'" in events[2]["payload"]["error"]["message"]
246246

247247

248+
def test_telemetry_with_raised_exception(test_agent_session, run_python_code_in_subprocess):
249+
env = os.environ.copy()
250+
_, stderr, status, _ = run_python_code_in_subprocess(
251+
"import ddtrace; ddtrace.tracer.trace('moon').finish(); raise Exception('bad_code')", env=env
252+
)
253+
assert status == 1, stderr
254+
assert b"bad_code" in stderr
255+
# Regression test for python3.12 support
256+
assert b"RuntimeError: can't create new thread at interpreter shutdown" not in stderr
257+
258+
# Ensure the expected telemetry events are sent
259+
events = test_agent_session.get_events()
260+
event_types = [event["request_type"] for event in events]
261+
assert event_types == ["generate-metrics", "app-closing", "app-dependencies-loaded", "app-started"]
262+
263+
248264
def test_handled_integration_error(test_agent_session, run_python_code_in_subprocess):
249265
code = """
250266
import logging
@@ -283,17 +299,20 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
283299
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
284300
)
285301

286-
metric_events = [
287-
event
288-
for event in events
289-
if event["request_type"] == "generate-metrics"
290-
and event["payload"]["series"][0]["metric"] == "integration_errors"
291-
]
292-
assert len(metric_events) == 1
293-
assert len(metric_events[0]["payload"]["series"]) == 1
294-
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
295-
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
296-
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
302+
# Get metric containing the integration error
303+
integration_error = {}
304+
for event in events:
305+
if event["request_type"] == "generate-metrics":
306+
for metric in event["payload"]["series"]:
307+
if metric["metric"] == "integration_errors":
308+
integration_error = metric
309+
break
310+
311+
# assert the integration metric has the correct type, count, and tags
312+
assert integration_error
313+
assert integration_error["type"] == "count"
314+
assert integration_error["points"][0][1] == 1
315+
assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]
297316

298317

299318
def test_unhandled_integration_error(test_agent_session, run_python_code_in_subprocess):

tests/telemetry/test_writer.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,6 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
143143
logging.basicConfig()
144144
145145
import ddtrace.auto
146-
147-
from ddtrace.internal.telemetry import telemetry_writer
148-
telemetry_writer.enable()
149-
telemetry_writer.reset_queues()
150-
telemetry_writer._app_started_event()
151-
telemetry_writer.periodic(force_flush=True)
152-
telemetry_writer.disable()
153146
"""
154147

155148
env = os.environ.copy()
@@ -204,10 +197,11 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
204197
assert status == 0, stderr
205198

206199
events = test_agent_session.get_events()
200+
app_started_events = [event for event in events if event["request_type"] == "app-started"]
201+
assert len(app_started_events) == 1
207202

208-
assert len(events) == 1
209-
events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
210-
assert events[0]["payload"]["configuration"] == [
203+
app_started_events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
204+
assert app_started_events[0]["payload"]["configuration"] == [
211205
{"name": "DD_AGENT_HOST", "origin": "unknown", "value": None},
212206
{"name": "DD_AGENT_PORT", "origin": "unknown", "value": None},
213207
{"name": "DD_APPSEC_ENABLED", "origin": "unknown", "value": False},

0 commit comments

Comments
 (0)