Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release-oci-full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
- '*.*.*'

# Run on pull requests.
# pull_request:
pull_request:

# Run each night.
schedule:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-oci-ingest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
- '*.*.*'

# Run on pull requests.
# pull_request:
pull_request:

# Run each night.
schedule:
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Kinesis: Added `ctk kinesis` CLI group with `list-checkpoints` and
`prune-checkpoints` commands for checkpoint table maintenance
- Dependencies: Permitted installation of click 8.3
- I/O: Added CSV file import, with transformations

## 2026/03/16 v0.0.46
- I/O: API improvements: `ctk {load,save} table` became `ctk {load,save}`
Expand Down
8 changes: 4 additions & 4 deletions cratedb_toolkit/cluster/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,10 @@ def load_table(
ctk load kinesis+dms:///arn:aws:kinesis:eu-central-1:831394476016:stream/testdrive
ctk load kinesis+dms:///path/to/dms-over-kinesis.jsonl
"""

self._load_table_result = self._router.load_table(
source=source, target=self.address, transformation=transformation
)
address = self.address
if target:
address = address.with_table_address(target)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
self._load_table_result = self._router.load_table(source=source, target=address, transformation=transformation)
return self

def save_table(
Expand Down
Empty file.
152 changes: 152 additions & 0 deletions cratedb_toolkit/io/file/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
CSV file integration for CrateDB Toolkit.

This module provides functionality to transfer data between CSV files
and CrateDB database tables, supporting both import and export operations.
"""

import dataclasses
import logging
from typing import Dict, List, Optional

import polars as pl
from boltons.urlutils import URL

from cratedb_toolkit.io.util import parse_uri, polars_to_cratedb

logger = logging.getLogger(__name__)


DEFAULT_SEPARATOR = ","
DEFAULT_QUOTE_CHAR = '"'
DEFAULT_BATCH_SIZE = 75_000


@dataclasses.dataclass
class CsvFileAddress:
"""
Represent a CSV file location and provide loader methods.
"""

url: URL
location: str
pipeline: Optional[List[str]] = dataclasses.field(default_factory=list)
batch_size: int = DEFAULT_BATCH_SIZE
# TODO: What about other parameters? See `polars.io.csv.functions`.
separator: Optional[str] = DEFAULT_SEPARATOR
quote_char: Optional[str] = DEFAULT_QUOTE_CHAR

@classmethod
def from_url(cls, url: str) -> "CsvFileAddress":
"""
Parse a CSV file location and return a CsvFileAddress object.

Examples:

csv://./var/lib/example.csv
https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv
"""
url_obj, location = parse_uri(url, "csv")
try:
batch_size = int(url_obj.query_params.get("batch-size", DEFAULT_BATCH_SIZE))
except ValueError as ex:
raise ValueError("Invalid value for batch size") from ex
return cls(
url=url_obj,
location=location,
pipeline=url_obj.query_params.getlist("pipe"),
batch_size=batch_size,
separator=url_obj.query_params.get("separator", DEFAULT_SEPARATOR),
quote_char=url_obj.query_params.get("quote-char", DEFAULT_QUOTE_CHAR),
)

@property
def storage_options(self) -> Dict[str, str]:
"""
Provide file storage options.

TODO: Generalize.
"""
prefixes = ["aws_", "azure_", "google_", "delta_"]
return self.collect_properties(self.url.query_params, prefixes)

@staticmethod
def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]:
"""
Collect parameters from URL query string.

TODO: Generalize.
"""
opts = {}
for name, value in query_params.items():
for prefix in prefixes:
if name.lower().startswith(prefix) and value is not None:
opts[name.upper()] = value
break
return opts

def load_table(self, lazy: bool = True) -> pl.LazyFrame:
"""
Load the CSV file as a Polars LazyFrame.
"""

# Read from data source.
kwargs = {
"separator": self.separator,
"quote_char": self.quote_char,
"storage_options": self.storage_options,
}
# Note: Type checker ignores are only for Python 3.9.
if lazy:
lf = pl.scan_csv(self.location, **kwargs) # ty: ignore[invalid-argument-type]
else:
lf = pl.read_csv(self.location, **kwargs).lazy() # ty: ignore[invalid-argument-type]

# Optionally apply transformations.
if self.pipeline:
from macropipe import MacroPipe

mp = MacroPipe.from_recipes(*self.pipeline)
lf = mp.apply(lf)

return lf


def from_csv(source_url, target_url, progress: bool = False) -> bool:
"""
Scan a CSV file from local filesystem or object store, and load into CrateDB.
Documentation: https://cratedb-toolkit.readthedocs.io/io/file/csv.html

See also: https://docs.pola.rs/api/python/stable/reference/api/polars.scan_csv.html

# Synopsis: Load from filesystem.
ctk load \
"csv://./var/lib/example.csv" \
"crate://crate@localhost:4200/demo/example"
"""
source = CsvFileAddress.from_url(source_url)
logger.info(f"File address: {source.location}")

try:
return polars_to_cratedb(
frame=source.load_table(),
target_url=target_url,
chunk_size=source.batch_size or DEFAULT_BATCH_SIZE,
)

# OSError: object-store error: Generic S3 error: Error performing PUT http://169.254.169.254/latest/api/token
# in 218.979617ms, after 2 retries, max_retries: 2, retry_timeout: 10s - HTTP error:
# error sending request (path: s3://guided-path/demo_climate_data_export.csv)
except OSError as ex:
msg = str(ex)
if "Generic S3 error" in msg and "/api/token" in msg:
logger.warning(
"Storage backend authentication is required for streaming reads but failed. "
"Falling back to non-streaming mode: This may result in inefficient reads."
)
return polars_to_cratedb(
frame=source.load_table(lazy=False),
target_url=target_url,
chunk_size=source.batch_size,
)
raise OSError(f"Loading data from CSV failed: {source_url}: {msg}") from ex
13 changes: 13 additions & 0 deletions cratedb_toolkit/io/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ def load_table(
progress=True,
)

elif (
source_url_obj.scheme.startswith("csv")
or source_url_obj.scheme.endswith("csv")
or source_url_obj.path.endswith(".csv")
):
from cratedb_toolkit.io.file.csv import from_csv

adjusted_url = str(source_url_obj)
if source_url_obj.scheme.startswith("csv"):
source_url_obj.scheme = None

return from_csv(adjusted_url, target_url)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

elif source_url_obj.scheme.startswith("deltalake") or source_url_obj.scheme.endswith("deltalake"):
from cratedb_toolkit.io.deltalake import from_deltalake

Expand Down
5 changes: 3 additions & 2 deletions cratedb_toolkit/io/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ def polars_to_cratedb(frame: pl.LazyFrame, target_url, chunk_size: int) -> bool:
"""
Write a Polars LazyFrame to a CrateDB table, in batches/chunks.
"""
cratedb_address = DatabaseAddress.from_string(target_url)
target_url_obj = URL(target_url)
if_exists = target_url_obj.query_params.pop("if-exists", "fail")
cratedb_address = DatabaseAddress.from_string(str(target_url_obj))
cratedb_url, cratedb_table = cratedb_address.decode()
if_exists = URL(target_url).query_params.get("if-exists") or "fail"
if cratedb_table.table is None:
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
logger.info("Target address: %s", cratedb_address)
Expand Down
12 changes: 12 additions & 0 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ def schema(self) -> t.Union[str, None]:
"""
return self.uri.query_params.get("schema") or self.uri.path.lstrip("/")

def with_table_address(self, table_address: "TableAddress") -> "DatabaseAddress":
cp = deepcopy(self)
cp.uri.path = f"/{table_address.schema}/{table_address.table}"
# Use `if-exists` from table address.
if table_address.if_exists:
cp.uri.query_params["if-exists"] = table_address.if_exists
# When not supplied, don't let existing spots leak.
else:
cp.uri.query_params.pop("if-exists", None)
return cp


@dataclasses.dataclass
class TableAddress:
Expand All @@ -212,6 +223,7 @@ class TableAddress:

schema: t.Optional[str] = None
table: t.Optional[str] = None
if_exists: t.Optional[str] = None
Comment thread
amotl marked this conversation as resolved.

def __bool__(self):
return bool(self.table)
Expand Down
1 change: 1 addition & 0 deletions cratedb_toolkit/testing/testcontainers/cratedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def get_connection_url(self, dialect: str = "crate", host: Optional[str] = None)
"""
# TODO: When using `db_name=self.CRATEDB_DB`:
# Connection.__init__() got an unexpected keyword argument 'database'
# Connection.__init__() got an unexpected keyword argument 'table'
return super()._create_connection_url(
dialect=dialect,
username=self.CRATEDB_USER,
Expand Down
3 changes: 2 additions & 1 deletion cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,9 @@ def decode_database_table(url: str) -> t.Tuple[t.Union[str, None], t.Union[str,
if "too many values to unpack" not in str(ex) and "not enough values to unpack" not in str(ex):
raise

table = url_.query_params.get("table")
if not database:
database = url_.query_params.get("database")
table = url_.query_params.get("table")
if url_.scheme == "crate" and not database:
database = url_.query_params.get("schema")
if database is None and table is None:
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ optional-dependencies.io-opentable = [
"cratedb-toolkit[deltalake,iceberg]",
]
optional-dependencies.io-recipe = [
"macropipe[geo]==0.0.0",
"tikray>=0.2,<0.4",
]
optional-dependencies.kinesis = [
Expand Down Expand Up @@ -292,6 +293,7 @@ line-length = 120
line-length = 120
extend-exclude = [
"amqp-to-mqtt.py",
"examples",
"workbench.py",
]
lint.select = [
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_parquet_import_remote(cloud_environment, caplog):
assert result.exit_code == 0, f"ERROR: {result.output}"

assert "Loading data." in caplog.text
assert "target=TableAddress(schema=None, table='basic')" in caplog.text
assert "target=TableAddress(schema=None, table='basic'" in caplog.text
Comment thread
amotl marked this conversation as resolved.
assert "Import succeeded (status: SUCCEEDED)" in caplog.text

with ManagedCluster.from_env() as cluster:
Expand Down
Empty file added tests/io/file/__init__.py
Empty file.
Empty file added tests/io/file/data/__init__.py
Empty file.
14 changes: 14 additions & 0 deletions tests/io/file/data/climate_ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE "{schema}".climate_data
(
"timestamp" TIMESTAMP WITHOUT TIME ZONE,
"geo_location" GEO_POINT,
"data" OBJECT(DYNAMIC) AS (
"temperature" DOUBLE PRECISION,
"u10" DOUBLE PRECISION,
"v10" DOUBLE PRECISION,
"pressure" DOUBLE PRECISION,
"latitude" DOUBLE PRECISION,
"longitude" DOUBLE PRECISION,
"humidity" DOUBLE PRECISION
)
);
4 changes: 4 additions & 0 deletions tests/io/file/data/climate_json_json.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
timestamp,geo_location,data
1754784000000,'[14.988999953493476, 51.10299998894334]','{"temperature": 19.704827880859398, "pressure": 99310.625, "v10": -1.545882225036621, "u10": 1.7978938817977905, "latitude": 51.102999999999945, "longitude": 14.989}'
1754784000000,'[7.088122218847275, 51.0029999865219]','{"temperature": 19.347802734375023, "pressure": 101470.9609375, "v10": -1.256191611289978, "u10": 0.02778780460357666, "latitude": 51.00299999999994, "longitude": 7.0881222222222195}'
1754784000000,'[7.58817776106298, 51.0029999865219]','{"temperature": 17.713037109375023, "pressure": 98837.3984375, "v10": -1.5747417211532593, "u10": -0.19953763484954834, "latitude": 51.00299999999994, "longitude": 7.588177777777774}'
4 changes: 4 additions & 0 deletions tests/io/file/data/climate_json_python.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
timestamp,geo_location,data
1754784000000,"[14.988999953493476, 51.10299998894334]","{'temperature': 19.704827880859398, 'pressure': 99310.625, 'v10': -1.545882225036621, 'u10': 1.7978938817977905, 'latitude': 51.102999999999945, 'longitude': 14.989}"
1754784000000,"[7.088122218847275, 51.0029999865219]","{'temperature': 19.347802734375023, 'pressure': 101470.9609375, 'v10': -1.256191611289978, 'u10': 0.02778780460357666, 'latitude': 51.00299999999994, 'longitude': 7.0881222222222195}"
1754784000000,"[7.58817776106298, 51.0029999865219]","{'temperature': 17.713037109375023, 'pressure': 98837.3984375, 'v10': -1.5747417211532593, 'u10': -0.19953763484954834, 'latitude': 51.00299999999994, 'longitude': 7.588177777777774}"
4 changes: 4 additions & 0 deletions tests/io/file/data/climate_wkt_json.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
timestamp,geo_location,data
1754784000000,'POINT ( 14.988999953493476 51.10299998894334 )','{"temperature": 19.704827880859398, "pressure": 99310.625, "v10": -1.545882225036621, "u10": 1.7978938817977905, "latitude": 51.102999999999945, "longitude": 14.989}'
1754784000000,'POINT ( 7.088122218847275 51.0029999865219 )','{"temperature": 19.347802734375023, "pressure": 101470.9609375, "v10": -1.256191611289978, "u10": 0.02778780460357666, "latitude": 51.00299999999994, "longitude": 7.0881222222222195}'
1754784000000,'POINT ( 7.58817776106298 51.0029999865219 )','{"temperature": 17.713037109375023, "pressure": 98837.3984375, "v10": -1.5747417211532593, "u10": -0.19953763484954834, "latitude": 51.00299999999994, "longitude": 7.588177777777774}'
64 changes: 64 additions & 0 deletions tests/io/file/test_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from importlib.resources import files

import pytest

import tests.io.file.data
from cratedb_toolkit import DatabaseCluster, InputOutputResource, TableAddress
from tests.conftest import TESTDRIVE_DATA_SCHEMA

data_folder = files(tests.io.file.data)
ddl = (data_folder / "climate_ddl.sql").read_text().format(schema=TESTDRIVE_DATA_SCHEMA)
climate_json_json = (
str(data_folder / "climate_json_json.csv") + "?quote-char='&pipe=json_array_to_wkt_point:geo_location"
)
climate_json_python_local = (
str(data_folder / "climate_json_python.csv")
+ '?quote-char="&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data'
)
climate_wkt_json = str(data_folder / "climate_wkt_json.csv") + "?quote-char='"
Comment on lines +11 to +18
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can configure Macropipe by describing transformation steps through compact URL parameters.

&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data

That's two pipeline elements, processed sequentially:

  • json_array_to_wkt_point:geo_location (docs)
  • python_to_json:data (docs)

climate_json_python_s3 = "https://guided-path.s3.us-east-1.amazonaws.com/demo_climate_data_export.csv?pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data"

table_address = TableAddress(schema=TESTDRIVE_DATA_SCHEMA, table="climate_data", if_exists="append")


@pytest.fixture(scope="function")
def provision_ddl(cratedb_synchronized) -> None:
cratedb_synchronized.database.run_sql(ddl)


def test_load_csv_wkt_json(cratedb_synchronized, provision_ddl):
"""Load a CSV file that does not need any geo transformations."""
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
cluster.load_table(InputOutputResource(climate_wkt_json), target=table_address)
cluster.adapter.refresh_table(table_address.fullname)
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"


def test_load_geo_csv_json_json(cratedb_synchronized, provision_ddl):
"""Load a CSV file that needs geo transformations."""
pytest.importorskip("polars_st", reason="CSV import needs geo transformations")
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
cluster.load_table(InputOutputResource(climate_json_json), target=table_address)
cluster.adapter.refresh_table(table_address.fullname)
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"


def test_load_geo_csv_json_python_local(cratedb_synchronized, provision_ddl):
"""Load a CSV file that needs geo transformations."""
pytest.importorskip("polars_st", reason="CSV import needs geo transformations")
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
cluster.load_table(InputOutputResource(climate_json_python_local), target=table_address)
cluster.adapter.refresh_table(table_address.fullname)
assert cluster.adapter.count_records(table_address.fullname) == 3, "Wrong number of records returned"


@pytest.mark.skip(
"Test takes too long to complete. When aiming to test a remote data source, please use a smaller dataset."
)
def test_load_geo_csv_json_python_s3(cratedb_synchronized, provision_ddl):
"""Load a CSV file that needs geo transformations."""
pytest.importorskip("polars_st", reason="CSV import needs geo transformations")
cluster = DatabaseCluster.create(cluster_url=cratedb_synchronized.database.dburi)
cluster.load_table(InputOutputResource(climate_json_python_s3), target=table_address)
cluster.adapter.refresh_table(table_address.fullname)
assert cluster.adapter.count_records(table_address.fullname) == 22650, "Wrong number of records returned"