25
25
from __future__ import annotations
26
26
27
27
import concurrent .futures
28
+ import itertools
28
29
import logging
29
30
import os
30
31
import re
34
35
from dataclasses import dataclass
35
36
from enum import Enum
36
37
from functools import lru_cache , singledispatch
37
- from itertools import chain , count
38
+ from itertools import chain
38
39
from typing import (
39
40
TYPE_CHECKING ,
40
41
Any ,
111
112
Schema ,
112
113
SchemaVisitorPerPrimitiveType ,
113
114
SchemaWithPartnerVisitor ,
115
+ assign_fresh_schema_ids ,
114
116
pre_order_visit ,
115
117
promote ,
116
118
prune_columns ,
@@ -617,7 +619,12 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], rows:
617
619
618
620
def pyarrow_to_schema (schema : pa .Schema ) -> Schema :
619
621
visitor = _ConvertToIceberg ()
620
- return visit_pyarrow (schema , visitor )
622
+ schema = visit_pyarrow (schema , visitor )
623
+
624
+ if visitor .missing_id_metadata :
625
+ return assign_fresh_schema_ids (schema )
626
+ else :
627
+ return schema
621
628
622
629
623
630
@singledispatch
@@ -715,12 +722,12 @@ def primitive(self, primitive: pa.DataType) -> Optional[T]:
715
722
716
723
717
724
class _ConvertToIceberg (PyArrowSchemaVisitor [Union [IcebergType , Schema ]]):
718
- counter : count [int ]
719
- missing_is_metadata : Optional [bool ]
725
+ counter : itertools . count [int ]
726
+ missing_id_metadata : Optional [bool ]
720
727
721
728
def __init__ (self ) -> None :
722
- self .counter = count ()
723
- self .missing_is_metadata = None
729
+ self .counter = itertools . count (1 )
730
+ self .missing_id_metadata = None
724
731
725
732
def _get_field_id (self , field : pa .Field ) -> int :
726
733
field_id : Optional [int ] = None
@@ -730,18 +737,17 @@ def _get_field_id(self, field: pa.Field) -> int:
730
737
field_id = int (field_id_str .decode ())
731
738
732
739
if field_id is None :
733
- if self .missing_is_metadata is None :
734
- warnings .warn ("Field-ids are missing, generating new IDs" )
735
-
740
+ if self .missing_id_metadata is None :
741
+ warnings .warn ("Field-ids are missing, new IDs will be set" )
736
742
field_id = next (self .counter )
737
743
missing_is_metadata = True
738
744
else :
739
745
missing_is_metadata = False
740
746
741
- if self .missing_is_metadata is not None and self .missing_is_metadata != missing_is_metadata :
747
+ if self .missing_id_metadata is not None and self .missing_id_metadata != missing_is_metadata :
742
748
raise ValueError ("Parquet file contains partial field-ids" )
743
749
else :
744
- self .missing_is_metadata = missing_is_metadata
750
+ self .missing_id_metadata = missing_is_metadata
745
751
746
752
return field_id
747
753
0 commit comments