Skip to content

Apply Name mapping #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
405d36c
Arrow: Allow missing field-ids from Schema
Fokko Dec 5, 2023
27017cf
Thanks Honah!
Fokko Dec 7, 2023
2657fe1
Update pyiceberg/io/pyarrow.py
Fokko Dec 9, 2023
447d22f
lint
Fokko Dec 9, 2023
0a0e829
Add name-mapping
Fokko Dec 13, 2023
5a673d0
Move the names from a `set` to a `list`
Fokko Dec 14, 2023
2c9be7c
Move from `set` to `lint` in tests as well
Fokko Dec 14, 2023
c13e3b3
make tests happy
Fokko Dec 14, 2023
4fb7002
Build: Bump mypy-boto3-glue from 1.33.5 to 1.34.0 (#213)
dependabot[bot] Dec 14, 2023
b63043e
Merge remote-tracking branch 'fokko/fd-make-field-ids-optional' into …
sungwy Dec 16, 2023
9dfdc05
apply name mapping to file tasks
sungwy Dec 17, 2023
c918dcf
load name mapping from metadata
sungwy Dec 18, 2023
7f51df5
Build: Bump actions/upload-artifact from 3 to 4 (#215)
dependabot[bot] Dec 15, 2023
48fa520
Build: Bump coverage from 7.3.2 to 7.3.3 (#214)
dependabot[bot] Dec 15, 2023
7aa0270
Build: Bump pyarrow from 14.0.1 to 14.0.2 (#225)
dependabot[bot] Dec 18, 2023
2225fa4
Build: Bump moto from 4.2.11 to 4.2.12 (#224)
dependabot[bot] Dec 18, 2023
199fb85
Add name-mapping (#212)
Fokko Dec 19, 2023
473b17d
Move from `set` to `lint` in tests as well
Fokko Dec 14, 2023
6466205
make tests happy
Fokko Dec 14, 2023
074d23c
apply name mapping to file tasks
sungwy Dec 17, 2023
2eccbd5
rebase
sungwy Dec 19, 2023
9ed7511
Merge branch 'main' into sy-name-mapping
sungwy Dec 19, 2023
54bcf5e
refactoring
sungwy Dec 19, 2023
b6d06cf
remove stale ApplyNameMapping class
sungwy Dec 20, 2023
dcb7991
review comments
sungwy Dec 21, 2023
3827cc0
Update pyiceberg/io/pyarrow.py
sungwy Jan 12, 2024
7abd334
Apply suggestions from code review
sungwy Jan 12, 2024
be0e6fd
Moto server implementation for pytest (#223)
Dec 19, 2023
f4a0592
Make `connect_timeout` configurable (#218)
jqin61 Dec 19, 2023
4594d21
Build: Bump aiohttp from 3.8.6 to 3.9.0 (#168)
dependabot[bot] Dec 19, 2023
5aeb5be
Build: Bump cython from 3.0.6 to 3.0.7 (#228)
dependabot[bot] Dec 20, 2023
a50c163
Build: Bump coverage from 7.3.3 to 7.3.4 (#231)
dependabot[bot] Dec 21, 2023
82e6efe
Build: Bump fastavro from 1.9.1 to 1.9.2 (#236)
dependabot[bot] Dec 22, 2023
a00cf42
Build: Bump mypy-boto3-glue from 1.34.0 to 1.34.7 (#238)
dependabot[bot] Dec 23, 2023
6d6536d
Build: Bump pydantic from 2.5.2 to 2.5.3 (#237)
dependabot[bot] Dec 23, 2023
26ae9a5
Build: Bump mkdocs-material from 9.5.2 to 9.5.3 (#239)
dependabot[bot] Dec 26, 2023
1a2fbfb
Build: Bump sqlalchemy from 2.0.23 to 2.0.24 (#244)
dependabot[bot] Dec 31, 2023
f5b4be8
Build: Bump coverage from 7.3.4 to 7.4.0 (#243)
dependabot[bot] Dec 31, 2023
afa482f
Glue catalog commit table (#140)
HonahX Jan 1, 2024
276c91d
Build: Bump pytest from 7.4.3 to 7.4.4 (#248)
dependabot[bot] Jan 2, 2024
81276d0
Allow filtering on newly added columns (#246)
Fokko Jan 3, 2024
0e7e4b4
Build: Bump sqlalchemy from 2.0.24 to 2.0.25 (#250)
dependabot[bot] Jan 4, 2024
9458492
Bug fix falsy value of zero (#249)
MehulBatra Jan 4, 2024
c88d15c
Correct schema behavior (#247)
Fokko Jan 5, 2024
4c2821e
Replace Black by Ruff formatter (#127)
hussein-awala Jan 5, 2024
004ae5e
Fix lint tests failed in main (#253)
hussein-awala Jan 5, 2024
1bda7dc
Build: Bump moto from 4.2.12 to 4.2.13 (#257)
dependabot[bot] Jan 9, 2024
516cebf
Build: Bump mkdocstrings-python from 1.7.5 to 1.8.0 (#256)
dependabot[bot] Jan 9, 2024
a339746
Build: Bump fastavro from 1.9.2 to 1.9.3 (#258)
dependabot[bot] Jan 9, 2024
f88ffdf
Cast env var `s3.connect-timeout` to float (#259)
sungwy Jan 10, 2024
d3919b4
Update feature support documentation (#261)
ndrluis Jan 11, 2024
cdd57e3
Build: Bump cython from 3.0.7 to 3.0.8 (#260)
dependabot[bot] Jan 11, 2024
f1f2dd2
Build: Bump jinja2 from 3.1.2 to 3.1.3 in /mkdocs (#263)
dependabot[bot] Jan 11, 2024
63c91ab
Build: Bump jinja2 from 3.1.2 to 3.1.3 (#264)
dependabot[bot] Jan 11, 2024
86bf014
Arrow: Set field-id with prefix (#227)
Fokko Jan 12, 2024
65dc566
Add name-mapping
Fokko Dec 13, 2023
318da04
Arrow: Allow missing field-ids from Schema
Fokko Dec 5, 2023
abcd8b6
apply name mapping to file tasks
sungwy Dec 17, 2023
23e510a
Add name-mapping (#212)
Fokko Dec 19, 2023
73a5c11
rebase
sungwy Dec 19, 2023
27f0472
refactoring
sungwy Dec 19, 2023
263d0e3
rebase
sungwy Jan 13, 2024
4358c7c
adopt review comments
sungwy Jan 13, 2024
3bec604
rebase
sungwy Jan 13, 2024
8897ef6
Merge branch 'main' into sy-name-mapping
sungwy Jan 13, 2024
30e34c6
use constant
sungwy Jan 13, 2024
f7d537d
add utility function new_schema_for_table
sungwy Jan 14, 2024
49055c5
Update pyiceberg/table/__init__.py
sungwy Jan 16, 2024
7a2cef0
Merge branch 'main' into sy-name-mapping
sungwy Jan 18, 2024
2931b11
remove new_schema_for_table
sungwy Jan 18, 2024
3ac65d7
Merge branch 'apache:main' into sy-name-mapping
sungwy Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 164 additions & 65 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
visit_with_partner,
)
from pyiceberg.table import WriteTask
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
from pyiceberg.types import (
Expand Down Expand Up @@ -164,6 +165,9 @@
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
PYARROW_FIELD_DOC_KEY = b"doc"
LIST_ELEMENT_NAME = "element"
MAP_KEY_NAME = "key"
MAP_VALUE_NAME = "value"

T = TypeVar("T")

Expand Down Expand Up @@ -631,8 +635,16 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], rows:
return np.setdiff1d(np.arange(rows), all_chunks, assume_unique=False)


def pyarrow_to_schema(schema: pa.Schema) -> Schema:
visitor = _ConvertToIceberg()
def pyarrow_to_schema(schema: pa.Schema, name_mapping: Optional[NameMapping] = None) -> Schema:
has_ids = visit_pyarrow(schema, _HasIds())
if has_ids:
visitor = _ConvertToIceberg()
elif name_mapping is not None:
visitor = _ConvertToIceberg(name_mapping=name_mapping)
else:
raise ValueError(
"Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
)
return visit_pyarrow(schema, visitor)


Expand All @@ -653,50 +665,47 @@ def visit_pyarrow(obj: Union[pa.DataType, pa.Schema], visitor: PyArrowSchemaVisi


@visit_pyarrow.register(pa.Schema)
def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
struct_results: List[Optional[T]] = []
for field in obj:
visitor.before_field(field)
struct_result = visit_pyarrow(field.type, visitor)
visitor.after_field(field)
struct_results.append(struct_result)

return visitor.schema(obj, struct_results)
def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:
return visitor.schema(obj, visit_pyarrow(pa.struct(obj), visitor))


@visit_pyarrow.register(pa.StructType)
def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
struct_results: List[Optional[T]] = []
def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
results = []

for field in obj:
visitor.before_field(field)
struct_result = visit_pyarrow(field.type, visitor)
result = visit_pyarrow(field.type, visitor)
results.append(visitor.field(field, result))
visitor.after_field(field)
struct_results.append(struct_result)

return visitor.struct(obj, struct_results)
return visitor.struct(obj, results)


@visit_pyarrow.register(pa.ListType)
def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
visitor.before_field(obj.value_field)
list_result = visit_pyarrow(obj.value_field.type, visitor)
visitor.after_field(obj.value_field)
return visitor.list(obj, list_result)
def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> T:
visitor.before_list_element(obj.value_field)
result = visit_pyarrow(obj.value_type, visitor)
visitor.after_list_element(obj.value_field)

return visitor.list(obj, result)


@visit_pyarrow.register(pa.MapType)
def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
visitor.before_field(obj.key_field)
key_result = visit_pyarrow(obj.key_field.type, visitor)
visitor.after_field(obj.key_field)
visitor.before_field(obj.item_field)
value_result = visit_pyarrow(obj.item_field.type, visitor)
visitor.after_field(obj.item_field)
def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> T:
visitor.before_map_key(obj.key_field)
key_result = visit_pyarrow(obj.key_type, visitor)
visitor.after_map_key(obj.key_field)

visitor.before_map_value(obj.item_field)
value_result = visit_pyarrow(obj.item_type, visitor)
visitor.after_map_value(obj.item_field)

return visitor.map(obj, key_result, value_result)


@visit_pyarrow.register(pa.DataType)
def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
if pa.types.is_nested(obj):
raise TypeError(f"Expected primitive type, got: {type(obj)}")
return visitor.primitive(obj)
Expand All @@ -709,24 +718,46 @@ def before_field(self, field: pa.Field) -> None:
def after_field(self, field: pa.Field) -> None:
"""Override this method to perform an action immediately after visiting a field."""

def before_list_element(self, element: pa.Field) -> None:
"""Override this method to perform an action immediately before visiting an element within a ListType."""

def after_list_element(self, element: pa.Field) -> None:
"""Override this method to perform an action immediately after visiting an element within a ListType."""

def before_map_key(self, key: pa.Field) -> None:
"""Override this method to perform an action immediately before visiting a key within a MapType."""

def after_map_key(self, key: pa.Field) -> None:
"""Override this method to perform an action immediately after visiting a key within a MapType."""

def before_map_value(self, value: pa.Field) -> None:
"""Override this method to perform an action immediately before visiting a value within a MapType."""

def after_map_value(self, value: pa.Field) -> None:
"""Override this method to perform an action immediately after visiting a value within a MapType."""

@abstractmethod
def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> Optional[T]:
def schema(self, schema: pa.Schema, struct_result: T) -> T:
"""Visit a schema."""

@abstractmethod
def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) -> Optional[T]:
def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
"""Visit a struct."""

@abstractmethod
def list(self, list_type: pa.ListType, element_result: Optional[T]) -> Optional[T]:
def field(self, field: pa.Field, field_result: T) -> T:
"""Visit a field."""

@abstractmethod
def list(self, list_type: pa.ListType, element_result: T) -> T:
"""Visit a list."""

@abstractmethod
def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: Optional[T]) -> Optional[T]:
def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
"""Visit a map."""

@abstractmethod
def primitive(self, primitive: pa.DataType) -> Optional[T]:
def primitive(self, primitive: pa.DataType) -> T:
"""Visit a primitive type."""


Expand All @@ -738,42 +769,84 @@ def _get_field_id(field: pa.Field) -> Optional[int]:
)


class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
field_type = field_results[i]
if field_type is not None and field_id is not None:
fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc))
return fields

def schema(self, schema: pa.Schema, field_results: List[Optional[IcebergType]]) -> Schema:
return Schema(*self._convert_fields(schema, field_results))

def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType]]) -> IcebergType:
return StructType(*self._convert_fields(struct, field_results))

def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> Optional[IcebergType]:
class _HasIds(PyArrowSchemaVisitor[bool]):
def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
return struct_result

def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
return all(field_results)

def field(self, field: pa.Field, field_result: bool) -> bool:
return all([_get_field_id(field) is not None, field_result])

def list(self, list_type: pa.ListType, element_result: bool) -> bool:
element_field = list_type.value_field
element_id = _get_field_id(element_field)
if element_result is not None and element_id is not None:
return ListType(element_id, element_result, element_required=not element_field.nullable)
return None
return element_result and element_id is not None

def map(
self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType]
) -> Optional[IcebergType]:
def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) -> bool:
key_field = map_type.key_field
key_id = _get_field_id(key_field)
value_field = map_type.item_field
value_id = _get_field_id(value_field)
if key_result is not None and value_result is not None and key_id is not None and value_id is not None:
return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable)
return None
return all([key_id is not None, value_id is not None, key_result, value_result])

def primitive(self, primitive: pa.DataType) -> bool:
return True

def primitive(self, primitive: pa.DataType) -> IcebergType:

class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
"""Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided."""

_field_names: List[str]
_name_mapping: Optional[NameMapping]

def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
self._field_names = []
self._name_mapping = name_mapping

def _current_path(self) -> str:
return ".".join(self._field_names)

def _field_id(self, field: pa.Field) -> int:
if self._name_mapping:
return self._name_mapping.find(self._current_path()).field_id
elif (field_id := _get_field_id(field)) is not None:
return field_id
else:
raise ValueError(f"Cannot convert {field} to Iceberg Field as field_id is empty.")

def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
return Schema(*struct_result.fields)

def struct(self, struct: pa.StructType, field_results: List[NestedField]) -> StructType:
return StructType(*field_results)

def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
field_id = self._field_id(field)
field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
field_type = field_result
return NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)

def list(self, list_type: pa.ListType, element_result: IcebergType) -> ListType:
element_field = list_type.value_field
self._field_names.append(LIST_ELEMENT_NAME)
element_id = self._field_id(element_field)
self._field_names.pop()
return ListType(element_id, element_result, element_required=not element_field.nullable)

def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: IcebergType) -> MapType:
key_field = map_type.key_field
self._field_names.append(MAP_KEY_NAME)
key_id = self._field_id(key_field)
self._field_names.pop()
value_field = map_type.item_field
self._field_names.append(MAP_VALUE_NAME)
value_id = self._field_id(value_field)
self._field_names.pop()
return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable)

def primitive(self, primitive: pa.DataType) -> PrimitiveType:
if pa.types.is_boolean(primitive):
return BooleanType()
elif pa.types.is_int32(primitive):
Expand Down Expand Up @@ -808,6 +881,30 @@ def primitive(self, primitive: pa.DataType) -> IcebergType:

raise TypeError(f"Unsupported type: {primitive}")

def before_field(self, field: pa.Field) -> None:
self._field_names.append(field.name)

def after_field(self, field: pa.Field) -> None:
self._field_names.pop()

def before_list_element(self, element: pa.Field) -> None:
self._field_names.append(LIST_ELEMENT_NAME)

def after_list_element(self, element: pa.Field) -> None:
self._field_names.pop()

def before_map_key(self, key: pa.Field) -> None:
self._field_names.append(MAP_KEY_NAME)

def after_map_key(self, element: pa.Field) -> None:
self._field_names.pop()

def before_map_value(self, value: pa.Field) -> None:
self._field_names.append(MAP_VALUE_NAME)

def after_map_value(self, element: pa.Field) -> None:
self._field_names.pop()


def _task_to_table(
fs: FileSystem,
Expand All @@ -819,6 +916,7 @@ def _task_to_table(
case_sensitive: bool,
row_counts: List[int],
limit: Optional[int] = None,
name_mapping: Optional[NameMapping] = None,
) -> Optional[pa.Table]:
if limit and sum(row_counts) >= limit:
return None
Expand All @@ -831,9 +929,9 @@ def _task_to_table(
schema_raw = None
if metadata := physical_schema.metadata:
schema_raw = metadata.get(ICEBERG_SCHEMA)
# TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema,
# see https://github.com/apache/iceberg/issues/7451
file_schema = Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema)
file_schema = (
Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema, name_mapping)
)

pyarrow_filter = None
if bound_row_filter is not AlwaysTrue():
Expand Down Expand Up @@ -970,6 +1068,7 @@ def project_table(
case_sensitive,
row_counts,
limit,
table.name_mapping(),
)
for task in tasks
]
Expand Down
13 changes: 13 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
TableMetadata,
TableMetadataUtil,
)
from pyiceberg.table.name_mapping import (
SCHEMA_NAME_MAPPING_DEFAULT,
NameMapping,
create_mapping_from_schema,
parse_mapping_from_json,
)
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import (
Operation,
Expand Down Expand Up @@ -909,6 +915,13 @@ def history(self) -> List[SnapshotLogEntry]:
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
return UpdateSchema(self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive)

def name_mapping(self) -> NameMapping:
"""Return the table's field-id NameMapping."""
if name_mapping_json := self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
return parse_mapping_from_json(name_mapping_json)
else:
return create_mapping_from_schema(self.schema())

def append(self, df: pa.Table) -> None:
"""
Append data to the table.
Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/table/name_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType, StructType

SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"


class MappedField(IcebergBaseModel):
field_id: int = Field(alias="field-id")
Expand Down
Loading