Skip to content

Commit 7b21b94

Browse files
committed
readers: added use_pyarrow kwarg + tests
1 parent 2f518da commit 7b21b94

File tree

3 files changed

+189
-8
lines changed

3 files changed

+189
-8
lines changed

dlt/common/storages/fsspec_filesystem.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ def open( # noqa: A003
298298
bytes_io,
299299
**text_kwargs,
300300
)
301+
# `FileItemDict` kwarg `fsspec` is `Optional`. If `fsspec=None` this code branch
302+
# will fail.
301303
else:
302304
if "file" in self.fsspec.protocol:
303305
# use native local file path to open file:// uris

dlt/sources/filesystem/readers.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional
1+
from typing import TYPE_CHECKING, Any, Iterable, Iterator, Optional
22

33
from dlt.common import json
44
from dlt.common.typing import copy_sig_any
@@ -10,8 +10,10 @@
1010
__source_name__ = "filesystem"
1111

1212

13+
# NOTE inconsistent kwarg convention across readers `chunk_size` vs. `chunksize`
14+
# snakecased `chunk_size` is the more appropriate Python convention
1315
def _read_csv(
14-
items: Iterator[FileItemDict], chunksize: int = 10000, **pandas_kwargs: Any
16+
items: Iterable[FileItemDict], chunksize: int = 10000, **pandas_kwargs: Any
1517
) -> Iterator[TDataItems]:
1618
"""Reads csv file with Pandas chunk by chunk.
1719
@@ -34,7 +36,9 @@ def _read_csv(
3436
yield df.to_dict(orient="records")
3537

3638

37-
def _read_jsonl(items: Iterator[FileItemDict], chunksize: int = 1000) -> Iterator[TDataItems]:
39+
# NOTE inconsistent kwarg convention across readers `chunk_size` vs. `chunksize`
40+
# snakecased `chunk_size` is the more appropriate Python convention
41+
def _read_jsonl(items: Iterable[FileItemDict], chunksize: int = 1000) -> Iterator[TDataItems]:
3842
"""Reads jsonl file content and extract the data.
3943
4044
Args:
@@ -55,9 +59,12 @@ def _read_jsonl(items: Iterator[FileItemDict], chunksize: int = 1000) -> Iterato
5559
yield lines_chunk
5660

5761

62+
# NOTE inconsistent kwarg convention across readers `chunk_size` vs. `chunksize`
63+
# snakecased `chunk_size` is the more appropriate Python convention
5864
def _read_parquet(
59-
items: Iterator[FileItemDict],
65+
items: Iterable[FileItemDict],
6066
chunksize: int = 1000,
67+
use_pyarrow: bool = False,
6168
) -> Iterator[TDataItems]:
6269
"""Reads parquet file content and extract the data.
6370
@@ -72,12 +79,14 @@ def _read_parquet(
7279
for file_obj in items:
7380
with file_obj.open() as f:
7481
parquet_file = pq.ParquetFile(f)
75-
for rows in parquet_file.iter_batches(batch_size=chunksize):
76-
yield rows.to_pylist()
82+
for batch in parquet_file.iter_batches(batch_size=chunksize):
83+
yield batch if use_pyarrow else batch.to_pylist()
7784

7885

86+
# NOTE inconsistent kwarg convention across readers `chunk_size` vs. `chunksize`
87+
# snakecased `chunk_size` is the more appropriate Python convention
7988
def _read_csv_duckdb(
80-
items: Iterator[FileItemDict],
89+
items: Iterable[FileItemDict],
8190
chunk_size: Optional[int] = 5000,
8291
use_pyarrow: bool = False,
8392
**duckdb_kwargs: Any,
@@ -87,7 +96,7 @@ def _read_csv_duckdb(
8796
Uses DuckDB engine to import and cast CSV data.
8897
8998
Args:
90-
items (Iterator[FileItemDict]): CSV files to read.
99+
items (Iterable[FileItemDict]): CSV files to read.
91100
chunk_size (Optional[int]):
92101
The number of rows to read at once. Defaults to 5000.
93102
use_pyarrow (bool):
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import pathlib
2+
from typing import Any, Iterator
3+
4+
import pytest
5+
import pandas as pd
6+
import pyarrow
7+
from fsspec import AbstractFileSystem
8+
9+
from dlt.common import pendulum, json
10+
from dlt.common.storages import fsspec_filesystem
11+
from dlt.common.storages.fsspec_filesystem import FileItem
12+
from dlt.sources.filesystem import FileItemDict
13+
from dlt.sources.filesystem.readers import _read_csv, _read_csv_duckdb, _read_jsonl, _read_parquet
14+
15+
16+
@pytest.fixture(scope="module")
17+
def data() -> list[dict[str, Any]]:
18+
return [
19+
{"id": 1, "name": "Al"},
20+
{"id": 2, "name": "Bob"},
21+
{"id": 3, "name": "Charle"},
22+
{"id": 4, "name": "Dave"},
23+
{"id": 5, "name": "Eve"},
24+
]
25+
26+
27+
def _fsspec_client(tmp_path: pathlib.Path) -> AbstractFileSystem:
28+
client, _ = fsspec_filesystem(
29+
protocol=str(tmp_path), credentials=None, kwargs={}, client_kwargs={}
30+
)
31+
return client
32+
33+
34+
def _create_parquet_file(data: list[dict[str, Any]], tmp_path: pathlib.Path) -> FileItemDict:
35+
file_name = "data.parquet"
36+
full_file_path = tmp_path / file_name
37+
38+
df = pd.DataFrame(data)
39+
df.to_parquet(full_file_path, engine="pyarrow")
40+
41+
file_item = FileItem(
42+
file_name=file_name,
43+
relative_path=file_name,
44+
file_url=full_file_path.as_uri(),
45+
mime_type="application/parquet",
46+
modification_date=pendulum.DateTime(
47+
2025, 1, 1, 0, 0, 0, 0, tzinfo=pendulum.Timezone("UTC")
48+
),
49+
size_in_bytes=111,
50+
)
51+
52+
return FileItemDict(mapping=file_item, fsspec=_fsspec_client(tmp_path))
53+
54+
55+
def _create_csv_file(data: list[dict[str, Any]], tmp_path: pathlib.Path) -> FileItemDict:
56+
file_name = "data.csv"
57+
full_file_path = tmp_path / file_name
58+
59+
df = pd.DataFrame(data)
60+
df.to_csv(full_file_path, index=False)
61+
62+
file_item = FileItem(
63+
file_name=file_name,
64+
relative_path=file_name,
65+
file_url=full_file_path.as_uri(),
66+
mime_type="text/csv",
67+
modification_date=pendulum.DateTime(
68+
2025, 1, 1, 0, 0, 0, 0, tzinfo=pendulum.Timezone("UTC")
69+
),
70+
size_in_bytes=111,
71+
)
72+
return FileItemDict(mapping=file_item, fsspec=_fsspec_client(tmp_path))
73+
74+
75+
def _create_jsonl_file(data: list[dict[str, Any]], tmp_path: pathlib.Path) -> FileItemDict:
76+
file_name = "data.jsonl"
77+
full_file_path = tmp_path / file_name
78+
79+
with open(full_file_path, "w", encoding="utf-8") as f:
80+
for item in data:
81+
f.write(json.dumps(item) + "\n")
82+
83+
file_item = FileItem(
84+
file_name=file_name,
85+
relative_path=file_name,
86+
file_url=full_file_path.as_uri(),
87+
mime_type="text/jsonl",
88+
modification_date=pendulum.DateTime(
89+
2025, 1, 1, 0, 0, 0, 0, tzinfo=pendulum.Timezone("UTC")
90+
),
91+
size_in_bytes=111,
92+
)
93+
94+
return FileItemDict(mapping=file_item, fsspec=_fsspec_client(tmp_path))
95+
96+
97+
# TODO rewrite the following tests as a parameterized test once `read_` functions
98+
# have a unified interface
99+
# see discussion for ibis: https://github.com/ibis-project/ibis/issues/11459
100+
# see discussion for narwhals: https://github.com/narwhals-dev/narwhals/issues/2930
101+
def test_read_parquet(tmp_path: pathlib.Path, data: list[dict[str, Any]]) -> None:
102+
file_ = _create_parquet_file(data=data, tmp_path=tmp_path)
103+
iterator = _read_parquet([file_])
104+
read_data = list(iterator)
105+
106+
assert isinstance(iterator, Iterator)
107+
assert isinstance(read_data, list) # list of batches
108+
assert isinstance(read_data[0], list) # batch of records
109+
assert isinstance(read_data[0][0], dict) # record
110+
assert read_data == [data]
111+
112+
113+
def test_read_parquet_use_pyarrow(tmp_path: pathlib.Path, data: list[dict[str, Any]]) -> None:
114+
file_ = _create_parquet_file(data=data, tmp_path=tmp_path)
115+
iterator = _read_parquet([file_], use_pyarrow=True)
116+
read_data = list(iterator)
117+
118+
assert isinstance(iterator, Iterator)
119+
assert isinstance(read_data, list) # list of batches
120+
assert isinstance(read_data[0], pyarrow.RecordBatch) # batch of records
121+
assert isinstance(read_data[0][0], pyarrow.Array) # column
122+
assert read_data == [pyarrow.RecordBatch.from_pylist(data)]
123+
124+
125+
def test_read_csv(tmp_path: pathlib.Path, data: list[dict[str, Any]]) -> None:
126+
file_ = _create_csv_file(data=data, tmp_path=tmp_path)
127+
iterator = _read_csv([file_])
128+
read_data = list(iterator)
129+
130+
assert isinstance(iterator, Iterator)
131+
assert isinstance(read_data, list) # list of batches
132+
assert isinstance(read_data[0], list) # batch of records
133+
assert isinstance(read_data[0][0], dict) # record
134+
assert read_data == [data]
135+
136+
137+
def test_read_jsonl(tmp_path: pathlib.Path, data: list[dict[str, Any]]) -> None:
138+
file_ = _create_jsonl_file(data=data, tmp_path=tmp_path)
139+
iterator = _read_jsonl([file_])
140+
read_data = list(iterator)
141+
142+
assert isinstance(iterator, Iterator)
143+
assert isinstance(read_data, list) # list of batches
144+
assert isinstance(read_data[0], list) # batch of records
145+
assert isinstance(read_data[0][0], dict) # record
146+
assert read_data == [data]
147+
148+
149+
def test_read_csv_duckdb(tmp_path: pathlib.Path, data: list[dict[str, Any]]) -> None:
150+
file_ = _create_csv_file(data=data, tmp_path=tmp_path)
151+
iterator = _read_csv_duckdb([file_])
152+
read_data = list(iterator)
153+
154+
assert isinstance(iterator, Iterator)
155+
assert isinstance(read_data, list) # list of batches
156+
assert isinstance(read_data[0], list) # batch of records
157+
assert isinstance(read_data[0][0], dict) # record
158+
assert read_data == [data]
159+
160+
161+
def test_read_csv_duckdb_use_pyarrow(tmp_path: pathlib.Path, data: list[dict[str, Any]]) -> None:
162+
file_ = _create_csv_file(data=data, tmp_path=tmp_path)
163+
iterator = _read_csv_duckdb([file_], use_pyarrow=True)
164+
read_data = list(iterator)
165+
166+
assert isinstance(iterator, Iterator)
167+
assert isinstance(read_data, list) # list of batches
168+
assert isinstance(read_data[0], pyarrow.RecordBatch) # batch of records
169+
assert isinstance(read_data[0][0], pyarrow.Array) # column
170+
assert read_data == [pyarrow.RecordBatch.from_pylist(data)]

0 commit comments

Comments
 (0)