Skip to content

Commit dca6332

Browse files
committed
I/O: Improve CSV file import
1 parent 01ed104 commit dca6332

File tree

3 files changed

+56
-18
lines changed

3 files changed

+56
-18
lines changed

cratedb_toolkit/io/file/csv.py

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class CsvFileAddress:
3131
url: URL
3232
location: str
3333
pipeline: Optional[List[str]] = dataclasses.field(default_factory=list)
34-
batch_size: Optional[int] = DEFAULT_BATCH_SIZE
34+
batch_size: int = DEFAULT_BATCH_SIZE
3535
# TODO: What about other parameters? See `polars.io.csv.functions`.
3636
separator: Optional[str] = DEFAULT_SEPARATOR
3737
quote_char: Optional[str] = DEFAULT_QUOTE_CHAR
@@ -47,11 +47,15 @@ def from_url(cls, url: str) -> "CsvFileAddress":
4747
https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv
4848
"""
4949
url_obj, location = parse_uri(url, "csv")
50+
try:
51+
batch_size = int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE))
52+
except ValueError as ex:
53+
raise ValueError("Invalid value for batch size") from ex
5054
return cls(
5155
url=url_obj,
5256
location=location,
5357
pipeline=url_obj.query_params.getlist("pipe"),
54-
batch_size=int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE)),
58+
batch_size=batch_size,
5559
separator=url_obj.query_params.get("separator", DEFAULT_SEPARATOR),
5660
quote_char=url_obj.query_params.get("quote-char", DEFAULT_QUOTE_CHAR),
5761
)
@@ -81,18 +85,21 @@ def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]:
8185
break
8286
return opts
8387

84-
def load_table(self) -> pl.LazyFrame:
88+
def load_table(self, lazy: bool = True) -> pl.LazyFrame:
8589
"""
8690
Load the CSV file as a Polars LazyFrame.
8791
"""
8892

8993
# Read from data source.
90-
lf = pl.scan_csv(
91-
self.location,
92-
separator=self.separator,
93-
quote_char=self.quote_char,
94-
storage_options=self.storage_options,
95-
)
94+
kwargs = {
95+
"separator": self.separator,
96+
"quote_char": self.quote_char,
97+
"storage_options": self.storage_options,
98+
}
99+
if lazy:
100+
lf = pl.scan_csv(self.location, **kwargs)
101+
else:
102+
lf = pl.read_csv(self.location, **kwargs).lazy()
96103

97104
# Optionally apply transformations.
98105
if self.pipeline:
@@ -118,8 +125,28 @@ def from_csv(source_url, target_url, progress: bool = False) -> bool:
118125
"""
119126
source = CsvFileAddress.from_url(source_url)
120127
logger.info(f"File address: {source.location}")
121-
return polars_to_cratedb(
122-
frame=source.load_table(),
123-
target_url=target_url,
124-
chunk_size=source.batch_size,
125-
)
128+
129+
try:
130+
return polars_to_cratedb(
131+
frame=source.load_table(),
132+
target_url=target_url,
133+
chunk_size=source.batch_size or DEFAULT_BATCH_SIZE,
134+
)
135+
136+
# OSError: object-store error: Generic S3 error: Error performing PUT http://169.254.169.254/latest/api/token
137+
# in 218.979617ms, after 2 retries, max_retries: 2, retry_timeout: 10s - HTTP error:
138+
# error sending request (path: s3://guided-path/demo_climate_data_export.csv)
139+
except OSError as e:
140+
msg = str(e)
141+
if "Generic S3 error" in msg and "/api/token" in msg:
142+
logger.warning(
143+
"Authentication with the storage backend is required for lazy reading, but failed. "
144+
"Falling back to complete reading: This may exhaust your system memory."
145+
)
146+
return polars_to_cratedb(
147+
frame=source.load_table(lazy=False),
148+
target_url=target_url,
149+
chunk_size=source.batch_size,
150+
)
151+
152+
raise RuntimeError(f"Loading data from CSV failed (unknown error): {source_url}")

cratedb_toolkit/io/router.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def load_table(
116116

117117
adjusted_url = str(source_url_obj)
118118
if source_url_obj.scheme.startswith("csv"):
119-
adjusted_url = str(source_url_obj.path)
119+
source_url_obj.scheme = None
120120

121121
return from_csv(adjusted_url, target_url)
122122

tests/io/file/test_csv.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
climate_json_json = (
1212
str(data_folder / "climate_json_json.csv") + "?quote-char='&pipe=json_array_to_wkt_point:geo_location"
1313
)
14-
climate_json_python = (
14+
climate_json_python_local = (
1515
str(data_folder / "climate_json_python.csv")
1616
+ '?quote-char="&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data'
1717
)
1818
climate_wkt_json = str(data_folder / "climate_wkt_json.csv") + "?quote-char='"
19+
climate_json_python_s3 = "https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv?pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data"
1920

2021
table_address = TableAddress(schema=TESTDRIVE_DATA_SCHEMA, table="climate_data", if_exists="append")
2122

@@ -32,13 +33,23 @@ def test_load_csv_json_json(cratedb_synchronized, provision_ddl):
3233
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"
3334

3435

35-
def test_load_csv_json_python(cratedb_synchronized, provision_ddl):
36+
def test_load_csv_json_python_local(cratedb_synchronized, provision_ddl):
3637
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
37-
cluster.load_table(InputOutputResource(climate_json_python), target=table_address)
38+
cluster.load_table(InputOutputResource(climate_json_python_local), target=table_address)
3839
cluster.adapter.refresh_table(table_address.fullname)
3940
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"
4041

4142

43+
@pytest.mark.skip(
44+
"Test takes too long to complete. When aiming to test a remote data source, please use a smaller dataset."
45+
)
46+
def test_load_csv_json_python_s3(cratedb_synchronized, provision_ddl):
47+
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
48+
cluster.load_table(InputOutputResource(climate_json_python_s3), target=table_address)
49+
cluster.adapter.refresh_table(table_address.fullname)
50+
assert cluster.adapter.count_records(table_address.fullname) == 22650, "Wrong number of records returned"
51+
52+
4253
def test_load_csv_wkt_json(cratedb_synchronized, provision_ddl):
4354
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
4455
cluster.load_table(InputOutputResource(climate_wkt_json), target=table_address)

0 commit comments

Comments
 (0)