Skip to content

Commit d4e6068

Browse files
Interrupt exporter retry backoff sleeps when shutdown is called. Update BatchSpan/LogRecordProcessor.shutdown to complete in 30 seconds (#4638)
* Initial commit to add timeout as a parm to export, make retries encompass timeout * Fix lint issues * Fix a bunch of failing style/lint/spellcheck checks * Remove timeout param from the export calls. * Fix flaky windows test ? * Respond to review comments.. * Delete exponential backoff code that is now unused * Add changelog and remove some unused imports.. * fix typo and unit test flaking on windows * Refactor tests, HTTP exporters a bit * Remove unneeded test reqs * Remove gRPC retry config * Tweak backoff calculation * Lint and precommit * Empty commit * Another empty commit * Calculate backoff in 1 place instead of 2 * Update changelog * Update changelog * Make new _common directory in the http exporter for shared code * precommit * Make many changes * Reorder shutdown stuff * Fix merging * Don't join the thread in case we are stuck in an individual export call * Add tests, changelog entry * Update time assertions to satisfy windows.. Fix lint issues * Skip test on windows * Use threading Event instead of sleep loop. * Respond to review comments.. * Pass remaining timeout to shutdown * Run precommit * Change variable names * Switch timeout back to timeout_millis * Update CHANGELOG.md Co-authored-by: Emídio Neto <[email protected]> * Update CHANGELOG.md Co-authored-by: Emídio Neto <[email protected]> * Rename variable * Fix variable name --------- Co-authored-by: Emídio Neto <[email protected]>
1 parent a28b0ca commit d4e6068

File tree

11 files changed

+280
-97
lines changed

11 files changed

+280
-97
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
the OTLP `LogHandler` remains attached to the root logger. Fix a bug that
1414
can cause a deadlock to occur over `logging._lock` in some cases ([#4636](https://github.com/open-telemetry/opentelemetry-python/pull/4636)).
1515

16+
- Update OTLP gRPC/HTTP exporters: calling shutdown will now interrupt exporters that are sleeping
17+
before a retry attempt, and cause them to return failure immediately.
18+
Update BatchSpan/LogRecordProcessors: shutdown will now complete after 30 seconds of trying to finish
19+
exporting any buffered telemetry, instead of continuing to export until all telemetry was exported.
20+
([#4638](https://github.com/open-telemetry/opentelemetry-python/pull/4638)).
21+
1622
## Version 1.35.0/0.56b0 (2025-07-11)
1723

1824
- Update OTLP proto to v1.7 [#4645](https://github.com/open-telemetry/opentelemetry-python/pull/4645).

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from collections.abc import Sequence # noqa: F401
2121
from logging import getLogger
2222
from os import environ
23-
from time import sleep, time
23+
from time import time
2424
from typing import ( # noqa: F401
2525
Any,
2626
Callable,
@@ -289,7 +289,7 @@ def __init__(
289289
)
290290
self._client = self._stub(self._channel)
291291

292-
self._export_lock = threading.Lock()
292+
self._shutdown_in_progress = threading.Event()
293293
self._shutdown = False
294294

295295
@abstractmethod
@@ -309,62 +309,63 @@ def _export(
309309
# FIXME remove this check if the export type for traces
310310
# gets updated to a class that represents the proto
311311
# TracesData and use the code below instead.
312-
with self._export_lock:
313-
deadline_sec = time() + self._timeout
314-
for retry_num in range(_MAX_RETRYS):
315-
try:
316-
self._client.Export(
317-
request=self._translate_data(data),
318-
metadata=self._headers,
319-
timeout=deadline_sec - time(),
312+
deadline_sec = time() + self._timeout
313+
for retry_num in range(_MAX_RETRYS):
314+
try:
315+
self._client.Export(
316+
request=self._translate_data(data),
317+
metadata=self._headers,
318+
timeout=deadline_sec - time(),
319+
)
320+
return self._result.SUCCESS
321+
except RpcError as error:
322+
retry_info_bin = dict(error.trailing_metadata()).get(
323+
"google.rpc.retryinfo-bin"
324+
)
325+
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
326+
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
327+
if retry_info_bin is not None:
328+
retry_info = RetryInfo()
329+
retry_info.ParseFromString(retry_info_bin)
330+
backoff_seconds = (
331+
retry_info.retry_delay.seconds
332+
+ retry_info.retry_delay.nanos / 1.0e9
320333
)
321-
return self._result.SUCCESS
322-
except RpcError as error:
323-
retry_info_bin = dict(error.trailing_metadata()).get(
324-
"google.rpc.retryinfo-bin"
325-
)
326-
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
327-
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
328-
if retry_info_bin is not None:
329-
retry_info = RetryInfo()
330-
retry_info.ParseFromString(retry_info_bin)
331-
backoff_seconds = (
332-
retry_info.retry_delay.seconds
333-
+ retry_info.retry_delay.nanos / 1.0e9
334-
)
335-
if (
336-
error.code() not in _RETRYABLE_ERROR_CODES
337-
or retry_num + 1 == _MAX_RETRYS
338-
or backoff_seconds > (deadline_sec - time())
339-
):
340-
logger.error(
341-
"Failed to export %s to %s, error code: %s",
342-
self._exporting,
343-
self._endpoint,
344-
error.code(),
345-
exc_info=error.code() == StatusCode.UNKNOWN,
346-
)
347-
return self._result.FAILURE
348-
logger.warning(
349-
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
350-
error.code(),
334+
if (
335+
error.code() not in _RETRYABLE_ERROR_CODES
336+
or retry_num + 1 == _MAX_RETRYS
337+
or backoff_seconds > (deadline_sec - time())
338+
or self._shutdown
339+
):
340+
logger.error(
341+
"Failed to export %s to %s, error code: %s",
351342
self._exporting,
352343
self._endpoint,
353-
backoff_seconds,
344+
error.code(),
345+
exc_info=error.code() == StatusCode.UNKNOWN,
354346
)
355-
sleep(backoff_seconds)
347+
return self._result.FAILURE
348+
logger.warning(
349+
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
350+
error.code(),
351+
self._exporting,
352+
self._endpoint,
353+
backoff_seconds,
354+
)
355+
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
356+
if shutdown:
357+
logger.warning("Shutdown in progress, aborting retry.")
358+
break
356359
# Not possible to reach here but the linter is complaining.
357360
return self._result.FAILURE
358361

359362
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
360363
if self._shutdown:
361364
logger.warning("Exporter already shutdown, ignoring call")
362365
return
363-
# wait for the last export if any
364-
self._export_lock.acquire(timeout=timeout_millis / 1e3)
365366
self._shutdown = True
367+
self._shutdown_in_progress.set()
366368
self._channel.close()
367-
self._export_lock.release()
368369

369370
@property
370371
@abstractmethod

exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -325,48 +325,40 @@ def test_shutdown(self):
325325
"Exporter already shutdown, ignoring batch",
326326
)
327327

328-
def test_shutdown_wait_last_export(self):
329-
add_TraceServiceServicer_to_server(
330-
TraceServiceServicerWithExportParams(
331-
StatusCode.OK, optional_export_sleep=1
332-
),
333-
self.server,
334-
)
335-
336-
export_thread = ThreadWithReturnValue(
337-
target=self.exporter.export, args=([self.span],)
338-
)
339-
export_thread.start()
340-
# Wait a bit for exporter to get lock and make export call.
341-
time.sleep(0.25)
342-
# pylint: disable=protected-access
343-
self.assertTrue(self.exporter._export_lock.locked())
344-
self.exporter.shutdown(timeout_millis=3000)
345-
# pylint: disable=protected-access
346-
self.assertTrue(self.exporter._shutdown)
347-
self.assertEqual(export_thread.join(), SpanExportResult.SUCCESS)
348-
349-
def test_shutdown_doesnot_wait_last_export(self):
328+
@unittest.skipIf(
329+
system() == "Windows",
330+
"For gRPC + windows there's some added delay in the RPCs which breaks the assertion over amount of time passed.",
331+
)
332+
def test_shutdown_interrupts_export_retry_backoff(self):
350333
add_TraceServiceServicer_to_server(
351334
TraceServiceServicerWithExportParams(
352-
StatusCode.OK, optional_export_sleep=3
335+
StatusCode.UNAVAILABLE,
353336
),
354337
self.server,
355338
)
356339

357340
export_thread = ThreadWithReturnValue(
358341
target=self.exporter.export, args=([self.span],)
359342
)
360-
export_thread.start()
361-
# Wait for exporter to get lock and make export call.
362-
time.sleep(0.25)
363-
# pylint: disable=protected-access
364-
self.assertTrue(self.exporter._export_lock.locked())
365-
# Set to 1 seconds, so the 3 second server-side delay will not be reached.
366-
self.exporter.shutdown(timeout_millis=1000)
367-
# pylint: disable=protected-access
368-
self.assertTrue(self.exporter._shutdown)
369-
self.assertEqual(export_thread.join(), None)
343+
with self.assertLogs(level=WARNING) as warning:
344+
begin_wait = time.time()
345+
export_thread.start()
346+
# Wait a bit for export to fail and the backoff sleep to start
347+
time.sleep(0.05)
348+
# The code should now be in a 1 second backoff.
349+
# pylint: disable=protected-access
350+
self.assertFalse(self.exporter._shutdown_in_progress.is_set())
351+
self.exporter.shutdown()
352+
self.assertTrue(self.exporter._shutdown_in_progress.is_set())
353+
export_result = export_thread.join()
354+
end_wait = time.time()
355+
self.assertEqual(export_result, SpanExportResult.FAILURE)
356+
# Shutdown should have interrupted the sleep.
357+
self.assertTrue(end_wait - begin_wait < 0.2)
358+
self.assertEqual(
359+
warning.records[1].message,
360+
"Shutdown in progress, aborting retry.",
361+
)
370362

371363
def test_export_over_closed_grpc_channel(self):
372364
# pylint: disable=protected-access

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
import gzip
1616
import logging
1717
import random
18+
import threading
1819
import zlib
1920
from io import BytesIO
2021
from os import environ
21-
from time import sleep, time
22+
from time import time
2223
from typing import Dict, Optional, Sequence
2324

2425
import requests
@@ -77,6 +78,7 @@ def __init__(
7778
compression: Optional[Compression] = None,
7879
session: Optional[requests.Session] = None,
7980
):
81+
self._shutdown_is_occuring = threading.Event()
8082
self._endpoint = endpoint or environ.get(
8183
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
8284
_append_logs_path(
@@ -173,6 +175,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
173175
not _is_retryable(resp)
174176
or retry_num + 1 == _MAX_RETRYS
175177
or backoff_seconds > (deadline_sec - time())
178+
or self._shutdown
176179
):
177180
_logger.error(
178181
"Failed to export logs batch code: %s, reason: %s",
@@ -185,8 +188,10 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:
185188
resp.reason,
186189
backoff_seconds,
187190
)
188-
sleep(backoff_seconds)
189-
# Not possible to reach here but the linter is complaining.
191+
shutdown = self._shutdown_is_occuring.wait(backoff_seconds)
192+
if shutdown:
193+
_logger.warning("Shutdown in progress, aborting retry.")
194+
break
190195
return LogExportResult.FAILURE
191196

192197
def force_flush(self, timeout_millis: float = 10_000) -> bool:
@@ -197,8 +202,9 @@ def shutdown(self):
197202
if self._shutdown:
198203
_logger.warning("Exporter already shutdown, ignoring call")
199204
return
200-
self._session.close()
201205
self._shutdown = True
206+
self._shutdown_is_occuring.set()
207+
self._session.close()
202208

203209

204210
def _compression_from_env() -> Compression:

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
import gzip
1616
import logging
1717
import random
18+
import threading
1819
import zlib
1920
from io import BytesIO
2021
from os import environ
21-
from time import sleep, time
22+
from time import time
2223
from typing import ( # noqa: F401
2324
Any,
2425
Callable,
@@ -120,6 +121,7 @@ def __init__(
120121
| None = None,
121122
preferred_aggregation: dict[type, Aggregation] | None = None,
122123
):
124+
self._shutdown_in_progress = threading.Event()
123125
self._endpoint = endpoint or environ.get(
124126
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
125127
_append_metrics_path(
@@ -223,6 +225,7 @@ def export(
223225
not _is_retryable(resp)
224226
or retry_num + 1 == _MAX_RETRYS
225227
or backoff_seconds > (deadline_sec - time())
228+
or self._shutdown
226229
):
227230
_logger.error(
228231
"Failed to export metrics batch code: %s, reason: %s",
@@ -235,16 +238,19 @@ def export(
235238
resp.reason,
236239
backoff_seconds,
237240
)
238-
sleep(backoff_seconds)
239-
# Not possible to reach here but the linter is complaining.
241+
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
242+
if shutdown:
243+
_logger.warning("Shutdown in progress, aborting retry.")
244+
break
240245
return MetricExportResult.FAILURE
241246

242247
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
243248
if self._shutdown:
244249
_logger.warning("Exporter already shutdown, ignoring call")
245250
return
246-
self._session.close()
247251
self._shutdown = True
252+
self._shutdown_in_progress.set()
253+
self._session.close()
248254

249255
@property
250256
def _exporting(self) -> str:

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
import gzip
1616
import logging
1717
import random
18+
import threading
1819
import zlib
1920
from io import BytesIO
2021
from os import environ
21-
from time import sleep, time
22+
from time import time
2223
from typing import Dict, Optional, Sequence
2324

2425
import requests
@@ -76,6 +77,7 @@ def __init__(
7677
compression: Optional[Compression] = None,
7778
session: Optional[requests.Session] = None,
7879
):
80+
self._shutdown_in_progress = threading.Event()
7981
self._endpoint = endpoint or environ.get(
8082
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
8183
_append_trace_path(
@@ -171,6 +173,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
171173
not _is_retryable(resp)
172174
or retry_num + 1 == _MAX_RETRYS
173175
or backoff_seconds > (deadline_sec - time())
176+
or self._shutdown
174177
):
175178
_logger.error(
176179
"Failed to export span batch code: %s, reason: %s",
@@ -183,16 +186,19 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
183186
resp.reason,
184187
backoff_seconds,
185188
)
186-
sleep(backoff_seconds)
187-
# Not possible to reach here but the linter is complaining.
189+
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
190+
if shutdown:
191+
_logger.warning("Shutdown in progress, aborting retry.")
192+
break
188193
return SpanExportResult.FAILURE
189194

190195
def shutdown(self):
191196
if self._shutdown:
192197
_logger.warning("Exporter already shutdown, ignoring call")
193198
return
194-
self._session.close()
195199
self._shutdown = True
200+
self._shutdown_in_progress.set()
201+
self._session.close()
196202

197203
def force_flush(self, timeout_millis: int = 30000) -> bool:
198204
"""Nothing is buffered in this exporter, so this method does nothing."""

0 commit comments

Comments
 (0)