Skip to content

feat: Add StatusMessageWatcher #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a71ae41
TODO: Figure out hwo to mock response with steram
Pijukatel May 12, 2025
69ff84c
WIP
Pijukatel May 12, 2025
862cacc
Polish spliting of messages and setting the log level
Pijukatel May 13, 2025
753427a
Draft with async implementation and example tests
Pijukatel May 13, 2025
cc0d944
Add `raw=True`
Pijukatel May 14, 2025
cbcabd3
Add chunck processing
Pijukatel May 14, 2025
81577e8
Merge remote-tracking branch 'origin/master' into redirected-actor-logs
Pijukatel May 14, 2025
b9bc44d
Add sync version of the logging.
Pijukatel May 14, 2025
9720327
Finalize, update comments
Pijukatel May 14, 2025
85ead2f
Add `from_start` argument for streaming from stand-by actors
Pijukatel May 15, 2025
4ad39fa
Skip first logs based on datetime of the marker
Pijukatel May 15, 2025
74595f9
Self review.
Pijukatel May 15, 2025
cba571f
Handle bytestream edgecase of chunk containing only half of the multi…
Pijukatel May 15, 2025
02a1eb2
Review comments
Pijukatel May 15, 2025
2674cf2
Remove unnecessary `actor_name` argument
Pijukatel May 16, 2025
2a6f2ec
Update split pattern to deal with multiple times redirected log
Pijukatel May 16, 2025
1263450
Review comment
Pijukatel May 16, 2025
b1338f1
Regenerate `uv.lock` with new version of `uv`
Pijukatel May 16, 2025
669a749
Test data time alignment.
Pijukatel May 16, 2025
737cde9
Add status redirector
Pijukatel May 19, 2025
2914e50
TODO: Finalize tests
Pijukatel May 19, 2025
8fbbffa
Finalize tests.
Pijukatel May 20, 2025
8e70e59
Merge remote-tracking branch 'origin/master' into redirect-status-mes…
Pijukatel May 20, 2025
a3a629e
Update syntax to avoid https://github.com/PyCQA/redbaron/issues/212
Pijukatel May 21, 2025
18f4f51
Update client names in tests to match their type
Pijukatel May 21, 2025
268e568
Review comments
Pijukatel May 28, 2025
335b8c3
Properly set _force_propagate
Pijukatel May 28, 2025
1e5e976
Use whitespace in default redirect logger name instead of `-`
Pijukatel Jun 2, 2025
350fc67
Review comments
Pijukatel Jun 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/apify_client/_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,5 @@ def format(self, record: logging.LogRecord) -> str:
Returns:
Formated log message.
"""
formated_logger_name = f'{Fore.CYAN}[{record.name}]{Style.RESET_ALL} '
return f'{formated_logger_name}-> {record.msg}'
formated_logger_name = f'{Fore.CYAN}[{record.name}]{Style.RESET_ALL}'
return f'{formated_logger_name} -> {record.msg}'
23 changes: 13 additions & 10 deletions src/apify_client/clients/resource_clients/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ def call(
waits indefinitely.
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
will redirect logs to the provided logger.
will redirect logs to the provided logger. The logger is also used to capture status and status message
of the other Actor run.
Returns:
The run object.
Expand All @@ -336,12 +337,11 @@ def call(
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

run_client = self.root_client.run(run_id=started_run['id'])

if logger == 'default':
log_context = run_client.get_streamed_log()
else:
log_context = run_client.get_streamed_log(to_logger=logger)
logger = None

with log_context:
with run_client.get_status_message_watcher(to_logger=logger), run_client.get_streamed_log(to_logger=logger):
return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

def build(
Expand Down Expand Up @@ -722,7 +722,8 @@ async def call(
waits indefinitely.
logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined
default logger will be used. Setting `None` will disable any log propagation. Passing custom logger
will redirect logs to the provided logger.
will redirect logs to the provided logger. The logger is also used to capture status and status message
of the other Actor run.
Returns:
The run object.
Expand All @@ -742,12 +743,14 @@ async def call(
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

run_client = self.root_client.run(run_id=started_run['id'])

if logger == 'default':
log_context = await run_client.get_streamed_log()
else:
log_context = await run_client.get_streamed_log(to_logger=logger)
logger = None

status_redirector = await run_client.get_status_message_watcher(to_logger=logger)
streamed_log = await run_client.get_streamed_log(to_logger=logger)

async with log_context:
async with status_redirector, streamed_log:
return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs)

async def build(
Expand Down
176 changes: 171 additions & 5 deletions src/apify_client/clients/resource_clients/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import logging
import re
import threading
import time
from asyncio import Task
from contextlib import asynccontextmanager, contextmanager
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from threading import Thread
from typing import TYPE_CHECKING, Any, cast

Expand All @@ -23,6 +24,8 @@
import httpx
from typing_extensions import Self

from apify_client.clients import RunClient, RunClientAsync


class LogClient(ResourceClient):
"""Sub-client for manipulating logs."""
Expand Down Expand Up @@ -228,9 +231,9 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non
logs for long-running actors in stand-by.

"""
self._to_logger = to_logger
if self._force_propagate:
to_logger.propagate = True
self._to_logger = to_logger
self._stream_buffer = list[bytes]()
self._split_marker = re.compile(rb'(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)')
self._relevancy_time_limit: datetime | None = None if from_start else datetime.now(tz=timezone.utc)
Expand Down Expand Up @@ -350,13 +353,16 @@ def start(self) -> Task:
self._streaming_task = asyncio.create_task(self._stream_log())
return self._streaming_task

def stop(self) -> None:
async def stop(self) -> None:
"""Stop the streaming task."""
if not self._streaming_task:
raise RuntimeError('Streaming task is not active')

self._streaming_task.cancel()
self._streaming_task = None
try:
await self._streaming_task
except asyncio.CancelledError:
self._streaming_task = None

async def __aenter__(self) -> Self:
"""Start the streaming task within the context. Exiting the context will cancel the streaming task."""
Expand All @@ -367,7 +373,7 @@ async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the streaming task."""
self.stop()
await self.stop()

async def _stream_log(self) -> None:
async with self._log_client.stream(raw=True) as log_stream:
Expand All @@ -378,3 +384,163 @@ async def _stream_log(self) -> None:

# If the stream is finished, then the last part will be also processed.
self._log_buffer_content(include_last_part=True)


class StatusMessageWatcher:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these new classes be exposed publicly? They seem like implementation details to me - you usually create these using helper methods on the resource client, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper methods on clients are convenient constructors for these classes, but the user will interact with them directly calling either start, close or using them as context managers.

(From ActorClient point of view this is indeed implementation detail hidden in the call method, but from RunClient point of view it is actual public return value of one of the public method.)

"""Utility class for logging status messages from another Actor run.

Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged,
especially in cases of frequent status message changes.
"""

_force_propagate = False
# This is final sleep time to try to get the last status and status message of finished Actor run.
# The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the
# final message will be captured, but increases the chances of that.
_final_sleep_time_s = 6

def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None:
"""Initialize `StatusMessageWatcher`.

Args:
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
if self._force_propagate:
to_logger.propagate = True
self._to_logger = to_logger
self._check_period = check_period.total_seconds()
self._last_status_message = ''

def _log_run_data(self, run_data: dict[str, Any] | None) -> bool:
"""Get relevant run data, log them if changed and return `True` if more data is expected.

Args:
run_data: The dictionary that contains the run data.

Returns:
`True` if more data is expected, `False` otherwise.
"""
if run_data is not None:
status = run_data.get('status', 'Unknown status')
status_message = run_data.get('statusMessage', '')
new_status_message = f'Status: {status}, Message: {status_message}'

if new_status_message != self._last_status_message:
self._last_status_message = new_status_message
self._to_logger.info(new_status_message)

return not (run_data.get('isStatusMessageTerminal', False))
return True


class StatusMessageWatcherAsync(StatusMessageWatcher):
"""Async variant of `StatusMessageWatcher` that is logging in task."""

def __init__(
self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
) -> None:
"""Initialize `StatusMessageWatcherAsync`.

Args:
run_client: The client for run that will be used to get a status and message.
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
super().__init__(to_logger=to_logger, check_period=check_period)
self._run_client = run_client
self._logging_task: Task | None = None

def start(self) -> Task:
"""Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method."""
if self._logging_task:
raise RuntimeError('Logging task already active')
self._logging_task = asyncio.create_task(self._log_changed_status_message())
return self._logging_task

async def stop(self) -> None:
"""Stop the logging task."""
if not self._logging_task:
raise RuntimeError('Logging task is not active')

self._logging_task.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid there might be GC-related warnings if you don't await the task (docs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added awaits

try:
await self._logging_task
except asyncio.CancelledError:
self._logging_task = None

async def __aenter__(self) -> Self:
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
self.start()
return self

async def __aexit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the logging task."""
await asyncio.sleep(self._final_sleep_time_s)
await self.stop()

async def _log_changed_status_message(self) -> None:
while True:
run_data = await self._run_client.get()
if not self._log_run_data(run_data):
break
await asyncio.sleep(self._check_period)


class StatusMessageWatcherSync(StatusMessageWatcher):
"""Sync variant of `StatusMessageWatcher` that is logging in thread."""

def __init__(
self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1)
) -> None:
"""Initialize `StatusMessageWatcherSync`.

Args:
run_client: The client for run that will be used to get a status and message.
to_logger: The logger to which the status message will be redirected.
check_period: The period with which the status message will be polled.
"""
super().__init__(to_logger=to_logger, check_period=check_period)
self._run_client = run_client
self._logging_thread: Thread | None = None
self._stop_logging = False

def start(self) -> Thread:
"""Start the logging thread. The caller has to handle any cleanup by manually calling the `stop` method."""
if self._logging_thread:
raise RuntimeError('Logging thread already active')
self._stop_logging = False
self._logging_thread = threading.Thread(target=self._log_changed_status_message)
self._logging_thread.start()
return self._logging_thread

def stop(self) -> None:
"""Signal the _logging_thread thread to stop logging and wait for it to finish."""
if not self._logging_thread:
raise RuntimeError('Logging thread is not active')
time.sleep(self._final_sleep_time_s)
self._stop_logging = True
self._logging_thread.join()
self._logging_thread = None
self._stop_logging = False

def __enter__(self) -> Self:
"""Start the logging task within the context. Exiting the context will cancel the logging task."""
self.start()
return self

def __exit__(
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
"""Cancel the logging task."""
self.stop()

def _log_changed_status_message(self) -> None:
while True:
if not self._log_run_data(self._run_client.get()):
break
if self._stop_logging:
break
time.sleep(self._check_period)
Loading