Skip to content

Commit 1ac0158

Browse files
DylanRussellxrmx
andauthored
Prevent recursive logging issue in SimpleLogRecordProcessor.on_emit (#4799)
* Initial commit * Make changes * Make changes to approach * Make more changes * windows is failing but not sure why * Fix bug on windows.. * Commit changes * Update opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py Co-authored-by: Riccardo Magliocchetti <[email protected]> * Respond to comment * remove duplicate filter from propagate false logger * clarify comment * make changes * Apply suggestion from @xrmx Co-authored-by: Riccardo Magliocchetti <[email protected]> * Update opentelemetry-sdk/tests/logs/test_export.py * merge --------- Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 5e3b057 commit 1ac0158

File tree

6 files changed

+89
-49
lines changed

6 files changed

+89
-49
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
## Unreleased
1414

1515
- `opentelemetry-sdk`: Fix the type hint of the `_metrics_data` property to allow `None`
16-
([#4837](https://github.com/open-telemetry/opentelemetry-python/pull/4837)
16+
([#4837](https://github.com/open-telemetry/opentelemetry-python/pull/4837)).
1717
- Regenerate opentelemetry-proto code with v1.9.0 release
1818
([#4840](https://github.com/open-telemetry/opentelemetry-python/pull/4840))
1919
- Add python 3.14 support
2020
([#4798](https://github.com/open-telemetry/opentelemetry-python/pull/4798))
2121
- Silence events API warnings for internal users
2222
([#4847](https://github.com/open-telemetry/opentelemetry-python/pull/4847))
23+
- Prevent possible endless recursion from happening in `SimpleLogRecordProcessor.on_emit`,
24+
([#4799](https://github.com/open-telemetry/opentelemetry-python/pull/4799)).
2325

2426
## Version 1.39.0/0.60b0 (2025-12-03)
2527

opentelemetry-api/test-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ wrapt==1.16.0
1212
zipp==3.20.2
1313
-e opentelemetry-sdk
1414
-e opentelemetry-semantic-conventions
15-
-e tests/opentelemetry-test-utils
1615
-e opentelemetry-api
16+
-e tests/opentelemetry-test-utils

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import enum
1818
import logging
1919
import sys
20+
import traceback
2021
from os import environ, linesep
2122
from typing import IO, Callable, Optional, Sequence
2223

@@ -52,6 +53,9 @@
5253
_logger = logging.getLogger(__name__)
5354
_logger.addFilter(DuplicateFilter())
5455

56+
_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false")
57+
_propagate_false_logger.propagate = False
58+
5559

5660
class LogRecordExportResult(enum.Enum):
5761
SUCCESS = 0
@@ -145,11 +149,33 @@ def __init__(self, exporter: LogRecordExporter):
145149
self._shutdown = False
146150

147151
def on_emit(self, log_record: ReadWriteLogRecord):
148-
if self._shutdown:
149-
_logger.warning("Processor is already shutdown, ignoring call")
152+
# Prevent entering a recursive loop.
153+
if (
154+
sum(
155+
item.name == "on_emit"
156+
and (
157+
item.filename.endswith("export/__init__.py")
158+
or item.filename.endswith(
159+
r"export\__init__.py"
160+
) # backward slash on windows..
161+
)
162+
for item in traceback.extract_stack()
163+
)
164+
# Recursive depth of 3 is sort of arbitrary. It's possible that an Exporter.export call
165+
# emits a log which returns us to this function, but when we call Exporter.export again the log
166+
# is no longer emitted and we exit this recursive loop naturally, a depth of >3 allows 3
167+
# recursive log calls but exits after because it's likely endless.
168+
> 3
169+
):
170+
_propagate_false_logger.warning(
171+
"SimpleLogRecordProcessor.on_emit has entered a recursive loop. Dropping log and exiting the loop."
172+
)
150173
return
151174
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
152175
try:
176+
if self._shutdown:
177+
_logger.warning("Processor is already shutdown, ignoring call")
178+
return
153179
# Convert ReadWriteLogRecord to ReadableLogRecord before exporting
154180
# Note: resource should not be None at this point as it's set during Logger.emit()
155181
resource = (
@@ -166,7 +192,8 @@ def on_emit(self, log_record: ReadWriteLogRecord):
166192
self._exporter.export((readable_log_record,))
167193
except Exception: # pylint: disable=broad-exception-caught
168194
_logger.exception("Exception while exporting logs.")
169-
detach(token)
195+
finally:
196+
detach(token)
170197

171198
def shutdown(self):
172199
self._shutdown = True

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@
4242
class DuplicateFilter(logging.Filter):
4343
"""Filter that can be applied to internal `logger`'s.
4444
45-
Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a
46-
recursion depth exceeded issue in cases where logging itself results in an exception."""
45+
Currently applied to `logger`s on the export logs path to prevent endlessly logging the same log
46+
in cases where logging itself is failing."""
4747

4848
def filter(self, record):
4949
current_log = (
@@ -81,6 +81,10 @@ def shutdown(self):
8181
raise NotImplementedError
8282

8383

84+
_logger = logging.getLogger(__name__)
85+
_logger.addFilter(DuplicateFilter())
86+
87+
8488
class BatchProcessor(Generic[Telemetry]):
8589
"""This class can be used with exporter's that implement the above
8690
Exporter interface to buffer and send telemetry in batch through
@@ -111,8 +115,6 @@ def __init__(
111115
target=self.worker,
112116
daemon=True,
113117
)
114-
self._logger = logging.getLogger(__name__)
115-
self._logger.addFilter(DuplicateFilter())
116118
self._exporting = exporting
117119

118120
self._shutdown = False
@@ -189,20 +191,20 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
189191
]
190192
)
191193
except Exception: # pylint: disable=broad-exception-caught
192-
self._logger.exception(
194+
_logger.exception(
193195
"Exception while exporting %s.", self._exporting
194196
)
195197
detach(token)
196198

197-
# Do not add any logging.log statements to this function, they can be being routed back to this `emit` function,
198-
# resulting in endless recursive calls that crash the program.
199-
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
200199
def emit(self, data: Telemetry) -> None:
201200
if self._shutdown:
201+
_logger.info("Shutdown called, ignoring %s.", self._exporting)
202202
return
203203
if self._pid != os.getpid():
204204
self._bsp_reset_once.do_once(self._at_fork_reinit)
205-
# This will drop a log from the right side if the queue is at _max_queue_length.
205+
if len(self._queue) == self._max_queue_size:
206+
_logger.warning("Queue full, dropping %s.", self._exporting)
207+
# This will drop a log from the right side if the queue is at _max_queue_size.
206208
self._queue.appendleft(data)
207209
if len(self._queue) >= self._max_export_batch_size:
208210
self._worker_awaken.set()

opentelemetry-sdk/test-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ tomli==2.0.1
1111
typing_extensions==4.10.0
1212
wrapt==1.16.0
1313
zipp==3.19.2
14-
-e tests/opentelemetry-test-utils
1514
-e opentelemetry-api
15+
-e tests/opentelemetry-test-utils
1616
-e opentelemetry-semantic-conventions
1717
-e opentelemetry-sdk

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
# pylint: disable=protected-access
1616
import logging
1717
import os
18+
import sys
1819
import time
1920
import unittest
2021
from concurrent.futures import ( # pylint: disable=no-name-in-module
2122
ThreadPoolExecutor,
2223
)
23-
from sys import version_info
24+
from typing import Sequence
2425
from unittest.mock import Mock, patch
2526

2627
from pytest import mark
@@ -38,6 +39,7 @@
3839
BatchLogRecordProcessor,
3940
ConsoleLogRecordExporter,
4041
InMemoryLogRecordExporter,
42+
LogRecordExporter,
4143
SimpleLogRecordProcessor,
4244
)
4345
from opentelemetry.sdk.environment_variables import (
@@ -63,6 +65,46 @@
6365

6466

6567
class TestSimpleLogRecordProcessor(unittest.TestCase):
68+
@mark.skipif(
69+
(3, 13, 0) <= sys.version_info <= (3, 13, 5),
70+
reason="This will fail on 3.13.5 due to https://github.com/python/cpython/pull/131812 which prevents recursive log messages but was rolled back in 3.13.6.",
71+
)
72+
def test_simple_log_record_processor_doesnt_enter_recursive_loop(self):
73+
class Exporter(LogRecordExporter):
74+
def shutdown(self):
75+
pass
76+
77+
def export(self, batch: Sequence[ReadableLogRecord]):
78+
logger = logging.getLogger("any logger..")
79+
logger.warning("Something happened.")
80+
81+
exporter = Exporter()
82+
logger_provider = LoggerProvider()
83+
logger_provider.add_log_record_processor(
84+
SimpleLogRecordProcessor(exporter)
85+
)
86+
root_logger = logging.getLogger()
87+
# Add the OTLP handler to the root logger like is done in auto instrumentation.
88+
# This causes logs generated from within SimpleLogRecordProcessor.on_emit (such as the above log in export)
89+
# to be sent back to SimpleLogRecordProcessor.on_emit
90+
handler = LoggingHandler(
91+
level=logging.DEBUG, logger_provider=logger_provider
92+
)
93+
root_logger.addHandler(handler)
94+
propagate_false_logger = logging.getLogger(
95+
"opentelemetry.sdk._logs._internal.export.propagate.false"
96+
)
97+
# This would cause a max recursion depth exceeded error..
98+
try:
99+
with self.assertLogs(propagate_false_logger) as cm:
100+
root_logger.warning("hello!")
101+
assert (
102+
"SimpleLogRecordProcessor.on_emit has entered a recursive loop"
103+
in cm.output[0]
104+
)
105+
finally:
106+
root_logger.removeHandler(handler)
107+
66108
def test_simple_log_record_processor_default_level(self):
67109
exporter = InMemoryLogRecordExporter()
68110
logger_provider = LoggerProvider()
@@ -406,39 +448,6 @@ def bulk_emit(num_emit):
406448
time.sleep(2)
407449
assert len(exporter.get_finished_logs()) == total_expected_logs
408450

409-
@mark.skipif(
410-
version_info < (3, 10),
411-
reason="assertNoLogs only exists in python 3.10+.",
412-
)
413-
def test_logging_lib_not_invoked_in_batch_log_record_emit(self): # pylint: disable=no-self-use
414-
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
415-
exporter = Mock()
416-
processor = BatchLogRecordProcessor(exporter)
417-
logger_provider = LoggerProvider(
418-
resource=SDKResource.create(
419-
{
420-
"service.name": "shoppingcart",
421-
"service.instance.id": "instance-12",
422-
}
423-
),
424-
)
425-
logger_provider.add_log_record_processor(processor)
426-
handler = LoggingHandler(
427-
level=logging.INFO, logger_provider=logger_provider
428-
)
429-
sdk_logger = logging.getLogger("opentelemetry.sdk")
430-
# Attach OTLP handler to SDK logger
431-
sdk_logger.addHandler(handler)
432-
# If `emit` calls logging.log then this test will throw a maximum recursion depth exceeded exception and fail.
433-
try:
434-
with self.assertNoLogs(sdk_logger, logging.NOTSET):
435-
processor.on_emit(EMPTY_LOG)
436-
processor.shutdown()
437-
with self.assertNoLogs(sdk_logger, logging.NOTSET):
438-
processor.on_emit(EMPTY_LOG)
439-
finally:
440-
sdk_logger.removeHandler(handler)
441-
442451
def test_args(self):
443452
exporter = InMemoryLogRecordExporter()
444453
log_record_processor = BatchLogRecordProcessor(

0 commit comments

Comments
 (0)