Skip to content

Commit 01ed104

Browse files
committed
I/O: Add CSV file import, with transformations
1 parent b6be286 commit 01ed104

File tree

17 files changed

+231
-7
lines changed

17 files changed

+231
-7
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- Kinesis: Added `ctk kinesis` CLI group with `list-checkpoints` and
66
`prune-checkpoints` commands for checkpoint table maintenance
77
- Dependencies: Permitted installation of click 8.3
8+
- I/O: Added CSV file import, with transformations
89

910
## 2026/03/16 v0.0.46
1011
- I/O: API improvements: `ctk {load,save} table` became `ctk {load,save}`

cratedb_toolkit/cluster/core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,10 +545,10 @@ def load_table(
545545
ctk load kinesis+dms:///arn:aws:kinesis:eu-central-1:831394476016:stream/testdrive
546546
ctk load kinesis+dms:///path/to/dms-over-kinesis.jsonl
547547
"""
548-
549-
self._load_table_result = self._router.load_table(
550-
source=source, target=self.address, transformation=transformation
551-
)
548+
address = self.address
549+
if target:
550+
address = address.with_table_address(target)
551+
self._load_table_result = self._router.load_table(source=source, target=address, transformation=transformation)
552552
return self
553553

554554
def save_table(

cratedb_toolkit/io/file/__init__.py

Whitespace-only changes.

cratedb_toolkit/io/file/csv.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
"""
2+
CSV file integration for CrateDB Toolkit.
3+
4+
This module provides functionality to transfer data between CSV files
5+
and CrateDB database tables, supporting both import and export operations.
6+
"""
7+
8+
import dataclasses
9+
import logging
10+
from typing import Dict, List, Optional
11+
12+
import polars as pl
13+
from boltons.urlutils import URL
14+
15+
from cratedb_toolkit.io.util import parse_uri, polars_to_cratedb
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
DEFAULT_SEPARATOR = ","
21+
DEFAULT_QUOTE_CHAR = '"'
22+
DEFAULT_BATCH_SIZE = 75_000
23+
24+
25+
@dataclasses.dataclass
26+
class CsvFileAddress:
27+
"""
28+
Represent a CSV file location and provide loader methods.
29+
"""
30+
31+
url: URL
32+
location: str
33+
pipeline: Optional[List[str]] = dataclasses.field(default_factory=list)
34+
batch_size: Optional[int] = DEFAULT_BATCH_SIZE
35+
# TODO: What about other parameters? See `polars.io.csv.functions`.
36+
separator: Optional[str] = DEFAULT_SEPARATOR
37+
quote_char: Optional[str] = DEFAULT_QUOTE_CHAR
38+
39+
@classmethod
40+
def from_url(cls, url: str) -> "CsvFileAddress":
41+
"""
42+
Parse a CSV file location and return a CsvFileAddress object.
43+
44+
Examples:
45+
46+
csv://./var/lib/example.csv
47+
https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv
48+
"""
49+
url_obj, location = parse_uri(url, "csv")
50+
return cls(
51+
url=url_obj,
52+
location=location,
53+
pipeline=url_obj.query_params.getlist("pipe"),
54+
batch_size=int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE)),
55+
separator=url_obj.query_params.get("separator", DEFAULT_SEPARATOR),
56+
quote_char=url_obj.query_params.get("quote-char", DEFAULT_QUOTE_CHAR),
57+
)
58+
59+
@property
60+
def storage_options(self) -> Dict[str, str]:
61+
"""
62+
Provide file storage options.
63+
64+
TODO: Generalize.
65+
"""
66+
prefixes = ["aws_", "azure_", "google_", "delta_"]
67+
return self.collect_properties(self.url.query_params, prefixes)
68+
69+
@staticmethod
70+
def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]:
71+
"""
72+
Collect parameters from URL query string.
73+
74+
TODO: Generalize.
75+
"""
76+
opts = {}
77+
for name, value in query_params.items():
78+
for prefix in prefixes:
79+
if name.lower().startswith(prefix) and value is not None:
80+
opts[name.upper()] = value
81+
break
82+
return opts
83+
84+
def load_table(self) -> pl.LazyFrame:
85+
"""
86+
Load the CSV file as a Polars LazyFrame.
87+
"""
88+
89+
# Read from data source.
90+
lf = pl.scan_csv(
91+
self.location,
92+
separator=self.separator,
93+
quote_char=self.quote_char,
94+
storage_options=self.storage_options,
95+
)
96+
97+
# Optionally apply transformations.
98+
if self.pipeline:
99+
from macropipe import MacroPipe
100+
101+
mp = MacroPipe.from_recipes(*self.pipeline)
102+
lf = mp.apply(lf)
103+
104+
return lf
105+
106+
107+
def from_csv(source_url, target_url, progress: bool = False) -> bool:
108+
"""
109+
Scan a CSV file from local filesystem or object store, and load into CrateDB.
110+
Documentation: https://cratedb-toolkit.readthedocs.io/io/file/csv.html
111+
112+
See also: https://docs.pola.rs/api/python/stable/reference/api/polars.scan_csv.html
113+
114+
# Synopsis: Load from filesystem.
115+
ctk load \
116+
"csv://./var/lib/example.csv" \
117+
"crate://crate@localhost:4200/demo/example"
118+
"""
119+
source = CsvFileAddress.from_url(source_url)
120+
logger.info(f"File address: {source.location}")
121+
return polars_to_cratedb(
122+
frame=source.load_table(),
123+
target_url=target_url,
124+
chunk_size=source.batch_size,
125+
)

cratedb_toolkit/io/router.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ def load_table(
107107
progress=True,
108108
)
109109

110+
elif (
111+
source_url_obj.scheme.startswith("csv")
112+
or source_url_obj.scheme.endswith("csv")
113+
or source_url_obj.path.endswith(".csv")
114+
):
115+
from cratedb_toolkit.io.file.csv import from_csv
116+
117+
adjusted_url = str(source_url_obj)
118+
if source_url_obj.scheme.startswith("csv"):
119+
adjusted_url = str(source_url_obj.path)
120+
121+
return from_csv(adjusted_url, target_url)
122+
110123
elif source_url_obj.scheme.startswith("deltalake") or source_url_obj.scheme.endswith("deltalake"):
111124
from cratedb_toolkit.io.deltalake import from_deltalake
112125

cratedb_toolkit/io/util.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ def polars_to_cratedb(frame: pl.LazyFrame, target_url, chunk_size: int) -> bool:
4040
"""
4141
Write a Polars LazyFrame to a CrateDB table, in batches/chunks.
4242
"""
43-
cratedb_address = DatabaseAddress.from_string(target_url)
43+
target_url_obj = URL(target_url)
44+
if_exists = target_url_obj.query_params.pop("if-exists", "fail")
45+
cratedb_address = DatabaseAddress.from_string(str(target_url_obj))
4446
cratedb_url, cratedb_table = cratedb_address.decode()
45-
if_exists = URL(target_url).query_params.get("if-exists") or "fail"
4647
if cratedb_table.table is None:
4748
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
4849
logger.info("Target address: %s", cratedb_address)

cratedb_toolkit/model.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,13 @@ def schema(self) -> t.Union[str, None]:
203203
"""
204204
return self.uri.query_params.get("schema") or self.uri.path.lstrip("/")
205205

206+
def with_table_address(self, table_address: "TableAddress") -> "DatabaseAddress":
207+
cp = deepcopy(self)
208+
cp.uri.path = f"/{table_address.schema}/{table_address.table}"
209+
if table_address.if_exists:
210+
cp.uri.query_params["if-exists"] = table_address.if_exists
211+
return cp
212+
206213

207214
@dataclasses.dataclass
208215
class TableAddress:
@@ -212,6 +219,7 @@ class TableAddress:
212219

213220
schema: t.Optional[str] = None
214221
table: t.Optional[str] = None
222+
if_exists: t.Optional[str] = None
215223

216224
def __bool__(self):
217225
return bool(self.table)

cratedb_toolkit/testing/testcontainers/cratedb.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def get_connection_url(self, dialect: str = "crate", host: Optional[str] = None)
142142
"""
143143
# TODO: When using `db_name=self.CRATEDB_DB`:
144144
# Connection.__init__() got an unexpected keyword argument 'database'
145+
# Connection.__init__() got an unexpected keyword argument 'table'
145146
return super()._create_connection_url(
146147
dialect=dialect,
147148
username=self.CRATEDB_USER,

cratedb_toolkit/util/database.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,8 +460,9 @@ def decode_database_table(url: str) -> t.Tuple[t.Union[str, None], t.Union[str,
460460
if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex):
461461
raise
462462

463+
table = url_.query_params.get("table")
464+
if not database:
463465
database = url_.query_params.get("database")
464-
table = url_.query_params.get("table")
465466
if url_.scheme == "crate" and not database:
466467
database = url_.query_params.get("schema")
467468
if database is None and table is None:

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ optional-dependencies.io-opentable = [
202202
"cratedb-toolkit[deltalake,iceberg]",
203203
]
204204
optional-dependencies.io-recipe = [
205+
"macropipe[geo]==0.0.0",
205206
"tikray>=0.2,<0.4",
206207
]
207208
optional-dependencies.kinesis = [
@@ -292,6 +293,7 @@ line-length = 120
292293
line-length = 120
293294
extend-exclude = [
294295
"amqp-to-mqtt.py",
296+
"examples",
295297
"workbench.py",
296298
]
297299
lint.select = [

0 commit comments

Comments
 (0)