Skip to content

Commit bdadd43

Browse files
committed
Dataset export methods
1 parent e6f1e1f commit bdadd43

File tree

8 files changed

+158
-158
lines changed

8 files changed

+158
-158
lines changed

src/crawlee/_cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
cli = typer.Typer(no_args_is_help=True)
2323

2424
template_directory = importlib.resources.files('crawlee') / 'project_template'
25-
with open(str(template_directory / 'cookiecutter.json')) as f:
25+
with (template_directory / 'cookiecutter.json').open() as f:
2626
cookiecutter_json = json.load(f)
2727

2828
crawler_choices = cookiecutter_json['crawler_type']

src/crawlee/_utils/file.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,26 @@
22

33
import asyncio
44
import contextlib
5+
import csv
56
import json
67
import mimetypes
78
import os
89
import re
910
import shutil
1011
from enum import Enum
12+
from logging import getLogger
1113
from typing import TYPE_CHECKING
1214

1315
if TYPE_CHECKING:
16+
from collections.abc import AsyncIterator
1417
from pathlib import Path
15-
from typing import Any
18+
from typing import Any, TextIO
19+
20+
from typing_extensions import Unpack
21+
22+
from crawlee.storages._types import ExportDataCsvKwargs, ExportDataJsonKwargs
23+
24+
logger = getLogger(__name__)
1625

1726

1827
class ContentType(Enum):
@@ -92,3 +101,36 @@ async def json_dumps(obj: Any) -> str:
92101
A string containing the JSON representation of the input object.
93102
"""
94103
return await asyncio.to_thread(json.dumps, obj, ensure_ascii=False, indent=2, default=str)
104+
105+
106+
async def export_json_to_stream(
107+
iterator: AsyncIterator[dict],
108+
dst: TextIO,
109+
**kwargs: Unpack[ExportDataJsonKwargs],
110+
) -> None:
111+
items = [item async for item in iterator]
112+
113+
if items:
114+
json.dump(items, dst, **kwargs)
115+
else:
116+
logger.warning('Attempting to export an empty dataset - no file will be created')
117+
118+
119+
async def export_csv_to_stream(
120+
iterator: AsyncIterator[dict],
121+
dst: TextIO,
122+
**kwargs: Unpack[ExportDataCsvKwargs],
123+
) -> None:
124+
writer = csv.writer(dst, **kwargs)
125+
write_header = True
126+
127+
# Iterate over the dataset and write to CSV.
128+
async for item in iterator:
129+
if not item:
130+
continue
131+
132+
if write_header:
133+
writer.writerow(item.keys())
134+
write_header = False
135+
136+
writer.writerow(item.values())

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 16 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
SendRequestFunction,
3333
)
3434
from crawlee._utils.docs import docs_group
35+
from crawlee._utils.file import export_csv_to_stream, export_json_to_stream
3536
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
3637
from crawlee._utils.wait import wait_for
3738
from crawlee._utils.web import is_status_code_client_error, is_status_code_server_error
@@ -57,7 +58,7 @@
5758
import re
5859
from contextlib import AbstractAsyncContextManager
5960

60-
from crawlee._types import ConcurrencySettings, HttpMethod, JsonSerializable
61+
from crawlee._types import ConcurrencySettings, HttpMethod, JsonSerializable, PushDataKwargs
6162
from crawlee.configuration import Configuration
6263
from crawlee.events import EventManager
6364
from crawlee.http_clients import HttpClient, HttpResponse
@@ -67,7 +68,7 @@
6768
from crawlee.statistics import FinalStatistics
6869
from crawlee.storage_clients import StorageClient
6970
from crawlee.storage_clients.models import DatasetItemsListPage
70-
from crawlee.storages._dataset import ExportDataCsvKwargs, ExportDataJsonKwargs, GetDataKwargs, PushDataKwargs
71+
from crawlee.storages._types import GetDataKwargs
7172

7273
TCrawlingContext = TypeVar('TCrawlingContext', bound=BasicCrawlingContext, default=BasicCrawlingContext)
7374
TStatisticsState = TypeVar('TStatisticsState', bound=StatisticsState, default=StatisticsState)
@@ -655,13 +656,18 @@ async def add_requests(
655656
wait_for_all_requests_to_be_added_timeout=wait_for_all_requests_to_be_added_timeout,
656657
)
657658

658-
async def _use_state(self, default_value: dict[str, JsonSerializable] | None = None) -> dict[str, JsonSerializable]:
659-
store = await self.get_key_value_store()
660-
return await store.get_auto_saved_value(self._CRAWLEE_STATE_KEY, default_value)
659+
async def _use_state(
660+
self,
661+
default_value: dict[str, JsonSerializable] | None = None,
662+
) -> dict[str, JsonSerializable]:
663+
kvs = await self.get_key_value_store()
664+
# TODO:
665+
# return some kvs value
661666

662667
async def _save_crawler_state(self) -> None:
663-
store = await self.get_key_value_store()
664-
await store.persist_autosaved_values()
668+
kvs = await self.get_key_value_store()
669+
# TODO:
670+
# some kvs call
665671

666672
async def get_data(
667673
self,
@@ -705,64 +711,15 @@ async def export_data(
705711
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
706712

707713
path = path if isinstance(path, Path) else Path(path)
708-
destination = path.open('w', newline='')
714+
dst = path.open('w', newline='')
709715

710716
if path.suffix == '.csv':
711-
await dataset.write_to_csv(destination)
717+
await export_csv_to_stream(dataset.iterate(), dst)
712718
elif path.suffix == '.json':
713-
await dataset.write_to_json(destination)
719+
await export_json_to_stream(dataset.iterate(), dst)
714720
else:
715721
raise ValueError(f'Unsupported file extension: {path.suffix}')
716722

717-
async def export_data_csv(
718-
self,
719-
path: str | Path,
720-
*,
721-
dataset_id: str | None = None,
722-
dataset_name: str | None = None,
723-
**kwargs: Unpack[ExportDataCsvKwargs],
724-
) -> None:
725-
"""Export data from a `Dataset` to a CSV file.
726-
727-
This helper method simplifies the process of exporting data from a `Dataset` in csv format. It opens
728-
the specified one and then exports the data based on the provided parameters.
729-
730-
Args:
731-
path: The destination path.
732-
content_type: The output format.
733-
dataset_id: The ID of the `Dataset`.
734-
dataset_name: The name of the `Dataset`.
735-
kwargs: Extra configurations for dumping/writing in csv format.
736-
"""
737-
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
738-
path = path if isinstance(path, Path) else Path(path)
739-
740-
return await dataset.write_to_csv(path.open('w', newline=''), **kwargs)
741-
742-
async def export_data_json(
743-
self,
744-
path: str | Path,
745-
*,
746-
dataset_id: str | None = None,
747-
dataset_name: str | None = None,
748-
**kwargs: Unpack[ExportDataJsonKwargs],
749-
) -> None:
750-
"""Export data from a `Dataset` to a JSON file.
751-
752-
This helper method simplifies the process of exporting data from a `Dataset` in json format. It opens the
753-
specified one and then exports the data based on the provided parameters.
754-
755-
Args:
756-
path: The destination path
757-
dataset_id: The ID of the `Dataset`.
758-
dataset_name: The name of the `Dataset`.
759-
kwargs: Extra configurations for dumping/writing in json format.
760-
"""
761-
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
762-
path = path if isinstance(path, Path) else Path(path)
763-
764-
return await dataset.write_to_json(path.open('w', newline=''), **kwargs)
765-
766723
async def _push_data(
767724
self,
768725
data: JsonSerializable,

src/crawlee/storage_clients/_base/_dataset_client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,22 @@
1414
from crawlee.storage_clients.models import DatasetItemsListPage
1515

1616

17+
# Properties:
18+
# - id
19+
# - name
20+
# - created_at
21+
# - accessed_at
22+
# - modified_at
23+
# - item_count
24+
25+
# Methods:
26+
# - open
27+
# - drop
28+
# - push_data
29+
# - get_data
30+
# - iterate
31+
32+
1733
@docs_group('Abstract classes')
1834
class DatasetClient(ABC):
1935
"""An abstract class for dataset resource clients.

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ async def get_data(
219219
invalid = [arg for arg in unsupported_args if arg not in (False, None)]
220220
if invalid:
221221
logger.warning(
222-
f'The arguments {invalid} of iterate_items are not supported by the {self.__class__.__name__} client.'
222+
f'The arguments {invalid} of get_data are not supported by the {self.__class__.__name__} client.'
223223
)
224224

225225
# If the dataset directory does not exist, log a warning and return an empty page.

src/crawlee/storage_clients/_memory/_dataset_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def get_data(
135135
invalid = [arg for arg in unsupported_args if arg not in (False, None)]
136136
if invalid:
137137
logger.warning(
138-
f'The arguments {invalid} of iterate_items are not supported by the {self.__class__.__name__} client.'
138+
f'The arguments {invalid} of get_data are not supported by the {self.__class__.__name__} client.'
139139
)
140140

141141
total = len(self._records)
@@ -172,7 +172,7 @@ async def iterate(
172172
invalid = [arg for arg in unsupported_args if arg not in (False, None)]
173173
if invalid:
174174
logger.warning(
175-
f'The arguments {invalid} of iterate_items are not supported by the {self.__class__.__name__} client.'
175+
f'The arguments {invalid} of iterate are not supported by the {self.__class__.__name__} client.'
176176
)
177177

178178
items = self._records.copy()

0 commit comments

Comments
 (0)