Skip to content

Commit 9d19ef7

Browse files
authored
V3: Introduce timestamp_ns and timestamptz_ns (#1632)
Fixes: #1552 - [x] Add TimestampNanoType and TimestampTzNanoType - [x] Add Readers and Writers - [x] Enhance Transforms - [x] Add String Expressions parsing for nanoseconds timestamps - [x] Add format-version compatibility check for each type - [x] Run compatibility check on TableMetadata creation - [x] Unit tests python native `datetime` module does not have support for nanoseconds. We'll need to update our internal date time representations to use a different library. numpy? arrow?
1 parent baee2f9 commit 9d19ef7

19 files changed

+627
-69
lines changed

pyiceberg/avro/reader.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ class TimestampReader(IntegerReader):
175175
"""
176176

177177

178+
class TimestampNanoReader(IntegerReader):
179+
"""Reads a nanosecond granularity timestamp from the stream.
180+
181+
Long is decoded as python integer which represents
182+
the number of nanoseconds from the unix epoch, 1 January 1970.
183+
"""
184+
185+
178186
class TimestamptzReader(IntegerReader):
179187
"""Reads a microsecond granularity timestamptz from the stream.
180188
@@ -185,6 +193,16 @@ class TimestamptzReader(IntegerReader):
185193
"""
186194

187195

196+
class TimestamptzNanoReader(IntegerReader):
197+
"""Reads a microsecond granularity timestamptz from the stream.
198+
199+
Long is decoded as python integer which represents
200+
the number of nanoseconds from the unix epoch, 1 January 1970.
201+
202+
Adjusted to UTC.
203+
"""
204+
205+
188206
class StringReader(Reader):
189207
def read(self, decoder: BinaryDecoder) -> str:
190208
return decoder.read_utf8()

pyiceberg/avro/resolver.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
StringReader,
4545
StructReader,
4646
TimeReader,
47+
TimestampNanoReader,
4748
TimestampReader,
49+
TimestamptzNanoReader,
4850
TimestamptzReader,
4951
UnknownReader,
5052
UUIDReader,
@@ -64,6 +66,8 @@
6466
OptionWriter,
6567
StringWriter,
6668
StructWriter,
69+
TimestampNanoWriter,
70+
TimestamptzNanoWriter,
6771
TimestamptzWriter,
6872
TimestampWriter,
6973
TimeWriter,
@@ -99,7 +103,9 @@
99103
PrimitiveType,
100104
StringType,
101105
StructType,
106+
TimestampNanoType,
102107
TimestampType,
108+
TimestamptzNanoType,
103109
TimestamptzType,
104110
TimeType,
105111
UnknownType,
@@ -184,9 +190,15 @@ def visit_time(self, time_type: TimeType) -> Writer:
184190
def visit_timestamp(self, timestamp_type: TimestampType) -> Writer:
185191
return TimestampWriter()
186192

193+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType) -> Writer:
194+
return TimestampNanoWriter()
195+
187196
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> Writer:
188197
return TimestamptzWriter()
189198

199+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> Writer:
200+
return TimestamptzNanoWriter()
201+
190202
def visit_string(self, string_type: StringType) -> Writer:
191203
return StringWriter()
192204

@@ -332,9 +344,15 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Wri
332344
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Writer:
333345
return TimestampWriter()
334346

347+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[IcebergType]) -> Writer:
348+
return TimestampNanoWriter()
349+
335350
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Writer:
336351
return TimestamptzWriter()
337352

353+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[IcebergType]) -> Writer:
354+
return TimestamptzNanoWriter()
355+
338356
def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Writer:
339357
return StringWriter()
340358

@@ -465,9 +483,15 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Rea
465483
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Reader:
466484
return TimestampReader()
467485

486+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[IcebergType]) -> Reader:
487+
return TimestampNanoReader()
488+
468489
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader:
469490
return TimestamptzReader()
470491

492+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[IcebergType]) -> Reader:
493+
return TimestamptzNanoReader()
494+
471495
def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Reader:
472496
return StringReader()
473497

pyiceberg/avro/writer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,24 @@ def write(self, encoder: BinaryEncoder, val: int) -> None:
9595
encoder.write_int(val)
9696

9797

98+
@dataclass(frozen=True)
99+
class TimestampNanoWriter(Writer):
100+
def write(self, encoder: BinaryEncoder, val: int) -> None:
101+
encoder.write_int(val)
102+
103+
98104
@dataclass(frozen=True)
99105
class TimestamptzWriter(Writer):
100106
def write(self, encoder: BinaryEncoder, val: int) -> None:
101107
encoder.write_int(val)
102108

103109

110+
@dataclass(frozen=True)
111+
class TimestamptzNanoWriter(Writer):
112+
def write(self, encoder: BinaryEncoder, val: int) -> None:
113+
encoder.write_int(val)
114+
115+
104116
@dataclass(frozen=True)
105117
class StringWriter(Writer):
106118
def write(self, encoder: BinaryEncoder, val: Any) -> None:

pyiceberg/conversions.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@
5555
LongType,
5656
PrimitiveType,
5757
StringType,
58+
TimestampNanoType,
5859
TimestampType,
60+
TimestamptzNanoType,
5961
TimestamptzType,
6062
TimeType,
6163
UnknownType,
@@ -66,6 +68,7 @@
6668
date_str_to_days,
6769
date_to_days,
6870
datetime_to_micros,
71+
datetime_to_nanos,
6972
days_to_date,
7073
micros_to_time,
7174
micros_to_timestamp,
@@ -127,7 +130,9 @@ def _(primitive_type: BooleanType, value_str: str) -> Union[int, float, str, uui
127130
@partition_to_py.register(DateType)
128131
@partition_to_py.register(TimeType)
129132
@partition_to_py.register(TimestampType)
133+
@partition_to_py.register(TimestampNanoType)
130134
@partition_to_py.register(TimestamptzType)
135+
@partition_to_py.register(TimestamptzNanoType)
131136
@handle_none
132137
def _(primitive_type: PrimitiveType, value_str: str) -> int:
133138
"""Convert a string to an integer value.
@@ -213,12 +218,20 @@ def _(_: PrimitiveType, value: int) -> bytes:
213218

214219
@to_bytes.register(TimestampType)
215220
@to_bytes.register(TimestamptzType)
216-
def _(_: TimestampType, value: Union[datetime, int]) -> bytes:
221+
def _(_: PrimitiveType, value: Union[datetime, int]) -> bytes:
217222
if isinstance(value, datetime):
218223
value = datetime_to_micros(value)
219224
return _LONG_STRUCT.pack(value)
220225

221226

227+
@to_bytes.register(TimestampNanoType)
228+
@to_bytes.register(TimestamptzNanoType)
229+
def _(_: PrimitiveType, value: Union[datetime, int]) -> bytes:
230+
if isinstance(value, datetime):
231+
value = datetime_to_nanos(value)
232+
return _LONG_STRUCT.pack(value)
233+
234+
222235
@to_bytes.register(DateType)
223236
def _(_: DateType, value: Union[date, int]) -> bytes:
224237
if isinstance(value, date):
@@ -319,6 +332,8 @@ def _(_: PrimitiveType, b: bytes) -> int:
319332
@from_bytes.register(TimeType)
320333
@from_bytes.register(TimestampType)
321334
@from_bytes.register(TimestamptzType)
335+
@from_bytes.register(TimestampNanoType)
336+
@from_bytes.register(TimestamptzNanoType)
322337
def _(_: PrimitiveType, b: bytes) -> int:
323338
return _LONG_STRUCT.unpack(b)[0]
324339

pyiceberg/io/pyarrow.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@
163163
PrimitiveType,
164164
StringType,
165165
StructType,
166+
TimestampNanoType,
166167
TimestampType,
168+
TimestamptzNanoType,
167169
TimestamptzType,
168170
TimeType,
169171
UnknownType,
@@ -662,9 +664,15 @@ def visit_time(self, _: TimeType) -> pa.DataType:
662664
def visit_timestamp(self, _: TimestampType) -> pa.DataType:
663665
return pa.timestamp(unit="us")
664666

667+
def visit_timestamp_ns(self, _: TimestampNanoType) -> pa.DataType:
668+
return pa.timestamp(unit="ns")
669+
665670
def visit_timestamptz(self, _: TimestamptzType) -> pa.DataType:
666671
return pa.timestamp(unit="us", tz="UTC")
667672

673+
def visit_timestamptz_ns(self, _: TimestamptzNanoType) -> pa.DataType:
674+
return pa.timestamp(unit="ns", tz="UTC")
675+
668676
def visit_string(self, _: StringType) -> pa.DataType:
669677
return pa.large_string()
670678

@@ -1894,9 +1902,15 @@ def visit_time(self, time_type: TimeType) -> str:
18941902
def visit_timestamp(self, timestamp_type: TimestampType) -> str:
18951903
return "INT64"
18961904

1905+
def visit_timestamp_ns(self, timestamp_type: TimestampNanoType) -> str:
1906+
return "INT64"
1907+
18971908
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> str:
18981909
return "INT64"
18991910

1911+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> str:
1912+
return "INT64"
1913+
19001914
def visit_string(self, string_type: StringType) -> str:
19011915
return "BYTE_ARRAY"
19021916

pyiceberg/schema.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
PrimitiveType,
5858
StringType,
5959
StructType,
60+
TimestampNanoType,
6061
TimestampType,
62+
TimestamptzNanoType,
6163
TimestamptzType,
6264
TimeType,
6365
UnknownType,
@@ -362,6 +364,21 @@ def _validate_identifier_field(self, field_id: int) -> None:
362364
f"Cannot add field {field.name} as an identifier field: must not be nested in an optional field {parent}"
363365
)
364366

367+
def check_format_version_compatibility(self, format_version: int) -> None:
368+
"""Check that the schema is compatible for the given table format version.
369+
370+
Args:
371+
format_version: The Iceberg table format version.
372+
373+
Raises:
374+
ValueError: If the schema is not compatible for the format version.
375+
"""
376+
for field in self._lazy_id_to_field.values():
377+
if format_version < field.field_type.minimum_format_version():
378+
raise ValueError(
379+
f"{field.field_type} is only supported in {field.field_type.minimum_format_version()} or higher. Current format version is: {format_version}"
380+
)
381+
365382

366383
class SchemaVisitor(Generic[T], ABC):
367384
def before_field(self, field: NestedField) -> None:
@@ -522,8 +539,12 @@ def primitive(self, primitive: PrimitiveType, primitive_partner: Optional[P]) ->
522539
return self.visit_time(primitive, primitive_partner)
523540
elif isinstance(primitive, TimestampType):
524541
return self.visit_timestamp(primitive, primitive_partner)
542+
elif isinstance(primitive, TimestampNanoType):
543+
return self.visit_timestamp_ns(primitive, primitive_partner)
525544
elif isinstance(primitive, TimestamptzType):
526545
return self.visit_timestamptz(primitive, primitive_partner)
546+
elif isinstance(primitive, TimestamptzNanoType):
547+
return self.visit_timestamptz_ns(primitive, primitive_partner)
527548
elif isinstance(primitive, StringType):
528549
return self.visit_string(primitive, primitive_partner)
529550
elif isinstance(primitive, UUIDType):
@@ -573,10 +594,18 @@ def visit_time(self, time_type: TimeType, partner: Optional[P]) -> T:
573594
def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[P]) -> T:
574595
"""Visit a TimestampType."""
575596

597+
@abstractmethod
598+
def visit_timestamp_ns(self, timestamp_ns_type: TimestampNanoType, partner: Optional[P]) -> T:
599+
"""Visit a TimestampNanoType."""
600+
576601
@abstractmethod
577602
def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[P]) -> T:
578603
"""Visit a TimestamptzType."""
579604

605+
@abstractmethod
606+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType, partner: Optional[P]) -> T:
607+
"""Visit a TimestamptzNanoType."""
608+
580609
@abstractmethod
581610
def visit_string(self, string_type: StringType, partner: Optional[P]) -> T:
582611
"""Visit a StringType."""
@@ -706,8 +735,12 @@ def primitive(self, primitive: PrimitiveType) -> T:
706735
return self.visit_time(primitive)
707736
elif isinstance(primitive, TimestampType):
708737
return self.visit_timestamp(primitive)
738+
elif isinstance(primitive, TimestampNanoType):
739+
return self.visit_timestamp_ns(primitive)
709740
elif isinstance(primitive, TimestamptzType):
710741
return self.visit_timestamptz(primitive)
742+
elif isinstance(primitive, TimestamptzNanoType):
743+
return self.visit_timestamptz_ns(primitive)
711744
elif isinstance(primitive, StringType):
712745
return self.visit_string(primitive)
713746
elif isinstance(primitive, UUIDType):
@@ -759,10 +792,18 @@ def visit_time(self, time_type: TimeType) -> T:
759792
def visit_timestamp(self, timestamp_type: TimestampType) -> T:
760793
"""Visit a TimestampType."""
761794

795+
@abstractmethod
796+
def visit_timestamp_ns(self, timestamp_type: TimestampNanoType) -> T:
797+
"""Visit a TimestampNanoType."""
798+
762799
@abstractmethod
763800
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> T:
764801
"""Visit a TimestamptzType."""
765802

803+
@abstractmethod
804+
def visit_timestamptz_ns(self, timestamptz_ns_type: TimestamptzNanoType) -> T:
805+
"""Visit a TimestamptzNanoType."""
806+
766807
@abstractmethod
767808
def visit_string(self, string_type: StringType) -> T:
768809
"""Visit a StringType."""

pyiceberg/table/metadata.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,15 +578,18 @@ def new_table_metadata(
578578
) -> TableMetadata:
579579
from pyiceberg.table import TableProperties
580580

581+
# Remove format-version so it does not get persisted
582+
format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
583+
584+
schema.check_format_version_compatibility(format_version)
585+
581586
fresh_schema = assign_fresh_schema_ids(schema)
582587
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema)
583588
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
584589

585590
if table_uuid is None:
586591
table_uuid = uuid.uuid4()
587592

588-
# Remove format-version so it does not get persisted
589-
format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
590593
if format_version == 1:
591594
return TableMetadataV1(
592595
location=location,

0 commit comments

Comments
 (0)