diff --git a/src/apify_client/_logging.py b/src/apify_client/_logging.py index e2f08b6f..55d9de8a 100644 --- a/src/apify_client/_logging.py +++ b/src/apify_client/_logging.py @@ -164,5 +164,5 @@ def format(self, record: logging.LogRecord) -> str: Returns: Formated log message. """ - formated_logger_name = f'{Fore.CYAN}[{record.name}]{Style.RESET_ALL} ' - return f'{formated_logger_name}-> {record.msg}' + formated_logger_name = f'{Fore.CYAN}[{record.name}]{Style.RESET_ALL}' + return f'{formated_logger_name} -> {record.msg}' diff --git a/src/apify_client/clients/resource_clients/actor.py b/src/apify_client/clients/resource_clients/actor.py index 8adcd1ab..a2473699 100644 --- a/src/apify_client/clients/resource_clients/actor.py +++ b/src/apify_client/clients/resource_clients/actor.py @@ -317,7 +317,8 @@ def call( waits indefinitely. logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined default logger will be used. Setting `None` will disable any log propagation. Passing custom logger - will redirect logs to the provided logger. + will redirect logs to the provided logger. The logger is also used to capture status and status message + of the other Actor run. Returns: The run object. @@ -336,12 +337,11 @@ def call( return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs) run_client = self.root_client.run(run_id=started_run['id']) + if logger == 'default': - log_context = run_client.get_streamed_log() - else: - log_context = run_client.get_streamed_log(to_logger=logger) + logger = None - with log_context: + with run_client.get_status_message_watcher(to_logger=logger), run_client.get_streamed_log(to_logger=logger): return self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs) def build( @@ -722,7 +722,8 @@ async def call( waits indefinitely. logger: Logger used to redirect logs from the Actor run. Using "default" literal means that a predefined default logger will be used. Setting `None` will disable any log propagation. Passing custom logger - will redirect logs to the provided logger. + will redirect logs to the provided logger. The logger is also used to capture status and status message + of the other Actor run. Returns: The run object. @@ -742,12 +743,14 @@ async def call( return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs) run_client = self.root_client.run(run_id=started_run['id']) + if logger == 'default': - log_context = await run_client.get_streamed_log() - else: - log_context = await run_client.get_streamed_log(to_logger=logger) + logger = None + + status_redirector = await run_client.get_status_message_watcher(to_logger=logger) + streamed_log = await run_client.get_streamed_log(to_logger=logger) - async with log_context: + async with status_redirector, streamed_log: return await self.root_client.run(started_run['id']).wait_for_finish(wait_secs=wait_secs) async def build( diff --git a/src/apify_client/clients/resource_clients/log.py b/src/apify_client/clients/resource_clients/log.py index 3b671d7a..bca8b07a 100644 --- a/src/apify_client/clients/resource_clients/log.py +++ b/src/apify_client/clients/resource_clients/log.py @@ -4,9 +4,10 @@ import logging import re import threading +import time from asyncio import Task from contextlib import asynccontextmanager, contextmanager -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from threading import Thread from typing import TYPE_CHECKING, Any, cast @@ -23,6 +24,8 @@ import httpx from typing_extensions import Self + from apify_client.clients import RunClient, RunClientAsync + class LogClient(ResourceClient): """Sub-client for manipulating logs.""" @@ -228,9 +231,9 @@ def __init__(self, to_logger: logging.Logger, *, from_start: bool = True) -> Non logs for long-running actors in stand-by. """ - self._to_logger = to_logger if self._force_propagate: to_logger.propagate = True + self._to_logger = to_logger self._stream_buffer = list[bytes]() self._split_marker = re.compile(rb'(?:\n|^)(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)') self._relevancy_time_limit: datetime | None = None if from_start else datetime.now(tz=timezone.utc) @@ -350,13 +353,16 @@ def start(self) -> Task: self._streaming_task = asyncio.create_task(self._stream_log()) return self._streaming_task - def stop(self) -> None: + async def stop(self) -> None: """Stop the streaming task.""" if not self._streaming_task: raise RuntimeError('Streaming task is not active') self._streaming_task.cancel() - self._streaming_task = None + try: + await self._streaming_task + except asyncio.CancelledError: + self._streaming_task = None async def __aenter__(self) -> Self: """Start the streaming task within the context. Exiting the context will cancel the streaming task.""" @@ -367,7 +373,7 @@ async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: """Cancel the streaming task.""" - self.stop() + await self.stop() async def _stream_log(self) -> None: async with self._log_client.stream(raw=True) as log_stream: @@ -378,3 +384,163 @@ async def _stream_log(self) -> None: # If the stream is finished, then the last part will be also processed. self._log_buffer_content(include_last_part=True) + + +class StatusMessageWatcher: + """Utility class for logging status messages from another Actor run. + + Status message is logged at fixed time intervals, and there is no guarantee that all messages will be logged, + especially in cases of frequent status message changes. + """ + + _force_propagate = False + # This is final sleep time to try to get the last status and status message of finished Actor run. + # The status and status message can get set on the Actor run with a delay. Sleep time does not guarantee that the + # final message will be captured, but increases the chances of that. + _final_sleep_time_s = 6 + + def __init__(self, *, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=5)) -> None: + """Initialize `StatusMessageWatcher`. + + Args: + to_logger: The logger to which the status message will be redirected. + check_period: The period with which the status message will be polled. + """ + if self._force_propagate: + to_logger.propagate = True + self._to_logger = to_logger + self._check_period = check_period.total_seconds() + self._last_status_message = '' + + def _log_run_data(self, run_data: dict[str, Any] | None) -> bool: + """Get relevant run data, log them if changed and return `True` if more data is expected. + + Args: + run_data: The dictionary that contains the run data. + + Returns: + `True` if more data is expected, `False` otherwise. + """ + if run_data is not None: + status = run_data.get('status', 'Unknown status') + status_message = run_data.get('statusMessage', '') + new_status_message = f'Status: {status}, Message: {status_message}' + + if new_status_message != self._last_status_message: + self._last_status_message = new_status_message + self._to_logger.info(new_status_message) + + return not (run_data.get('isStatusMessageTerminal', False)) + return True + + +class StatusMessageWatcherAsync(StatusMessageWatcher): + """Async variant of `StatusMessageWatcher` that is logging in task.""" + + def __init__( + self, *, run_client: RunClientAsync, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1) + ) -> None: + """Initialize `StatusMessageWatcherAsync`. + + Args: + run_client: The client for run that will be used to get a status and message. + to_logger: The logger to which the status message will be redirected. + check_period: The period with which the status message will be polled. + """ + super().__init__(to_logger=to_logger, check_period=check_period) + self._run_client = run_client + self._logging_task: Task | None = None + + def start(self) -> Task: + """Start the logging task. The caller has to handle any cleanup by manually calling the `stop` method.""" + if self._logging_task: + raise RuntimeError('Logging task already active') + self._logging_task = asyncio.create_task(self._log_changed_status_message()) + return self._logging_task + + async def stop(self) -> None: + """Stop the logging task.""" + if not self._logging_task: + raise RuntimeError('Logging task is not active') + + self._logging_task.cancel() + try: + await self._logging_task + except asyncio.CancelledError: + self._logging_task = None + + async def __aenter__(self) -> Self: + """Start the logging task within the context. Exiting the context will cancel the logging task.""" + self.start() + return self + + async def __aexit__( + self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None + ) -> None: + """Cancel the logging task.""" + await asyncio.sleep(self._final_sleep_time_s) + await self.stop() + + async def _log_changed_status_message(self) -> None: + while True: + run_data = await self._run_client.get() + if not self._log_run_data(run_data): + break + await asyncio.sleep(self._check_period) + + +class StatusMessageWatcherSync(StatusMessageWatcher): + """Sync variant of `StatusMessageWatcher` that is logging in thread.""" + + def __init__( + self, *, run_client: RunClient, to_logger: logging.Logger, check_period: timedelta = timedelta(seconds=1) + ) -> None: + """Initialize `StatusMessageWatcherSync`. + + Args: + run_client: The client for run that will be used to get a status and message. + to_logger: The logger to which the status message will be redirected. + check_period: The period with which the status message will be polled. + """ + super().__init__(to_logger=to_logger, check_period=check_period) + self._run_client = run_client + self._logging_thread: Thread | None = None + self._stop_logging = False + + def start(self) -> Thread: + """Start the logging thread. The caller has to handle any cleanup by manually calling the `stop` method.""" + if self._logging_thread: + raise RuntimeError('Logging thread already active') + self._stop_logging = False + self._logging_thread = threading.Thread(target=self._log_changed_status_message) + self._logging_thread.start() + return self._logging_thread + + def stop(self) -> None: + """Signal the _logging_thread thread to stop logging and wait for it to finish.""" + if not self._logging_thread: + raise RuntimeError('Logging thread is not active') + time.sleep(self._final_sleep_time_s) + self._stop_logging = True + self._logging_thread.join() + self._logging_thread = None + self._stop_logging = False + + def __enter__(self) -> Self: + """Start the logging task within the context. Exiting the context will cancel the logging task.""" + self.start() + return self + + def __exit__( + self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None + ) -> None: + """Cancel the logging task.""" + self.stop() + + def _log_changed_status_message(self) -> None: + while True: + if not self._log_run_data(self._run_client.get()): + break + if self._stop_logging: + break + time.sleep(self._check_period) diff --git a/src/apify_client/clients/resource_clients/run.py b/src/apify_client/clients/resource_clients/run.py index 3297c36f..2b51aaf7 100644 --- a/src/apify_client/clients/resource_clients/run.py +++ b/src/apify_client/clients/resource_clients/run.py @@ -1,9 +1,11 @@ from __future__ import annotations import json +import logging import random import string import time +from datetime import timedelta from typing import TYPE_CHECKING, Any from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields @@ -16,6 +18,8 @@ from apify_client.clients.resource_clients.log import ( LogClient, LogClientAsync, + StatusMessageWatcherAsync, + StatusMessageWatcherSync, StreamedLogAsync, StreamedLogSync, ) @@ -258,7 +262,7 @@ def log(self) -> LogClient: def get_streamed_log(self, to_logger: logging.Logger | None = None, *, from_start: bool = True) -> StreamedLogSync: """Get `StreamedLog` instance that can be used to redirect logs. - `StreamedLog` can be directly called or used as a context manager. + `StreamedLog` can be explicitly started and stopped or used as a context manager. Args: to_logger: `Logger` used for logging the redirected messages. If not provided, a new logger is created @@ -270,14 +274,14 @@ def get_streamed_log(self, to_logger: logging.Logger | None = None, *, from_star `StreamedLog` instance for redirected logs. """ run_data = self.get() - run_id = run_data.get('id', '') if run_data else '' + run_id = f'runId:{run_data.get("id", "")}' if run_data else '' actor_id = run_data.get('actId', '') if run_data else '' actor_data = self.root_client.actor(actor_id=actor_id).get() or {} actor_name = actor_data.get('name', '') if run_data else '' if not to_logger: - name = '-'.join(part for part in (actor_name, run_id) if part) + name = ' '.join(part for part in (actor_name, run_id) if part) to_logger = create_redirect_logger(f'apify.{name}') return StreamedLogSync(log_client=self.log(), to_logger=to_logger, from_start=from_start) @@ -318,6 +322,34 @@ def charge( ), ) + def get_status_message_watcher( + self, to_logger: logging.Logger | None = None, check_period: timedelta = timedelta(seconds=1) + ) -> StatusMessageWatcherSync: + """Get `StatusMessageWatcher` instance that can be used to redirect status and status messages to logs. + + `StatusMessageWatcher` can be explicitly started and stopped or used as a context manager. + + Args: + to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is + created. + check_period: The period with which the status message will be polled. + + Returns: + `StatusMessageWatcher` instance. + """ + run_data = self.get() + run_id = f'runId:{run_data.get("id", "")}' if run_data else '' + + actor_id = run_data.get('actId', '') if run_data else '' + actor_data = self.root_client.actor(actor_id=actor_id).get() or {} + actor_name = actor_data.get('name', '') if run_data else '' + + if not to_logger: + name = ' '.join(part for part in (actor_name, run_id) if part) + to_logger = create_redirect_logger(f'apify.{name}') + + return StatusMessageWatcherSync(run_client=self, to_logger=to_logger, check_period=check_period) + class RunClientAsync(ActorJobBaseClientAsync): """Async sub-client for manipulating a single Actor run.""" @@ -554,7 +586,7 @@ async def get_streamed_log( ) -> StreamedLogAsync: """Get `StreamedLog` instance that can be used to redirect logs. - `StreamedLog` can be directly called or used as a context manager. + `StreamedLog` can be explicitly started and stopped or used as a context manager. Args: to_logger: `Logger` used for logging the redirected messages. If not provided, a new logger is created @@ -566,14 +598,14 @@ async def get_streamed_log( `StreamedLog` instance for redirected logs. """ run_data = await self.get() - run_id = run_data.get('id', '') if run_data else '' + run_id = f'runId:{run_data.get("id", "")}' if run_data else '' actor_id = run_data.get('actId', '') if run_data else '' actor_data = await self.root_client.actor(actor_id=actor_id).get() or {} actor_name = actor_data.get('name', '') if run_data else '' if not to_logger: - name = '-'.join(part for part in (actor_name, run_id) if part) + name = ' '.join(part for part in (actor_name, run_id) if part) to_logger = create_redirect_logger(f'apify.{name}') return StreamedLogAsync(log_client=self.log(), to_logger=to_logger, from_start=from_start) @@ -612,3 +644,33 @@ async def charge( } ), ) + + async def get_status_message_watcher( + self, + to_logger: logging.Logger | None = None, + check_period: timedelta = timedelta(seconds=1), + ) -> StatusMessageWatcherAsync: + """Get `StatusMessageWatcher` instance that can be used to redirect status and status messages to logs. + + `StatusMessageWatcher` can be explicitly started and stopped or used as a context manager. + + Args: + to_logger: `Logger` used for logging the status and status messages. If not provided, a new logger is + created. + check_period: The period with which the status message will be polled. + + Returns: + `StatusMessageWatcher` instance. + """ + run_data = await self.get() + run_id = f'runId:{run_data.get("id", "")}' if run_data else '' + + actor_id = run_data.get('actId', '') if run_data else '' + actor_data = await self.root_client.actor(actor_id=actor_id).get() or {} + actor_name = actor_data.get('name', '') if run_data else '' + + if not to_logger: + name = ' '.join(part for part in (actor_name, run_id) if part) + to_logger = create_redirect_logger(f'apify.{name}') + + return StatusMessageWatcherAsync(run_client=self, to_logger=to_logger, check_period=check_period) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index c77c75a0..88879c9e 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -2,8 +2,8 @@ import json import logging import time -from collections.abc import AsyncIterator, Iterator -from datetime import datetime +from collections.abc import AsyncIterator, Generator, Iterator +from datetime import datetime, timedelta from unittest.mock import patch import httpx @@ -14,7 +14,7 @@ from apify_client import ApifyClient, ApifyClientAsync from apify_client._logging import RedirectLogFormatter -from apify_client.clients.resource_clients.log import StreamedLog +from apify_client.clients.resource_clients.log import StatusMessageWatcher, StreamedLog _MOCKED_API_URL = 'https://example.com' _MOCKED_RUN_ID = 'mocked_run_id' @@ -34,7 +34,7 @@ b'2025-05-13T07:27:14.132Z [apify] DEB', # Chunked log that got split outside of marker b'UG e\n', # part 2 # Already redirected message - b'2025-05-13T07:28:14.132Z [apify.redirect-logger-4U1oAnKau6jpzjUuA] -> 2025-05-13T07:27:14.132Z ACTOR: Pulling\n', + b'2025-05-13T07:28:14.132Z [apify.redirect-logger runId:4U1oAnKau6jpzjUuA] -> 2025-05-13T07:27:14.132Z ACTOR:...\n', ) _EXISTING_LOGS_BEFORE_REDIRECT_ATTACH = 3 @@ -49,40 +49,75 @@ ('2025-05-13T07:26:14.132Z [apify] DEBUG d', logging.DEBUG), ('2025-05-13T07:27:14.132Z [apify] DEBUG e', logging.DEBUG), ( - '2025-05-13T07:28:14.132Z [apify.redirect-logger-4U1oAnKau6jpzjUuA] -> 2025-05-13T07:27:14.132Z ACTOR: Pulling', + '2025-05-13T07:28:14.132Z [apify.redirect-logger runId:4U1oAnKau6jpzjUuA] -> ' + '2025-05-13T07:27:14.132Z ACTOR:...', logging.INFO, ), ) +_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES = ( + ('Status: RUNNING, Message: Initial message', logging.INFO), + *_EXPECTED_MESSAGES_AND_LEVELS, + ('Status: RUNNING, Message: Another message', logging.INFO), + ('Status: SUCCEEDED, Message: Final message', logging.INFO), +) + @pytest.fixture def mock_api() -> None: - actor_runs_responses = iter( - ( - httpx.Response( + def get_responses() -> Generator[httpx.Response, None, None]: + """Simulate actor run that changes status 3 times.""" + for _ in range(5): + yield httpx.Response( content=json.dumps( - {'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.RUNNING}} + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'status': ActorJobStatus.RUNNING, + 'statusMessage': 'Initial message', + 'isStatusMessageTerminal': False, + } + } ), status_code=200, - ), - httpx.Response( + ) + for _ in range(5): + yield httpx.Response( content=json.dumps( - {'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.RUNNING}} + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'status': ActorJobStatus.RUNNING, + 'statusMessage': 'Another message', + 'isStatusMessageTerminal': False, + } + } ), status_code=200, - ), - httpx.Response( + ) + while True: + yield httpx.Response( content=json.dumps( - {'data': {'id': _MOCKED_RUN_ID, 'actId': _MOCKED_ACTOR_ID, 'status': ActorJobStatus.SUCCEEDED}} + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'status': ActorJobStatus.SUCCEEDED, + 'statusMessage': 'Final message', + 'isStatusMessageTerminal': True, + } + } ), status_code=200, - ), - ) - ) + ) + + responses = get_responses() def actor_runs_side_effect(_: httpx.Request) -> httpx.Response: time.sleep(0.1) - return next(actor_runs_responses) + return next(responses) respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect) @@ -129,10 +164,21 @@ def close(self) -> None: @pytest.fixture def propagate_stream_logs() -> None: - StreamedLog._force_propagate = True # Enable propagation of logs to the caplog fixture + # Enable propagation of logs to the caplog fixture + StreamedLog._force_propagate = True + StatusMessageWatcher._force_propagate = True logging.getLogger(f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}').setLevel(logging.DEBUG) +@pytest.fixture +def reduce_final_timeout_for_status_message_redirector() -> None: + """Reduce timeout used by the `StatusMessageWatcher` + + This timeout makes sense on the platform, but in tests it is better to reduce it to speed up the tests. + """ + StatusMessageWatcher._final_sleep_time_s = 2 + + @pytest.mark.parametrize( ('log_from_start', 'expected_log_count'), [ @@ -163,7 +209,7 @@ async def test_redirected_logs_async( with caplog.at_level(logging.DEBUG, logger=logger_name): async with streamed_log: - # Do stuff while the log from the other actor is being redirected to the logs. + # Do stuff while the log from the other Actor is being redirected to the logs. await asyncio.sleep(2) assert len(caplog.records) == expected_log_count @@ -201,7 +247,7 @@ def test_redirected_logs_sync( logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' with caplog.at_level(logging.DEBUG, logger=logger_name), streamed_log: - # Do stuff while the log from the other actor is being redirected to the logs. + # Do stuff while the log from the other Actor is being redirected to the logs. time.sleep(2) assert len(caplog.records) == expected_log_count @@ -215,24 +261,25 @@ async def test_actor_call_redirect_logs_to_default_logger_async( caplog: LogCaptureFixture, mock_api_async: None, # noqa: ARG001, fixture propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture ) -> None: """Test that logs are redirected correctly to the default logger. Caplog contains logs before formatting, so formatting is not included in the test expectations.""" - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' logger = logging.getLogger(logger_name) - run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): - await run_client.call() + await actor_client.call() # Ensure expected handler and formater assert isinstance(logger.handlers[0].formatter, RedirectLogFormatter) assert isinstance(logger.handlers[0], logging.StreamHandler) # Ensure logs are propagated - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records): + assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) + for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): assert expected_message_and_level[0] == record.message assert expected_message_and_level[1] == record.levelno @@ -242,24 +289,25 @@ def test_actor_call_redirect_logs_to_default_logger_sync( caplog: LogCaptureFixture, mock_api_sync: None, # noqa: ARG001, fixture propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture ) -> None: """Test that logs are redirected correctly to the default logger. Caplog contains logs before formatting, so formatting is not included in the test expectations.""" - logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + logger_name = f'apify.{_MOCKED_ACTOR_NAME} runId:{_MOCKED_RUN_ID}' logger = logging.getLogger(logger_name) - run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): - run_client.call() + actor_client.call() # Ensure expected handler and formater assert isinstance(logger.handlers[0].formatter, RedirectLogFormatter) assert isinstance(logger.handlers[0], logging.StreamHandler) # Ensure logs are propagated - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records): + assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) + for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): assert expected_message_and_level[0] == record.message assert expected_message_and_level[1] == record.levelno @@ -271,10 +319,10 @@ async def test_actor_call_no_redirect_logs_async( propagate_stream_logs: None, # noqa: ARG001, fixture ) -> None: logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): - await run_client.call(logger=None) + await actor_client.call(logger=None) assert len(caplog.records) == 0 @@ -286,10 +334,10 @@ def test_actor_call_no_redirect_logs_sync( propagate_stream_logs: None, # noqa: ARG001, fixture ) -> None: logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' - run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): - run_client.call(logger=None) + actor_client.call(logger=None) assert len(caplog.records) == 0 @@ -299,17 +347,18 @@ async def test_actor_call_redirect_logs_to_custom_logger_async( caplog: LogCaptureFixture, mock_api_async: None, # noqa: ARG001, fixture propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture ) -> None: """Test that logs are redirected correctly to the custom logger.""" logger_name = 'custom_logger' logger = logging.getLogger(logger_name) - run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): - await run_client.call(logger=logger) + await actor_client.call(logger=logger) - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records): + assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) + for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): assert expected_message_and_level[0] == record.message assert expected_message_and_level[1] == record.levelno @@ -319,16 +368,66 @@ def test_actor_call_redirect_logs_to_custom_logger_sync( caplog: LogCaptureFixture, mock_api_sync: None, # noqa: ARG001, fixture propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture ) -> None: """Test that logs are redirected correctly to the custom logger.""" logger_name = 'custom_logger' logger = logging.getLogger(logger_name) - run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) + actor_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).actor(actor_id=_MOCKED_ACTOR_ID) with caplog.at_level(logging.DEBUG, logger=logger_name): - run_client.call(logger=logger) + actor_client.call(logger=logger) - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS, caplog.records): + assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) + for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): assert expected_message_and_level[0] == record.message assert expected_message_and_level[1] == record.levelno + + +@respx.mock +async def test_redirect_status_message_async( + *, + caplog: LogCaptureFixture, + mock_api: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test redirected status and status messages.""" + + run_client = ApifyClientAsync(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + status_message_redirector = await run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) + with caplog.at_level(logging.DEBUG, logger=logger_name): + async with status_message_redirector: + # Do stuff while the status from the other Actor is being redirected to the logs. + await asyncio.sleep(3) + + assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' + assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' + assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message' + + +@respx.mock +def test_redirect_status_message_sync( + *, + caplog: LogCaptureFixture, + mock_api: None, # noqa: ARG001, fixture + propagate_stream_logs: None, # noqa: ARG001, fixture + reduce_final_timeout_for_status_message_redirector: None, # noqa: ARG001, fixture +) -> None: + """Test redirected status and status messages.""" + + run_client = ApifyClient(token='mocked_token', api_url=_MOCKED_API_URL).run(run_id=_MOCKED_RUN_ID) + + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + status_message_redirector = run_client.get_status_message_watcher(check_period=timedelta(seconds=0)) + with caplog.at_level(logging.DEBUG, logger=logger_name), status_message_redirector: + # Do stuff while the status from the other Actor is being redirected to the logs. + time.sleep(3) + + assert caplog.records[0].message == 'Status: RUNNING, Message: Initial message' + assert caplog.records[1].message == 'Status: RUNNING, Message: Another message' + assert caplog.records[2].message == 'Status: SUCCEEDED, Message: Final message'