Skip to content

Commit 48ecebf

Browse files
chore(DSM): fix test flake for dsm processor shutdown (#15821)
## Description This is just a smaller version of #15758, so that we can merge the root fix now without interfering with any changes that would come from this PR: #15715. Tests were flaky due to: Race during teardown where processor.shutdown() could be called twice (once by the test fixture and once by global tracer teardown), causing a ServiceStatusError; the new try/except ignores that expected case. ## Testing <!-- Describe your testing strategy or note what tests are included --> ## Risks <!-- Note any risks associated with this change, or "None" if no risks --> ## Additional Notes <!-- Any other information that would be helpful for reviewers -->
1 parent 3ad335e commit 48ecebf

File tree

2 files changed

+24
-30
lines changed

2 files changed

+24
-30
lines changed

tests/contrib/aiokafka/test_aiokafka_dsm.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from ddtrace.internal.datastreams.processor import DataStreamsCtx
1010
from ddtrace.internal.datastreams.processor import PartitionKey
1111
from ddtrace.internal.native import DDSketch
12+
from ddtrace.internal.service import ServiceStatus
13+
from ddtrace.internal.service import ServiceStatusError
1214
from tests.utils import DummyTracer
1315
from tests.utils import override_global_tracer
1416

@@ -48,7 +50,13 @@ def dsm_processor(tracer):
4850

4951
with mock.patch("ddtrace.internal.datastreams.data_streams_processor", return_value=processor):
5052
yield processor
51-
processor.shutdown(timeout=5)
53+
54+
try:
55+
processor.shutdown(timeout=5)
56+
except ServiceStatusError as e:
57+
# Expected: processor already stopped by tracer shutdown during test teardown
58+
if e.current_status == ServiceStatus.RUNNING:
59+
raise
5260

5361

5462
@pytest.mark.asyncio

tests/contrib/kafka/test_kafka_dsm.py

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from ddtrace.internal.datastreams.processor import DataStreamsCtx
1212
from ddtrace.internal.datastreams.processor import PartitionKey
1313
from ddtrace.internal.native import DDSketch
14+
from ddtrace.internal.service import ServiceStatus
15+
from ddtrace.internal.service import ServiceStatusError
1416
from tests.datastreams.test_public_api import MockedTracer
1517

1618

@@ -24,11 +26,24 @@ class CustomError(Exception):
2426
@pytest.fixture
2527
def dsm_processor(tracer):
2628
processor = tracer.data_streams_processor
29+
# Clean up any existing context to prevent test pollution
30+
try:
31+
del processor._current_context.value
32+
except AttributeError:
33+
pass
34+
2735
with mock.patch("ddtrace.internal.datastreams.data_streams_processor", return_value=processor):
2836
yield processor
2937
# flush buckets for the next test run
3038
processor.periodic()
3139

40+
try:
41+
processor.shutdown(timeout=5)
42+
except ServiceStatusError as e:
43+
# Expected: processor already stopped by tracer shutdown during test teardown
44+
if e.current_status == ServiceStatus.RUNNING:
45+
raise
46+
3247

3348
@pytest.mark.parametrize("payload_and_length", [("test", 4), ("你".encode("utf-8"), 3), (b"test2", 5)])
3449
@pytest.mark.parametrize("key_and_length", [("test-key", 8), ("你".encode("utf-8"), 3), (b"t2", 2)])
@@ -44,11 +59,6 @@ def test_data_streams_payload_size(dsm_processor, consumer, producer, kafka_topi
4459
expected_payload_size += len(PROPAGATION_KEY_BASE_64) # Add in header key length
4560
expected_payload_size += DSM_TEST_PATH_HEADER_SIZE # to account for path header we add
4661

47-
try:
48-
del dsm_processor._current_context.value
49-
except AttributeError:
50-
pass
51-
5262
producer.produce(kafka_topic, payload, key=key, headers=test_headers)
5363
producer.flush()
5464
consumer.poll()
@@ -65,10 +75,6 @@ def test_data_streams_payload_size(dsm_processor, consumer, producer, kafka_topi
6575

6676
def test_data_streams_kafka_serializing(dsm_processor, deserializing_consumer, serializing_producer, kafka_topic):
6777
PAYLOAD = bytes("data streams", encoding="utf-8")
68-
try:
69-
del dsm_processor._current_context.value
70-
except AttributeError:
71-
pass
7278
serializing_producer.produce(kafka_topic, value=PAYLOAD, key="test_key_2")
7379
serializing_producer.flush()
7480
message = None
@@ -80,10 +86,6 @@ def test_data_streams_kafka_serializing(dsm_processor, deserializing_consumer, s
8086

8187
def test_data_streams_kafka(dsm_processor, consumer, producer, kafka_topic):
8288
PAYLOAD = bytes("data streams", encoding="utf-8")
83-
try:
84-
del dsm_processor._current_context.value
85-
except AttributeError:
86-
pass
8789
producer.produce(kafka_topic, PAYLOAD, key="test_key_1")
8890
producer.produce(kafka_topic, PAYLOAD, key="test_key_2")
8991
producer.flush()
@@ -127,10 +129,6 @@ def _read_single_message(consumer):
127129

128130
PAYLOAD = bytes("data streams", encoding="utf-8")
129131
consumer = non_auto_commit_consumer
130-
try:
131-
del dsm_processor._current_context.value
132-
except AttributeError:
133-
pass
134132
buckets = dsm_processor._buckets
135133
producer.produce(kafka_topic, PAYLOAD, key="test_key_1")
136134
producer.produce(kafka_topic, PAYLOAD, key="test_key_2")
@@ -170,10 +168,6 @@ def _read_single_message(consumer):
170168

171169
consumer = non_auto_commit_consumer
172170
PAYLOAD = bytes("data streams", encoding="utf-8")
173-
try:
174-
del dsm_processor._current_context.value
175-
except AttributeError:
176-
pass
177171
producer.produce(kafka_topic, PAYLOAD, key="test_key_1")
178172
producer.produce(kafka_topic, PAYLOAD, key="test_key_2")
179173
producer.flush()
@@ -207,10 +201,6 @@ def _read_single_message(consumer):
207201
return message
208202

209203
PAYLOAD = bytes("data streams", encoding="utf-8")
210-
try:
211-
del dsm_processor._current_context.value
212-
except AttributeError:
213-
pass
214204
producer.produce(kafka_topic, PAYLOAD, key="test_key_1")
215205
producer.produce(kafka_topic, PAYLOAD, key="test_key_2")
216206
producer.flush()
@@ -236,10 +226,6 @@ def test_data_streams_kafka_produce_api_compatibility(dsm_processor, consumer, p
236226
kafka_topic = empty_kafka_topic
237227

238228
PAYLOAD = bytes("data streams", encoding="utf-8")
239-
try:
240-
del dsm_processor._current_context.value
241-
except AttributeError:
242-
pass
243229

244230
# All of these should work
245231
producer.produce(kafka_topic)

0 commit comments

Comments
 (0)