Skip to content

Commit 3c12fe9

Browse files
committed
Improved test, adjusted custom metrics
1 parent c55ce51 commit 3c12fe9

File tree

5 files changed

+140
-102
lines changed

5 files changed

+140
-102
lines changed

dlt/extract/incremental/__init__.py

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
import os
22
from datetime import datetime # noqa: I251
3-
from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union, Literal, Tuple
3+
from typing import (
4+
Generic,
5+
ClassVar,
6+
Any,
7+
Optional,
8+
Type,
9+
Dict,
10+
Union,
11+
Literal,
12+
Tuple,
13+
)
414

515
import inspect
616
from functools import wraps
@@ -20,6 +30,7 @@
2030
is_optional_type,
2131
is_subclass,
2232
TColumnNames,
33+
TypedDict,
2334
)
2435
from dlt.common.configuration import configspec, ConfigurationValueError
2536
from dlt.common.configuration.specs import BaseConfiguration
@@ -63,8 +74,17 @@
6374
pandas = None
6475

6576

77+
class IncrementalCustomMetrics(TypedDict, total=False):
78+
unfiltered_items_count: int
79+
unfiltered_batches_count: int
80+
initial_unique_hashes_count: int
81+
final_unique_hashes_count: int
82+
83+
6684
@configspec
67-
class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorValue]):
85+
class Incremental(
86+
ItemTransform[TDataItem, IncrementalCustomMetrics], BaseConfiguration, Generic[TCursorValue]
87+
):
6888
"""Adds incremental extraction for a resource by storing a cursor value in persistent state.
6989
7090
The cursor could for example be a timestamp for when the record was created and you can use this to load only
@@ -191,8 +211,12 @@ def __init__(
191211
"""Bound pipe"""
192212
self.range_start = range_start
193213
self.range_end = range_end
194-
# Initialize custom metrics
195-
BaseItemTransform.__init__(self)
214+
self._custom_metrics: IncrementalCustomMetrics = {
215+
"unfiltered_items_count": 0,
216+
"unfiltered_batches_count": 0,
217+
"initial_unique_hashes_count": 0,
218+
"final_unique_hashes_count": 0,
219+
}
196220

197221
@property
198222
def primary_key(self) -> Optional[TTableHintTemplate[TColumnNames]]:
@@ -570,13 +594,8 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
570594
return rows
571595

572596
# collect metrics
573-
self.custom_metrics["unfiltered_items_count"] = self.custom_metrics.get(
574-
"unfiltered_items_count", 0
575-
) + count_rows_in_items(rows)
576-
self.custom_metrics["unfiltered_batches_count"] = (
577-
self.custom_metrics.get("unfiltered_batches_count", 0) + 1
578-
)
579-
self.custom_metrics["unique_hashes_count"] = len(self.get_state().get("unique_hashes", []))
597+
self.custom_metrics["unfiltered_items_count"] += count_rows_in_items(rows)
598+
self.custom_metrics["unfiltered_batches_count"] += 1
580599

581600
transformer = self._get_transform(rows)
582601
if isinstance(rows, list):
@@ -599,6 +618,10 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
599618
# writing back state
600619
self._cached_state["last_value"] = transformer.last_value
601620

621+
initial_hash_list = self._cached_state.get("unique_hashes")
622+
initial_hash_count = len(initial_hash_list) if initial_hash_list else 0
623+
self.custom_metrics["initial_unique_hashes_count"] = initial_hash_count
624+
602625
if transformer.boundary_deduplication:
603626
# compute hashes for new last rows
604627
# NOTE: object transform uses last_rows to pass rows to dedup, arrow computes
@@ -607,11 +630,11 @@ def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
607630
transformer.compute_unique_value(row, self.primary_key)
608631
for row in transformer.last_rows
609632
)
610-
initial_hash_count = len(self._cached_state.get("unique_hashes", []))
611633
# add directly computed hashes
612634
unique_hashes.update(transformer.unique_hashes)
613635
self._cached_state["unique_hashes"] = list(unique_hashes)
614636
final_hash_count = len(self._cached_state["unique_hashes"])
637+
self.custom_metrics["final_unique_hashes_count"] = final_hash_count
615638

616639
self._check_duplicate_cursor_threshold(initial_hash_count, final_hash_count)
617640
return rows
@@ -636,7 +659,7 @@ def _check_duplicate_cursor_threshold(
636659
TIncrementalConfig = Union[Incremental[Any], IncrementalArgs]
637660

638661

639-
class IncrementalResourceWrapper(ItemTransform[TDataItem]):
662+
class IncrementalResourceWrapper(ItemTransform[TDataItem, IncrementalCustomMetrics]):
640663
placement_affinity: ClassVar[float] = 1 # stick to end
641664

642665
_incremental: Optional[Incremental[Any]] = None
@@ -798,8 +821,8 @@ def allow_external_schedulers(self, value: bool) -> None:
798821
self._incremental.allow_external_schedulers = value
799822

800823
@property
801-
def custom_metrics(self) -> Dict[str, Any]:
802-
"""Returns custom metrics of the Incremental object itself"""
824+
def custom_metrics(self) -> IncrementalCustomMetrics:
825+
"""Returns custom metrics of the Incremental object itself if exists"""
803826
if self._incremental:
804827
return self._incremental.custom_metrics
805828
return {}

dlt/extract/items_transform.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
Optional,
1212
Union,
1313
Dict,
14+
TypeVar,
15+
cast,
1416
)
1517

1618
from dlt.common.data_writers.writers import count_rows_in_items
@@ -32,16 +34,19 @@
3234
ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]]
3335

3436

35-
class BaseItemTransform:
37+
TCustomMetrics = TypeVar("TCustomMetrics", covariant=True)
38+
39+
40+
class BaseItemTransform(Generic[TCustomMetrics]):
3641
def __init__(self) -> None:
37-
self._custom_metrics: Dict[str, Any] = {}
42+
self._custom_metrics: TCustomMetrics = cast(TCustomMetrics, {})
3843

3944
@property
40-
def custom_metrics(self) -> Dict[str, Any]:
45+
def custom_metrics(self) -> TCustomMetrics:
4146
return self._custom_metrics
4247

4348

44-
class ItemTransform(BaseItemTransform, ABC, Generic[TAny]):
49+
class ItemTransform(BaseItemTransform[TCustomMetrics], ABC, Generic[TAny, TCustomMetrics]):
4550
_f_meta: ItemTransformFunctionWithMeta[TAny] = None
4651
_f: ItemTransformFunctionNoMeta[TAny] = None
4752

@@ -58,7 +63,9 @@ def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None:
5863
else: # TODO: do better check
5964
self._f_meta = transform_f # type: ignore
6065

61-
def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]":
66+
def bind(
67+
self: "ItemTransform[TAny, TCustomMetrics]", pipe: SupportsPipe
68+
) -> "ItemTransform[TAny, TCustomMetrics]":
6269
return self
6370

6471
@abstractmethod
@@ -67,7 +74,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
6774
pass
6875

6976

70-
class FilterItem(ItemTransform[bool]):
77+
class FilterItem(ItemTransform[bool, Dict[str, Any]]):
7178
# mypy needs those to type correctly
7279
_f_meta: ItemTransformFunctionWithMeta[bool]
7380
_f: ItemTransformFunctionNoMeta[bool]
@@ -93,7 +100,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
93100
return item if self._f(item) else None
94101

95102

96-
class MapItem(ItemTransform[TDataItem]):
103+
class MapItem(ItemTransform[TDataItem, Dict[str, Any]]):
97104
# mypy needs those to type correctly
98105
_f_meta: ItemTransformFunctionWithMeta[TDataItem]
99106
_f: ItemTransformFunctionNoMeta[TDataItem]
@@ -115,7 +122,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
115122
return self._f(item)
116123

117124

118-
class YieldMapItem(ItemTransform[Iterator[TDataItem]]):
125+
class YieldMapItem(ItemTransform[Iterator[TDataItem], Dict[str, Any]]):
119126
# mypy needs those to type correctly
120127
_f_meta: ItemTransformFunctionWithMeta[TDataItem]
121128
_f: ItemTransformFunctionNoMeta[TDataItem]
@@ -138,7 +145,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
138145
yield from self._f(item)
139146

140147

141-
class ValidateItem(ItemTransform[TDataItem]):
148+
class ValidateItem(ItemTransform[TDataItem, Dict[str, Any]]):
142149
"""Base class for validators of data items.
143150
144151
Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`.
@@ -149,12 +156,12 @@ class ValidateItem(ItemTransform[TDataItem]):
149156

150157
table_name: str
151158

152-
def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]:
159+
def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem, Dict[str, Any]]:
153160
self.table_name = pipe.name
154161
return self
155162

156163

157-
class LimitItem(ItemTransform[TDataItem]):
164+
class LimitItem(ItemTransform[TDataItem, Dict[str, Any]]):
158165
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental
159166

160167
def __init__(

dlt/extract/pipe.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
Iterator,
1313
List,
1414
Tuple,
15+
Dict,
1516
)
1617

1718
from dlt.common.reflection.inspect import isasyncgenfunction, isgeneratorfunction
@@ -42,7 +43,7 @@
4243
)
4344

4445

45-
class ForkPipe(ItemTransform[ResolvablePipeItem]):
46+
class ForkPipe(ItemTransform[ResolvablePipeItem, Dict[str, Any]]):
4647
placement_affinity: ClassVar[float] = 2
4748

4849
def __init__(self, pipe: "Pipe", step: int = -1, copy_on_fork: bool = False) -> None:

0 commit comments

Comments
 (0)