Skip to content

I/O: Adapter for Apache Iceberg #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cratedb_toolkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from .cmd.tail.cli import cli as tail_cli
from .docs.cli import cli as docs_cli
from .info.cli import cli as info_cli
from .io.cli import cli as io_cli
from .io.cli import cli_load as io_cli_load
from .io.cli import cli_save as io_cli_save
from .query.cli import cli as query_cli
from .settings.cli import cli as settings_cli
from .shell.cli import cli as shell_cli
Expand All @@ -30,7 +31,8 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
cli.add_command(cfr_cli, name="cfr")
cli.add_command(cloud_cli, name="cluster")
cli.add_command(docs_cli, name="docs")
cli.add_command(io_cli, name="load")
cli.add_command(io_cli_load, name="load")
cli.add_command(io_cli_save, name="save")
cli.add_command(query_cli, name="query")
cli.add_command(rockset_cli, name="rockset")
cli.add_command(shell_cli, name="shell")
Expand Down
28 changes: 28 additions & 0 deletions cratedb_toolkit/cluster/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
DatabaseAddressMissingError,
OperationFailed,
)
from cratedb_toolkit.io.iceberg import from_iceberg, to_iceberg
from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress
from cratedb_toolkit.util.client import jwt_token_patch
from cratedb_toolkit.util.data import asbool
Expand Down Expand Up @@ -569,6 +570,9 @@
else:
raise NotImplementedError("Loading full data via Kinesis not implemented yet")

elif source_url_obj.scheme.startswith("iceberg") or source_url_obj.scheme.endswith("iceberg"):
return from_iceberg(str(source_url_obj), target_url)

Check warning on line 574 in cratedb_toolkit/cluster/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/core.py#L574

Added line #L574 was not covered by tests

elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")
Expand Down Expand Up @@ -599,6 +603,30 @@

return self

def save_table(
self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None
) -> "StandaloneCluster":
"""
Export data from a database table on a standalone CrateDB Server.

Synopsis
--------
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo

ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
ctk load table mongodb://localhost:27017/testdrive/demo
"""
source_url = self.address.dburi
target_url_obj = URL(target.url)

Check warning on line 620 in cratedb_toolkit/cluster/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/core.py#L619-L620

Added lines #L619 - L620 were not covered by tests
# source_url = source.url

if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
return to_iceberg(source_url, target.url)

Check warning on line 624 in cratedb_toolkit/cluster/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/core.py#L623-L624

Added lines #L623 - L624 were not covered by tests
else:
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")

Check warning on line 626 in cratedb_toolkit/cluster/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/core.py#L626

Added line #L626 was not covered by tests

return self

Check warning on line 628 in cratedb_toolkit/cluster/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cluster/core.py#L628

Added line #L628 was not covered by tests


class DatabaseCluster:
"""
Expand Down
61 changes: 59 additions & 2 deletions cratedb_toolkit/io/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@click.version_option()
@click.pass_context
def cli(ctx: click.Context, verbose: bool, debug: bool):
def cli_load(ctx: click.Context, verbose: bool, debug: bool):
"""
Load data into CrateDB.
"""
return boot_click(ctx, verbose, debug)


@make_command(cli, name="table")
@make_command(cli_load, name="table")
@click.argument("url")
@option_cluster_id
@option_cluster_name
Expand Down Expand Up @@ -67,3 +67,60 @@
cluster_url=cluster_url,
)
cluster.load_table(source=source, target=target, transformation=transformation)


@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@click.version_option()
@click.pass_context
def cli_save(ctx: click.Context, verbose: bool, debug: bool):
"""
Export data from CrateDB.
"""
return boot_click(ctx, verbose, debug)

Check warning on line 81 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L81

Added line #L81 was not covered by tests


@make_command(cli_save, name="table")
@click.argument("url")
@option_cluster_id
@option_cluster_name
@option_cluster_url
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
Comment on lines +89 to +92
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix help text to reflect export functionality.

The help text incorrectly refers to "import" when this command is for exporting data.

-@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
-@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
-@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
-@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
+@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema from which to export the data")
+@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table from which to export the data")
+@click.option("--format", "format_", type=str, required=False, help="File format of the export resource")
+@click.option("--compression", type=str, required=False, help="Compression format of the export resource")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema from which to export the data")
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table from which to export the data")
@click.option("--format", "format_", type=str, required=False, help="File format of the export resource")
@click.option("--compression", type=str, required=False, help="Compression format of the export resource")
🧰 Tools
🪛 Pylint (3.3.7)

[convention] 89-89: Line too long (116/100)

(C0301)


[convention] 90-90: Line too long (113/100)

(C0301)


[convention] 91-91: Line too long (105/100)

(C0301)


[convention] 92-92: Line too long (106/100)

(C0301)

🤖 Prompt for AI Agents
In cratedb_toolkit/io/cli.py around lines 89 to 92, the help text for the
command options incorrectly mentions "import" instead of "export." Update the
help strings to correctly describe that these options relate to exporting data,
changing phrases like "where to import the data" to "where to export the data"
and similarly adjusting other help messages to reflect export functionality.

@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
@click.pass_context
def save_table(
ctx: click.Context,
url: str,
cluster_id: str,
cluster_name: str,
cluster_url: str,
schema: str,
table: str,
format_: str,
compression: str,
transformation: t.Union[Path, None],
):
"""
Export data from CrateDB and CrateDB Cloud clusters.
"""

# When `--transformation` is given, but empty, fix it.
if transformation is not None and transformation.name == "":
transformation = None

Check warning on line 113 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L112-L113

Added lines #L112 - L113 were not covered by tests

# Encapsulate source and target parameters.
source = TableAddress(schema=schema, table=table)
target = InputOutputResource(url=url, format=format_, compression=compression)
print("target:", target)

Check warning on line 118 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L116-L118

Added lines #L116 - L118 were not covered by tests

# Dispatch "load table" operation.
cluster = DatabaseCluster.create(

Check warning on line 121 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L121

Added line #L121 was not covered by tests
cluster_id=cluster_id,
cluster_name=cluster_name,
cluster_url=cluster_url,
)
cluster.save_table(source=source, target=target, transformation=transformation)

Check warning on line 126 in cratedb_toolkit/io/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/cli.py#L126

Added line #L126 was not covered by tests
Comment on lines +118 to +126
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove debug print statement and fix comment.

The code contains a debug print statement and an incorrect comment.

     # Encapsulate source and target parameters.
     source = TableAddress(schema=schema, table=table)
     target = InputOutputResource(url=url, format=format_, compression=compression)
-    print("target:", target)
 
-    # Dispatch "load table" operation.
+    # Dispatch "save table" operation.
     cluster = DatabaseCluster.create(
         cluster_id=cluster_id,
         cluster_name=cluster_name,
         cluster_url=cluster_url,
     )
     cluster.save_table(source=source, target=target, transformation=transformation)
🧰 Tools
🪛 Ruff (0.11.9)

118-118: print found

Remove print

(T201)

🤖 Prompt for AI Agents
In cratedb_toolkit/io/cli.py around lines 118 to 126, remove the debug print
statement printing "target:" and update the comment above the cluster creation
to accurately describe the operation being performed instead of "Dispatch 'load
table' operation." Ensure the comment clearly reflects the purpose of creating
the DatabaseCluster and saving the table.

134 changes: 134 additions & 0 deletions cratedb_toolkit/io/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import dataclasses
import logging

import polars as pl
import pyarrow.parquet as pq
import sqlalchemy as sa
from boltons.urlutils import URL
from pyiceberg.catalog import Catalog, load_catalog
from sqlalchemy_cratedb import insert_bulk

from cratedb_toolkit.model import DatabaseAddress

logger = logging.getLogger(__name__)


CHUNK_SIZE = 75_000


@dataclasses.dataclass
class IcebergAddress:
path: str
catalog: str
table: str

@classmethod
def from_url(cls, url: str):
iceberg_url = URL(url)
if iceberg_url.host == ".":
iceberg_url.path = iceberg_url.path.lstrip("/")
return cls(

Check warning on line 30 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L27-L30

Added lines #L27 - L30 were not covered by tests
path=iceberg_url.path,
catalog=iceberg_url.query_params.get("catalog"),
table=iceberg_url.query_params.get("table"),
)

def load_catalog(self) -> Catalog:
return load_catalog(

Check warning on line 37 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L37

Added line #L37 was not covered by tests
self.catalog,
**{
"type": "sql",
"uri": f"sqlite:///{self.path}/pyiceberg_catalog.db",
"warehouse": f"file://{self.path}",
},
)

@property
def identifier(self):
return (self.catalog, self.table)

Check warning on line 48 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L48

Added line #L48 was not covered by tests

def load_table(self) -> pl.LazyFrame:
if self.catalog is not None:
catalog = self.load_catalog()
return catalog.load_table(self.identifier).to_polars()

Check warning on line 53 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L51-L53

Added lines #L51 - L53 were not covered by tests
else:
return pl.scan_iceberg(self.path)

Check warning on line 55 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L55

Added line #L55 was not covered by tests

Comment on lines +46 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify control flow and add documentation.

The code has an unnecessary else after return and lacks documentation.

     @property
     def identifier(self):
+        """Return the catalog-table identifier tuple."""
         return (self.catalog, self.table)
 
     def load_table(self) -> pl.LazyFrame:
+        """Load the Iceberg table as a Polars LazyFrame."""
         if self.catalog is not None:
             catalog = self.load_catalog()
             return catalog.load_table(self.identifier).to_polars()
-        else:
-            return pl.scan_iceberg(self.path)
+        return pl.scan_iceberg(self.path)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@property
def identifier(self):
return (self.catalog, self.table)
def load_table(self) -> pl.LazyFrame:
if self.catalog is not None:
catalog = self.load_catalog()
return catalog.load_table(self.identifier).to_polars()
else:
return pl.scan_iceberg(self.path)
@property
def identifier(self):
"""Return the catalog-table identifier tuple."""
return (self.catalog, self.table)
def load_table(self) -> pl.LazyFrame:
"""Load the Iceberg table as a Polars LazyFrame."""
if self.catalog is not None:
catalog = self.load_catalog()
return catalog.load_table(self.identifier).to_polars()
return pl.scan_iceberg(self.path)
🧰 Tools
🪛 Pylint (3.3.7)

[convention] 42-42: Missing function or method docstring

(C0116)


[convention] 45-45: Missing function or method docstring

(C0116)


[refactor] 46-50: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it

(R1705)

🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py around lines 41 to 51, remove the unnecessary
else block after the return statement in load_table to simplify control flow.
Add a docstring to the load_table method explaining its purpose and behavior,
including what it returns and under what conditions it loads from catalog or
path.


def from_iceberg(source_url, cratedb_url, progress: bool = False):
"""
Scan an Iceberg table from local filesystem or object store, and load into CrateDB.
https://docs.pola.rs/api/python/stable/reference/api/polars.scan_iceberg.html

Synopsis
--------
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
"""

iceberg_address = IcebergAddress.from_url(source_url)

Check warning on line 70 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L70

Added line #L70 was not covered by tests

# Parse parameters.
logger.info(

Check warning on line 73 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L73

Added line #L73 was not covered by tests
f"Iceberg address: Path: {iceberg_address.path}, catalog: {iceberg_address.catalog}, table: {iceberg_address.table}"
)

cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_url, cratedb_table = cratedb_address.decode()
if cratedb_table.table is None:
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
logger.info(f"Target address: {cratedb_address}")

Check warning on line 81 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L77-L81

Added lines #L77 - L81 were not covered by tests

# Invoke copy operation.
logger.info("Running Iceberg copy")
engine = sa.create_engine(str(cratedb_url))

Check warning on line 85 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L84-L85

Added lines #L84 - L85 were not covered by tests

pl.Config.set_streaming_chunk_size(CHUNK_SIZE)
table = iceberg_address.load_table()

Check warning on line 88 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L87-L88

Added lines #L87 - L88 were not covered by tests

# This conversion to pandas is zero-copy,
# so we can utilize their SQL utils for free.
# https://github.com/pola-rs/polars/issues/7852
# Note: This code also uses the most efficient `insert_bulk` method with CrateDB.
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
table.collect(streaming=True).to_pandas().to_sql(

Check warning on line 95 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L95

Added line #L95 was not covered by tests
name=cratedb_table.table,
schema=cratedb_table.schema,
con=engine,
if_exists="replace",
index=False,
chunksize=CHUNK_SIZE,
method=insert_bulk,
)

# Note: This was much slower.
# table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace")


def to_iceberg(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
"""

iceberg_address = IcebergAddress.from_url(target_url)
catalog = iceberg_address.load_catalog()
print("catalog:", catalog)

Check warning on line 120 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L118-L120

Added lines #L118 - L120 were not covered by tests

# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe
df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet")

Check warning on line 123 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L123

Added line #L123 was not covered by tests

# Create a new Iceberg table.
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table_if_not_exists(

Check warning on line 127 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L126-L127

Added lines #L126 - L127 were not covered by tests
"default.taxi_dataset",
schema=df.schema,
)

# Append the dataframe to the table.
table.append(df)
len(table.scan().to_arrow())

Check warning on line 134 in cratedb_toolkit/io/iceberg.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/iceberg.py#L133-L134

Added lines #L133 - L134 were not covered by tests
Comment on lines +109 to +134
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Complete the implementation and fix multiple issues.

The to_iceberg function has several critical issues:

  1. Hardcoded file path instead of using source_url parameter
  2. Print statement instead of logging
  3. Unused parameters: source_url and progress
  4. Dangling expression at line 121
  5. Line length issues in docstring
-def to_iceberg(source_url, target_url, progress: bool = False):
+def to_iceberg(source_url: str, target_url: str, progress: bool = False):
     """
+    Export data from CrateDB to an Iceberg table.
+    
     Synopsis
     --------
     export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
     ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
-    ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
+    ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\
+00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
     """
 
     iceberg_address = IcebergAddress.from_url(target_url)
     catalog = iceberg_address.load_catalog()
-    print("catalog:", catalog)
+    logger.info("Loading catalog: %s", catalog)
 
     # https://py.iceberg.apache.org/#write-a-pyarrow-dataframe
-    df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet")
+    # TODO: Implement reading from source_url (CrateDB) instead of hardcoded file
+    df = pq.read_table(source_url)  # This should read from CrateDB
 
     # Create a new Iceberg table.
     catalog.create_namespace_if_not_exists("default")
     table = catalog.create_table_if_not_exists(
         "default.taxi_dataset",
         schema=df.schema,
     )
 
     # Append the dataframe to the table.
     table.append(df)
-    len(table.scan().to_arrow())
+    row_count = len(table.scan().to_arrow())
+    logger.info("Appended %d rows to Iceberg table", row_count)

Would you like me to help implement the complete functionality to read data from CrateDB using the source_url parameter?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def to_iceberg(source_url, target_url, progress: bool = False):
"""
Synopsis
--------
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
"""
iceberg_address = IcebergAddress.from_url(target_url)
catalog = iceberg_address.load_catalog()
print("catalog:", catalog)
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe
df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet")
# Create a new Iceberg table.
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table_if_not_exists(
"default.taxi_dataset",
schema=df.schema,
)
# Append the dataframe to the table.
table.append(df)
len(table.scan().to_arrow())
def to_iceberg(source_url: str, target_url: str, progress: bool = False):
"""
Export data from CrateDB to an Iceberg table.
Synopsis
--------
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"
ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\
00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"
"""
iceberg_address = IcebergAddress.from_url(target_url)
catalog = iceberg_address.load_catalog()
logger.info("Loading catalog: %s", catalog)
# https://py.iceberg.apache.org/#write-a-pyarrow-dataframe
# TODO: Implement reading from source_url (CrateDB) instead of hardcoded file
df = pq.read_table(source_url) # This should read from CrateDB
# Create a new Iceberg table.
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table_if_not_exists(
"default.taxi_dataset",
schema=df.schema,
)
# Append the dataframe to the table.
table.append(df)
row_count = len(table.scan().to_arrow())
logger.info("Appended %d rows to Iceberg table", row_count)
🧰 Tools
🪛 Ruff (0.11.9)

102-102: Line too long (139 > 120)

(E501)


107-107: print found

Remove print

(T201)

🪛 Pylint (3.3.7)

[convention] 102-102: Line too long (139/100)

(C0301)


[warning] 96-96: Unused argument 'source_url'

(W0613)


[warning] 96-96: Unused argument 'progress'

(W0613)

🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py from lines 96 to 121, the to_iceberg function
has multiple issues: it uses a hardcoded file path instead of the source_url
parameter to read data, it uses print instead of proper logging, the parameters
source_url and progress are unused, there is a dangling expression at the end,
and the docstring lines are too long. Fix these by replacing the hardcoded path
with source_url to read the parquet file, replace print with a logger call,
utilize the progress parameter if applicable or remove it if not needed, remove
the dangling expression or assign its result properly, and reformat the
docstring to respect line length limits. This will complete and clean up the
function implementation.

12 changes: 12 additions & 0 deletions doc/io/iceberg/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(iceberg)=
# Apache Iceberg I/O

## About
Import and export data into/from Iceberg tables, for humans and machines.


```{toctree}
:maxdepth: 1

loader
```
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ optional-dependencies.io = [
"fsspec[s3,http]",
"pandas>=1,<2.3",
"polars<1.30",
"pyiceberg[pyarrow,sql-postgres]<0.10",
"sqlalchemy>=2",
"universal-pathlib<0.3",
]
Expand Down
Loading