28
28
import logging
29
29
import os
30
30
import re
31
+ import warnings
31
32
from abc import ABC , abstractmethod
32
33
from concurrent .futures import Future
33
34
from dataclasses import dataclass
34
35
from enum import Enum
35
36
from functools import lru_cache , singledispatch
36
- from itertools import chain
37
+ from itertools import chain , count
37
38
from typing import (
38
39
TYPE_CHECKING ,
39
40
Any ,
@@ -713,28 +714,50 @@ def primitive(self, primitive: pa.DataType) -> Optional[T]:
713
714
"""Visit a primitive type."""
714
715
715
716
716
- def _get_field_id (field : pa .Field ) -> Optional [int ]:
717
- for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS :
718
- if field_id_str := field .metadata .get (pyarrow_field_id_key ):
719
- return int (field_id_str .decode ())
720
- return None
717
+ class _ConvertToIceberg (PyArrowSchemaVisitor [Union [IcebergType , Schema ]]):
718
+ counter : count [int ]
719
+ missing_is_metadata : Optional [bool ]
721
720
721
+ def __init__ (self ) -> None :
722
+ self .counter = count ()
723
+ self .missing_is_metadata = None
722
724
723
- def _get_field_doc (field : pa .Field ) -> Optional [str ]:
724
- for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS :
725
- if doc_str := field .metadata .get (pyarrow_doc_key ):
726
- return doc_str .decode ()
727
- return None
725
+ def _get_field_id (self , field : pa .Field ) -> int :
726
+ field_id : Optional [int ] = None
728
727
728
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS :
729
+ if field .metadata and (field_id_str := field .metadata .get (pyarrow_field_id_key )):
730
+ field_id = int (field_id_str .decode ())
731
+
732
+ if field_id is None :
733
+ if self .missing_is_metadata is None :
734
+ warnings .warn ("Field-ids are missing, generating new IDs" )
735
+
736
+ field_id = next (self .counter )
737
+ missing_is_metadata = True
738
+ else :
739
+ missing_is_metadata = False
740
+
741
+ if self .missing_is_metadata is not None and self .missing_is_metadata != missing_is_metadata :
742
+ raise ValueError ("Parquet file contains partial field-ids" )
743
+ else :
744
+ self .missing_is_metadata = missing_is_metadata
745
+
746
+ return field_id
747
+
748
+ def _get_field_doc (self , field : pa .Field ) -> Optional [str ]:
749
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS :
750
+ if field .metadata and (doc_str := field .metadata .get (pyarrow_doc_key )):
751
+ return doc_str .decode ()
752
+ return None
729
753
730
- class _ConvertToIceberg (PyArrowSchemaVisitor [Union [IcebergType , Schema ]]):
731
754
def _convert_fields (self , arrow_fields : Iterable [pa .Field ], field_results : List [Optional [IcebergType ]]) -> List [NestedField ]:
732
755
fields = []
733
756
for i , field in enumerate (arrow_fields ):
734
- field_id = _get_field_id (field )
735
- field_doc = _get_field_doc (field )
757
+ field_id = self . _get_field_id (field )
758
+ field_doc = self . _get_field_doc (field )
736
759
field_type = field_results [i ]
737
- if field_type is not None and field_id is not None :
760
+ if field_type is not None :
738
761
fields .append (NestedField (field_id , field .name , field_type , required = not field .nullable , doc = field_doc ))
739
762
return fields
740
763
@@ -746,7 +769,7 @@ def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType
746
769
747
770
def list (self , list_type : pa .ListType , element_result : Optional [IcebergType ]) -> Optional [IcebergType ]:
748
771
element_field = list_type .value_field
749
- element_id = _get_field_id (element_field )
772
+ element_id = self . _get_field_id (element_field )
750
773
if element_result is not None and element_id is not None :
751
774
return ListType (element_id , element_result , element_required = not element_field .nullable )
752
775
return None
@@ -755,9 +778,9 @@ def map(
755
778
self , map_type : pa .MapType , key_result : Optional [IcebergType ], value_result : Optional [IcebergType ]
756
779
) -> Optional [IcebergType ]:
757
780
key_field = map_type .key_field
758
- key_id = _get_field_id (key_field )
781
+ key_id = self . _get_field_id (key_field )
759
782
value_field = map_type .item_field
760
- value_id = _get_field_id (value_field )
783
+ value_id = self . _get_field_id (value_field )
761
784
if key_result is not None and value_result is not None and key_id is not None and value_id is not None :
762
785
return MapType (key_id , key_result , value_id , value_result , value_required = not value_field .nullable )
763
786
return None
0 commit comments