Skip to content

Commit a89997f

Browse files
committed
I/O: Add CSV file import, with transformations
1 parent 6c8c3e6 commit a89997f

File tree

14 files changed

+216
-2
lines changed

14 files changed

+216
-2
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- Kinesis: Added `ctk kinesis` CLI group with `list-checkpoints` and
66
`prune-checkpoints` commands for checkpoint table maintenance
77
- Dependencies: Permitted installation of click 8.3
8+
- I/O: Added CSV file import, with transformations
89

910
## 2026/03/16 v0.0.46
1011
- I/O: API improvements: `ctk {load,save} table` became `ctk {load,save}`

cratedb_toolkit/io/file/__init__.py

Whitespace-only changes.

cratedb_toolkit/io/file/csv.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""
2+
CSV file integration for CrateDB Toolkit.
3+
4+
This module provides functionality to transfer data between CSV files
5+
and CrateDB database tables, supporting both import and export operations.
6+
"""
7+
8+
import dataclasses
9+
import logging
10+
from typing import Dict, List, Optional
11+
12+
import polars as pl
13+
from boltons.urlutils import URL
14+
15+
from cratedb_toolkit.io.util import parse_uri, polars_to_cratedb
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
DEFAULT_SEPARATOR = ","
21+
DEFAULT_QUOTE_CHAR = '"'
22+
DEFAULT_BATCH_SIZE = 75_000
23+
24+
25+
@dataclasses.dataclass
26+
class CsvFileAddress:
27+
"""
28+
Represent a CSV file location and provide loader methods.
29+
"""
30+
31+
url: URL
32+
location: str
33+
batch_size: Optional[int] = DEFAULT_BATCH_SIZE
34+
separator: Optional[str] = DEFAULT_SEPARATOR
35+
quote_char: Optional[str] = DEFAULT_QUOTE_CHAR
36+
pipeline: Optional[List[str]] = dataclasses.field(default_factory=list)
37+
38+
@classmethod
39+
def from_url(cls, url: str) -> "CsvFileAddress":
40+
"""
41+
Parse a CSV file location and return a CsvFileAddress object.
42+
43+
Examples:
44+
45+
csv://./var/lib/example.csv
46+
https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv
47+
"""
48+
url_obj, location = parse_uri(url, "csv")
49+
return cls(
50+
url=url_obj,
51+
location=location,
52+
batch_size=int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE)),
53+
separator=url_obj.query_params.get("separator", DEFAULT_SEPARATOR),
54+
quote_char=url_obj.query_params.get("quote-char", DEFAULT_QUOTE_CHAR),
55+
pipeline=url_obj.query_params.getlist("pipe"),
56+
)
57+
58+
@property
59+
def storage_options(self) -> Dict[str, str]:
60+
"""
61+
Provide file storage options.
62+
63+
TODO: Generalize.
64+
"""
65+
prefixes = ["aws_", "azure_", "google_", "delta_"]
66+
return self.collect_properties(self.url.query_params, prefixes)
67+
68+
@staticmethod
69+
def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]:
70+
"""
71+
Collect parameters from URL query string.
72+
73+
TODO: Generalize.
74+
"""
75+
opts = {}
76+
for name, value in query_params.items():
77+
for prefix in prefixes:
78+
if name.lower().startswith(prefix) and value is not None:
79+
opts[name.upper()] = value
80+
break
81+
return opts
82+
83+
def load_table(self) -> pl.LazyFrame:
84+
"""
85+
Load the CSV file as a Polars LazyFrame.
86+
"""
87+
88+
# Read from data source.
89+
lf = pl.scan_csv(
90+
self.location,
91+
separator=self.separator,
92+
quote_char=self.quote_char,
93+
storage_options=self.storage_options,
94+
)
95+
96+
# Optionally apply transformations.
97+
if self.pipeline:
98+
from macropipe import MacroPipe
99+
100+
mp = MacroPipe.from_recipes(*self.pipeline)
101+
lf = mp.apply(lf)
102+
103+
return lf
104+
105+
106+
def from_csv(source_url, target_url, progress: bool = False) -> bool:
107+
"""
108+
Scan a CSV file from local filesystem or object store, and load into CrateDB.
109+
Documentation: https://cratedb-toolkit.readthedocs.io/io/file/csv.html
110+
111+
See also: https://docs.pola.rs/api/python/stable/reference/api/polars.scan_csv.html
112+
113+
# Synopsis: Load from filesystem.
114+
ctk load \
115+
"csv://./var/lib/example.csv" \
116+
"crate://crate@localhost:4200/demo/example"
117+
"""
118+
source = CsvFileAddress.from_url(source_url)
119+
logger.info(f"File address: {source.location}")
120+
return polars_to_cratedb(
121+
frame=source.load_table(),
122+
target_url=target_url,
123+
chunk_size=source.batch_size,
124+
)

cratedb_toolkit/io/router.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ def load_table(
107107
progress=True,
108108
)
109109

110+
elif (
111+
source_url_obj.scheme.startswith("csv")
112+
or source_url_obj.scheme.endswith("csv")
113+
or source_url_obj.path.endswith(".csv")
114+
):
115+
from cratedb_toolkit.io.file import from_csv
116+
117+
adjusted_url = str(source_url_obj)
118+
if source_url_obj.scheme.startswith("csv"):
119+
adjusted_url = str(source_url_obj.path)
120+
121+
return from_csv(adjusted_url, target_url)
122+
110123
elif source_url_obj.scheme.startswith("deltalake") or source_url_obj.scheme.endswith("deltalake"):
111124
from cratedb_toolkit.io.deltalake import from_deltalake
112125

cratedb_toolkit/io/util.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ def polars_to_cratedb(frame: pl.LazyFrame, target_url, chunk_size: int) -> bool:
4040
"""
4141
Write a Polars LazyFrame to a CrateDB table, in batches/chunks.
4242
"""
43-
cratedb_address = DatabaseAddress.from_string(target_url)
43+
target_url_obj = URL(target_url)
44+
if_exists = target_url_obj.query_params.pop("if-exists", "fail")
45+
cratedb_address = DatabaseAddress.from_string(str(target_url_obj))
4446
cratedb_url, cratedb_table = cratedb_address.decode()
45-
if_exists = URL(target_url).query_params.get("if-exists") or "fail"
4647
if cratedb_table.table is None:
4748
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
4849
logger.info("Target address: %s", cratedb_address)

cratedb_toolkit/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ class TableAddress:
212212

213213
schema: t.Optional[str] = None
214214
table: t.Optional[str] = None
215+
if_exists: t.Optional[str] = None
215216

216217
def __bool__(self):
217218
return bool(self.table)

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ optional-dependencies.io-opentable = [
202202
"cratedb-toolkit[deltalake,iceberg]",
203203
]
204204
optional-dependencies.io-recipe = [
205+
"macropipe[geo]==0.0.0",
205206
"tikray>=0.2,<0.4",
206207
]
207208
optional-dependencies.kinesis = [
@@ -292,6 +293,7 @@ line-length = 120
292293
line-length = 120
293294
extend-exclude = [
294295
"amqp-to-mqtt.py",
296+
"examples",
295297
"workbench.py",
296298
]
297299
lint.select = [

tests/io/file/__init__.py

Whitespace-only changes.

tests/io/file/data/__init__.py

Whitespace-only changes.

tests/io/file/data/climate_ddl.sql

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CREATE TABLE "{schema}".climate_data
2+
(
3+
"timestamp" TIMESTAMP WITHOUT TIME ZONE,
4+
"geo_location" GEO_POINT,
5+
"data" OBJECT(DYNAMIC) AS (
6+
"temperature" DOUBLE PRECISION,
7+
"u10" DOUBLE PRECISION,
8+
"v10" DOUBLE PRECISION,
9+
"pressure" DOUBLE PRECISION,
10+
"latitude" DOUBLE PRECISION,
11+
"longitude" DOUBLE PRECISION,
12+
"humidity" DOUBLE PRECISION
13+
)
14+
);

0 commit comments

Comments
 (0)