Skip to content

Commit 1e73d67

Browse files
authored
Refactor boundary timestamp handling in SqlMergeFollowupJob and SqlalchemyMergeFollowupJob to ensure current load package creation time is used when no boundary timestamp is provided. Update DltResourceHints class to streamline timestamp validation for active_record_timestamp and boundary_timestamp. Adjust tests accordingly. (#3378)
1 parent 382eb6b commit 1e73d67

File tree

5 files changed

+159
-68
lines changed

5 files changed

+159
-68
lines changed

dlt/destinations/impl/sqlalchemy/merge_job.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from typing import Sequence, Tuple, Optional, List, Union
1+
from typing import Sequence, Tuple, Optional, List, Union, cast
22
import operator
33
import sqlalchemy as sa
44

5+
from dlt.common.typing import TAnyDateTime
56
from dlt.common.utils import uniq_id
67
from dlt.common.destination import PreparedTableSchema, DestinationCapabilitiesContext
78
from dlt.common.schema.utils import (
@@ -374,10 +375,13 @@ def gen_scd2_sql(
374375
format_datetime_literal = (
375376
DestinationCapabilitiesContext.generic_capabilities().format_datetime_literal
376377
)
377-
378-
boundary_ts = ensure_pendulum_datetime_utc(
379-
root_table.get("x-boundary-timestamp", current_load_package()["state"]["created_at"]) # type: ignore[arg-type]
378+
_boundary_ts = cast(Optional[TAnyDateTime], root_table.get("x-boundary-timestamp"))
379+
boundary_ts: TAnyDateTime = (
380+
_boundary_ts
381+
if _boundary_ts is not None
382+
else current_load_package()["state"]["created_at"]
380383
)
384+
boundary_ts = ensure_pendulum_datetime_utc(boundary_ts)
381385

382386
boundary_literal = format_datetime_literal(boundary_ts, caps.timestamp_precision)
383387

dlt/destinations/sql_jobs.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dlt.common.time import ensure_pendulum_datetime_utc
55
from dlt.common.destination import PreparedTableSchema
66
from dlt.common.destination.utils import resolve_merge_strategy
7-
from dlt.common.typing import TypedDict
7+
from dlt.common.typing import TAnyDateTime, TypedDict
88

99
from dlt.common.schema.typing import (
1010
TSortOrder,
@@ -845,12 +845,14 @@ def gen_scd2_sql(
845845
DestinationCapabilitiesContext.generic_capabilities().format_datetime_literal
846846
)
847847

848-
boundary_ts = ensure_pendulum_datetime_utc(
849-
root_table.get( # type: ignore[arg-type]
850-
"x-boundary-timestamp",
851-
current_load_package()["state"]["created_at"],
852-
)
848+
_boundary_ts = cast(Optional[TAnyDateTime], root_table.get("x-boundary-timestamp"))
849+
boundary_ts: TAnyDateTime = (
850+
_boundary_ts
851+
if _boundary_ts is not None
852+
else current_load_package()["state"]["created_at"]
853853
)
854+
boundary_ts = ensure_pendulum_datetime_utc(boundary_ts)
855+
854856
boundary_literal = format_datetime_literal(
855857
boundary_ts,
856858
caps.timestamp_precision,

dlt/extract/hints.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,8 @@ def validate_write_disposition_hint(template: TResourceHints) -> None:
830830
):
831831
continue # None is allowed for active_record_timestamp
832832
if ts in wd:
833+
if wd[ts] is None: # type: ignore[literal-required]
834+
continue
833835
try:
834836
ensure_pendulum_datetime_utc(wd[ts]) # type: ignore[literal-required]
835837
except Exception:

docs/website/docs/general-usage/merge-loading.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,35 @@ def dim_customer():
567567
...
568568
```
569569

570+
#### Reset boundary timestamp to the current load time
571+
To stop using a previously set `boundary_timestamp` and revert to the default (the current load package creation time), set `boundary_timestamp` to `None`. You can do this either at definition time or dynamically with `apply_hints` before a run.
572+
573+
Definition-time (always use current load time):
574+
```py
575+
@dlt.resource(
576+
write_disposition={
577+
"disposition": "merge",
578+
"strategy": "scd2",
579+
"boundary_timestamp": None, # reset to current load time
580+
}
581+
)
582+
def dim_customer():
583+
...
584+
```
585+
586+
Per-run reset (override just for this run):
587+
```py
588+
r.apply_hints(
589+
write_disposition={
590+
"disposition": "merge",
591+
"strategy": "scd2",
592+
"boundary_timestamp": None, # reset to current load time for this run
593+
}
594+
)
595+
pipeline.run(r(...))
596+
```
597+
When `boundary_timestamp` is `None` (or omitted), `dlt` uses the load package's creation timestamp as the boundary for both retiring existing versions and creating new versions.
598+
570599
### Example: Use your own row hash
571600
By default, `dlt` generates a row hash based on all columns provided by the resource and stores it in `_dlt_id`. You can use your own hash instead by specifying `row_version_column_name` in the `write_disposition` dictionary. You might already have a column present in your resource that can naturally serve as a row hash, in which case it's more efficient to use those pre-existing hash values than to generate new artificial ones. This option also allows you to use hashes based on a subset of columns, in case you want to ignore changes in some of the columns. When using your own hash, values for `_dlt_id` are randomly generated.
572601
```py

tests/load/pipeline/test_scd2.py

Lines changed: 112 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# timezone is removed from all datetime objects in these tests to simplify comparison
22

3+
from unittest import mock
34
import pytest
45
from typing import List, Dict, Any, Optional
56
from datetime import date, datetime, timezone # noqa: I251
@@ -633,7 +634,7 @@ def r():
633634

634635
@pytest.mark.parametrize(
635636
"destination_config",
636-
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
637+
destinations_configs(default_sql_configs=True, subset=["sqlalchemy", "duckdb"]),
637638
ids=lambda x: x.name,
638639
)
639640
def test_boundary_timestamp(
@@ -645,6 +646,7 @@ def test_boundary_timestamp(
645646
ts2 = "2024-08-22"
646647
ts3 = date(2024, 8, 20) # earlier than ts1 and ts2
647648
ts4 = "i_am_not_a_timestamp"
649+
ts5 = pendulum.datetime(2025, 8, 21, 12, 15, tz="UTC").timestamp()
648650

649651
@dlt.resource(
650652
table_name="dim_test",
@@ -657,75 +659,127 @@ def test_boundary_timestamp(
657659
def r(data):
658660
yield data
659661

662+
# normalize timestamps once for assertions
663+
ts1_dt = strip_timezone(ts1)
664+
ts2_dt = strip_timezone(ts2)
665+
ts3_dt = strip_timezone(ts3)
666+
ts5_dt = strip_timezone(ts5)
667+
660668
# load 1 — initial load
661669
dim_snap = [
662670
l1_1 := {"nk": 1, "foo": "foo"},
663671
l1_2 := {"nk": 2, "foo": "foo"},
664672
]
665-
info = p.run(r(dim_snap), **destination_config.run_kwargs)
666-
assert_load_info(info)
667-
assert load_table_counts(p, "dim_test")["dim_test"] == 2
668-
expected = [
669-
{**{FROM: strip_timezone(ts1), TO: None}, **l1_1},
670-
{**{FROM: strip_timezone(ts1), TO: None}, **l1_2},
671-
]
672-
assert get_table(p, "dim_test", "nk") == expected
673-
674-
# load 2 — different source records, different boundary timestamp
675-
r.apply_hints(
676-
write_disposition={
677-
"disposition": "merge",
678-
"strategy": "scd2",
679-
"boundary_timestamp": ts2,
680-
}
681-
)
682-
dim_snap = [
683-
l2_1 := {"nk": 1, "foo": "bar"}, # natural key 1 updated
684-
# l1_2, # natural key 2 no longer present
685-
l2_3 := {"nk": 3, "foo": "foo"}, # new natural key
686-
]
687-
info = p.run(r(dim_snap), **destination_config.run_kwargs)
688-
assert_load_info(info)
689-
assert load_table_counts(p, "dim_test")["dim_test"] == 4
690-
expected = [
691-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # retired
692-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # retired
693-
{**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # new
694-
{**{FROM: strip_timezone(ts2), TO: None}, **l2_3}, # new
695-
]
696-
assert_records_as_set(get_table(p, "dim_test"), expected)
697-
698-
# load 3 — earlier boundary timestamp
699-
# we naively apply any valid timestamp
700-
# may lead to "valid from" > "valid to", as in this test case
701-
r.apply_hints(
702-
write_disposition={
703-
"disposition": "merge",
704-
"strategy": "scd2",
705-
"boundary_timestamp": ts3,
706-
}
707-
)
708-
dim_snap = [l2_1] # natural key 3 no longer present
709-
info = p.run(r(dim_snap), **destination_config.run_kwargs)
710-
assert_load_info(info)
711-
assert load_table_counts(p, "dim_test")["dim_test"] == 4
712-
expected = [
713-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_1}, # unchanged
714-
{**{FROM: strip_timezone(ts1), TO: strip_timezone(ts2)}, **l1_2}, # unchanged
715-
{**{FROM: strip_timezone(ts2), TO: None}, **l2_1}, # unchanged
716-
{**{FROM: strip_timezone(ts2), TO: strip_timezone(ts3)}, **l2_3}, # retired
717-
]
718-
assert_records_as_set(get_table(p, "dim_test"), expected)
673+
current_time: Dict[str, Optional[float]] = {"ts": None}
674+
with mock.patch(
675+
"dlt.common.storages.load_package.precise_time",
676+
side_effect=lambda: current_time["ts"],
677+
):
678+
# load 1 — initial load
679+
current_time["ts"] = pendulum.datetime(2024, 8, 21, 12, 15, tz="UTC").timestamp()
680+
r.apply_hints(
681+
write_disposition={
682+
"disposition": "merge",
683+
"strategy": "scd2",
684+
"boundary_timestamp": ts1,
685+
}
686+
)
687+
info = p.run(r(dim_snap), **destination_config.run_kwargs)
688+
assert_load_info(info)
689+
assert load_table_counts(p, "dim_test")["dim_test"] == 2
690+
expected = [
691+
{**{FROM: ts1_dt, TO: None}, **l1_1},
692+
{**{FROM: ts1_dt, TO: None}, **l1_2},
693+
]
694+
assert get_table(p, "dim_test", "nk", ts_columns=[FROM, TO]) == expected
695+
696+
# load 2 — different source records, different boundary timestamp
697+
current_time["ts"] = pendulum.datetime(2024, 8, 22, tz="UTC").timestamp()
698+
dim_snap = [
699+
l2_1 := {"nk": 1, "foo": "bar"}, # natural key 1 updated
700+
# l1_2, # natural key 2 no longer present
701+
l2_3 := {"nk": 3, "foo": "foo"}, # new natural key
702+
]
703+
r.apply_hints(
704+
write_disposition={
705+
"disposition": "merge",
706+
"strategy": "scd2",
707+
"boundary_timestamp": ts2,
708+
}
709+
)
710+
info = p.run(r(dim_snap), **destination_config.run_kwargs)
711+
assert_load_info(info)
712+
assert load_table_counts(p, "dim_test")["dim_test"] == 4
713+
expected = [
714+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_1}, # retired
715+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_2}, # retired
716+
{**{FROM: ts2_dt, TO: None}, **l2_1}, # new
717+
{**{FROM: ts2_dt, TO: None}, **l2_3}, # new
718+
]
719+
assert_records_as_set(get_table(p, "dim_test", ts_columns=[FROM, TO]), expected)
720+
721+
# load 3 — earlier boundary timestamp
722+
# we naively apply any valid timestamp
723+
# may lead to "valid from" > "valid to", as in this test case
724+
current_time["ts"] = pendulum.datetime(2024, 8, 22, 0, 0, 1, tz="UTC").timestamp()
725+
dim_snap = [l2_1] # natural key 3 no longer present
726+
r.apply_hints(
727+
write_disposition={
728+
"disposition": "merge",
729+
"strategy": "scd2",
730+
"boundary_timestamp": ts3,
731+
}
732+
)
733+
info = p.run(r(dim_snap), **destination_config.run_kwargs)
734+
assert_load_info(info)
735+
assert load_table_counts(p, "dim_test")["dim_test"] == 4
736+
expected = [
737+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_1}, # unchanged
738+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_2}, # unchanged
739+
{**{FROM: ts2_dt, TO: None}, **l2_1}, # unchanged
740+
{**{FROM: ts2_dt, TO: ts3_dt}, **l2_3}, # retired
741+
]
742+
assert_records_as_set(get_table(p, "dim_test", ts_columns=[FROM, TO]), expected)
743+
744+
# invalid boundary timestamp should raise error
745+
with pytest.raises(ValueError):
746+
r.apply_hints(
747+
write_disposition={
748+
"disposition": "merge",
749+
"strategy": "scd2",
750+
"boundary_timestamp": ts4,
751+
}
752+
)
719753

720-
# invalid boundary timestamp should raise error
721-
with pytest.raises(ValueError):
754+
# run 4 — no boundary timestamp (use current precise_time)
755+
current_time["ts"] = ts5
756+
dim_snap = [
757+
l3_1 := {"nk": 1, "foo": "foobar"}, # updated
758+
]
722759
r.apply_hints(
723760
write_disposition={
724761
"disposition": "merge",
725762
"strategy": "scd2",
726-
"boundary_timestamp": ts4,
763+
"boundary_timestamp": None,
727764
}
728765
)
766+
info = p.run(r(dim_snap), **destination_config.run_kwargs)
767+
assert_load_info(info)
768+
assert load_table_counts(p, "dim_test")["dim_test"] == 5
769+
expected = [
770+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_1}, # unchanged
771+
{**{FROM: ts1_dt, TO: ts2_dt}, **l1_2}, # unchanged
772+
{
773+
**{FROM: ts2_dt, TO: ts5_dt},
774+
**l2_1,
775+
}, # retired in this run
776+
{
777+
**{FROM: ts2_dt, TO: ts3_dt},
778+
**l2_3,
779+
}, # unchanged (already retired in load 3)
780+
{**{FROM: ts5_dt, TO: None}, **l3_1},
781+
]
782+
assert_records_as_set(get_table(p, "dim_test", ts_columns=[FROM, TO]), expected)
729783

730784

731785
@pytest.mark.essential

0 commit comments

Comments
 (0)