Skip to content

docs: improve API docs of all crawlers #624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/crawlee/_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,11 @@ class Request(BaseRequestData):
The recommended way to create a new instance is by using the `Request.from_url` constructor, which automatically
generates a unique key and identifier based on the URL and request parameters.

### Usage

```python
from crawlee import Request

request = Request.from_url('https://crawlee.dev')
```
"""
Expand Down
129 changes: 67 additions & 62 deletions src/crawlee/basic_crawler/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,27 @@ class BasicCrawlerOptions(TypedDict, Generic[TCrawlingContext]):


class BasicCrawler(Generic[TCrawlingContext]):
"""Provides a simple framework for parallel crawling of web pages.

The URLs to crawl are fed either from a static list of URLs or from a dynamic queue of URLs enabling recursive
crawling of websites.

`BasicCrawler` is a low-level tool that requires the user to implement the page download and data extraction
functionality themselves. If we want a crawler that already facilitates this functionality, we should consider using
one of its subclasses.
"""A basic web crawler providing a framework for crawling websites.

The `BasicCrawler` provides a low-level functionality for crawling websites, allowing users to define their
own page download and data extraction logic. It is designed mostly to be subclassed by crawlers with specific
purposes. In most cases, you will want to use a more specialized crawler, such as `HttpCrawler`,
`BeautifulSoupCrawler`, `ParselCrawler`, or `PlaywrightCrawler`. If you are an advanced user and want full
control over the crawling process, you can subclass the `BasicCrawler` and implement the request-handling logic
yourself.

The crawling process begins with URLs provided by a `RequestProvider` instance. Each request is then
handled by a user-defined `request_handler` function, which processes the page and extracts the data.

The `BasicCrawler` includes several common features for crawling, such as:
- automatic scaling based on the system resources,
- retries for failed requests,
- session management,
- statistics tracking,
- request routing via labels,
- proxy rotation,
- direct storage interaction helpers,
- and more.
"""

def __init__(
Expand Down Expand Up @@ -517,40 +530,38 @@ async def _push_data(
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
await dataset.push_data(data, **kwargs)

def _should_retry_request(self, crawling_context: BasicCrawlingContext, error: Exception) -> bool:
if crawling_context.request.no_retry:
def _should_retry_request(self, context: BasicCrawlingContext, error: Exception) -> bool:
if context.request.no_retry:
return False

# Do not retry on client errors.
if isinstance(error, HttpStatusCodeError) and is_status_code_client_error(error.status_code):
return False

if isinstance(error, SessionError):
return ((crawling_context.request.session_rotation_count or 0) + 1) < self._max_session_rotations
return ((context.request.session_rotation_count or 0) + 1) < self._max_session_rotations

max_request_retries = crawling_context.request.max_retries
max_request_retries = context.request.max_retries
if max_request_retries is None:
max_request_retries = self._max_request_retries

return (crawling_context.request.retry_count + 1) < max_request_retries
return (context.request.retry_count + 1) < max_request_retries

async def _check_url_after_redirects(
self, crawling_context: TCrawlingContext
) -> AsyncGenerator[TCrawlingContext, None]:
async def _check_url_after_redirects(self, context: TCrawlingContext) -> AsyncGenerator[TCrawlingContext, None]:
"""Invoked at the end of the context pipeline to make sure that the `loaded_url` still matches enqueue_strategy.

This is done to filter out links that redirect outside of the crawled domain.
"""
if crawling_context.request.loaded_url is not None and not self._check_enqueue_strategy(
crawling_context.request.enqueue_strategy,
origin_url=urlparse(crawling_context.request.url),
target_url=urlparse(crawling_context.request.loaded_url),
if context.request.loaded_url is not None and not self._check_enqueue_strategy(
context.request.enqueue_strategy,
origin_url=urlparse(context.request.url),
target_url=urlparse(context.request.loaded_url),
):
raise ContextPipelineInterruptedError(
f'Skipping URL {crawling_context.request.loaded_url} (redirected from {crawling_context.request.url})'
f'Skipping URL {context.request.loaded_url} (redirected from {context.request.url})'
)

yield crawling_context
yield context

def _check_enqueue_strategy(
self,
Expand Down Expand Up @@ -611,19 +622,19 @@ def _check_url_patterns(

async def _handle_request_retries(
self,
crawling_context: TCrawlingContext | BasicCrawlingContext,
context: TCrawlingContext | BasicCrawlingContext,
error: Exception,
) -> None:
request_provider = await self.get_request_provider()
request = crawling_context.request
request = context.request

if self._should_retry_request(crawling_context, error):
if self._should_retry_request(context, error):
request.retry_count += 1
self._statistics.error_tracker.add(error)

if self._error_handler:
try:
new_request = await self._error_handler(crawling_context, error)
new_request = await self._error_handler(context, error)
except Exception as e:
raise UserDefinedErrorHandlerError('Exception thrown in user-defined request error handler') from e
else:
Expand All @@ -633,57 +644,51 @@ async def _handle_request_retries(
await request_provider.reclaim_request(request)
else:
await wait_for(
lambda: request_provider.mark_request_as_handled(crawling_context.request),
lambda: request_provider.mark_request_as_handled(context.request),
timeout=self._internal_timeout,
timeout_message='Marking request as handled timed out after '
f'{self._internal_timeout.total_seconds()} seconds',
logger=self._logger,
max_retries=3,
)
await self._handle_failed_request(crawling_context, error)
await self._handle_failed_request(context, error)
self._statistics.record_request_processing_failure(request.id or request.unique_key)

async def _handle_request_error(
self,
crawling_context: TCrawlingContext | BasicCrawlingContext,
error: Exception,
) -> None:
async def _handle_request_error(self, context: TCrawlingContext | BasicCrawlingContext, error: Exception) -> None:
try:
crawling_context.request.state = RequestState.ERROR_HANDLER
context.request.state = RequestState.ERROR_HANDLER

await wait_for(
partial(self._handle_request_retries, crawling_context, error),
partial(self._handle_request_retries, context, error),
timeout=self._internal_timeout,
timeout_message='Handling request failure timed out after '
f'{self._internal_timeout.total_seconds()} seconds',
logger=self._logger,
)

crawling_context.request.state = RequestState.DONE
context.request.state = RequestState.DONE
except UserDefinedErrorHandlerError:
crawling_context.request.state = RequestState.ERROR
context.request.state = RequestState.ERROR
raise
except Exception as secondary_error:
self._logger.exception(
'An exception occurred during handling of failed request. This places the crawler '
'and its underlying storages into an unknown state and crawling will be terminated.',
exc_info=secondary_error,
)
crawling_context.request.state = RequestState.ERROR
context.request.state = RequestState.ERROR
raise

if crawling_context.session:
crawling_context.session.mark_bad()
if context.session:
context.session.mark_bad()

async def _handle_failed_request(
self, crawling_context: TCrawlingContext | BasicCrawlingContext, error: Exception
) -> None:
async def _handle_failed_request(self, context: TCrawlingContext | BasicCrawlingContext, error: Exception) -> None:
self._logger.exception('Request failed and reached maximum retries', exc_info=error)
self._statistics.error_tracker.add(error)

if self._failed_request_handler:
try:
await self._failed_request_handler(crawling_context, error)
await self._failed_request_handler(context, error)
except Exception as e:
raise UserDefinedErrorHandlerError('Exception thrown in user-defined failed request handler') from e

Expand Down Expand Up @@ -803,7 +808,7 @@ async def __run_task_function(self) -> None:
proxy_info = await self._get_proxy_info(request, session)
result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store)

crawling_context = BasicCrawlingContext(
context = BasicCrawlingContext(
request=request,
session=session,
proxy_info=proxy_info,
Expand All @@ -821,17 +826,17 @@ async def __run_task_function(self) -> None:
request.state = RequestState.REQUEST_HANDLER

await wait_for(
lambda: self.__run_request_handler(crawling_context),
lambda: self.__run_request_handler(context),
timeout=self._request_handler_timeout,
timeout_message='Request handler timed out after '
f'{self._request_handler_timeout.total_seconds()} seconds',
logger=self._logger,
)

await self._commit_request_handler_result(crawling_context, result)
await self._commit_request_handler_result(context, result)

await wait_for(
lambda: request_provider.mark_request_as_handled(crawling_context.request),
lambda: request_provider.mark_request_as_handled(context.request),
timeout=self._internal_timeout,
timeout_message='Marking request as handled timed out after '
f'{self._internal_timeout.total_seconds()} seconds',
Expand All @@ -841,8 +846,8 @@ async def __run_task_function(self) -> None:

request.state = RequestState.DONE

if crawling_context.session:
crawling_context.session.mark_good()
if context.session:
context.session.mark_good()

self._statistics.record_request_processing_finish(statistics_id)

Expand All @@ -858,28 +863,28 @@ async def __run_task_function(self) -> None:
await self._handle_request_error(primary_error.crawling_context, primary_error.wrapped_exception)

except SessionError as session_error:
if not crawling_context.session:
if not context.session:
raise RuntimeError('SessionError raised in a crawling context without a session') from session_error

if self._error_handler:
await self._error_handler(crawling_context, session_error)
await self._error_handler(context, session_error)

if self._should_retry_request(crawling_context, session_error):
if self._should_retry_request(context, session_error):
self._logger.warning('Encountered a session error, rotating session and retrying')

crawling_context.session.retire()
context.session.retire()

if crawling_context.request.session_rotation_count is None:
crawling_context.request.session_rotation_count = 0
crawling_context.request.session_rotation_count += 1
if context.request.session_rotation_count is None:
context.request.session_rotation_count = 0
context.request.session_rotation_count += 1

await request_provider.reclaim_request(request)
self._statistics.error_tracker_retry.add(session_error)
else:
self._logger.exception('Request failed and reached maximum retries', exc_info=session_error)

await wait_for(
lambda: request_provider.mark_request_as_handled(crawling_context.request),
lambda: request_provider.mark_request_as_handled(context.request),
timeout=self._internal_timeout,
timeout_message='Marking request as handled timed out after '
f'{self._internal_timeout.total_seconds()} seconds',
Expand All @@ -894,7 +899,7 @@ async def __run_task_function(self) -> None:
self._logger.debug('The context pipeline was interrupted', exc_info=interrupted_error)

await wait_for(
lambda: request_provider.mark_request_as_handled(crawling_context.request),
lambda: request_provider.mark_request_as_handled(context.request),
timeout=self._internal_timeout,
timeout_message='Marking request as handled timed out after '
f'{self._internal_timeout.total_seconds()} seconds',
Expand All @@ -907,7 +912,7 @@ async def __run_task_function(self) -> None:
'An exception occurred during the initialization of crawling context',
exc_info=initialization_error,
)
await self._handle_request_error(crawling_context, initialization_error.wrapped_exception)
await self._handle_request_error(context, initialization_error.wrapped_exception)

except Exception as internal_error:
self._logger.exception(
Expand All @@ -917,5 +922,5 @@ async def __run_task_function(self) -> None:
)
raise

async def __run_request_handler(self, crawling_context: BasicCrawlingContext) -> None:
await self._context_pipeline(crawling_context, self.router)
async def __run_request_handler(self, context: BasicCrawlingContext) -> None:
await self._context_pipeline(context, self.router)
Loading
Loading