Skip to content

Commit 283e6fc

Browse files
committed
root key WIP
1 parent fdbcd49 commit 283e6fc

File tree

15 files changed

+292
-43
lines changed

15 files changed

+292
-43
lines changed

dlt/common/schema/schema.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,14 @@ def update_schema(self, schema: "Schema") -> None:
357357
self._settings = deepcopy(schema.settings)
358358
# make shallow copy of normalizer settings
359359
self._configure_normalizers(copy(schema._normalizers_config))
360+
self.data_item_normalizer.extend_schema()
360361
self._compile_settings()
361-
# update all tables
362-
for table in schema.tables.values():
363-
self.update_table(table)
362+
# update all tables starting for parents and then nested tables in order
363+
tables = list(schema.tables.values())
364+
for table in tables:
365+
if not utils.is_nested_table(table):
366+
for chain_table in utils.get_nested_tables(schema._schema_tables, table["name"]):
367+
self.update_table(chain_table)
364368

365369
def drop_tables(
366370
self, table_names: Sequence[str], seen_data_only: bool = False
@@ -766,6 +770,7 @@ def update_normalizers(self) -> None:
766770
as textual parts can be extracted from an expression.
767771
"""
768772
self._configure_normalizers(configured_normalizers(schema_name=self._schema_name))
773+
self.data_item_normalizer.extend_schema()
769774
self._compile_settings()
770775

771776
def will_update_normalizers(self) -> bool:
@@ -1042,7 +1047,6 @@ def _configure_normalizers(self, explicit_normalizers: TNormalizersConfig) -> No
10421047
self._replace_and_apply_naming(normalizers_config, to_naming, self.naming)
10431048
# data item normalization function
10441049
self.data_item_normalizer = item_normalizer_class(self)
1045-
self.data_item_normalizer.extend_schema()
10461050

10471051
def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> None:
10481052
self._schema_tables: TSchemaTables = {}
@@ -1072,6 +1076,7 @@ def _reset_schema(self, name: str, normalizers: TNormalizersConfig = None) -> No
10721076
if not normalizers:
10731077
normalizers = configured_normalizers(schema_name=self._schema_name)
10741078
self._configure_normalizers(normalizers)
1079+
self.data_item_normalizer.extend_schema() # type: ignore[attr-defined]
10751080
# add version tables
10761081
self._add_standard_tables()
10771082
# compile all known regexes

dlt/destinations/impl/mssql/sql_client.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,6 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
180180
pass
181181
raise outer
182182
finally:
183-
# clear all pending result sets
184-
while curr.nextset():
185-
pass
186183
# always close cursor
187184
curr.close()
188185

dlt/extract/items_transform.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,20 +167,24 @@ def bind(self, pipe: SupportsPipe) -> "LimitItem":
167167

168168
def limit(self, chunk_size: int) -> Optional[int]:
169169
"""Calculate the maximum number of rows to which result is limited. Limit works in chunks
170-
that controlled by the data source and this must be provided in `chunk_size`
170+
that controlled by the data source and this must be provided in `chunk_size`.
171+
`chunk_size` will be ignore if counting rows (`count_rows` is `True`). Mind that
172+
this count method will not split batches so you may get more items (up to the full last batch)
173+
than `limit` method indicates.
171174
"""
172175
if self.max_items in (None, -1):
173176
return None
174-
return self.max_items * chunk_size
177+
return self.max_items * (1 if self.count_rows else chunk_size)
175178

176179
def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
177180
row_count = count_rows_in_items(item)
178181
if row_count > 0:
179182
self.count += row_count if self.count_rows else 1
180183

181184
# detect when the limit is reached, max time or yield count
185+
# self.max_items < 0 disables the limit on max items (legacy)
182186
if (
183-
(self.count >= self.max_items)
187+
(self.count >= self.max_items and self.max_items >= 0)
184188
or (self.max_time and time.time() - self.start_time > self.max_time)
185189
or self.max_items == 0
186190
):

dlt/extract/source.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext, configspec
2222
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
2323
from dlt.common.normalizers.json.relational import DataItemNormalizer as RelationalNormalizer
24+
from dlt.common.normalizers.json.typing import RelationalNormalizerConfig
2425
from dlt.common.schema import Schema
2526
from dlt.common.schema.typing import TColumnName, TSchemaContract
2627
from dlt.common.schema.utils import normalize_table_identifiers
@@ -390,7 +391,6 @@ def from_data(cls, schema: Schema, section: str, data: Any) -> Self:
390391
def name(self) -> str:
391392
return self._schema.name
392393

393-
# TODO: max_table_nesting/root_key below must go somewhere else ie. into RelationalSchema which is Schema + Relational normalizer.
394394
@property
395395
def max_table_nesting(self) -> int:
396396
"""A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON."""
@@ -412,30 +412,38 @@ def root_key(self) -> Optional[bool]:
412412
413413
"""
414414
# this also check the normalizer type
415-
config = RelationalNormalizer.get_normalizer_config(self._schema).get("propagation")
416-
data_normalizer = self._schema.data_item_normalizer
417-
assert isinstance(data_normalizer, RelationalNormalizer)
418-
return config.get("root_key_propagation") # type: ignore[return-value]
415+
config = RelationalNormalizer.get_normalizer_config(self._schema)
416+
is_root_key = config.get("root_key_propagation")
417+
if is_root_key is None:
418+
# if not found get legacy value
419+
is_root_key = self._get_root_key_legacy(config)
420+
if is_root_key:
421+
# set the root key if legacy value set
422+
self.root_key = True
423+
return is_root_key
419424

420425
@root_key.setter
421426
def root_key(self, value: bool) -> None:
422427
# this also check the normalizer type
423428
config = RelationalNormalizer.get_normalizer_config(self._schema)
429+
if value is None:
430+
value = self._get_root_key_legacy(config)
431+
if value is not None:
432+
RelationalNormalizer.update_normalizer_config(
433+
self._schema,
434+
{"root_key_propagation": value},
435+
)
436+
437+
def _get_root_key_legacy(self, config: RelationalNormalizerConfig) -> Optional[bool]:
424438
data_normalizer = self._schema.data_item_normalizer
425439
assert isinstance(data_normalizer, RelationalNormalizer)
426-
427440
# we must remove root key propagation
428441
with contextlib.suppress(KeyError):
429442
propagation_config = config["propagation"]
430443
propagation_config["root"].pop(data_normalizer.c_dlt_id)
431444
# and set the value below
432-
value = True
433-
434-
if value is not None:
435-
RelationalNormalizer.update_normalizer_config(
436-
self._schema,
437-
{"root_key_propagation": value},
438-
)
445+
return True
446+
return None
439447

440448
@property
441449
def schema_contract(self) -> TSchemaContract:

dlt/sources/filesystem/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,14 @@ def filesystem( # noqa DOC
111111
# NOTE: fsspec glob for buckets reads all files before running iterator
112112
# so below we do not have real batching anyway
113113
if incremental and incremental.row_order:
114+
reverse = (incremental.row_order == "asc" and incremental.last_value_func is min) or (
115+
incremental.row_order == "desc" and incremental.last_value_func is max
116+
)
114117
iter_ = iter(
115118
sorted(
116119
list(glob_files(fs_client, bucket_url, file_glob)),
117120
key=lambda f_: f_[incremental.cursor_path], # type: ignore[literal-required]
118-
reverse=incremental.row_order == "desc",
121+
reverse=reverse,
119122
)
120123
)
121124

docs/website/docs/dlt-ecosystem/verified-sources/filesystem/basic.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,8 @@ print(load_info)
439439
### 6. Split large incremental loads
440440
If you have many files to process or they are large you may choose to split pipeline runs into smaller chunks (where single file is the smallest). There are
441441
two methods to do that:
442+
* Partitioning where divide your source data in several ranges and load them (possibly in parallel) and then continue to load data incrementally.
443+
* Split where you load data sequentially in small chunks
442444

443445
Partitioning works as follows:
444446
1. Obtain a list of files ie. by just listing your resource `files = list(filesystem(...))`

docs/website/docs/general-usage/incremental/cursor.md

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ Note that dlt's incremental filtering considers the ranges half-closed. `initial
146146
With the `row_order` argument set, dlt will stop retrieving data from the data source (e.g., GitHub API) if it detects that the values of the cursor field are out of the range of **start** and **end** values.
147147

148148
In particular:
149-
* dlt stops processing when the resource yields any item with a cursor value _equal to or greater than_ the `end_value` and `row_order` is set to **asc**. (`end_value` is not included)
149+
* dlt stops processing when the resource yields any item with a cursor value _equal to or greater than_ the `end_value` and `row_order` is set to **asc**. (`end_value` is not included, also see )
150150
* dlt stops processing when the resource yields any item with a cursor value _lower_ than the `last_value` and `row_order` is set to **desc**. (`last_value` is included)
151151

152152
:::note
@@ -215,7 +215,6 @@ def tickets(
215215
"updated_at",
216216
initial_value="2023-01-01T00:00:00Z",
217217
end_value="2023-02-01T00:00:00Z",
218-
row_order="asc"
219218
),
220219
):
221220
for page in zendesk_client.get_pages(
@@ -229,7 +228,48 @@ def tickets(
229228
```
230229
:::
231230

232-
## Deduplicate overlapping ranges with primary key
231+
## Partition large loads
232+
You can execute a backfill on large amount of data by partitioning it into smaller fragments. Best case is if you can partition.
233+
234+
235+
:::Note
236+
237+
238+
## Split large loads into chunks
239+
You can split large incremental resources into smaller chunks and load them sequentially. This way you'll see the data quicker and
240+
in case of loading error you are able to retry a single chunk. **This method works only if your source returns data in deterministic order**, for example:
241+
* you can request your REST API endpoint to return data ordered by `updated_at`.
242+
* you use `row_order` on one of supported sources like `sql_database` or `filesystem`.
243+
244+
Below we go for the second option and load data from messages table that we order on `created_at` column.
245+
```py
246+
import dlt
247+
from dlt
248+
249+
pipeline = dlt.pipeline("test_load_sql_table_split_loading", destination="duckdb")
250+
251+
messages = sql_table(
252+
table="chat_message",
253+
incremental=dlt.sources.incremental(
254+
"created_at",
255+
row_order="asc", # critical to set row_order when doing split loading
256+
range_start="open", # use open range to disable deduplication
257+
),
258+
)
259+
260+
# produce chunk each minute
261+
while pipeline.run(messages.add_limit(max_time=60)).has_data:
262+
pass
263+
```
264+
Note how we combine `incremental` and `add_limit` to generate chunk each minute. If you create and index on `created_at`, the database
265+
engine will be able to stream data using the index without the need to scan the whole table.
266+
267+
:::caution
268+
If your source returns unordered data, you will most probably miss some data items or load them twice.
269+
:::
270+
271+
272+
## Deduplicate overlapping ranges
233273

234274
`Incremental` **does not** deduplicate datasets like the **merge** write disposition does. However, it ensures that when another portion of data is extracted, records that were previously loaded **at the end of range** won't be included again. `dlt` assumes that you load a range of data, where the lower bound is inclusive by default (i.e., greater than or equal). This ensures that you never lose any data but will also re-acquire some rows. For example, if you have a database table with a cursor field on `updated_at` which has a day resolution, then there's a high chance that after you extract data on a given day, more records will still be added. When you extract on the next day, you should reacquire data from the last day to ensure all records are present; however, this will create overlap with data from the previous extract.
235275

docs/website/docs/general-usage/resource.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ You can also set the limit to `0` for the resource to not yield any items.
504504
You can use `add_limit` to split incremental resources that process large data into manageable chunks:
505505
```py
506506
```
507-
splits loading of `issues` table into one hour chunks that are loaded in a loop. You'll see your data quicker without impacting the performance.
507+
splits loading of `issues` table into 10 minute chunks that are loaded in a loop. You'll see your data quicker without impacting the performance.
508508
Note **row_order** above! this makes sure that your table rows are returned deterministically so `dlt` can process consecutive chunks without
509509
losing data. Mind that ordering results may increase load on the database server. [Please read about other backfill strategies]
510510

tests/common/runtime/test_telemetry.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,19 +133,18 @@ def test_telemetry_endpoint_exceptions(
133133
def test_track_anon_event(
134134
mocker: MockerFixture, disable_temporary_telemetry: RuntimeConfiguration
135135
) -> None:
136-
from dlt.sources.helpers import requests
137136
from dlt.common.runtime import anon_tracker
138137

139138
mock_github_env(os.environ)
140139
mock_pod_env(os.environ)
141140
SENT_ITEMS.clear()
142141
config = SentryLoggerConfiguration()
143142

144-
requests_post = mocker.spy(requests, "post")
145-
146143
props = {"destination_name": "duckdb", "elapsed_time": 712.23123, "success": True}
147144
with patch("dlt.common.runtime.anon_tracker.before_send", _mock_before_send):
148145
start_test_telemetry(config)
146+
# requests client created on start telemetry
147+
requests_post = mocker.spy(anon_tracker.requests, "post")
149148
track("pipeline", "run", props)
150149
# this will send stuff
151150
disable_anon_tracker()

tests/common/schema/test_normalize_identifiers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ def test_update_schema_normalizer_props() -> None:
453453
schema = make_issues_schema_for_normalizers_update()
454454
schema_2 = make_issues_schema_for_normalizers_update()
455455
# remove issues table
456-
del schema_2._schema_tables["issues"]
456+
schema_2.drop_tables(["issues"])
457457
schema_2.update_schema(schema)
458458

459459
os.environ["SCHEMA__NAMING"] = "tests.common.cases.normalizers.sql_upper"

0 commit comments

Comments
 (0)