Skip to content

I/O: Add CSV file import, with transformations#747

Draft
amotl wants to merge 3 commits intomainfrom
io-files
Draft

I/O: Add CSV file import, with transformations#747
amotl wants to merge 3 commits intomainfrom
io-files

Conversation

@amotl
Copy link
Copy Markdown
Member

@amotl amotl commented Apr 13, 2026

About

Support CSV file imports with special needs.

Poem

I sniff the CSV, a ribboned trail,
Pipes and separators wag their tail,
Polars hum, macro-steps unfold,
Chunks hop into Crate, stories told.

References

Backlog

  • Other supported file formats, like Parquet.
  • Documentation.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 13, 2026

Walkthrough

Added a CSV import adapter and wiring: new CSV loader module with URL parsing, batching, optional MacroPipe transformations, router dispatch, tests/fixtures, model/util adjustments, pyproject/CI tweaks, and changelog entry. No API-breaking changes to existing public functions beyond adding TableAddress.if_exists.

Changes

Cohort / File(s) Summary
CSV I/O Implementation
cratedb_toolkit/io/file/csv.py
New module providing CsvFileAddress (URL parsing, storage options, load_table) and from_csv() which calls polars_to_cratedb() with batch handling and S3 auth retry logic.
Router Integration
cratedb_toolkit/io/router.py
IoRouter.load_table() detects CSV-like sources (scheme/path) and dispatches to from_csv() after adjusting the URL.
Model Update
cratedb_toolkit/model.py
Added TableAddress.if_exists: Optional[str] and DatabaseAddress.with_table_address(table_address) to produce a DB address for a specific table (injects if-exists when set).
I/O Utility
cratedb_toolkit/io/util.py
polars_to_cratedb() now parses target_url, pops if-exists from query params, and builds DatabaseAddress from the cleaned URL so if-exists is applied to the first batch.
Cluster API
cratedb_toolkit/cluster/core.py
StandaloneCluster.load_table now respects the target argument by constructing an address via with_table_address() when provided.
Tests & Fixtures
tests/io/file/test_csv.py, tests/io/file/data/climate_ddl.sql
Added SQL fixture and integration tests for CSV import scenarios (local, pipeline, S3 skipped).
Project Config & Dependencies
pyproject.toml
Added macropipe[geo]==0.0.0 to optional-dependencies.io-recipe; extended Ruff exclude to include examples.
Workflows
.github/workflows/release-oci-full.yml, .github/workflows/release-oci-ingest.yml
Re-enabled pull_request trigger in both release workflows (uncommented/added).
Changelog
CHANGES.md
Added Unreleased entry: “Added CSV file import, with transformations.”
Misc (small edits)
cratedb_toolkit/util/database.py, cratedb_toolkit/testing/testcontainers/cratedb.py, tests/cluster/test_import.py
Minor control-flow/assignment tweaks and a TODO/comment update; adjusted a test log assertion string.

Sequence Diagram(s)

sequenceDiagram
    participant User as "User"
    participant Router as "IoRouter"
    participant CsvAddr as "CsvFileAddress"
    participant Polars as "Polars"
    participant Macro as "MacroPipe"
    participant Crate as "CrateDB"

    User->>Router: load_table(csv_source, target)
    Router->>CsvAddr: from_url(csv_source)
    CsvAddr->>CsvAddr: parse URI, extract batch/pipeline/storage opts
    CsvAddr->>Polars: scan_csv(path, sep, quote, storage_options)
    Polars-->>CsvAddr: LazyFrame
    alt pipeline present
        CsvAddr->>Macro: MacroPipe.from_recipes(pipeline)
        Macro->>Macro: apply(LazyFrame)
        Macro-->>CsvAddr: transformed LazyFrame
    end
    CsvAddr->>Polars: collect() / chunk
    Polars-->>CsvAddr: DataFrame chunk
    CsvAddr->>Crate: polars_to_cratedb(chunk, target_url, if_exists)
    Crate-->>CsvAddr: insert result
    CsvAddr-->>User: success/failure
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • seut
  • matriv

Poem

🐰 I sniff the CSV, a ribboned trail,

Pipes and separators wag their tail,
Polars hum, macro-steps unfold,
Chunks hop into Crate, stories told,
I nibble carrots, then leap — import complete! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 65.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: adding CSV file import functionality with transformations, which is well-supported by the raw summary showing new CSV module, dataclass, and utility functions.
Description check ✅ Passed The PR description directly relates to the changeset, describing CSV file import support with transformations and referencing the relevant issue.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch io-files

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

Comment thread cratedb_toolkit/io/file/csv.py Outdated
Comment on lines +83 to +103
def load_table(self) -> pl.LazyFrame:
"""
Load the CSV file as a Polars LazyFrame.
"""

# Read from data source.
lf = pl.scan_csv(
self.location,
separator=self.separator,
quote_char=self.quote_char,
storage_options=self.storage_options,
)

# Optionally apply transformations.
if self.pipeline:
from macropipe import MacroPipe

mp = MacroPipe.from_recipes(*self.pipeline)
lf = mp.apply(lf)

return lf
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where Macropipe comes into play, providing a concisely configurable transformation unit to your ingress channel.

Comment thread tests/io/file/test_csv.py
Comment on lines +11 to +18
climate_json_json = (
str(data_folder / "climate_json_json.csv") + "?quote-char='&pipe=json_array_to_wkt_point:geo_location"
)
climate_json_python = (
str(data_folder / "climate_json_python.csv")
+ '?quote-char="&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data'
)
climate_wkt_json = str(data_folder / "climate_wkt_json.csv") + "?quote-char='"
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can configure Macropipe by describing transformation steps through compact URL parameters.

&pipe=json_array_to_wkt_point:geo_location&pipe=python_to_json:data

That's two pipeline elements, processed sequentially:

  • json_array_to_wkt_point:geo_location (docs)
  • python_to_json:data (docs)

coderabbitai[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

@amotl amotl force-pushed the io-files branch 2 times, most recently from 5251ddd to b8ae509 Compare April 14, 2026 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant