Skip to content

Commit ecc5218

Browse files
authored
add iceberg datafusion integration (#2075)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change - Added pyiceberg table integration so that pyiceberg `Table` can be pass in directly to datafusion's `register_table_provider` - Added `datafusion` as a optional dependency - Added docs for the integration: <img width="1279" alt="Screenshot 2025-07-06 at 10 59 44 AM" src="https://github.com/user-attachments/assets/f41f08e6-dd41-4012-ad96-2eaae805d28e" /> # Are these changes tested? Yes # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 9c99f32 commit ecc5218

File tree

5 files changed

+197
-21
lines changed

5 files changed

+197
-21
lines changed

mkdocs/docs/api.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,3 +1758,66 @@ shape: (11, 4)
17581758
21566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20
17591759
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
17601760
```
1761+
1762+
### Apache DataFusion
1763+
1764+
PyIceberg integrates with [Apache DataFusion](https://datafusion.apache.org/) through the Custom Table Provider interface ([FFI_TableProvider](https://datafusion.apache.org/python/user-guide/io/table_provider.html)) exposed through `iceberg-rust`.
1765+
1766+
<!-- prettier-ignore-start -->
1767+
1768+
!!! note "Requirements"
1769+
This requires [`datafusion` to be installed](index.md).
1770+
1771+
<!-- prettier-ignore-end -->
1772+
1773+
<!-- markdownlint-disable MD046 -- Allowing indented multi-line formatting in admonition-->
1774+
1775+
!!! warning "Experimental Feature"
1776+
The DataFusion integration is considered **experimental**.
1777+
1778+
The integration has a few caveats:
1779+
1780+
- Only works with `datafusion >= 45`
1781+
- Depends directly on `iceberg-rust` instead of PyIceberg's implementation
1782+
- Has limited features compared to the full PyIceberg API
1783+
1784+
The integration will improve as both DataFusion and `iceberg-rust` matures.
1785+
1786+
<!-- markdownlint-enable MD046 -->
1787+
1788+
PyIceberg tables can be registered directly with DataFusion's SessionContext using the table provider interface.
1789+
1790+
```python
1791+
from datafusion import SessionContext
1792+
from pyiceberg.catalog import load_catalog
1793+
import pyarrow as pa
1794+
1795+
# Load catalog and create/load a table
1796+
catalog = load_catalog("catalog", type="in-memory")
1797+
catalog.create_namespace_if_not_exists("default")
1798+
1799+
# Create some sample data
1800+
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
1801+
iceberg_table = catalog.create_table("default.test", schema=data.schema)
1802+
iceberg_table.append(data)
1803+
1804+
# Register the table with DataFusion
1805+
ctx = SessionContext()
1806+
ctx.register_table_provider("test", iceberg_table)
1807+
1808+
# Query the table using DataFusion SQL
1809+
ctx.table("test").show()
1810+
```
1811+
1812+
This will output:
1813+
1814+
```python
1815+
DataFrame()
1816+
+---+---+
1817+
| x | y |
1818+
+---+---+
1819+
| 1 | 4 |
1820+
| 2 | 5 |
1821+
| 3 | 6 |
1822+
+---+---+
1823+
```

poetry.lock

Lines changed: 22 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/table/__init__.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
import pyarrow as pa
144144
import ray
145145
from duckdb import DuckDBPyConnection
146+
from pyiceberg_core.datafusion import IcebergDataFusionTable
146147

147148
from pyiceberg.catalog import Catalog
148149

@@ -1494,6 +1495,51 @@ def to_polars(self) -> pl.LazyFrame:
14941495

14951496
return pl.scan_iceberg(self)
14961497

1498+
def __datafusion_table_provider__(self) -> "IcebergDataFusionTable":
1499+
"""Return the DataFusion table provider PyCapsule interface.
1500+
1501+
To support DataFusion features such as push down filtering, this function will return a PyCapsule
1502+
interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
1503+
you should not need to call this function directly. Instead you can use ``register_table_provider`` in
1504+
the DataFusion SessionContext.
1505+
1506+
Returns:
1507+
A PyCapsule DataFusion TableProvider interface.
1508+
1509+
Example:
1510+
```python
1511+
from datafusion import SessionContext
1512+
from pyiceberg.catalog import load_catalog
1513+
import pyarrow as pa
1514+
catalog = load_catalog("catalog", type="in-memory")
1515+
catalog.create_namespace_if_not_exists("default")
1516+
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
1517+
iceberg_table = catalog.create_table("default.test", schema=data.schema)
1518+
iceberg_table.append(data)
1519+
ctx = SessionContext()
1520+
ctx.register_table_provider("test", iceberg_table)
1521+
ctx.table("test").show()
1522+
```
1523+
Results in
1524+
```
1525+
DataFrame()
1526+
+---+---+
1527+
| x | y |
1528+
+---+---+
1529+
| 1 | 4 |
1530+
| 2 | 5 |
1531+
| 3 | 6 |
1532+
+---+---+
1533+
```
1534+
"""
1535+
from pyiceberg_core.datafusion import IcebergDataFusionTable
1536+
1537+
return IcebergDataFusionTable(
1538+
identifier=self.name(),
1539+
metadata_location=self.metadata_location,
1540+
file_io_properties=self.io.properties,
1541+
).__datafusion_table_provider__()
1542+
14971543

14981544
class StaticTable(Table):
14991545
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pyiceberg-core = { version = "^0.5.1", optional = true }
8484
polars = { version = "^1.21.0", optional = true }
8585
thrift-sasl = { version = ">=0.4.3", optional = true }
8686
kerberos = {version = "^1.3.1", optional = true}
87+
datafusion = { version = ">=45", optional = true }
8788

8889
[tool.poetry.group.dev.dependencies]
8990
pytest = "7.4.4"
@@ -99,7 +100,6 @@ pytest-mock = "3.14.1"
99100
pyspark = "3.5.6"
100101
cython = "3.1.2"
101102
deptry = ">=0.14,<0.24"
102-
datafusion = ">=44,<48"
103103
docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520
104104
mypy-boto3-glue = ">=1.28.18"
105105
mypy-boto3-dynamodb = ">=1.28.18"
@@ -314,6 +314,7 @@ gcsfs = ["gcsfs"]
314314
rest-sigv4 = ["boto3"]
315315
hf = ["huggingface-hub"]
316316
pyiceberg-core = ["pyiceberg-core"]
317+
datafusion = ["datafusion"]
317318

318319
[tool.pytest.ini_options]
319320
markers = [

tests/table/test_datafusion.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
from pathlib import Path
20+
21+
import pyarrow as pa
22+
import pytest
23+
from datafusion import SessionContext
24+
25+
from pyiceberg.catalog import Catalog, load_catalog
26+
27+
28+
@pytest.fixture(scope="session")
29+
def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path:
30+
return tmp_path_factory.mktemp("warehouse")
31+
32+
33+
@pytest.fixture(scope="session")
34+
def catalog(warehouse: Path) -> Catalog:
35+
catalog = load_catalog(
36+
"default",
37+
uri=f"sqlite:///{warehouse}/pyiceberg_catalog.db",
38+
warehouse=f"file://{warehouse}",
39+
)
40+
return catalog
41+
42+
43+
def test_datafusion_register_pyiceberg_table(catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
44+
catalog.create_namespace_if_not_exists("default")
45+
iceberg_table = catalog.create_table_if_not_exists(
46+
"default.dataset",
47+
schema=arrow_table_with_null.schema,
48+
)
49+
iceberg_table.append(arrow_table_with_null)
50+
51+
ctx = SessionContext()
52+
ctx.register_table_provider("test", iceberg_table)
53+
54+
datafusion_table = ctx.table("test")
55+
assert datafusion_table is not None
56+
57+
assert datafusion_table.to_arrow_table().to_pylist() == iceberg_table.scan().to_arrow().to_pylist()
58+
59+
from pandas.testing import assert_frame_equal
60+
61+
assert_frame_equal(
62+
datafusion_table.to_arrow_table().to_pandas(),
63+
iceberg_table.scan().to_arrow().to_pandas(),
64+
)

0 commit comments

Comments
 (0)