Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bed9771
adds databricks timestamp NTZ
rudolfix Apr 28, 2025
916e893
improves error messages in pyarrow tuples to arrow
rudolfix Apr 28, 2025
9ab3020
decreases timestamp precision to 6 for mssql
rudolfix Apr 28, 2025
d4dad26
adds naive datetime to all data types case, enables fallback when tes…
rudolfix Apr 28, 2025
8923bed
other test fixes
rudolfix Apr 28, 2025
266be34
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Jul 21, 2025
15687eb
always stores incremental state last value as present in the data, te…
rudolfix Jul 22, 2025
8f7451c
fixes ntz timestamp tests
rudolfix Jul 22, 2025
cab6f28
fixes sqlalchemy destination to work with mssql
rudolfix Jul 24, 2025
c99c251
adds func to current module to get current resource instance
rudolfix Jul 24, 2025
7421d04
generates LIMIT clause in sql_database when limit step is present
rudolfix Jul 24, 2025
a716e5e
adds basic tests for mssql in sql_database
rudolfix Jul 24, 2025
795665c
adds docs on tz-awareness in datetime columns in sql_database
rudolfix Jul 24, 2025
f166f83
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Aug 6, 2025
6680bbd
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Aug 16, 2025
6e32240
adds naive an tz aware datetimes to destination caps, implements for …
rudolfix Aug 17, 2025
d8800ed
caches dlt type to python type conversion
rudolfix Aug 17, 2025
1a90e6b
normalizes timezone handling in timestamp and time data types, fixes …
rudolfix Aug 17, 2025
bbaa824
fixes incremental and lag so they always follow the tz-awareness of t…
rudolfix Aug 17, 2025
4f591a0
moves schema inference and data coercion from Schema to item_normaliz…
rudolfix Aug 17, 2025
9d3f251
casts timezones in arrow table normalizations, datetime and time case…
rudolfix Aug 17, 2025
f88e390
tracks resource parent, along pipe parent, fixes resource cloning whe…
rudolfix Aug 17, 2025
176ef1f
updates dbapi sql client for dremio
rudolfix Aug 17, 2025
50c8f8c
adjust column schema inferred from arrow to destination caps in extra…
rudolfix Aug 17, 2025
19fe0e6
moves schema and data setup for all data types tests to common code
rudolfix Aug 17, 2025
2115b59
adds option to exclude columns in sql_table, uses LimitItem to genera…
rudolfix Aug 17, 2025
366f56c
tests sql_database on mssql for all data types and incremental cursor…
rudolfix Aug 17, 2025
950407e
improves tests for row tuples to arrow with cast to dlt schema, tests…
rudolfix Aug 17, 2025
2bb104b
improved test for timestamps and int with precision on duckdb
rudolfix Aug 17, 2025
840dbd8
disables Python 3.14 tests and dashboard test on mac
rudolfix Aug 17, 2025
845bd73
better maybe transaction in job client: takes into account ddl and re…
rudolfix Aug 17, 2025
6a12ba4
pyodbc py3.13 bump
rudolfix Aug 17, 2025
74f220b
timestamp docs WIP
rudolfix Aug 17, 2025
b423a73
fixes tests
rudolfix Aug 17, 2025
6a7a95d
review fixes
rudolfix Aug 31, 2025
a144eb4
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Aug 31, 2025
349ca9b
finalizes docs
rudolfix Aug 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Any,
Callable,
ClassVar,
List,
Literal,
Optional,
Sequence,
Expand All @@ -12,7 +13,6 @@
Type,
)
from dlt.common.libs.sqlglot import TSqlGlotDialect
from dlt.common.data_types import TDataType
from dlt.common.destination.configuration import ParquetFormatConfiguration
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.typing import TNamingConventionReferenceArg
Expand All @@ -34,6 +34,7 @@
TLoaderMergeStrategy,
TTableFormat,
TLoaderReplaceStrategy,
TTableSchemaColumns,
)
from dlt.common.wei import EVM_DECIMAL_PRECISION

Expand Down Expand Up @@ -172,6 +173,10 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
supports_truncate_command: bool = True
schema_supports_numeric_precision: bool = True
timestamp_precision: int = 6
"""Default precision of the timestamp type"""
max_timestamp_precision: int = 6
"""Maximum supported timestamp precision"""

max_rows_per_insert: Optional[int] = None
insert_values_writer_type: str = "default"
supports_multiple_statements: bool = True
Expand Down Expand Up @@ -209,6 +214,12 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
parquet_format: Optional[ParquetFormatConfiguration] = None
"""Parquet format preferred by this destination"""

supports_tz_aware_datetime: bool = True
"""The destination can store datetime with timezone"""

supports_naive_datetime: bool = True
"""The destination can store datetime without timezone"""

def generates_case_sensitive_identifiers(self) -> bool:
"""Tells if capabilities as currently adjusted, will generate case sensitive identifiers"""
# must have case sensitive support and folding function must preserve casing
Expand Down Expand Up @@ -288,3 +299,55 @@ def merge_caps_file_formats(
else:
requested_file_format = possible_file_formats[0] if len(possible_file_formats) > 0 else None
return requested_file_format, possible_file_formats


def adjust_column_schema_to_capabilities(
column: TColumnSchema, caps: DestinationCapabilitiesContext
) -> TColumnSchema:
"""Modify column schema in place according to destination capabilities.
* timestamp and time precision are limited by min(destination max, parquet writer max)
* timezone is always removed (enables default behavior)
* default timestamp precision for destination is removed
"""
data_type = column.get("data_type")
if data_type in ("timestamp", "time"):
precision = column.get("precision", caps.timestamp_precision)
# limit precision according to caps: min of parquet writer and destination max
precision = min(
precision,
caps.max_timestamp_precision,
(
caps.parquet_format.max_timestamp_precision()
if caps.parquet_format
else ParquetFormatConfiguration().max_timestamp_precision()
),
)

# remove timezone flag to fallback to default tz-aware and let normalizer to correct the data
column.pop("timezone", None)

# do not write default precision
if precision == caps.timestamp_precision:
column.pop("precision", None)
elif column.get("precision") != precision:
column["precision"] = precision

if data_type == "json" and not caps.supports_nested_types:
column.pop("x-nested-type", None) # type: ignore[typeddict-item]

return column


def adjust_schema_to_capabilities(
columns: TTableSchemaColumns, caps: DestinationCapabilitiesContext
) -> TTableSchemaColumns:
"""Modifies `columns` schema in place according to destination `caps`. See `adjust_column_schema_to_capabilities`
for a list of adjustments.
NOTE: that function is intended to be used on schemas inferred from the data,
before user explicit hints are applied as it overrides ie. timezone and precision hints.

Returns modified columns schema
"""
for col in columns.values():
adjust_column_schema_to_capabilities(col, caps)
return columns
17 changes: 17 additions & 0 deletions dlt/common/destination/configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import ClassVar, Literal, Optional

from dlt.common.configuration import configspec, known_sections
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.time import get_precision_from_datetime_unit

CsvQuoting = Literal["quote_all", "quote_needed", "quote_minimal", "quote_none"]

Expand Down Expand Up @@ -30,4 +32,19 @@ class ParquetFormatConfiguration(BaseConfiguration):
allow_truncated_timestamps: bool = False
use_compliant_nested_type: bool = True

def max_timestamp_precision(self) -> int:
if (self.flavor or "").lower() == "spark":
base = get_precision_from_datetime_unit("ns") # INT96 → treat as ns-capable
else:
v = float(self.version or "0.0")
base = (
get_precision_from_datetime_unit("ns")
if v >= 2.6
else get_precision_from_datetime_unit("us")
)

if self.coerce_timestamps:
return min(base, get_precision_from_datetime_unit(self.coerce_timestamps))
return base

__section__: ClassVar[str] = known_sections.DATA_WRITER
2 changes: 2 additions & 0 deletions dlt/destinations/impl/athena/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,15 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.alter_add_multi_column = True
caps.schema_supports_numeric_precision = False
caps.timestamp_precision = 3
caps.max_timestamp_precision = 3
caps.supports_truncate_command = False
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.merge_strategies_selector = athena_merge_strategies_selector
caps.replace_strategies_selector = athena_replace_strategies_selector
caps.enforces_nulls_on_alter = False
caps.sqlglot_dialect = "athena"
caps.supports_tz_aware_datetime = False

return caps

Expand Down
28 changes: 27 additions & 1 deletion dlt/destinations/impl/bigquery/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Dict, Type, Union, TYPE_CHECKING, Optional, cast
from typing import Any, Type, TYPE_CHECKING, Optional

from dlt.common import logger
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.naming import NamingConvention
Expand Down Expand Up @@ -45,6 +46,7 @@ class BigQueryTypeMapper(TypeMapperImpl):
"BOOL": "bool",
"DATE": "date",
"TIMESTAMP": "timestamp",
"DATETIME": "timestamp",
"INT64": "bigint",
"BYTES": "binary",
"NUMERIC": "decimal",
Expand All @@ -69,6 +71,27 @@ def ensure_supported_type(
"Enable `autodetect_schema` in config or via BigQuery adapter", column["data_type"]
)

def to_db_datetime_type(
self,
column: TColumnSchema,
table: PreparedTableSchema = None,
) -> str:
column_name = column["name"]
table_name = table["name"]
timezone = column.get("timezone", True)
precision = column.get("precision")

if precision and precision != self.capabilities.timestamp_precision:
logger.warn(
f"BigQuery does not support custom precision for column '{column_name}' in"
f" table '{table_name}'. Will use default precision."
)

# TIMESTAMP is always timezone-aware in BigQuery
# DATETIME is always timezone-naive in BigQuery
# NOTE: we disable DATETIME because it does not work with parquet
return "TIMESTAMP" if timezone else "TIMESTAMP"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line correct? The condition does not change anything..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is but I'll remove it to make it 100% clear. DATETIME does not work on bigquery in practice


def to_db_decimal_type(self, column: TColumnSchema) -> str:
# Use BigQuery's BIGNUMERIC for large precision decimals
precision, scale = self.decimal_precision(column.get("precision"), column.get("scale"))
Expand All @@ -90,6 +113,8 @@ def from_destination_type(
# precision is present in the type name
if db_type == "BIGNUMERIC":
return dict(data_type="wei")
if db_type == "DATETIME":
return {"data_type": "timestamp", "timezone": False}
return super().from_destination_type(*parse_db_data_type_str_with_precision(db_type))


Expand Down Expand Up @@ -123,6 +148,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.is_max_text_data_type_length_in_bytes = True
caps.supports_ddl_transactions = False
caps.supports_clone_table = True
caps.supports_naive_datetime = False
caps.schema_supports_numeric_precision = False # no precision information in BigQuery
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
Expand Down
59 changes: 52 additions & 7 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
from typing import Any, Dict, Type, Union, TYPE_CHECKING, Optional, cast

from dlt.common import logger
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.data_writers.escape import (
escape_clickhouse_identifier,
Expand Down Expand Up @@ -69,24 +70,43 @@ def from_destination_type(
# Remove "Nullable" wrapper.
db_type = re.sub(r"^Nullable\((?P<type>.+)\)$", r"\g<type>", db_type)

# Remove timezone details.
# Handle DateTime types with and without timezone
if db_type == "DateTime":
# Legacy DateTime type without timezone and without precision
return cast(TColumnType, dict(data_type="timestamp", timezone=False))

if db_type == "DateTime('UTC')":
db_type = "DateTime"
# Legacy DateTime type with UTC timezone
return cast(TColumnType, dict(data_type="timestamp", timezone=True))

# Handle DateTime64 types with various formats
if datetime_match := re.match(
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>UTC)')?\))?",
r"DateTime64(?:\((?P<precision>\d+)(?:,?\s*'(?P<timezone>[^']+)')?\))?",
db_type,
):
has_timezone = bool(datetime_match["timezone"])

if datetime_match["precision"]:
precision = int(datetime_match["precision"])
else:
precision = None
db_type = "DateTime64"

column_type = cast(
TColumnType,
dict(
data_type="timestamp",
timezone=has_timezone,
),
)

if precision is not None:
column_type["precision"] = precision

return column_type

# Extract precision and scale, parameters and remove from string.
if decimal_match := re.match(
r"Decimal\((?P<precision>\d+)\s*(?:,\s*(?P<scale>\d+))?\)", db_type
):
precision, scale = decimal_match.groups() # type: ignore[assignment]
precision, scale = decimal_match.groups()
precision = int(precision)
scale = int(scale) if scale else 0
db_type = "Decimal"
Expand All @@ -96,6 +116,31 @@ def from_destination_type(

return super().from_destination_type(db_type, precision, scale)

def to_db_datetime_type(
self,
column: TColumnSchema,
table: PreparedTableSchema = None,
) -> str:
"""Map timestamp type to appropriate ClickHouse datetime type with or without timezone."""
column_name = column["name"]
table_name = table.get("name")
timezone = column.get("timezone", True)
precision = column.get(
"precision", self.capabilities.timestamp_precision
) # Default precision is 6 for microseconds

if precision and precision > self.capabilities.max_timestamp_precision:
logger.warn(
f"ClickHouse only supports microsecond precision (6) for column '{column_name}' in"
f" table '{table_name}'. Will use precision 6."
)
precision = self.capabilities.max_timestamp_precision

if timezone:
return f"DateTime64({precision},'UTC')"
else:
return f"DateTime64({precision})"

def to_db_integer_type(self, column: TColumnSchema, table: PreparedTableSchema = None) -> str:
"""Map integer precision to the appropriate ClickHouse integer type."""
precision = column.get("precision")
Expand Down
10 changes: 1 addition & 9 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@
SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + S3_PROTOCOLS + GCS_PROTOCOLS


SUPPORTED_HINTS: Dict[TDatabricksColumnHint, str] = {
"primary_key": "PRIMARY KEY",
"foreign_key": "FOREIGN KEY",
}


class DatabricksLoadJob(RunnableLoadJob, HasFollowupJobs):
def __init__(
self,
Expand Down Expand Up @@ -314,9 +308,7 @@ def __init__(
self.config: DatabricksClientConfiguration = config
self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment, unused-ignore]
self.type_mapper = self.capabilities.get_type_mapper()
self.active_hints = (
cast(Dict[TColumnHint, str], SUPPORTED_HINTS) if self.config.create_indexes else {}
)
# PK and FK are created in SQL fragments, not inline

def _get_column_def_sql(self, column: TColumnSchema, table: PreparedTableSchema = None) -> str:
column_def_sql = super()._get_column_def_sql(column, table)
Expand Down
30 changes: 21 additions & 9 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,26 @@ def ensure_supported_type(
table: PreparedTableSchema,
loader_file_format: TLoaderFileFormat,
) -> None:
if loader_file_format == "jsonl" and column["data_type"] in {
"decimal",
"wei",
"binary",
"json",
"date",
}:
raise TerminalValueError("", column["data_type"])
if loader_file_format == "jsonl":
if column["data_type"] in {
"decimal",
"wei",
"binary",
"json",
"date",
}:
raise TerminalValueError("", column["data_type"])
if column["data_type"] == "timestamp" and column.get("timezone") is False:
raise TerminalValueError(
"Cannot load naive timestamps from json, use parquet", column["data_type"]
)
if loader_file_format == "parquet":
if column["data_type"] in {"time"}:
raise TerminalValueError(
"Spark can't read Time from parquet. Convert your time column to string or"
" change file format.",
column["data_type"],
)

def to_db_integer_type(self, column: TColumnSchema, table: PreparedTableSchema = None) -> str:
precision = column.get("precision")
Expand Down Expand Up @@ -95,7 +107,7 @@ def to_db_datetime_type(
timezone = column.get("timezone", True)
precision = column.get("precision")

if precision and precision != 6:
if precision and precision != self.capabilities.timestamp_precision:
logger.warn(
f"Databricks does not support precision {precision} for column '{column_name}' in"
f" table '{table_name}'. Will default to 6."
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/dremio/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def ensure_supported_type(
"Dremio cannot load fixed width `binary` columns from parquet files. Switch to"
" other file format or use binary columns without precision.",
"binary",
column["data_type"],
)

def from_destination_type(
Expand Down Expand Up @@ -108,10 +109,12 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_clone_table = False
caps.supports_multiple_statements = False
caps.timestamp_precision = 3
caps.max_timestamp_precision = 3
caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.enforces_nulls_on_alter = False
caps.sqlglot_dialect = "presto"
caps.supports_tz_aware_datetime = False

return caps

Expand Down
Loading