Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bed9771
adds databricks timestamp NTZ
rudolfix Apr 28, 2025
916e893
improves error messages in pyarrow tuples to arrow
rudolfix Apr 28, 2025
9ab3020
decreases timestamp precision to 6 for mssql
rudolfix Apr 28, 2025
d4dad26
adds naive datetime to all data types case, enables fallback when tes…
rudolfix Apr 28, 2025
8923bed
other test fixes
rudolfix Apr 28, 2025
266be34
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Jul 21, 2025
15687eb
always stores incremental state last value as present in the data, te…
rudolfix Jul 22, 2025
8f7451c
fixes ntz timestamp tests
rudolfix Jul 22, 2025
cab6f28
fixes sqlalchemy destination to work with mssql
rudolfix Jul 24, 2025
c99c251
adds func to current module to get current resource instance
rudolfix Jul 24, 2025
7421d04
generates LIMIT clause in sql_database when limit step is present
rudolfix Jul 24, 2025
a716e5e
adds basic tests for mssql in sql_database
rudolfix Jul 24, 2025
795665c
adds docs on tz-awareness in datetime columns in sql_database
rudolfix Jul 24, 2025
f166f83
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Aug 6, 2025
6680bbd
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Aug 16, 2025
6e32240
adds naive an tz aware datetimes to destination caps, implements for …
rudolfix Aug 17, 2025
d8800ed
caches dlt type to python type conversion
rudolfix Aug 17, 2025
1a90e6b
normalizes timezone handling in timestamp and time data types, fixes …
rudolfix Aug 17, 2025
bbaa824
fixes incremental and lag so they always follow the tz-awareness of t…
rudolfix Aug 17, 2025
4f591a0
moves schema inference and data coercion from Schema to item_normaliz…
rudolfix Aug 17, 2025
9d3f251
casts timezones in arrow table normalizations, datetime and time case…
rudolfix Aug 17, 2025
f88e390
tracks resource parent, along pipe parent, fixes resource cloning whe…
rudolfix Aug 17, 2025
176ef1f
updates dbapi sql client for dremio
rudolfix Aug 17, 2025
50c8f8c
adjust column schema inferred from arrow to destination caps in extra…
rudolfix Aug 17, 2025
19fe0e6
moves schema and data setup for all data types tests to common code
rudolfix Aug 17, 2025
2115b59
adds option to exclude columns in sql_table, uses LimitItem to genera…
rudolfix Aug 17, 2025
366f56c
tests sql_database on mssql for all data types and incremental cursor…
rudolfix Aug 17, 2025
950407e
improves tests for row tuples to arrow with cast to dlt schema, tests…
rudolfix Aug 17, 2025
2bb104b
improved test for timestamps and int with precision on duckdb
rudolfix Aug 17, 2025
840dbd8
disables Python 3.14 tests and dashboard test on mac
rudolfix Aug 17, 2025
845bd73
better maybe transaction in job client: takes into account ddl and re…
rudolfix Aug 17, 2025
6a12ba4
pyodbc py3.13 bump
rudolfix Aug 17, 2025
74f220b
timestamp docs WIP
rudolfix Aug 17, 2025
b423a73
fixes tests
rudolfix Aug 17, 2025
6a7a95d
review fixes
rudolfix Aug 31, 2025
a144eb4
Merge branch 'devel' into fix/2486-fixes-mssql-datetime-precision
rudolfix Aug 31, 2025
349ca9b
finalizes docs
rudolfix Aug 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,23 +932,29 @@ def convert_numpy_to_arrow(
data_type=dlt_data_type,
inferred_arrow_type=inferred_arrow_type,
details=(
"Insufficient decimal precision. Consider setting `precision` and `scale`"
" hints: https://dlthub.com/docs/general-usage/schema/#tables-and-columns"
f"Insufficient decimal precision {error_msg}. Consider setting `precision`"
" and `scale` hints:"
" https://dlthub.com/docs/general-usage/schema/#tables-and-columns"
),
) from e

elif (
"to utf8 using function cast_string" in error_msg
and dlt_data_type in ("json", "text")
and pa.types.is_string(inferred_arrow_type)
):
("to utf8 using function cast_string" in error_msg and dlt_data_type == "text")
or dlt_data_type == "json"
) and pa.types.is_string(inferred_arrow_type):
# this is handled by fallback case 3
logger.warning(
f"Received `data_type='{dlt_data_type}'`, data requires serialization to"
" string, slowing extraction. Cast the JSON field to STRING in your database"
" system to improve performance. For example, create and extract data from an"
" SQL VIEW that SELECT with CAST."
)
else:
raise PyToArrowConversionException(
data_type=dlt_data_type,
inferred_arrow_type=inferred_arrow_type,
details=f"This conversion is currently unsupported by dlt ({error_msg})",
)

# case 2: encode Sequence and Mapping types (list, tuples, set, dict, etc.) to JSON strings
# This logic needs to be before case 3, otherwise pyarrow might infer the deserialized JSON object as a `pyarrow.struct` instead of `pyarrow.string`
Expand Down Expand Up @@ -977,10 +983,10 @@ def convert_numpy_to_arrow(
if arrow_array is None and dlt_data_type is None:
try:
arrow_array = pa.array(column_data)
except (pa.ArrowInvalid, pyarrow.ArrowTypeError):
except (pa.ArrowInvalid, pyarrow.ArrowTypeError) as e:
logger.warning(
"Type can't be inferred by `pyarrow`. Values will be encoded as in a loop, slowing"
" extraction."
f"Type can't be inferred by `pyarrow` {e.args[0]}. Values will be encoded as in a"
" loop, slowing extraction."
)
encoded_values: list[Union[None, Mapping[Any, Any], Sequence[Any], str]] = []
for value in column_data:
Expand All @@ -1007,13 +1013,6 @@ def convert_numpy_to_arrow(

arrow_array = pa.array(encoded_values)

if arrow_array is None:
raise PyToArrowConversionException(
data_type=dlt_data_type,
inferred_arrow_type=inferred_arrow_type,
details="This data type seems currently unsupported by dlt. Please open a GitHub issue",
)

return arrow_array


Expand Down
20 changes: 20 additions & 0 deletions dlt/destinations/impl/databricks/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Optional, Type, Union, Dict, TYPE_CHECKING, Sequence, Tuple

from dlt.common import logger
from dlt.common.data_types.typing import TDataType
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.data_writers.escape import escape_databricks_identifier, escape_databricks_literal
Expand Down Expand Up @@ -39,6 +40,7 @@ class DatabricksTypeMapper(TypeMapperImpl):
"BOOLEAN": "bool",
"DATE": "date",
"TIMESTAMP": "timestamp",
"TIMESTAMP_NTZ": "timestamp",
"BIGINT": "bigint",
"INT": "bigint",
"SMALLINT": "bigint",
Expand Down Expand Up @@ -83,6 +85,24 @@ def to_db_integer_type(self, column: TColumnSchema, table: PreparedTableSchema =
f"bigint with `{precision=:}` can't be mapped to Databricks integer type"
)

def to_db_datetime_type(
self,
column: TColumnSchema,
table: PreparedTableSchema = None,
) -> str:
column_name = column["name"]
table_name = table["name"]
timezone = column.get("timezone", True)
precision = column.get("precision")

if precision and precision != 6:
logger.warn(
f"Databricks does not support precision {precision} for column '{column_name}' in"
f" table '{table_name}'. Will default to 6."
)

return "TIMESTAMP" if timezone else "TIMESTAMP_NTZ"

def from_destination_type(
self, db_type: str, precision: Optional[int] = None, scale: Optional[int] = None
) -> TColumnType:
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/mssql/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = True
caps.supports_create_table_if_not_exists = False # IF NOT EXISTS not supported
caps.max_rows_per_insert = 1000
caps.timestamp_precision = 7
# NOTE: timestamp_precision is 7 in the database but there's no way to write it via Python
caps.timestamp_precision = 6
caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"]
caps.supported_replace_strategies = [
"truncate-and-insert",
Expand Down
11 changes: 10 additions & 1 deletion dlt/destinations/impl/sqlalchemy/alter_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,16 @@ def flush(self) -> None:
class MigrationMaker:
def __init__(self, dialect: sa.engine.Dialect) -> None:
self._buf = ListBuffer()
self.ctx = MigrationContext(dialect, None, {"as_sql": True, "output_buffer": self._buf})
self.ctx = MigrationContext(
dialect,
None,
{
"as_sql": True,
"output_buffer": self._buf,
"mssql_batch_separator": None,
"oracle_batch_separator": None,
},
)
self.ops = Operations(self.ctx)

def add_column(self, table_name: str, column: sa.Column, schema: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/impl/sqlalchemy/merge_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ def _get_hard_delete_col_and_cond( # type: ignore[override]
cond = col.isnot(None)
if table["columns"][col_name]["data_type"] == "bool":
if invert:
cond = sa.or_(cond, col.is_(False))
cond = sa.or_(cond, col.eq_(False))
else:
cond = col.is_(True)
cond = col.eq_(True)
return col_name, cond

@classmethod
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/sqlalchemy/type_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ def to_destination_type( # type: ignore[override]
if length is None and column.get("unique"):
length = 128
if length is None:
return sa.Text()
return sa.String(length=length)
return sa.Text().with_variant(sa.UnicodeText(), "mssql") # type: ignore[no-any-return]
else:
return sa.String(length=length).with_variant(sa.Unicode(length=length), "mssql") # type: ignore[no-any-return]
elif sc_t == "double":
return self._create_double_type()
elif sc_t == "bool":
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/synapse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
# 10.000 records is a "safe" amount that always seems to work.
caps.max_rows_per_insert = 10000

# datetimeoffset can store 7 digits for fractional seconds
# NOTE: datetimeoffset can store 7 digits for fractional seconds, maybe you could use it with parquet in ns
# precision. you can pass synapse(timestamp_precision=7) to override
# https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetimeoffset-transact-sql?view=sql-server-ver16
caps.timestamp_precision = 7
caps.timestamp_precision = 6

caps.supported_merge_strategies = ["delete-insert", "scd2"]
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
Expand Down
10 changes: 10 additions & 0 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)

from dlt.extract.hints import TResourceNestedHints, make_hints
from dlt.extract.state import get_current_pipe_name
from dlt.extract.utils import dynstr
from dlt.extract.exceptions import (
CurrentSourceNotAvailable,
Expand Down Expand Up @@ -1032,6 +1033,15 @@ def get_source() -> DltSource:
raise CurrentSourceNotAvailable()


def get_resource() -> DltResource:
"""Should be executed from inside the function decorated with @dlt.resource

Returns:
DltResource: The resource object to which the currently executing pipe belongs
"""
return Container()[SourceInjectableContext].source.resources[get_current_pipe_name()]


TBoundItems = TypeVar("TBoundItems", bound=TDataItems)
TDeferred = Callable[[], TBoundItems]
TDeferredFunParams = ParamSpec("TDeferredFunParams")
Expand Down
65 changes: 33 additions & 32 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,28 +267,36 @@ def on_resolved(self) -> None:
"Incremental `end_value` was specified without `initial_value`."
"`initial_value` is required when using `end_value`."
)
self._cursor_datetime_check(self.initial_value, "initial_value")
self._cursor_datetime_check(self.initial_value, "end_value")
# Ensure end value is "higher" than initial value
if (
self.end_value is not None
and self.last_value_func([self.end_value, self.initial_value]) != self.end_value
):
if self.last_value_func in (min, max):
adject = "higher" if self.last_value_func is max else "lower"
msg = (
f"Incremental `initial_value={self.initial_value}` is {adject} than"
f" `end_value={self.end_value}`. 'end_value' must be {adject} than"
" `initial_value`."
)
else:
msg = (
f"Incremental `initial_value={self.initial_value}` is greater than"
f" `end_value={self.end_value}` as determined by the custom `last_value_func`."
f" The result of `{self.last_value_func.__name__}([end_value,"
" initial_value])` must equal `end_value`"
)
raise ConfigurationValueError(msg)
try:
if (
self.end_value is not None
and self.last_value_func([self.end_value, self.initial_value]) != self.end_value
):
if self.last_value_func in (min, max):
adject = "higher" if self.last_value_func is max else "lower"
msg = (
f"Incremental `initial_value={self.initial_value}` is {adject} than"
f" `end_value={self.end_value}`. 'end_value' must be {adject} than"
" `initial_value`."
)
else:
msg = (
f"Incremental `initial_value={self.initial_value}` is greater than"
f" `end_value={self.end_value}` as determined by the custom"
" `last_value_func`. The result of"
f" `{self.last_value_func.__name__}([end_value, initial_value])` must equal"
" `end_value`"
)
raise ConfigurationValueError(msg)
except ConfigurationValueError:
raise
except Exception as exc:
raise ConfigurationValueError(
f"Incremental `initial_value={self.initial_value}` and `end_value={self.end_value}`"
" are not comparable. Make sure they are of the same type and tz-awareness: "
+ str(exc)
) from exc

def parse_native_representation(self, native_value: Any) -> None:
if isinstance(native_value, Incremental):
Expand Down Expand Up @@ -344,6 +352,9 @@ def get_state(self) -> IncrementalColumnState:
"unique_hashes": [],
}
)
else:
# update initial value in existing state
self._cached_state["initial_value"] = self.initial_value
return self._cached_state

@staticmethod
Expand All @@ -354,16 +365,6 @@ def _get_state(resource_name: str, cursor_path: str) -> IncrementalColumnState:
# if state params is empty
return state

@staticmethod
def _cursor_datetime_check(value: Any, arg_name: str) -> None:
if value and isinstance(value, datetime) and value.tzinfo is None:
logger.warning(
f"The {arg_name} argument {value} is a datetime without timezone. This may result"
" in an error when such values are compared by Incremental class. Note that `dlt`"
" stores datetimes in timezone-aware types so the UTC timezone will be added by"
" the destination"
)

@property
def last_value(self) -> Optional[TCursorValue]:
s = self.get_state()
Expand All @@ -382,7 +383,7 @@ def last_value(self) -> Optional[TCursorValue]:
)
elif last_value is not None:
last_value = apply_lag(
self.lag, s["initial_value"], last_value, self.last_value_func
self.lag, self.initial_value, last_value, self.last_value_func
)

return last_value
Expand Down
Loading