-
Notifications
You must be signed in to change notification settings - Fork 4
I/O: Adapter for Apache Iceberg #444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThe changes introduce Apache Iceberg integration for CrateDB Toolkit, adding new CLI commands for loading data from and saving data to Iceberg tables. This includes a new module for Iceberg I/O, updates to the CLI for symmetrical import/export functionality, documentation for Iceberg support, and an optional dependency on Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI
participant StandaloneCluster
participant IcebergIO
User->>CLI: cratedb-toolkit io load table ...
CLI->>StandaloneCluster: load_table(source, target, ...)
StandaloneCluster->>IcebergIO: from_iceberg(source_url, cratedb_url)
IcebergIO-->>StandaloneCluster: Data loaded into CrateDB
User->>CLI: cratedb-toolkit io save table ...
CLI->>StandaloneCluster: save_table(source, target, ...)
StandaloneCluster->>IcebergIO: to_iceberg(source_url, target_url)
IcebergIO-->>StandaloneCluster: Data exported to Iceberg
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (6)
🚧 Files skipped from review as they are similar to previous changes (2)
🧰 Additional context used🪛 Ruff (0.11.9)cratedb_toolkit/io/cli.py118-118: Remove (T201) cratedb_toolkit/cluster/core.py621-621: Found commented-out code Remove commented-out code (ERA001) cratedb_toolkit/io/iceberg.py66-66: Line too long (139 > 120) (E501) 74-74: Line too long (124 > 120) (E501) 106-106: Found commented-out code Remove commented-out code (ERA001) 106-106: Line too long (143 > 120) (E501) 115-115: Line too long (139 > 120) (E501) 120-120: Remove (T201) 🪛 GitHub Check: codecov/patchcratedb_toolkit/io/cli.py[warning] 81-81: cratedb_toolkit/io/cli.py#L81 [warning] 112-113: cratedb_toolkit/io/cli.py#L112-L113 [warning] 116-118: cratedb_toolkit/io/cli.py#L116-L118 [warning] 121-121: cratedb_toolkit/io/cli.py#L121 [warning] 126-126: cratedb_toolkit/io/cli.py#L126 cratedb_toolkit/cluster/core.py[warning] 574-574: cratedb_toolkit/cluster/core.py#L574 [warning] 619-620: cratedb_toolkit/cluster/core.py#L619-L620 [warning] 623-624: cratedb_toolkit/cluster/core.py#L623-L624 [warning] 626-626: cratedb_toolkit/cluster/core.py#L626 [warning] 628-628: cratedb_toolkit/cluster/core.py#L628 cratedb_toolkit/io/iceberg.py[warning] 27-30: cratedb_toolkit/io/iceberg.py#L27-L30 [warning] 37-37: cratedb_toolkit/io/iceberg.py#L37 [warning] 48-48: cratedb_toolkit/io/iceberg.py#L48 [warning] 51-53: cratedb_toolkit/io/iceberg.py#L51-L53 [warning] 55-55: cratedb_toolkit/io/iceberg.py#L55 [warning] 70-70: cratedb_toolkit/io/iceberg.py#L70 [warning] 73-73: cratedb_toolkit/io/iceberg.py#L73 [warning] 77-81: cratedb_toolkit/io/iceberg.py#L77-L81 [warning] 84-85: cratedb_toolkit/io/iceberg.py#L84-L85 [warning] 87-88: cratedb_toolkit/io/iceberg.py#L87-L88 [warning] 95-95: cratedb_toolkit/io/iceberg.py#L95 [warning] 118-120: cratedb_toolkit/io/iceberg.py#L118-L120 [warning] 123-123: cratedb_toolkit/io/iceberg.py#L123 [warning] 126-127: cratedb_toolkit/io/iceberg.py#L126-L127 [warning] 133-134: cratedb_toolkit/io/iceberg.py#L133-L134 🪛 Pylint (3.3.7)cratedb_toolkit/io/cli.py[convention] 89-89: Line too long (116/100) (C0301) [convention] 90-90: Line too long (113/100) (C0301) [convention] 91-91: Line too long (105/100) (C0301) [convention] 92-92: Line too long (106/100) (C0301) [refactor] 95-95: Too many arguments (10/5) (R0913) [refactor] 95-95: Too many positional arguments (10/5) (R0917) [warning] 96-96: Unused argument 'ctx' (W0613) cratedb_toolkit/cluster/core.py[convention] 573-573: Line too long (102/100) (C0301) [convention] 607-607: Line too long (107/100) (C0301) [refactor] 623-626: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it (R1705) [warning] 607-607: Unused argument 'source' (W0613) [warning] 607-607: Unused argument 'transformation' (W0613) cratedb_toolkit/io/iceberg.py[convention] 66-66: Line too long (139/100) (C0301) [convention] 74-74: Line too long (124/100) (C0301) [convention] 106-106: Line too long (143/100) (C0301) [convention] 115-115: Line too long (139/100) (C0301) [convention] 1-1: Missing module docstring (C0114) [error] 4-4: Unable to import 'polars' (E0401) [error] 5-5: Unable to import 'pyarrow.parquet' (E0401) [error] 6-6: Unable to import 'sqlalchemy' (E0401) [error] 8-8: Unable to import 'pyiceberg.catalog' (E0401) [error] 9-9: Unable to import 'sqlalchemy_cratedb' (E0401) [convention] 20-20: Missing class docstring (C0115) [convention] 26-26: Missing function or method docstring (C0116) [convention] 36-36: Missing function or method docstring (C0116) [convention] 47-47: Missing function or method docstring (C0116) [convention] 50-50: Missing function or method docstring (C0116) [refactor] 51-55: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it (R1705) [warning] 73-75: Use lazy % formatting in logging functions (W1203) [warning] 81-81: Use lazy % formatting in logging functions (W1203) [warning] 58-58: Unused argument 'progress' (W0613) [warning] 109-109: Unused argument 'source_url' (W0613) [warning] 109-109: Unused argument 'progress' (W0613) 🪛 GitHub Actions: Tests: Commoncratedb_toolkit/io/cli.py[error] 118-118: Ruff: T201 cratedb_toolkit/cluster/core.py[error] 621-621: Ruff: ERA001 Found commented-out code. Remove commented-out code. cratedb_toolkit/io/iceberg.py[error] 4-4: ModuleNotFoundError: No module named 'polars'. The Python module 'polars' is missing and required by the test run. 🪛 GitHub Actions: Tests: DynamoDBcratedb_toolkit/io/iceberg.py[error] 4-4: ModuleNotFoundError: No module named 'polars' - The Python module 'polars' is missing, causing the pytest run to fail. 🪛 LanguageTooldoc/io/iceberg/index.md[uncategorized] ~1-~1: A punctuation mark might be missing here. (AI_EN_LECTOR_MISSING_PUNCTUATION) [uncategorized] ~5-~5: This verb may not be in the correct form. Consider using a different form for this context. (AI_EN_LECTOR_REPLACEMENT_VERB_FORM) 🔇 Additional comments (4)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (5)
cratedb_toolkit/io/iceberg.py (4)
1-13
: Add module docstring.The module lacks a docstring explaining its purpose and functionality. Consider adding a module-level docstring to describe the Iceberg integration capabilities.
+""" +Apache Iceberg integration for CrateDB Toolkit. + +This module provides functionality to transfer data between Iceberg tables +and CrateDB databases, supporting both import and export operations. +""" import dataclasses import logging🧰 Tools
🪛 Pylint (3.3.7)
[convention] 1-1: Missing module docstring
(C0114)
[error] 3-3: Unable to import 'sqlalchemy'
(E0401)
[error] 4-4: Unable to import 'polars'
(E0401)
[error] 5-5: Unable to import 'pyarrow.parquet'
(E0401)
[error] 7-7: Unable to import 'pyiceberg.catalog'
(E0401)
[error] 8-8: Unable to import 'sqlalchemy_cratedb'
(E0401)
🪛 GitHub Actions: Tests: DynamoDB
[error] 4-4: ModuleNotFoundError: No module named 'polars'. The Python module 'polars' is missing, causing pytest to fail.
🪛 GitHub Actions: Tests: Common
[error] 1-1: Ruff formatting check failed. File would be reformatted.
31-39
: Consider making catalog configuration flexible.The catalog configuration is hardcoded to use SQLite with specific paths. This limits flexibility for different Iceberg deployments.
def load_catalog(self) -> Catalog: + """Load the Iceberg catalog with appropriate configuration.""" + # TODO: Consider accepting catalog configuration as parameters + # to support different catalog types (Hive, REST, etc.) return load_catalog( self.catalog, **{ 'type': 'sql', "uri": f"sqlite:///{self.path}/pyiceberg_catalog.db", "warehouse": f"file://{self.path}", }, )🧰 Tools
🪛 Pylint (3.3.7)
[convention] 31-31: Missing function or method docstring
(C0116)
61-62
: Fix line length issues.Multiple lines exceed the maximum line length of 120 characters.
- ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" + ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\ +00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"- logger.info(f"Iceberg address: Path: {iceberg_address.path}, catalog: {iceberg_address.catalog}, table: {iceberg_address.table}") + logger.info( + "Iceberg address: Path: %s, catalog: %s, table: %s", + iceberg_address.path, iceberg_address.catalog, iceberg_address.table + )Also applies to: 68-68
🧰 Tools
🪛 Ruff (0.11.9)
61-61: Line too long (139 > 120)
(E501)
🪛 Pylint (3.3.7)
[convention] 61-61: Line too long (139/100)
(C0301)
74-74
: Use lazy formatting in logging.Use lazy % formatting instead of f-strings in logging functions for better performance.
- logger.info(f"Target address: {cratedb_address}") + logger.info("Target address: %s", cratedb_address)🧰 Tools
🪛 Pylint (3.3.7)
[warning] 74-74: Use lazy % formatting in logging functions
(W1203)
cratedb_toolkit/io/cli.py (1)
95-106
: Remove unused parameter.The
ctx
parameter is not used in the function.def save_table( - ctx: click.Context, url: str, cluster_id: str, cluster_name: str, cluster_url: str, schema: str, table: str, format_: str, compression: str, transformation: t.Union[Path, None], ):
Note: If you remove the
ctx
parameter, also remove@click.pass_context
from line 94.🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 95-95: Too many arguments (10/5)
(R0913)
[refactor] 95-95: Too many positional arguments (10/5)
(R0917)
[warning] 96-96: Unused argument 'ctx'
(W0613)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
cratedb_toolkit/cli.py
(2 hunks)cratedb_toolkit/cluster/core.py
(3 hunks)cratedb_toolkit/io/cli.py
(2 hunks)cratedb_toolkit/io/iceberg.py
(1 hunks)doc/io/iceberg/index.md
(1 hunks)pyproject.toml
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
cratedb_toolkit/cli.py (1)
cratedb_toolkit/io/cli.py (2)
cli_load
(21-25)cli_save
(77-81)
cratedb_toolkit/io/cli.py (3)
cratedb_toolkit/util/cli.py (2)
boot_click
(16-27)make_command
(75-89)cratedb_toolkit/cluster/core.py (2)
DatabaseCluster
(631-684)create
(651-684)cratedb_toolkit/model.py (3)
schema
(174-178)TableAddress
(182-201)InputOutputResource
(205-212)
🪛 LanguageTool
doc/io/iceberg/index.md
[uncategorized] ~1-~1: A punctuation mark might be missing here.
Context: (iceberg)= # Apache Iceberg I/O ## About Import and ...
(AI_EN_LECTOR_MISSING_PUNCTUATION)
[uncategorized] ~5-~5: This verb may not be in the correct form. Consider using a different form for this context.
Context: ...Apache Iceberg I/O ## About Import and export data into/from Iceberg tables, for huma...
(AI_EN_LECTOR_REPLACEMENT_VERB_FORM)
🪛 Pylint (3.3.7)
cratedb_toolkit/cluster/core.py
[convention] 573-573: Line too long (102/100)
(C0301)
[convention] 607-607: Line too long (107/100)
(C0301)
[refactor] 623-626: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
[warning] 607-607: Unused argument 'source'
(W0613)
[warning] 607-607: Unused argument 'transformation'
(W0613)
cratedb_toolkit/io/cli.py
[convention] 89-89: Line too long (116/100)
(C0301)
[convention] 90-90: Line too long (113/100)
(C0301)
[convention] 91-91: Line too long (105/100)
(C0301)
[convention] 92-92: Line too long (106/100)
(C0301)
[refactor] 95-95: Too many arguments (10/5)
(R0913)
[refactor] 95-95: Too many positional arguments (10/5)
(R0917)
[warning] 96-96: Unused argument 'ctx'
(W0613)
cratedb_toolkit/io/iceberg.py
[convention] 29-29: Line too long (135/100)
(C0301)
[convention] 61-61: Line too long (139/100)
(C0301)
[convention] 68-68: Line too long (133/100)
(C0301)
[convention] 89-89: Line too long (150/100)
(C0301)
[convention] 93-93: Line too long (143/100)
(C0301)
[convention] 102-102: Line too long (139/100)
(C0301)
[convention] 1-1: Missing module docstring
(C0114)
[error] 3-3: Unable to import 'sqlalchemy'
(E0401)
[error] 4-4: Unable to import 'polars'
(E0401)
[error] 5-5: Unable to import 'pyarrow.parquet'
(E0401)
[error] 7-7: Unable to import 'pyiceberg.catalog'
(E0401)
[error] 8-8: Unable to import 'sqlalchemy_cratedb'
(E0401)
[convention] 19-19: Missing class docstring
(C0115)
[convention] 25-25: Missing function or method docstring
(C0116)
[convention] 31-31: Missing function or method docstring
(C0116)
[convention] 42-42: Missing function or method docstring
(C0116)
[convention] 45-45: Missing function or method docstring
(C0116)
[refactor] 46-50: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
[warning] 68-68: Use lazy % formatting in logging functions
(W1203)
[warning] 74-74: Use lazy % formatting in logging functions
(W1203)
[warning] 53-53: Unused argument 'progress'
(W0613)
[warning] 96-96: Unused argument 'source_url'
(W0613)
[warning] 96-96: Unused argument 'progress'
(W0613)
🪛 Ruff (0.11.9)
cratedb_toolkit/cluster/core.py
621-621: Found commented-out code
Remove commented-out code
(ERA001)
cratedb_toolkit/io/cli.py
118-118: print
found
Remove print
(T201)
cratedb_toolkit/io/iceberg.py
29-29: Line too long (135 > 120)
(E501)
61-61: Line too long (139 > 120)
(E501)
68-68: Line too long (133 > 120)
(E501)
89-89: Line too long (150 > 120)
(E501)
93-93: Found commented-out code
Remove commented-out code
(ERA001)
93-93: Line too long (143 > 120)
(E501)
102-102: Line too long (139 > 120)
(E501)
107-107: print
found
Remove print
(T201)
🪛 GitHub Actions: Tests: Common
cratedb_toolkit/cluster/core.py
[error] 1-1: Ruff formatting check failed. File would be reformatted.
cratedb_toolkit/io/iceberg.py
[error] 1-1: Ruff formatting check failed. File would be reformatted.
🪛 GitHub Actions: Tests: DynamoDB
cratedb_toolkit/io/iceberg.py
[error] 4-4: ModuleNotFoundError: No module named 'polars'. The Python module 'polars' is missing, causing pytest to fail.
🔇 Additional comments (7)
pyproject.toml (1)
176-176
:✅ Verification successful
Verify the version constraint for pyiceberg dependency.
The dependency addition looks correct, but please verify that the version constraint
<0.10
is appropriate for the current Iceberg integration features.
🌐 Web query:
What is the latest stable version of pyiceberg and are there any recent breaking changes that would affect the version constraint <0.10?
💡 Result:
The latest stable version of pyiceberg is 0.9.1, which was released on April 30, 2025[1]. This release is part of the Apache Iceberg ecosystem, which is an open table format for huge analytic datasets.
Recent Changes in PyIceberg
PyIceberg has undergone significant development since its first release in September 2023, as mentioned during the Iceberg Summit 2025 by Fokko Driesprong[5]. The most recent version (0.9.1) follows several earlier versions with notable changes.
Breaking Changes
There are a few breaking changes that would affect users with version constraints <0.10:
Table.name Behavior Change: In a previous release, the behavior of
Table.name
was changed to return the table name without the catalog name. This was part of a broader effort to remove references to the catalog name in pyiceberg[6].Dependency Requirements: PyIceberg has removed upper bound restrictions for dependency libraries, allowing early testing of new versions. This includes removing Python library version upper bound restrictions, allowing for Python 3.13 support[6].
Python Version Support: While not directly in pyiceberg, but relevant to the Python ecosystem, it's worth noting that Python 3.8 support is being removed in related projects as it reached EOL in October 2024. The OpenMetadata project, for example, now only supports Python 3.9, 3.10, and 3.11 for their ingestion framework[7].
Features in Recent Versions
Recent versions of pyiceberg have added several new capabilities:
- Support for UPSERT operations directly within PyIceberg
- Dynamic overwrites optimization
- Extended table updates with new actions
- Support for Iceberg V3 spec
- Cross-region read support for S3FileIO operations
- Integration with pandas, polars, and other data processing libraries[6]
If you're using a version constraint of <0.10, you'll need to be aware of these changes, particularly the behavior change in
Table.name
which might require code adjustments.Citations:
- 1: https://pypi.org/project/pyiceberg/
- 2: https://py.iceberg.apache.org
- 3: https://iceberg.apache.org/releases/
- 4: https://github.com/apache/iceberg-python
- 5: https://www.youtube.com/watch?v=TIJMWHplBRU
- 6: https://github.com/apache/iceberg-python/releases
- 7: https://docs.open-metadata.org/v1.1.x/releases/all-releases
- 8: https://pypi.org/project/pyiceberg/0.4.0rc2
- 9: https://estuary.dev/blog/getting-started-pyiceberg/
- 10: https://pyodide.org/en/stable/project/changelog.html
pyiceberg <0.10 constraint is valid—note upcoming breaking changes
The constraint
<0.10
will install the latest stable 0.9.1 release, but please review and test for these recent changes before rolling out:
- Table.name now returns only the table name (catalog prefix removed)
- Python 3.8 support is dropped in the ecosystem (ensure Python ≥3.9)
- Upstream dependencies no longer have upper‐bound restrictions (may pull newer library versions)
No changes to the current version constraint are required.
doc/io/iceberg/index.md (1)
1-12
: Documentation looks good and clearly introduces the Iceberg I/O functionality.The documentation structure and content are appropriate for introducing the new Apache Iceberg integration features.
🧰 Tools
🪛 LanguageTool
[uncategorized] ~1-~1: A punctuation mark might be missing here.
Context: (iceberg)= # Apache Iceberg I/O ## About Import and ...(AI_EN_LECTOR_MISSING_PUNCTUATION)
[uncategorized] ~5-~5: This verb may not be in the correct form. Consider using a different form for this context.
Context: ...Apache Iceberg I/O ## About Import and export data into/from Iceberg tables, for huma...(AI_EN_LECTOR_REPLACEMENT_VERB_FORM)
cratedb_toolkit/cli.py (2)
12-13
: Good architectural improvement separating load and save operations.The split of the CLI import into separate
cli_load
andcli_save
functions properly implements the symmetrical import/export functionality.
34-35
: Correct implementation of separate CLI commands.The registration of distinct "load" and "save" commands enables users to access both import and export functionality independently.
cratedb_toolkit/cluster/core.py (2)
23-23
: Import addition looks correct.The import of
from_iceberg
andto_iceberg
functions is properly placed and supports the new Iceberg integration functionality.
573-575
: Iceberg scheme handling implemented correctly.The conditional logic properly detects Iceberg URLs and delegates to the appropriate
from_iceberg
function.🧰 Tools
🪛 Pylint (3.3.7)
[convention] 573-573: Line too long (102/100)
(C0301)
cratedb_toolkit/io/cli.py (1)
72-82
: Well-structured command group for data export.The new
cli_save
command group properly mirrors the structure ofcli_load
and provides a clear separation of concerns between loading and saving operations.
cratedb_toolkit/cluster/core.py
Outdated
def save_table( | ||
self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None | ||
) -> "StandaloneCluster": | ||
""" | ||
Export data from a database table on a standalone CrateDB Server. | ||
|
||
Synopsis | ||
-------- | ||
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | ||
|
||
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo | ||
ctk load table mongodb://localhost:27017/testdrive/demo | ||
""" | ||
source_url = self.address.dburi | ||
target_url_obj = URL(target.url) | ||
#source_url = source.url | ||
|
||
if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"): | ||
return to_iceberg(source_url, target.url) | ||
else: | ||
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}") | ||
|
||
return self | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix multiple issues in the save_table method.
The method has several issues that need to be addressed:
- Unused arguments:
source
andtransformation
parameters are not used - Commented-out code: Line 621 contains commented code that should be removed
- Unnecessary else clause: The else block after return is redundant
- Line length violations: Lines exceed the 100-character limit
- Formatting issues: The pipeline indicates this file needs reformatting
Apply this diff to fix the issues:
def save_table(
- self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None
+ self, source: TableAddress, target: InputOutputResource
) -> "StandaloneCluster":
"""
Export data from a database table on a standalone CrateDB Server.
Synopsis
--------
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
- ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
- ctk load table mongodb://localhost:27017/testdrive/demo
+ ctk save table iceberg://...
"""
source_url = self.address.dburi
target_url_obj = URL(target.url)
- #source_url = source.url
- if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
+ if (target_url_obj.scheme.startswith("iceberg") or
+ target_url_obj.scheme.endswith("iceberg")):
return to_iceberg(source_url, target.url)
- else:
- raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")
+
+ raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")
- return self
Additionally, run the formatter to fix the pipeline failure:
#!/bin/bash
# Fix formatting issues
ruff format cratedb_toolkit/cluster/core.py
🧰 Tools
🪛 Ruff (0.11.9)
621-621: Found commented-out code
Remove commented-out code
(ERA001)
🪛 Pylint (3.3.7)
[convention] 607-607: Line too long (107/100)
(C0301)
[refactor] 623-626: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
[warning] 607-607: Unused argument 'source'
(W0613)
[warning] 607-607: Unused argument 'transformation'
(W0613)
🤖 Prompt for AI Agents
In cratedb_toolkit/cluster/core.py around lines 606 to 629, fix the save_table
method by removing the unused parameters source and transformation, deleting the
commented-out code on line 621, eliminating the unnecessary else clause after
the return statement, and breaking long lines to respect the 100-character
limit. After these code changes, run the ruff formatter on the file to resolve
formatting issues and pass the pipeline.
@property | ||
def identifier(self): | ||
return (self.catalog, self.table) | ||
|
||
def load_table(self) -> pl.LazyFrame: | ||
if self.catalog is not None: | ||
catalog = self.load_catalog() | ||
return catalog.load_table(self.identifier).to_polars() | ||
else: | ||
return pl.scan_iceberg(self.path) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify control flow and add documentation.
The code has an unnecessary else
after return
and lacks documentation.
@property
def identifier(self):
+ """Return the catalog-table identifier tuple."""
return (self.catalog, self.table)
def load_table(self) -> pl.LazyFrame:
+ """Load the Iceberg table as a Polars LazyFrame."""
if self.catalog is not None:
catalog = self.load_catalog()
return catalog.load_table(self.identifier).to_polars()
- else:
- return pl.scan_iceberg(self.path)
+ return pl.scan_iceberg(self.path)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@property | |
def identifier(self): | |
return (self.catalog, self.table) | |
def load_table(self) -> pl.LazyFrame: | |
if self.catalog is not None: | |
catalog = self.load_catalog() | |
return catalog.load_table(self.identifier).to_polars() | |
else: | |
return pl.scan_iceberg(self.path) | |
@property | |
def identifier(self): | |
"""Return the catalog-table identifier tuple.""" | |
return (self.catalog, self.table) | |
def load_table(self) -> pl.LazyFrame: | |
"""Load the Iceberg table as a Polars LazyFrame.""" | |
if self.catalog is not None: | |
catalog = self.load_catalog() | |
return catalog.load_table(self.identifier).to_polars() | |
return pl.scan_iceberg(self.path) |
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 42-42: Missing function or method docstring
(C0116)
[convention] 45-45: Missing function or method docstring
(C0116)
[refactor] 46-50: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py around lines 41 to 51, remove the unnecessary
else block after the return statement in load_table to simplify control flow.
Add a docstring to the load_table method explaining its purpose and behavior,
including what it returns and under what conditions it loads from catalog or
path.
cratedb_toolkit/io/iceberg.py
Outdated
table.collect(streaming=True).to_pandas().to_sql( | ||
name=cratedb_table.table, schema=cratedb_table.schema, con=engine, if_exists="replace", index=False, chunksize=CHUNK_SIZE, method=insert_bulk, | ||
) | ||
|
||
# Note: This was much slower. | ||
# table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove commented-out code and fix line length.
The code contains commented-out implementation and line length issues.
table.collect(streaming=True).to_pandas().to_sql(
- name=cratedb_table.table, schema=cratedb_table.schema, con=engine, if_exists="replace", index=False, chunksize=CHUNK_SIZE, method=insert_bulk,
+ name=cratedb_table.table,
+ schema=cratedb_table.schema,
+ con=engine,
+ if_exists="replace",
+ index=False,
+ chunksize=CHUNK_SIZE,
+ method=insert_bulk,
)
-
- # Note: This was much slower.
- # table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
table.collect(streaming=True).to_pandas().to_sql( | |
name=cratedb_table.table, schema=cratedb_table.schema, con=engine, if_exists="replace", index=False, chunksize=CHUNK_SIZE, method=insert_bulk, | |
) | |
# Note: This was much slower. | |
# table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace") | |
table.collect(streaming=True).to_pandas().to_sql( | |
name=cratedb_table.table, | |
schema=cratedb_table.schema, | |
con=engine, | |
if_exists="replace", | |
index=False, | |
chunksize=CHUNK_SIZE, | |
method=insert_bulk, | |
) |
🧰 Tools
🪛 Ruff (0.11.9)
89-89: Line too long (150 > 120)
(E501)
93-93: Found commented-out code
Remove commented-out code
(ERA001)
93-93: Line too long (143 > 120)
(E501)
🪛 Pylint (3.3.7)
[convention] 89-89: Line too long (150/100)
(C0301)
[convention] 93-93: Line too long (143/100)
(C0301)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py around lines 88 to 94, remove the commented-out
code block that uses the polars library for writing to the database, as it is no
longer needed. Also, reformat the long line calling to_sql to ensure it does not
exceed the preferred line length, breaking it into multiple lines for better
readability.
def to_iceberg(source_url, target_url, progress: bool = False): | ||
""" | ||
Synopsis | ||
-------- | ||
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | ||
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" | ||
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" | ||
""" | ||
|
||
iceberg_address = IcebergAddress.from_url(target_url) | ||
catalog = iceberg_address.load_catalog() | ||
print("catalog:", catalog) | ||
|
||
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe | ||
df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet") | ||
|
||
# Create a new Iceberg table. | ||
catalog.create_namespace_if_not_exists("default") | ||
table = catalog.create_table_if_not_exists( | ||
"default.taxi_dataset", | ||
schema=df.schema, | ||
) | ||
|
||
# Append the dataframe to the table. | ||
table.append(df) | ||
len(table.scan().to_arrow()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete the implementation and fix multiple issues.
The to_iceberg
function has several critical issues:
- Hardcoded file path instead of using
source_url
parameter - Print statement instead of logging
- Unused parameters:
source_url
andprogress
- Dangling expression at line 121
- Line length issues in docstring
-def to_iceberg(source_url, target_url, progress: bool = False):
+def to_iceberg(source_url: str, target_url: str, progress: bool = False):
"""
+ Export data from CrateDB to an Iceberg table.
+
Synopsis
--------
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
- ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
+ ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\
+00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
"""
iceberg_address = IcebergAddress.from_url(target_url)
catalog = iceberg_address.load_catalog()
- print("catalog:", catalog)
+ logger.info("Loading catalog: %s", catalog)
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe
- df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet")
+ # TODO: Implement reading from source_url (CrateDB) instead of hardcoded file
+ df = pq.read_table(source_url) # This should read from CrateDB
# Create a new Iceberg table.
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table_if_not_exists(
"default.taxi_dataset",
schema=df.schema,
)
# Append the dataframe to the table.
table.append(df)
- len(table.scan().to_arrow())
+ row_count = len(table.scan().to_arrow())
+ logger.info("Appended %d rows to Iceberg table", row_count)
Would you like me to help implement the complete functionality to read data from CrateDB using the source_url parameter?
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def to_iceberg(source_url, target_url, progress: bool = False): | |
""" | |
Synopsis | |
-------- | |
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | |
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" | |
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" | |
""" | |
iceberg_address = IcebergAddress.from_url(target_url) | |
catalog = iceberg_address.load_catalog() | |
print("catalog:", catalog) | |
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe | |
df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet") | |
# Create a new Iceberg table. | |
catalog.create_namespace_if_not_exists("default") | |
table = catalog.create_table_if_not_exists( | |
"default.taxi_dataset", | |
schema=df.schema, | |
) | |
# Append the dataframe to the table. | |
table.append(df) | |
len(table.scan().to_arrow()) | |
def to_iceberg(source_url: str, target_url: str, progress: bool = False): | |
""" | |
Export data from CrateDB to an Iceberg table. | |
Synopsis | |
-------- | |
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | |
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" | |
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\ | |
00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" | |
""" | |
iceberg_address = IcebergAddress.from_url(target_url) | |
catalog = iceberg_address.load_catalog() | |
logger.info("Loading catalog: %s", catalog) | |
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe | |
# TODO: Implement reading from source_url (CrateDB) instead of hardcoded file | |
df = pq.read_table(source_url) # This should read from CrateDB | |
# Create a new Iceberg table. | |
catalog.create_namespace_if_not_exists("default") | |
table = catalog.create_table_if_not_exists( | |
"default.taxi_dataset", | |
schema=df.schema, | |
) | |
# Append the dataframe to the table. | |
table.append(df) | |
row_count = len(table.scan().to_arrow()) | |
logger.info("Appended %d rows to Iceberg table", row_count) |
🧰 Tools
🪛 Ruff (0.11.9)
102-102: Line too long (139 > 120)
(E501)
107-107: print
found
Remove print
(T201)
🪛 Pylint (3.3.7)
[convention] 102-102: Line too long (139/100)
(C0301)
[warning] 96-96: Unused argument 'source_url'
(W0613)
[warning] 96-96: Unused argument 'progress'
(W0613)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py from lines 96 to 121, the to_iceberg function
has multiple issues: it uses a hardcoded file path instead of the source_url
parameter to read data, it uses print instead of proper logging, the parameters
source_url and progress are unused, there is a dangling expression at the end,
and the docstring lines are too long. Fix these by replacing the hardcoded path
with source_url to read the parquet file, replace print with a logger call,
utilize the progress parameter if applicable or remove it if not needed, remove
the dangling expression or assign its result properly, and reformat the
docstring to respect line length limits. This will complete and clean up the
function implementation.
print("target:", target) | ||
|
||
# Dispatch "load table" operation. | ||
cluster = DatabaseCluster.create( | ||
cluster_id=cluster_id, | ||
cluster_name=cluster_name, | ||
cluster_url=cluster_url, | ||
) | ||
cluster.save_table(source=source, target=target, transformation=transformation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debug print statement and fix comment.
The code contains a debug print statement and an incorrect comment.
# Encapsulate source and target parameters.
source = TableAddress(schema=schema, table=table)
target = InputOutputResource(url=url, format=format_, compression=compression)
- print("target:", target)
- # Dispatch "load table" operation.
+ # Dispatch "save table" operation.
cluster = DatabaseCluster.create(
cluster_id=cluster_id,
cluster_name=cluster_name,
cluster_url=cluster_url,
)
cluster.save_table(source=source, target=target, transformation=transformation)
🧰 Tools
🪛 Ruff (0.11.9)
118-118: print
found
Remove print
(T201)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/cli.py around lines 118 to 126, remove the debug print
statement printing "target:" and update the comment above the cluster creation
to accurately describe the operation being performed instead of "Dispatch 'load
table' operation." Ensure the comment clearly reflects the purpose of creating
the DatabaseCluster and saving the table.
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data") | ||
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") | ||
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource") | ||
@click.option("--compression", type=str, required=False, help="Compression format of the import resource") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix help text to reflect export functionality.
The help text incorrectly refers to "import" when this command is for exporting data.
-@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
-@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
-@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
-@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
+@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema from which to export the data")
+@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table from which to export the data")
+@click.option("--format", "format_", type=str, required=False, help="File format of the export resource")
+@click.option("--compression", type=str, required=False, help="Compression format of the export resource")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data") | |
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") | |
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource") | |
@click.option("--compression", type=str, required=False, help="Compression format of the import resource") | |
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema from which to export the data") | |
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table from which to export the data") | |
@click.option("--format", "format_", type=str, required=False, help="File format of the export resource") | |
@click.option("--compression", type=str, required=False, help="Compression format of the export resource") |
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 89-89: Line too long (116/100)
(C0301)
[convention] 90-90: Line too long (113/100)
(C0301)
[convention] 91-91: Line too long (105/100)
(C0301)
[convention] 92-92: Line too long (106/100)
(C0301)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/cli.py around lines 89 to 92, the help text for the
command options incorrectly mentions "import" instead of "export." Update the
help strings to correctly describe that these options relate to exporting data,
changing phrases like "where to import the data" to "where to export the data"
and similarly adjusting other help messages to reflect export functionality.
About
Adding support to import and export Apache Iceberg tables.
References