Skip to content

chore(telemetry): ensure instrumentation telemetry is compatible with python 3.12 #6859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a1b4707
Drop support for Python < 3.7
Yun-Kim Aug 8, 2023
eb6006d
Add 3.12 testing support, upgrade riot
Yun-Kim Aug 10, 2023
22501bf
Fix test suites
Yun-Kim Aug 10, 2023
9b9e5ea
fix: start telemetry writer early
majorgreys Aug 24, 2023
11e7a5f
skip inject hooks
majorgreys Aug 30, 2023
0d91f94
disable dynamic instrumentation
majorgreys Aug 30, 2023
94ab049
fix telemetry writer after fork
majorgreys Aug 31, 2023
1771613
fix forking test
majorgreys Aug 31, 2023
c98489c
fix telemetry request tests
majorgreys Aug 31, 2023
0039294
Remove uwsgi dependency for 3.12 profiler testing
Yun-Kim Aug 31, 2023
9869ad0
appsec iast is not supported
Yun-Kim Aug 31, 2023
a3ffa24
Update profiler tests to use latest gevent
Yun-Kim Aug 31, 2023
70a812e
fix profiler tests
P403n1x87 Sep 4, 2023
72d9b1d
Merge remote-tracking branch 'upstream/1.x' into yunkim/add-py-312
P403n1x87 Sep 5, 2023
a2ddd44
Revert "disable dynamic instrumentation", still enable for 3.12
Yun-Kim Sep 5, 2023
a11af21
Drop support for Python < 3.7
Yun-Kim Aug 8, 2023
83be219
re-enable bytecode-based tests
P403n1x87 Sep 5, 2023
ed63b6a
Merge branch '2.x-dev' into yunkim/add-py-312
Yun-Kim Sep 5, 2023
0e38228
re-add removed pyXY files as they are still applicable to
Yun-Kim Sep 5, 2023
8a706d2
Fix riot lockfiles, remove debug print line
Yun-Kim Sep 5, 2023
919f560
Remove indentations in release note
Yun-Kim Sep 5, 2023
fdc19cc
profiling merge fix
Yun-Kim Sep 5, 2023
146b009
chore: remove deprecated items (#6580)
Yun-Kim Sep 5, 2023
140165f
Merge branch '2.x-dev' into yunkim/add-py-312
Yun-Kim Sep 5, 2023
9ffee96
skip iast test
majorgreys Sep 6, 2023
fcb6845
cleanup py35 and py36 tests
majorgreys Sep 6, 2023
aea8492
skip failing appsec iast test
emmettbutler Sep 7, 2023
3a4c4f4
fix unmerged riot requirements files
emmettbutler Sep 7, 2023
4bf573c
merge previous pyXY tests into parent test files
Yun-Kim Sep 7, 2023
81a608c
include setuptools in 3.12 deps
emmettbutler Sep 7, 2023
5768e82
Revert "include setuptools in 3.12 deps"
emmettbutler Sep 7, 2023
608f05a
Fix merging pyXY files to parent test files (aiopg, debugger, internal)
Yun-Kim Sep 7, 2023
998ac2f
add setuptools to pyproject
majorgreys Sep 7, 2023
d4400c0
skip test that uses dis in 3.12
emmettbutler Sep 7, 2023
a6f7ede
Revert setuptools being added as manual dep in debugger venv
Yun-Kim Sep 7, 2023
423620e
disable for bytecode
majorgreys Sep 7, 2023
653fc27
Revert "disable for bytecode"
majorgreys Sep 7, 2023
c9bcb33
add basic 3.12 support releasenote
ZStriker19 Sep 7, 2023
678abc9
Revert "fix: start telemetry writer early"
mabdinur Sep 7, 2023
98c3dd3
chore(telemetry): start telemetry as early as possible and delay send…
mabdinur Sep 8, 2023
b8e4221
fix fork errors in py3.12
mabdinur Sep 8, 2023
6c86ea3
Merge branch 'yunkim/add-py-312' into munir/fix-telemetry-2.x
majorgreys Sep 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ jobs:
workflows:
setup:
jobs:
- setup
- setup
5 changes: 5 additions & 0 deletions ddtrace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@


telemetry.install_excepthook()
# In order to support 3.12, we start the writer upon initialization.
# See https://github.com/python/cpython/pull/104826.
# Telemetry events will only be sent after the `app-started` is queued.
# This will occur when the agent writer starts.
telemetry.telemetry_writer.enable()

from ._monkey import patch # noqa: E402
from ._monkey import patch_all # noqa: E402
Expand Down
7 changes: 7 additions & 0 deletions ddtrace/internal/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def _excepthook(tp, value, root_traceback):
error_msg = "{}:{} {}".format(filename, lineno, str(value))
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)

if telemetry_writer.started is False:
telemetry_writer._app_started_event(False)
telemetry_writer._app_dependencies_loaded_event()

telemetry_writer.app_shutdown()
telemetry_writer.disable()

return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback)


Expand Down
6 changes: 3 additions & 3 deletions ddtrace/internal/telemetry/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from typing import List
from typing import Tuple

import ddtrace
from ddtrace.internal.compat import PY3
from ddtrace.internal.constants import DEFAULT_SERVICE_NAME
from ddtrace.internal.packages import get_distributions
from ddtrace.internal.runtime.container import get_container_info
from ddtrace.internal.utils.cache import cached
from ddtrace.version import get_version

from ...settings import _config as config
from ..hostname import get_hostname
Expand Down Expand Up @@ -63,7 +63,7 @@ def _get_application(key):
"env": env or "",
"language_name": "python",
"language_version": _format_version_info(sys.version_info),
"tracer_version": ddtrace.__version__,
"tracer_version": get_version(),
"runtime_name": platform.python_implementation(),
"runtime_version": _format_version_info(sys.implementation.version) if PY3 else "",
"products": _get_products(),
Expand All @@ -88,7 +88,7 @@ def get_application(service, version, env):
def _get_products():
# type: () -> Dict
return {
"appsec": {"version": ddtrace.__version__, "enabled": config._appsec_enabled},
"appsec": {"version": get_version(), "enabled": config._appsec_enabled},
}


Expand Down
21 changes: 7 additions & 14 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ def enable(self):

if self._is_periodic:
self.start()
atexit.register(self.app_shutdown)
return True

self.status = ServiceStatus.RUNNING
Expand Down Expand Up @@ -281,14 +280,18 @@ def add_error(self, code, msg, filename, line_number):
msg = "%s:%s: %s" % (filename, line_number, msg)
self._error = (code, msg)

def _app_started_event(self):
# type: () -> None
def _app_started_event(self, register_app_shutdown=True):
# type: (bool) -> None
"""Sent when TelemetryWriter is enabled or forks"""
if self._forked:
# app-started events should only be sent by the main process
return
# List of configurations to be collected

self.started = True
if register_app_shutdown:
atexit.register(self.app_shutdown)

self.add_configurations(
[
(TELEMETRY_TRACING_ENABLED, config._tracing_enabled, "unknown"),
Expand Down Expand Up @@ -575,15 +578,6 @@ def periodic(self, force_flush=False):
for telemetry_event in telemetry_events:
self._client.send_event(telemetry_event)

def start(self, *args, **kwargs):
# type: (...) -> None
super(TelemetryWriter, self).start(*args, **kwargs)
# Queue app-started event after the telemetry worker thread is running
if self.started is False:
self._app_started_event()
self._app_dependencies_loaded_event()
self.started = True

def app_shutdown(self):
self._app_closing_event()
self.periodic(force_flush=True)
Expand Down Expand Up @@ -616,8 +610,7 @@ def _fork_writer(self):

# Enable writer service in child process to avoid interpreter shutdown
# error in Python 3.12
if sys.version_info >= (3, 12):
self.enable()
self.enable()

def _restart_sequence(self):
self._sequence = itertools.count(1)
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,8 @@ def _send_payload(self, payload, count, client):
def start(self):
super(AgentWriter, self).start()
try:
telemetry_writer.enable()
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()

# appsec remote config should be enabled/started after the global tracer and configs
# are initialized
Expand Down
82 changes: 45 additions & 37 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
def test_enable(test_agent_session, run_python_code_in_subprocess):
code = """
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.service import ServiceStatus

telemetry_writer.enable()

assert telemetry_writer.status == ServiceStatus.RUNNING
assert telemetry_writer._worker is not None
"""

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
Expand All @@ -17,26 +22,10 @@ def test_enable(test_agent_session, run_python_code_in_subprocess):
assert stdout == b"", stderr
assert stderr == b""

events = test_agent_session.get_events()
assert len(events) == 3

# Same runtime id is used
assert events[0]["runtime_id"] == events[1]["runtime_id"]
assert events[0]["request_type"] == "app-closing"
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-started"
assert events[2]["payload"]["error"] == {"code": 0, "message": ""}


@pytest.mark.snapshot
def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess):
"""assert telemetry events are generated after the first trace is flushed to the agent"""
# Using ddtrace-run and/or importing ddtrace alone should not enable telemetry
# Telemetry data should only be sent after the first trace to the agent
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import ddtrace")
assert status == 0, stderr
# No trace and No Telemetry
assert len(test_agent_session.get_events()) == 0

# Submit a trace to the agent in a subprocess
code = 'from ddtrace import tracer; span = tracer.trace("test-telemetry"); span.finish()'
Expand All @@ -58,13 +47,19 @@ def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run
def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
"""assert app-started/app-closing events are only sent in parent process"""
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

import os

from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.telemetry import telemetry_writer

# We have to start before forking since fork hooks are not enabled until after enabling
telemetry_writer.enable()
telemetry_writer._app_started_event()

if os.fork() == 0:
# Send multiple started events to confirm none get sent
Expand All @@ -78,27 +73,29 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess):

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
assert status == 0, stderr
assert stderr == b""
assert stderr == b"", stderr

runtime_id = stdout.strip().decode("utf-8")

requests = test_agent_session.get_requests()

# We expect 2 events from the parent process to get sent, but none from the child process
assert len(requests) == 3
assert len(requests) == 2
# Validate that the runtime id sent for every event is the parent processes runtime id
assert requests[0]["body"]["runtime_id"] == runtime_id
assert requests[0]["body"]["request_type"] == "app-closing"
assert requests[1]["body"]["runtime_id"] == runtime_id
assert requests[1]["body"]["request_type"] == "app-dependencies-loaded"
assert requests[1]["body"]["runtime_id"] == runtime_id
assert requests[2]["body"]["request_type"] == "app-started"
assert requests[2]["body"]["runtime_id"] == runtime_id
assert requests[1]["body"]["request_type"] == "app-started"


def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess):
"""assert app-heartbeat events are only sent in parent process when no other events are queued"""
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

import os

from ddtrace.internal.runtime import get_runtime_id
Expand All @@ -120,7 +117,7 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
assert status == 0, stderr
assert stderr == b""
assert stderr == b"", stderr

runtime_id = stdout.strip().decode("utf-8")

Expand All @@ -138,6 +135,11 @@ def test_heartbeat_interval_configuration(run_python_code_in_subprocess):
env = os.environ.copy()
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "61"
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

from ddtrace import config
assert config._telemetry_heartbeat_interval == 61

Expand All @@ -156,6 +158,11 @@ def test_logs_after_fork(run_python_code_in_subprocess):
# Regression test: telemetry writer should not log an error when a process forks
_, err, status, _ = run_python_code_in_subprocess(
"""
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)

import ddtrace
import logging
import os
Expand All @@ -167,7 +174,7 @@ def test_logs_after_fork(run_python_code_in_subprocess):
)

assert status == 0, err
assert err == b""
assert err == b"", err


def test_app_started_error_handled_exception(test_agent_session, run_python_code_in_subprocess):
Expand Down Expand Up @@ -250,6 +257,9 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro

from ddtrace import patch, tracer
patch(raise_errors=False, sqlite3=True)

# Create a span to start the telemetry writer
tracer.trace("hi").finish()
"""

_, stderr, status, _ = run_python_code_in_subprocess(code)
Expand All @@ -260,15 +270,11 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro

events = test_agent_session.get_events()

assert len(events) == 5
# Same runtime id is used
assert (
events[0]["runtime_id"]
== events[1]["runtime_id"]
== events[2]["runtime_id"]
== events[3]["runtime_id"]
== events[4]["runtime_id"]
)
assert len(events) > 1
for event in events:
# Same runtime id is used
assert event["runtime_id"] == events[0]["runtime_id"]

integrations_events = [event for event in events if event["request_type"] == "app-integrations-change"]

assert len(integrations_events) == 1
Expand All @@ -277,12 +283,14 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
)

metric_events = [event for event in events if event["request_type"] == "generate-metrics"]

metric_events = [
event
for event in events
if event["request_type"] == "generate-metrics"
and event["payload"]["series"][0]["metric"] == "integration_errors"
]
assert len(metric_events) == 1
assert metric_events[0]["payload"]["namespace"] == "tracers"
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors"
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
Expand Down
11 changes: 2 additions & 9 deletions tests/telemetry/test_telemetry_metrics_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import subprocess
import sys
import time

import pytest

Expand All @@ -28,8 +27,6 @@ def _build_env():
def gunicorn_server(telemetry_metrics_enabled="true", token=None):
cmd = ["ddtrace-run", "gunicorn", "-w", "1", "-b", "0.0.0.0:8000", "tests.telemetry.app:app"]
env = _build_env()
env["DD_TELEMETRY_METRICS_ENABLED"] = telemetry_metrics_enabled
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "1.0"
env["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"] = "X-Datadog-Test-Session-Token:{}".format(token)
env["DD_TRACE_AGENT_URL"] = os.environ.get("DD_TRACE_AGENT_URL", "")
env["DD_TRACE_DEBUG"] = "true"
Expand Down Expand Up @@ -90,19 +87,15 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session)
gunicorn_client.get("/count_metric")
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200
# DD_TELEMETRY_HEARTBEAT_INTERVAL is set to 1 second
time.sleep(1)
gunicorn_client.get("/count_metric")
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200

events = test_agent_session.get_events()
metrics = list(filter(lambda event: event["request_type"] == "generate-metrics", events))
assert len(metrics) == 2
assert len(metrics) == 1
assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 2.0
assert metrics[1]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[1]["payload"]["series"][0]["points"][0][1] == 3.0
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5


def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess):
Expand Down
11 changes: 5 additions & 6 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ def test_send_failing_request(mock_status, telemetry_writer):
with httpretty.enabled():
httpretty.register_uri(httpretty.POST, telemetry_writer._client.url, status=mock_status)
with mock.patch("ddtrace.internal.telemetry.writer.log") as log:
# sends failing app-closing event
telemetry_writer.app_shutdown()
# sends failing app-heartbeat event
telemetry_writer.periodic()
# asserts unsuccessful status code was logged
log.debug.assert_called_with(
"failed to send telemetry to the Datadog Agent at %s. response: %s",
Expand All @@ -370,13 +370,11 @@ def test_telemetry_graceful_shutdown(telemetry_writer, test_agent_session, mock_
telemetry_writer.app_shutdown()

events = test_agent_session.get_events()
assert len(events) == 3
assert len(events) == 1

# Reverse chronological order
assert events[0]["request_type"] == "app-closing"
assert events[0] == _get_request_body({}, "app-closing", 3)
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-started"
assert events[0] == _get_request_body({}, "app-closing", 1)


def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_session):
Expand All @@ -385,6 +383,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se

# Ensure telemetry writer is initialized to send periodic events
telemetry_writer._is_periodic = True
telemetry_writer.started = True
# Assert default telemetry interval is 10 seconds and the expected periodic threshold and counts are set
assert telemetry_writer.interval == 10
assert telemetry_writer._periodic_threshold == 5
Expand Down