Skip to content
144 changes: 133 additions & 11 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
Any,
Callable,
Dict,
FrozenSet,
Generic,
Iterable,
Iterator,
Expand Down Expand Up @@ -978,7 +979,7 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi
raise ValueError(f"Unsupported file format: {file_format}")


def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
def _read_pos_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
if data_file.file_format == FileFormat.PARQUET:
with io.new_input(data_file.file_path).open() as fi:
delete_fragment = _get_file_format(
Expand All @@ -993,12 +994,20 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]
elif data_file.file_format == FileFormat.PUFFIN:
with io.new_input(data_file.file_path).open() as fi:
payload = fi.read()

return PuffinFile(payload).to_vector()
else:
raise ValueError(f"Delete file format not supported: {data_file.file_format}")


def _read_eq_deletes(io: FileIO, data_file: DataFile) -> pa.Table:
with io.new_input(data_file.file_path).open() as fi:
delete_fragment = _get_file_format(
data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE
).make_fragment(fi)
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
return table


def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array:
if len(positional_deletes) == 1:
all_chunks = positional_deletes[0]
Expand Down Expand Up @@ -1445,7 +1454,7 @@ def _task_to_record_batches(
bound_row_filter: BooleanExpression,
projected_schema: Schema,
projected_field_ids: Set[int],
positional_deletes: Optional[List[ChunkedArray]],
deletes: Optional[List[Union[pa.ChunkedArray, pa.Table]]],
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
partition_spec: Optional[PartitionSpec] = None,
Expand Down Expand Up @@ -1479,9 +1488,20 @@ def _task_to_record_batches(
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
filter=pyarrow_filter if not deletes else None,
columns=[col.name for col in file_project_schema.columns],
)
positional_deletes = None
equality_delete_groups = None
if deletes:
positional_deletes = [d for d in deletes if isinstance(d, pa.ChunkedArray)]
equality_deletes = [d for d in deletes if isinstance(d, pa.Table)]

# preprocess equality deletes to be applied
if equality_deletes:
task_eq_files = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES]
# concatenate equality delete tables with same set of field ids to reduce anti joins
equality_delete_groups = _group_deletes_by_equality_ids(task_eq_files, equality_deletes)

next_index = 0
batches = fragment_scanner.to_batches()
Expand All @@ -1499,6 +1519,17 @@ def _task_to_record_batches(
if current_batch.num_rows == 0:
continue

if equality_delete_groups:
table = pa.Table.from_batches([current_batch])
for equality_ids, combined_table in equality_delete_groups.items():
table = _apply_equality_deletes(table, combined_table, list(equality_ids), file_schema)
if table.num_rows == 0:
break
if table.num_rows > 0:
current_batch = table.combine_chunks().to_batches()[0]
else:
continue

# Apply the user filter
if pyarrow_filter is not None:
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
Expand Down Expand Up @@ -1528,22 +1559,64 @@ def _task_to_record_batches(
yield result_batch


def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
deletes_per_file: Dict[str, List[ChunkedArray]] = {}
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
if len(unique_deletes) > 0:
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[Union[pa.ChunkedArray, pa.Table]]]:
deletes_per_file: Dict[str, List[Union[pa.ChunkedArray, pa.Table]]] = {}

positional_deletes = {
df
for task in tasks
for df in task.delete_files
if df.content == DataFileContent.POSITION_DELETES and df.file_format != FileFormat.PUFFIN
}
if positional_deletes:
executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args),
[(io, delete_file) for delete_file in unique_deletes],
lambda args: _read_pos_deletes(*args),
[(io, delete_file) for delete_file in positional_deletes],
)
for delete in deletes_per_files:
for file, arr in delete.items():
if file in deletes_per_file:
deletes_per_file[file].append(arr)
else:
deletes_per_file[file] = [arr]
deletion_vectors = {
df
for task in tasks
for df in task.delete_files
if df.content == DataFileContent.POSITION_DELETES and df.file_format == FileFormat.PUFFIN
}
if deletion_vectors:
executor = ExecutorFactory.get_or_create()
dv_results: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_pos_deletes(*args),
[(io, delete_file) for delete_file in deletion_vectors],
)
for delete in dv_results:
for file, arr in delete.items():
# Deletion vectors replace all position deletes for a file
deletes_per_file[file] = [arr]

equality_delete_tasks = []
for task in tasks:
equality_deletes = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES]
if equality_deletes:
for delete_file in equality_deletes:
# create a group of datafile to associated equality delete
equality_delete_tasks.append((task.file.file_path, delete_file))

if equality_delete_tasks:
executor = ExecutorFactory.get_or_create()
# Processing equality delete tasks in parallel like position deletes
equality_delete_results = executor.map(
lambda args: (args[0], _read_eq_deletes(io, args[1])),
equality_delete_tasks,
)

for file_path, equality_delete_table in equality_delete_results:
if file_path not in deletes_per_file:
deletes_per_file[file_path] = []
deletes_per_file[file_path].append(equality_delete_table)
return deletes_per_file


Expand Down Expand Up @@ -1679,7 +1752,7 @@ def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
break

def _record_batches_from_scan_tasks_and_deletes(
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]]
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[Union[pa.ChunkedArray, pa.Table]]]
) -> Iterator[pa.RecordBatch]:
total_row_count = 0
for task in tasks:
Expand Down Expand Up @@ -2799,3 +2872,52 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
field_array = arrow_table[path_parts[0]]
# Navigate into the struct using the remaining path parts
return pc.struct_field(field_array, path_parts[1:])


def _group_deletes_by_equality_ids(
task_eq_files: List[DataFile], equality_delete_tables: List[pa.Table]
) -> dict[FrozenSet[int], pa.Table]:
"""Concatenate equality delete tables by their equality IDs to reduce number of anti joins."""
from collections import defaultdict

equality_delete_groups: Dict[FrozenSet[int], pa.Table] = {}
group_map = defaultdict(list)

# Group the tables by their equality IDs
for delete_file, delete_table in zip(task_eq_files, equality_delete_tables):
if delete_file.equality_ids is not None:
key = frozenset(delete_file.equality_ids)
group_map[key].append(delete_table)

# Concat arrow tables in the same groups
for equality_ids, delete_tables in group_map.items():
if delete_tables:
equality_delete_groups[equality_ids] = pa.concat_tables(delete_tables) if len(delete_tables) > 1 else delete_tables[0]
return equality_delete_groups


def _apply_equality_deletes(
data_table: pa.Table, delete_table: pa.Table, equality_ids: List[int], data_schema: Optional[Schema]
) -> pa.Table:
"""Apply equality deletes to a data table.

Filter out rows from the table that match the equality delete table the conditions in it.
Args:
data_table: A PyArrow table which has data to filter
delete_table: A PyArrow table containing the equality deletes
equality_ids: A List of field IDs to use for equality comparison
data_schema: The schema of the PyArrow table
Returns:
A filtered PyArrow table with matching rows removed
"""
if len(delete_table) == 0:
return data_table
if data_schema is None:
raise ValueError("Schema is required for applying equality deletes")

# Resolve the correct columns to be used in the anti join
equality_columns = [data_schema.find_field(fid).name for fid in equality_ids]

# Use PyArrow's join function with left anti join type
result = data_table.join(delete_table.select(equality_columns), keys=equality_columns, join_type="left anti")
return result
41 changes: 14 additions & 27 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
)

from pydantic import Field
from sortedcontainers import SortedList

import pyiceberg.expressions.parser as parser
from pyiceberg.expressions import (
Expand All @@ -64,7 +63,6 @@
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
DataFileContent,
ManifestContent,
Expand All @@ -78,6 +76,7 @@
PartitionSpec,
)
from pyiceberg.schema import Schema
from pyiceberg.table.delete_file_index import DeleteFileIndex
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.metadata import (
Expand Down Expand Up @@ -1793,29 +1792,20 @@ def _min_sequence_number(manifests: List[ManifestFile]) -> int:
return INITIAL_SEQUENCE_NUMBER


def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]:
"""Check if the delete file is relevant for the data file.

Using the column metrics to see if the filename is in the lower and upper bound.
def _match_deletes_to_data_file(data_entry: ManifestEntry, delete_file_index: DeleteFileIndex) -> Set[DataFile]:
"""Check if delete files are relevant for the data file.

Args:
data_entry (ManifestEntry): The manifest entry path of the datafile.
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.
data_entry (ManifestEntry): The manifest entry of the data file.
delete_file_index (DeleteFileIndex): Index containing all delete files.

Returns:
A set of files that are relevant for the data file.
A set of delete files that are relevant for the data file.
"""
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]

if len(relevant_entries) > 0:
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
return {
positional_delete_entry.data_file
for positional_delete_entry in relevant_entries
if evaluator.eval(positional_delete_entry.data_file)
}
else:
return set()
candidate_deletes = delete_file_index.for_data_file(
data_entry.sequence_number or 0, data_entry.data_file, partition_key=data_entry.data_file.partition
)
return set(candidate_deletes)


class DataScan(TableScan):
Expand Down Expand Up @@ -1921,7 +1911,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
delete_file_index = DeleteFileIndex(self.table_metadata.schema(), self.table_metadata.specs())

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
Expand All @@ -1942,19 +1932,16 @@ def plan_files(self) -> Iterable[FileScanTask]:
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
elif data_file.content in (DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES):
delete_file_index.add_delete_file(manifest_entry, partition_key=data_file.partition)
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
delete_file_index,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
Expand Down
Loading