Skip to content

Commit cd155f3

Browse files
enable scd2 record reinsertion by dropping unique constraint
1 parent 0459c17 commit cd155f3

File tree

3 files changed

+72
-6
lines changed

3 files changed

+72
-6
lines changed

dlt/destinations/sql_jobs.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -749,20 +749,20 @@ def gen_scd2_sql(
749749
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
750750
SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_literal} AS {to}
751751
FROM {staging_root_table_name} AS s
752-
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name});
752+
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name} WHERE {is_active_clause});
753753
""")
754754

755755
# insert list elements for new active records in child tables
756756
child_tables = table_chain[1:]
757757
if child_tables:
758-
unique_column = escape_column_id(
759-
cls._get_unique_col(table_chain, sql_client, root_table)
760-
)
761758
# TODO: - based on deterministic child hashes (OK)
762759
# - if row hash changes all is right
763760
# - if it does not we only capture new records, while we should replace existing with those in stage
764761
# - this write disposition is way more similar to regular merge (how root tables are handled is different, other tables handled same)
765762
for table in child_tables:
763+
unique_column = escape_column_id(
764+
cls._get_unique_col(table_chain, sql_client, table)
765+
)
766766
table_name, staging_table_name = sql_client.get_qualified_table_names(table["name"])
767767
sql.append(f"""
768768
INSERT INTO {table_name}

dlt/extract/hints.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,11 +465,16 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
465465
"x-valid-to": True,
466466
"x-active-record-timestamp": mddict.get("active_record_timestamp"),
467467
}
468+
# unique constraint is dropped for C_DLT_ID when used to store
469+
# SCD2 row hash (only applies to root table)
468470
hash_ = mddict.get("row_version_column_name", DataItemNormalizer.C_DLT_ID)
469471
dict_["columns"][hash_] = {
470472
"name": hash_,
471473
"nullable": False,
472474
"x-row-version": True,
475+
# duplicate value in row hash column is possible in case
476+
# of insert-delete-reinsert pattern
477+
"unique": False,
473478
}
474479

475480
@staticmethod

tests/load/pipeline/test_scd2.py

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@ def r(data):
154154
assert table["columns"][from_]["x-valid-from"] # type: ignore[typeddict-item]
155155
assert table["columns"][to]["x-valid-to"] # type: ignore[typeddict-item]
156156
assert table["columns"]["_dlt_id"]["x-row-version"] # type: ignore[typeddict-item]
157-
# _dlt_id is still unique
158-
assert table["columns"]["_dlt_id"]["unique"]
157+
# root table _dlt_id is not unique with `scd2` merge strategy
158+
assert not table["columns"]["_dlt_id"]["unique"]
159159

160160
# assert load results
161161
ts_1 = get_load_package_created_at(p, info)
@@ -489,6 +489,67 @@ def r(data):
489489
)
490490

491491

492+
@pytest.mark.parametrize(
493+
"destination_config",
494+
destinations_configs(default_sql_configs=True, supports_merge=True),
495+
ids=lambda x: x.name,
496+
)
497+
def test_record_reinsert(destination_config: DestinationTestConfiguration) -> None:
498+
p = destination_config.setup_pipeline("abstract", dev_mode=True)
499+
500+
@dlt.resource(
501+
table_name="dim_test", write_disposition={"disposition": "merge", "strategy": "scd2"}
502+
)
503+
def r(data):
504+
yield data
505+
506+
# load 1 — initial load
507+
dim_snap = [
508+
r1 := {"nk": 1, "c1": "foo", "c2": "foo", "child": [1]},
509+
r2 := {"nk": 2, "c1": "bar", "c2": "bar", "child": [2, 3]},
510+
]
511+
info = p.run(r(dim_snap))
512+
assert_load_info(info)
513+
assert load_table_counts(p, "dim_test")["dim_test"] == 2
514+
assert load_table_counts(p, "dim_test__child")["dim_test__child"] == 3
515+
ts_1 = get_load_package_created_at(p, info)
516+
517+
# load 2 — delete natural key 1
518+
dim_snap = [r2]
519+
info = p.run(r(dim_snap))
520+
assert_load_info(info)
521+
assert load_table_counts(p, "dim_test")["dim_test"] == 2
522+
assert load_table_counts(p, "dim_test__child")["dim_test__child"] == 3
523+
ts_2 = get_load_package_created_at(p, info)
524+
525+
# load 3 — reinsert natural key 1
526+
dim_snap = [r1, r2]
527+
info = p.run(r(dim_snap))
528+
assert_load_info(info)
529+
assert load_table_counts(p, "dim_test")["dim_test"] == 3
530+
assert load_table_counts(p, "dim_test__child")["dim_test__child"] == 3 # no new record
531+
ts_3 = get_load_package_created_at(p, info)
532+
533+
# assert parent records
534+
from_, to = DEFAULT_VALIDITY_COLUMN_NAMES
535+
r1_no_child = {k: v for k, v in r1.items() if k != "child"}
536+
r2_no_child = {k: v for k, v in r2.items() if k != "child"}
537+
expected = [
538+
{**{from_: ts_1, to: ts_2}, **r1_no_child},
539+
{**{from_: ts_3, to: None}, **r1_no_child},
540+
{**{from_: ts_1, to: None}, **r2_no_child},
541+
]
542+
assert_records_as_set(get_table(p, "dim_test"), expected)
543+
544+
# assert child records
545+
expected = [
546+
{"_dlt_root_id": get_row_hash(r1), "value": 1}, # links to two records in parent
547+
{"_dlt_root_id": get_row_hash(r2), "value": 2},
548+
{"_dlt_root_id": get_row_hash(r2), "value": 3},
549+
]
550+
assert_records_as_set(get_table(p, "dim_test__child"), expected)
551+
552+
492553
@pytest.mark.parametrize(
493554
"destination_config",
494555
destinations_configs(default_sql_configs=True, subset=["duckdb"]),

0 commit comments

Comments
 (0)