Skip to content

Commit e666efb

Browse files
committed
Merge branch 'main' into ForeverAngry/issue2151
1 parent 9937894 commit e666efb

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import pytest
18+
import pyarrow as pa
19+
import pyarrow.parquet as pq
20+
from pyarrow import Table as pa_table
21+
from datafusion import SessionContext
22+
from pyiceberg.table import Table
23+
from pyiceberg.io.pyarrow import parquet_file_to_data_file
24+
from tests.catalog.test_base import InMemoryCatalog
25+
26+
@pytest.fixture
27+
def iceberg_catalog(tmp_path):
28+
catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix())
29+
catalog.create_namespace("default")
30+
return catalog
31+
32+
def test_overwrite_removes_only_selected_datafile(iceberg_catalog, tmp_path):
33+
# Create a table and append two batches referencing the same file path
34+
ctx = SessionContext()
35+
identifier = "default.test_overwrite_removes_only_selected_datafile"
36+
try:
37+
iceberg_catalog.drop_table(identifier)
38+
except Exception:
39+
pass
40+
41+
# Create Arrow schema and table
42+
arrow_schema = pa.schema([
43+
pa.field("id", pa.int32(), nullable=False),
44+
pa.field("value", pa.string(), nullable=True),
45+
])
46+
df_a = pa_table.from_pylist([
47+
{"id": 1, "value": "A", "file_path": "path/to/file_a"},
48+
], schema=arrow_schema)
49+
df_b = pa_table.from_pylist([
50+
{"id": 1, "value": "A", "file_path": "path/to/file_a"},
51+
], schema=arrow_schema)
52+
53+
# Write Arrow tables to Parquet files
54+
parquet_path_a = str(tmp_path / "file_a.parquet")
55+
parquet_path_b = str(tmp_path / "file_a.parquet")
56+
pq.write_table(df_a, parquet_path_a)
57+
pq.write_table(df_b, parquet_path_b)
58+
59+
table: Table = iceberg_catalog.create_table(identifier, arrow_schema)
60+
61+
# Add both files as DataFiles using add_files
62+
tx = table.transaction()
63+
tx.add_files([parquet_path_a], check_duplicate_files = False)
64+
tx.add_files([parquet_path_b], check_duplicate_files=False)
65+
66+
# Find DataFile for file_b
67+
data_file_b = parquet_file_to_data_file(table.io, table.metadata, parquet_path_b)
68+
69+
# Overwrite: Remove only the DataFile for file_b
70+
table.transaction().update_snapshot().overwrite().delete_data_file(data_file_b).commit()
71+
72+
# Assert: only the row from file_a remains
73+
# Get all file paths in the current table
74+
file_paths = [chunk.as_py() for chunk in table.inspect.data_files().to_pylist()]
75+
76+
# Assert there are no duplicate file paths
77+
assert len(file_paths) == len(set(file_paths)), "Duplicate file paths found in the table"

0 commit comments

Comments
 (0)