Skip to content

Commit b8ae509

Browse files
committed
I/O: Improve CSV file import, with suggestions by CodeRabbit
1 parent beab5de commit b8ae509

File tree

5 files changed

+72
-23
lines changed

5 files changed

+72
-23
lines changed

cratedb_toolkit/io/file/csv.py

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class CsvFileAddress:
3131
url: URL
3232
location: str
3333
pipeline: Optional[List[str]] = dataclasses.field(default_factory=list)
34-
batch_size: Optional[int] = DEFAULT_BATCH_SIZE
34+
batch_size: int = DEFAULT_BATCH_SIZE
3535
# TODO: What about other parameters? See `polars.io.csv.functions`.
3636
separator: Optional[str] = DEFAULT_SEPARATOR
3737
quote_char: Optional[str] = DEFAULT_QUOTE_CHAR
@@ -47,11 +47,15 @@ def from_url(cls, url: str) -> "CsvFileAddress":
4747
https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv
4848
"""
4949
url_obj, location = parse_uri(url, "csv")
50+
try:
51+
batch_size = int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE))
52+
except ValueError as ex:
53+
raise ValueError("Invalid value for batch size") from ex
5054
return cls(
5155
url=url_obj,
5256
location=location,
5357
pipeline=url_obj.query_params.getlist("pipe"),
54-
batch_size=int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE)),
58+
batch_size=batch_size,
5559
separator=url_obj.query_params.get("separator", DEFAULT_SEPARATOR),
5660
quote_char=url_obj.query_params.get("quote-char", DEFAULT_QUOTE_CHAR),
5761
)
@@ -81,18 +85,22 @@ def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]:
8185
break
8286
return opts
8387

84-
def load_table(self) -> pl.LazyFrame:
88+
def load_table(self, lazy: bool = True) -> pl.LazyFrame:
8589
"""
8690
Load the CSV file as a Polars LazyFrame.
8791
"""
8892

8993
# Read from data source.
90-
lf = pl.scan_csv(
91-
self.location,
92-
separator=self.separator,
93-
quote_char=self.quote_char,
94-
storage_options=self.storage_options,
95-
)
94+
kwargs = {
95+
"separator": self.separator,
96+
"quote_char": self.quote_char,
97+
"storage_options": self.storage_options,
98+
}
99+
# Note: Type checker ignores are only for Python 3.9.
100+
if lazy:
101+
lf = pl.scan_csv(self.location, **kwargs) # ty: ignore[invalid-argument-type]
102+
else:
103+
lf = pl.read_csv(self.location, **kwargs).lazy() # ty: ignore[invalid-argument-type]
96104

97105
# Optionally apply transformations.
98106
if self.pipeline:
@@ -118,8 +126,27 @@ def from_csv(source_url, target_url, progress: bool = False) -> bool:
118126
"""
119127
source = CsvFileAddress.from_url(source_url)
120128
logger.info(f"File address: {source.location}")
121-
return polars_to_cratedb(
122-
frame=source.load_table(),
123-
target_url=target_url,
124-
chunk_size=source.batch_size,
125-
)
129+
130+
try:
131+
return polars_to_cratedb(
132+
frame=source.load_table(),
133+
target_url=target_url,
134+
chunk_size=source.batch_size or DEFAULT_BATCH_SIZE,
135+
)
136+
137+
# OSError: object-store error: Generic S3 error: Error performing PUT http://169.254.169.254/latest/api/token
138+
# in 218.979617ms, after 2 retries, max_retries: 2, retry_timeout: 10s - HTTP error:
139+
# error sending request (path: s3://guided-path/demo_climate_data_export.csv)
140+
except OSError as ex:
141+
msg = str(ex)
142+
if "Generic S3 error" in msg and "/api/token" in msg:
143+
logger.warning(
144+
"Storage backend authentication is required for streaming reads but failed. "
145+
"Falling back to non-streaming mode: This may result in inefficient reads."
146+
)
147+
return polars_to_cratedb(
148+
frame=source.load_table(lazy=False),
149+
target_url=target_url,
150+
chunk_size=source.batch_size,
151+
)
152+
raise OSError(f"Loading data from CSV failed: {source_url}: {msg}") from ex

cratedb_toolkit/io/router.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def load_table(
116116

117117
adjusted_url = str(source_url_obj)
118118
if source_url_obj.scheme.startswith("csv"):
119-
adjusted_url = str(source_url_obj.path)
119+
source_url_obj.scheme = None
120120

121121
return from_csv(adjusted_url, target_url)
122122

cratedb_toolkit/model.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,12 @@ def schema(self) -> t.Union[str, None]:
206206
def with_table_address(self, table_address: "TableAddress") -> "DatabaseAddress":
207207
cp = deepcopy(self)
208208
cp.uri.path = f"/{table_address.schema}/{table_address.table}"
209+
# Use `if-exists` from table address.
209210
if table_address.if_exists:
210211
cp.uri.query_params["if-exists"] = table_address.if_exists
212+
# When not supplied, don't let existing spots leak.
213+
else:
214+
cp.uri.query_params.pop("if-exists", None)
211215
return cp
212216

213217

tests/cluster/test_import.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def test_parquet_import_remote(cloud_environment, caplog):
5959
assert result.exit_code == 0, f"ERROR: {result.output}"
6060

6161
assert "Loading data." in caplog.text
62-
assert "target=TableAddress(schema=None, table='basic')" in caplog.text
62+
assert "target=TableAddress(schema=None, table='basic'" in caplog.text
6363
assert "Import succeeded (status: SUCCEEDED)" in caplog.text
6464

6565
with ManagedCluster.from_env() as cluster:

tests/io/file/test_csv.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
climate_json_json = (
1212
str(data_folder / "climate_json_json.csv") + "?quote-char='&pipe=json_array_to_wkt_point:geo_location"
1313
)
14-
climate_json_python = (
14+
climate_json_python_local = (
1515
str(data_folder / "climate_json_python.csv")
1616
+ '?quote-char="&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data'
1717
)
1818
climate_wkt_json = str(data_folder / "climate_wkt_json.csv") + "?quote-char='"
19+
climate_json_python_s3 = "https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv?pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data"
1920

2021
table_address = TableAddress(schema=TESTDRIVE_DATA_SCHEMA, table="climate_data", if_exists="append")
2122

@@ -25,22 +26,39 @@ def provision_ddl(cratedb_synchronized) -> None:
2526
cratedb_synchronized.database.run_sql(ddl)
2627

2728

28-
def test_load_csv_json_json(cratedb_synchronized, provision_ddl):
29+
def test_load_csv_wkt_json(cratedb_synchronized, provision_ddl):
30+
"""Load a CSV file that does not need any geo transformations."""
2931
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
30-
cluster.load_table(InputOutputResource(climate_json_json), target=table_address)
32+
cluster.load_table(InputOutputResource(climate_wkt_json), target=table_address)
3133
cluster.adapter.refresh_table(table_address.fullname)
3234
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"
3335

3436

35-
def test_load_csv_json_python(cratedb_synchronized, provision_ddl):
37+
def test_load_geo_csv_json_json(cratedb_synchronized, provision_ddl):
38+
"""Load a CSV file that needs geo transformations."""
39+
pytest.importorskip("polars_st", reason="CSV import needs geo transformations")
3640
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
37-
cluster.load_table(InputOutputResource(climate_json_python), target=table_address)
41+
cluster.load_table(InputOutputResource(climate_json_json), target=table_address)
3842
cluster.adapter.refresh_table(table_address.fullname)
3943
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"
4044

4145

42-
def test_load_csv_wkt_json(cratedb_synchronized, provision_ddl):
46+
def test_load_geo_csv_json_python_local(cratedb_synchronized, provision_ddl):
47+
"""Load a CSV file that needs geo transformations."""
48+
pytest.importorskip("polars_st", reason="CSV import needs geo transformations")
4349
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
44-
cluster.load_table(InputOutputResource(climate_wkt_json), target=table_address)
50+
cluster.load_table(InputOutputResource(climate_json_python_local), target=table_address)
4551
cluster.adapter.refresh_table(table_address.fullname)
4652
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"
53+
54+
55+
@pytest.mark.skip(
56+
"Test takes too long to complete. When aiming to test a remote data source, please use a smaller dataset."
57+
)
58+
def test_load_geo_csv_json_python_s3(cratedb_synchronized, provision_ddl):
59+
"""Load a CSV file that needs geo transformations."""
60+
pytest.importorskip("polars_st", reason="CSV import needs geo transformations")
61+
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
62+
cluster.load_table(InputOutputResource(climate_json_python_s3), target=table_address)
63+
cluster.adapter.refresh_table(table_address.fullname)
64+
assert cluster.adapter.count_records(table_address.fullname) == 22650, "Wrong number of records returned"

0 commit comments

Comments
 (0)