Skip to content

Commit cba571f

Browse files
committed
Handle bytestream edgecase of chunk containing only half of the multibyte character
1 parent 74595f9 commit cba571f

File tree

2 files changed

+26
-17
lines changed

2 files changed

+26
-17
lines changed

src/apify_client/clients/resource_clients/log.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,12 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
231231
self._to_logger = to_logger
232232
if self._force_propagate:
233233
to_logger.propagate = True
234-
self._stream_buffer = list[str]()
235-
self._split_marker = re.compile(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)')
234+
self._stream_buffer = list[bytes]()
235+
self._split_marker = re.compile(rb'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)')
236236
self._relevancy_time_limit: datetime | None = None if from_start else datetime.now(tz=timezone.utc)
237237

238238
def _process_new_data(self, data: bytes) -> None:
239-
new_chunk = data.decode('utf-8')
239+
new_chunk = data
240240
self._stream_buffer.append(new_chunk)
241241
if re.findall(self._split_marker, new_chunk):
242242
# If complete split marker was found in new chunk, then log the buffer.
@@ -248,7 +248,7 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
248248
Log the messages created from the split parts and remove them from buffer.
249249
The last part could be incomplete, and so it can be left unprocessed in the buffer until later.
250250
"""
251-
all_parts = re.split(self._split_marker, ''.join(self._stream_buffer))[1:] # First split is empty string
251+
all_parts = re.split(self._split_marker, b''.join(self._stream_buffer))[1:] # The First split is empty
252252
if include_last_part:
253253
message_markers = all_parts[0::2]
254254
message_contents = all_parts[1::2]
@@ -260,12 +260,14 @@ def _log_buffer_content(self, *, include_last_part: bool = False) -> None:
260260
self._stream_buffer = all_parts[-2:]
261261

262262
for marker, content in zip(message_markers, message_contents):
263+
decoded_marker = marker.decode('utf-8')
264+
decoded_content = content.decode('utf-8')
263265
if self._relevancy_time_limit:
264-
log_time = datetime.fromisoformat(marker.replace('Z', '+00:00'))
266+
log_time = datetime.fromisoformat(decoded_marker.replace('Z', '+00:00'))
265267
if log_time < self._relevancy_time_limit:
266268
# Skip irrelevant logs
267269
continue
268-
message = marker + content
270+
message = decoded_marker + decoded_content
269271
self._to_logger.log(level=self._guess_log_level_from_message(message), msg=message.strip())
270272

271273
@staticmethod

tests/unit/test_logging.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,23 @@
2323
_MOCKED_ACTOR_LOGS = (
2424
b'2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.\n'
2525
b'2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.\n'
26-
b'2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.', # Several logs merged into one message
26+
b'2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.', # Several logs merged into one chunk
27+
b'2025-05-13T07:26:14.132Z [apify] DEBUG \xc3', # Chunked log split in the middle of the multibyte character
28+
b'\xa1', # part 2
2729
b'2025-05-13T07:24:14.132Z [apify] INFO multiline \n log',
2830
b'2025-05-13T07:25:14.132Z [apify] WARNING some warning',
2931
b'2025-05-13T07:26:14.132Z [apify] DEBUG c',
30-
b'2025-05-13T0', # Chunked log that got split in the marker, part 1
31-
b'7:26:14.132Z [apify] DEBUG d' # Chunked log that got split in the marker, part 2
32-
b'2025-05-13T07:26:14.132Z [apify] DEB', # Chunked log that got split outside of marker, part 1
33-
b'UG e', # Chunked log that got split outside of marker, part 1
32+
b'2025-05-13T0', # Chunked log that got split in the marker
33+
b'7:26:14.132Z [apify] DEBUG d' # part 2
34+
b'2025-05-13T07:26:14.132Z [apify] DEB', # Chunked log that got split outside of marker
35+
b'UG e', # part 2
3436
)
3537

3638
_EXPECTED_MESSAGES_AND_LEVELS = (
3739
('2025-05-13T07:24:12.588Z ACTOR: Pulling Docker image of build.', logging.INFO),
3840
('2025-05-13T07:24:12.686Z ACTOR: Creating Docker container.', logging.INFO),
3941
('2025-05-13T07:24:12.745Z ACTOR: Starting Docker container.', logging.INFO),
42+
('2025-05-13T07:26:14.132Z [apify] DEBUG á', logging.DEBUG),
4043
('2025-05-13T07:24:14.132Z [apify] INFO multiline \n log', logging.INFO),
4144
('2025-05-13T07:25:14.132Z [apify] WARNING some warning', logging.WARNING),
4245
('2025-05-13T07:26:14.132Z [apify] DEBUG c', logging.DEBUG),
@@ -115,7 +118,9 @@ def propagate_stream_logs() -> None:
115118
logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG)
116119

117120

118-
@pytest.mark.parametrize(('log_from_start', 'expected_log_count'), [(True, 8), (False, 5)])
121+
@pytest.mark.parametrize(
122+
('log_from_start', 'expected_log_count'), [(True, len(_EXPECTED_MESSAGES_AND_LEVELS)), (False, 6)]
123+
)
119124
@respx.mock
120125
async def test_redirected_logs_async(
121126
*,
@@ -148,7 +153,9 @@ async def test_redirected_logs_async(
148153
assert expected_message_and_level[1] == record.levelno
149154

150155

151-
@pytest.mark.parametrize(('log_from_start', 'expected_log_count'), [(True, 8), (False, 5)])
156+
@pytest.mark.parametrize(
157+
('log_from_start', 'expected_log_count'), [(True, len(_EXPECTED_MESSAGES_AND_LEVELS)), (False, 6)]
158+
)
152159
@respx.mock
153160
def test_redirected_logs_sync(
154161
*,
@@ -201,7 +208,7 @@ async def test_actor_call_redirect_logs_to_default_logger_async(
201208
assert isinstance(logger.handlers[0], logging.StreamHandler)
202209

203210
# Ensure logs are propagated
204-
assert len(caplog.records) == 8
211+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
205212
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
206213
assert expected_message_and_level[0] == record.message
207214
assert expected_message_and_level[1] == record.levelno
@@ -228,7 +235,7 @@ def test_actor_call_redirect_logs_to_default_logger_sync(
228235
assert isinstance(logger.handlers[0], logging.StreamHandler)
229236

230237
# Ensure logs are propagated
231-
assert len(caplog.records) == 8
238+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
232239
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
233240
assert expected_message_and_level[0] == record.message
234241
assert expected_message_and_level[1] == record.levelno
@@ -278,7 +285,7 @@ async def test_actor_call_redirect_logs_to_custom_logger_async(
278285
with caplog.at_level(logging.DEBUG, logger=logger_name):
279286
await run_client.call(logger=logger)
280287

281-
assert len(caplog.records) == 8
288+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
282289
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
283290
assert expected_message_and_level[0] == record.message
284291
assert expected_message_and_level[1] == record.levelno
@@ -298,7 +305,7 @@ def test_actor_call_redirect_logs_to_custom_logger_sync(
298305
with caplog.at_level(logging.DEBUG, logger=logger_name):
299306
run_client.call(logger=logger)
300307

301-
assert len(caplog.records) == 8
308+
assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS)
302309
for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records):
303310
assert expected_message_and_level[0] == record.message
304311
assert expected_message_and_level[1] == record.levelno

0 commit comments

Comments
 (0)