Skip to content

Commit 7e6d35a

Browse files
committed
improved test, no boundary dedup edge case now resets unique hashes
1 parent 4441162 commit 7e6d35a

File tree

2 files changed

+26
-19
lines changed

2 files changed

+26
-19
lines changed

dlt/extract/incremental/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -618,10 +618,6 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
618618
# writing back state
619619
self._cached_state["last_value"] = transformer.last_value
620620

621-
initial_hash_list = self._cached_state.get("unique_hashes")
622-
initial_hash_count = len(initial_hash_list) if initial_hash_list else 0
623-
self.custom_metrics["initial_unique_hashes_count"] = initial_hash_count
624-
625621
if transformer.boundary_deduplication:
626622
# compute hashes for new last rows
627623
# NOTE: object transform uses last_rows to pass rows to dedup, arrow computes
@@ -630,13 +626,19 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
630626
transformer.compute_unique_value(row, self.primary_key)
631627
for row in transformer.last_rows
632628
)
629+
initial_hash_list = self._cached_state.get("unique_hashes")
630+
initial_hash_count = len(initial_hash_list) if initial_hash_list else 0
631+
self.custom_metrics["initial_unique_hashes_count"] = initial_hash_count
632+
633633
# add directly computed hashes
634634
unique_hashes.update(transformer.unique_hashes)
635635
self._cached_state["unique_hashes"] = list(unique_hashes)
636636
final_hash_count = len(self._cached_state["unique_hashes"])
637637
self.custom_metrics["final_unique_hashes_count"] = final_hash_count
638638

639639
self._check_duplicate_cursor_threshold(initial_hash_count, final_hash_count)
640+
else:
641+
self._cached_state["unique_hashes"] = []
640642
return rows
641643

642644
def _check_duplicate_cursor_threshold(

tests/extract/test_incremental.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4444,34 +4444,39 @@ def _run_with_items(items: TDataItems, as_batch: bool) -> str:
44444444
load_id = _run_with_items([{"id": 3, "value": "3"}, {"id": 4, "value": "4"}], False)
44454445
_assert_custom_metrics(load_id, 5, 4, 1, 1, 1)
44464446

4447-
# 4. run with duplicate cursor field values, but different hashes, as batch
4447+
# 4. run with duplicate cursor field values, but different hashes, as a single batch
44484448
load_id = _run_with_items(
44494449
[{"id": 5, "value": "5.1"}, {"id": 5, "value": "5.2"}, {"id": 5, "value": "5.3"}], True
44504450
)
44514451
_assert_custom_metrics(load_id, 8, 5, 1, 3, 3)
44524452

4453-
# 5. run with the same values as batch from previous run, but with no boundary deduplication
4453+
# 5. reset incremental with no boundary deduplication (primary_key=()) and run with the same values
4454+
# from previous run, should be loaded as a single batch with 3 items
44544455
resource_with_metrics.apply_hints(
44554456
incremental=dlt.sources.incremental(cursor_path="id", initial_value=-1, primary_key=())
44564457
)
44574458
load_id = _run_with_items(
44584459
[{"id": 5, "value": "5.1"}, {"id": 5, "value": "5.2"}, {"id": 5, "value": "5.3"}], True
44594460
)
4460-
_assert_custom_metrics(load_id, 3, 1, 3, 0, 3)
4461+
_assert_custom_metrics(load_id, 3, 1, 0, 0, 3)
44614462

4462-
# 6. run with two new items as a single batch
4463-
load_id = _run_with_items([{"id": 6, "value": "6.1"}, {"id": 6, "value": "6.2"}], True)
4464-
_assert_custom_metrics(load_id, 5, 2, 3, 0, 2)
4463+
# 6. run with one old and one new item as a single batch (still no boundary deduplication)
4464+
# should be loaded as a single batch with 2 items
4465+
load_id = _run_with_items([{"id": 5, "value": "5.1"}, {"id": 6, "value": "6.1"}], True)
4466+
_assert_custom_metrics(load_id, 5, 2, 0, 0, 2)
44654467

4466-
# 7. run with two new items as a single batch, with boundary deduplication
4468+
# 7. enable boundary deduplication and run with one old and one new item as a single batch
4469+
# should be loaded as a single batch with 2 items
44674470
resource_with_metrics.incremental.primary_key = "id"
4468-
load_id = _run_with_items({"id": 7, "value": "7"}, True)
4469-
_assert_custom_metrics(load_id, 6, 3, 3, 1, 1)
4471+
load_id = _run_with_items([{"id": 6, "value": "6.1"}, {"id": 7, "value": "7"}], True)
4472+
_assert_custom_metrics(load_id, 7, 3, 0, 1, 2)
44704473

4471-
# 8. run with None within a batch -> should increment unfiltered_items_count
4472-
load_id = _run_with_items([None, {"id": 8, "value": "8"}], True)
4473-
_assert_custom_metrics(load_id, 8, 4, 1, 1, 1)
4474-
4475-
# 9. run with None as a single batch -> should not increment unfiltered_items_count
4476-
load_id = _run_with_items([None, {"id": 9, "value": "9"}], False)
4474+
# 8. run with one old and one new item each as batch
4475+
# only the new item should be loaded
4476+
load_id = _run_with_items([{"id": 7, "value": "7"}, {"id": 8, "value": "8"}], False)
44774477
_assert_custom_metrics(load_id, 9, 5, 1, 1, 1)
4478+
4479+
# 9. run with None items and one new item as single batch
4480+
# None items should increment unfiltered_items_count
4481+
load_id = _run_with_items([None, None, {"id": 9, "value": "9"}], True)
4482+
_assert_custom_metrics(load_id, 12, 6, 1, 1, 1)

0 commit comments

Comments
 (0)