Skip to content

Commit 1723faa

Browse files
Fix/1571 Incremental: Optionally raise, load, or ignore raise records with cursor_path missing or None value (#1576)
* allows specification of what happens on cursor_path missing or cursor_path having the value None: raise differentiated exceptions, exclude row, or include row. * Documents handling None values at the incremental cursor * fixes incremental extract crashing if one record has cursor_path = None * test that add_map can be used to transform items before the incremental function is called * Unifies treating of None values for python Objects (including pydantic), pandas, and arrow --------- Co-authored-by: Marcin Rudolf <[email protected]>
1 parent fbf0ef4 commit 1723faa

File tree

7 files changed

+624
-35
lines changed

7 files changed

+624
-35
lines changed

dlt/extract/incremental/__init__.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@
3535
IncrementalCursorPathMissing,
3636
IncrementalPrimaryKeyMissing,
3737
)
38-
from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc
38+
from dlt.extract.incremental.typing import (
39+
IncrementalColumnState,
40+
TCursorValue,
41+
LastValueFunc,
42+
OnCursorValueMissing,
43+
)
3944
from dlt.extract.pipe import Pipe
4045
from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform
4146
from dlt.extract.incremental.transform import (
@@ -81,7 +86,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
8186
>>> info = p.run(r, destination="duckdb")
8287
8388
Args:
84-
cursor_path: The name or a JSON path to an cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
89+
cursor_path: The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
8590
initial_value: Optional value used for `last_value` when no state is available, e.g. on the first run of the pipeline. If not provided `last_value` will be `None` on the first run.
8691
last_value_func: Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is `max`
8792
primary_key: Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks
@@ -95,6 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
95100
specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class.
96101
The values passed explicitly to Incremental will be ignored.
97102
Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded
103+
on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude
98104
"""
99105

100106
# this is config/dataclass so declare members
@@ -104,6 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
104110
end_value: Optional[Any] = None
105111
row_order: Optional[TSortOrder] = None
106112
allow_external_schedulers: bool = False
113+
on_cursor_value_missing: OnCursorValueMissing = "raise"
107114

108115
# incremental acting as empty
109116
EMPTY: ClassVar["Incremental[Any]"] = None
@@ -118,6 +125,7 @@ def __init__(
118125
end_value: Optional[TCursorValue] = None,
119126
row_order: Optional[TSortOrder] = None,
120127
allow_external_schedulers: bool = False,
128+
on_cursor_value_missing: OnCursorValueMissing = "raise",
121129
) -> None:
122130
# make sure that path is valid
123131
if cursor_path:
@@ -133,6 +141,11 @@ def __init__(
133141
self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key
134142
self.row_order = row_order
135143
self.allow_external_schedulers = allow_external_schedulers
144+
if on_cursor_value_missing not in ["raise", "include", "exclude"]:
145+
raise ValueError(
146+
f"Unexpected argument for on_cursor_value_missing. Got {on_cursor_value_missing}"
147+
)
148+
self.on_cursor_value_missing = on_cursor_value_missing
136149

137150
self._cached_state: IncrementalColumnState = None
138151
"""State dictionary cached on first access"""
@@ -171,6 +184,7 @@ def _make_transforms(self) -> None:
171184
self.last_value_func,
172185
self._primary_key,
173186
set(self._cached_state["unique_hashes"]),
187+
self.on_cursor_value_missing,
174188
)
175189

176190
@classmethod

dlt/extract/incremental/exceptions.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,27 @@
55

66

77
class IncrementalCursorPathMissing(PipeException):
8-
def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None:
8+
def __init__(
9+
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
10+
) -> None:
11+
self.json_path = json_path
12+
self.item = item
13+
msg = (
14+
msg
15+
or f"Cursor element with JSON path `{json_path}` was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database."
16+
)
17+
super().__init__(pipe_name, msg)
18+
19+
20+
class IncrementalCursorPathHasValueNone(PipeException):
21+
def __init__(
22+
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
23+
) -> None:
924
self.json_path = json_path
1025
self.item = item
1126
msg = (
1227
msg
13-
or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database."
28+
or f"Cursor element with JSON path `{json_path}` has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows"
1429
)
1530
super().__init__(pipe_name, msg)
1631

dlt/extract/incremental/transform.py

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from datetime import datetime, date # noqa: I251
2-
from typing import Any, Optional, Set, Tuple, List
1+
from datetime import datetime # noqa: I251
2+
from typing import Any, Optional, Set, Tuple, List, Type
33

44
from dlt.common.exceptions import MissingDependencyException
55
from dlt.common.utils import digest128
@@ -11,8 +11,9 @@
1111
IncrementalCursorInvalidCoercion,
1212
IncrementalCursorPathMissing,
1313
IncrementalPrimaryKeyMissing,
14+
IncrementalCursorPathHasValueNone,
1415
)
15-
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc
16+
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing
1617
from dlt.extract.utils import resolve_column_value
1718
from dlt.extract.items import TTableHintTemplate
1819
from dlt.common.schema.typing import TColumnNames
@@ -55,6 +56,7 @@ def __init__(
5556
last_value_func: LastValueFunc[TCursorValue],
5657
primary_key: Optional[TTableHintTemplate[TColumnNames]],
5758
unique_hashes: Set[str],
59+
on_cursor_value_missing: OnCursorValueMissing = "raise",
5860
) -> None:
5961
self.resource_name = resource_name
6062
self.cursor_path = cursor_path
@@ -67,6 +69,7 @@ def __init__(
6769
self.primary_key = primary_key
6870
self.unique_hashes = unique_hashes
6971
self.start_unique_hashes = set(unique_hashes)
72+
self.on_cursor_value_missing = on_cursor_value_missing
7073

7174
# compile jsonpath
7275
self._compiled_cursor_path = compile_path(cursor_path)
@@ -116,21 +119,39 @@ class JsonIncremental(IncrementalTransform):
116119
def find_cursor_value(self, row: TDataItem) -> Any:
117120
"""Finds value in row at cursor defined by self.cursor_path.
118121
119-
Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict
122+
Will use compiled JSONPath if present.
123+
Otherwise, reverts to field access if row is dict, Pydantic model, or of other class.
120124
"""
121-
row_value: Any = None
125+
key_exc: Type[Exception] = IncrementalCursorPathHasValueNone
122126
if self._compiled_cursor_path:
123-
row_values = find_values(self._compiled_cursor_path, row)
124-
if row_values:
125-
row_value = row_values[0]
127+
# ignores the other found values, e.g. when the path is $data.items[*].created_at
128+
try:
129+
row_value = find_values(self._compiled_cursor_path, row)[0]
130+
except IndexError:
131+
# empty list so raise a proper exception
132+
row_value = None
133+
key_exc = IncrementalCursorPathMissing
126134
else:
127135
try:
128-
row_value = row[self.cursor_path]
129-
except Exception:
130-
pass
131-
if row_value is None:
132-
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
133-
return row_value
136+
try:
137+
row_value = row[self.cursor_path]
138+
except TypeError:
139+
# supports Pydantic models and other classes
140+
row_value = getattr(row, self.cursor_path)
141+
except (KeyError, AttributeError):
142+
# attr not found so raise a proper exception
143+
row_value = None
144+
key_exc = IncrementalCursorPathMissing
145+
146+
# if we have a value - return it
147+
if row_value is not None:
148+
return row_value
149+
150+
if self.on_cursor_value_missing == "raise":
151+
# raise missing path or None value exception
152+
raise key_exc(self.resource_name, self.cursor_path, row)
153+
elif self.on_cursor_value_missing == "exclude":
154+
return None
134155

135156
def __call__(
136157
self,
@@ -144,6 +165,12 @@ def __call__(
144165
return row, False, False
145166

146167
row_value = self.find_cursor_value(row)
168+
if row_value is None:
169+
if self.on_cursor_value_missing == "exclude":
170+
return None, False, False
171+
else:
172+
return row, False, False
173+
147174
last_value = self.last_value
148175
last_value_func = self.last_value_func
149176

@@ -299,6 +326,7 @@ def __call__(
299326

300327
# TODO: Json path support. For now assume the cursor_path is a column name
301328
cursor_path = self.cursor_path
329+
302330
# The new max/min value
303331
try:
304332
# NOTE: datetimes are always pendulum in UTC
@@ -310,11 +338,16 @@ def __call__(
310338
self.resource_name,
311339
cursor_path,
312340
tbl,
313-
f"Column name {cursor_path} was not found in the arrow table. Not nested JSON paths"
341+
f"Column name `{cursor_path}` was not found in the arrow table. Nested JSON paths"
314342
" are not supported for arrow tables and dataframes, the incremental cursor_path"
315343
" must be a column name.",
316344
) from e
317345

346+
if tbl.schema.field(cursor_path).nullable:
347+
tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl)
348+
349+
tbl = tbl_without_null
350+
318351
# If end_value is provided, filter to include table rows that are "less" than end_value
319352
if self.end_value is not None:
320353
try:
@@ -396,12 +429,28 @@ def __call__(
396429
)
397430
)
398431

432+
# drop the temp unique index before concat and returning
433+
if "_dlt_index" in tbl.schema.names:
434+
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])
435+
436+
if self.on_cursor_value_missing == "include":
437+
if isinstance(tbl, pa.RecordBatch):
438+
assert isinstance(tbl_with_null, pa.RecordBatch)
439+
tbl = pa.Table.from_batches([tbl, tbl_with_null])
440+
else:
441+
tbl = pa.concat_tables([tbl, tbl_with_null])
442+
399443
if len(tbl) == 0:
400444
return None, start_out_of_range, end_out_of_range
401-
try:
402-
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])
403-
except KeyError:
404-
pass
405445
if is_pandas:
406-
return tbl.to_pandas(), start_out_of_range, end_out_of_range
446+
tbl = tbl.to_pandas()
407447
return tbl, start_out_of_range, end_out_of_range
448+
449+
def _process_null_at_cursor_path(self, tbl: "pa.Table") -> Tuple["pa.Table", "pa.Table"]:
450+
mask = pa.compute.is_valid(tbl[self.cursor_path])
451+
rows_without_null = tbl.filter(mask)
452+
rows_with_null = tbl.filter(pa.compute.invert(mask))
453+
if self.on_cursor_value_missing == "raise":
454+
if rows_with_null.num_rows > 0:
455+
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path)
456+
return rows_without_null, rows_with_null

dlt/extract/incremental/typing.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
from typing import TypedDict, Optional, Any, List, TypeVar, Callable, Sequence
1+
from typing import TypedDict, Optional, Any, List, Literal, TypeVar, Callable, Sequence
22

33

44
TCursorValue = TypeVar("TCursorValue", bound=Any)
55
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
6+
OnCursorValueMissing = Literal["raise", "include", "exclude"]
67

78

89
class IncrementalColumnState(TypedDict):

docs/website/docs/general-usage/incremental-loading.md

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ than `end_value`.
689689

690690
:::caution
691691
In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close
692-
generator associated with a row that is out of range. You can still use still call `can_close()` method on
692+
generator associated with a row that is out of range. You can still call the `can_close()` method on
693693
incremental and exit yield loop when true.
694694
:::
695695

@@ -907,22 +907,75 @@ Consider the example below for reading incremental loading parameters from "conf
907907
```
908908
`id_after` incrementally stores the latest `cursor_path` value for future pipeline runs.
909909

910-
### Loading NULL values in the incremental cursor field
910+
### Loading when incremental cursor path is missing or value is None/NULL
911911

912-
When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`.
913-
For example, the following source data will raise an error:
912+
You can customize the incremental processing of dlt by setting the parameter `on_cursor_value_missing`.
913+
914+
When loading incrementally with the default settings, there are two assumptions:
915+
1. each row contains the cursor path
916+
2. each row is expected to contain a value at the cursor path that is not `None`.
917+
918+
For example, the two following source data will raise an error:
914919
```py
915920
@dlt.resource
916-
def some_data(updated_at=dlt.sources.incremental("updated_at")):
921+
def some_data_without_cursor_path(updated_at=dlt.sources.incremental("updated_at")):
917922
yield [
918923
{"id": 1, "created_at": 1, "updated_at": 1},
919-
{"id": 2, "created_at": 2, "updated_at": 2},
924+
{"id": 2, "created_at": 2}, # cursor field is missing
925+
]
926+
927+
list(some_data_without_cursor_path())
928+
929+
@dlt.resource
930+
def some_data_without_cursor_value(updated_at=dlt.sources.incremental("updated_at")):
931+
yield [
932+
{"id": 1, "created_at": 1, "updated_at": 1},
933+
{"id": 3, "created_at": 4, "updated_at": None}, # value at cursor field is None
934+
]
935+
936+
list(some_data_without_cursor_value())
937+
```
938+
939+
940+
To process a data set where some records do not include the incremental cursor path or where the values at the cursor path are `None,` there are the following four options:
941+
942+
1. Configure the incremental load to raise an exception in case there is a row where the cursor path is missing or has the value `None` using `incremental(..., on_cursor_value_missing="raise")`. This is the default behavior.
943+
2. Configure the incremental load to tolerate the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="include")`.
944+
3. Configure the incremental load to exclude the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="exclude")`.
945+
4. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None`. [See docs below](#transform-records-before-incremental-processing)
946+
947+
Here is an example of including rows where the incremental cursor value is missing or `None`:
948+
```py
949+
@dlt.resource
950+
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="include")):
951+
yield [
952+
{"id": 1, "created_at": 1, "updated_at": 1},
953+
{"id": 2, "created_at": 2},
954+
{"id": 3, "created_at": 4, "updated_at": None},
955+
]
956+
957+
result = list(some_data())
958+
assert len(result) == 3
959+
assert result[1] == {"id": 2, "created_at": 2}
960+
assert result[2] == {"id": 3, "created_at": 4, "updated_at": None}
961+
```
962+
963+
If you do not want to import records without the cursor path or where the value at the cursor path is `None` use the following incremental configuration:
964+
965+
```py
966+
@dlt.resource
967+
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="exclude")):
968+
yield [
969+
{"id": 1, "created_at": 1, "updated_at": 1},
970+
{"id": 2, "created_at": 2},
920971
{"id": 3, "created_at": 4, "updated_at": None},
921972
]
922973

923-
list(some_data())
974+
result = list(some_data())
975+
assert len(result) == 1
924976
```
925977

978+
### Transform records before incremental processing
926979
If you want to load data that includes `None` values you can transform the records before the incremental processing.
927980
You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data).
928981

@@ -1162,4 +1215,4 @@ sources:
11621215
}
11631216
```
11641217

1165-
Verify that the `last_value` is updated between pipeline runs.
1218+
Verify that the `last_value` is updated between pipeline runs.

0 commit comments

Comments
 (0)