Skip to content

Conversation

@rudolfix
Copy link
Collaborator

@rudolfix rudolfix commented Jun 9, 2025

Description

This PR documents and tests two methods of splitting pipeline runs in smaller chunks that may be useful for backfilling:

  • partition loading: where source data is partitioned into several ranges that may be independently loaded
  • split loading: where source data is split into several sequential ranges using limit item

On top of that a few improvements are added:

  • filesystem source follows sort_order and may be used for split loading
  • LimitItem allows to count rows (not only yields/batches)
  • several improvements to root_key propagation: (1) simplified implementation (2) allows to use parent_key if nesting level < 2 (3) does not enable root_key on scd2 (4) additional tests and docs

see commit log for more

@netlify
Copy link

netlify bot commented Jun 9, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit b9bb4dc
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/68ce797ce868280008c9ca19

@github-actions
Copy link

⚠️ Possible file(s) that should be tracked in LFS detected ⚠️

    The following file(s) exceeds the file size limit: 50000 bytes, as set in the .yml configuration files:

    docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md

    Consider using git-lfs to manage large files.

@github-actions github-actions bot added the lfs-detected! (automation) large files were committed to the PR label Jul 11, 2025
@github-actions
Copy link

⚠️ Possible file(s) that should be tracked in LFS detected ⚠️

    The following file(s) exceeds the file size limit: 50000 bytes, as set in the .yml configuration files:

    docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md

    Consider using git-lfs to manage large files.

@github-actions github-actions bot removed the lfs-detected! (automation) large files were committed to the PR label Jul 11, 2025
@rudolfix rudolfix force-pushed the feat/explains-partition-and-split-loading branch from 283e6fc to 72af356 Compare September 12, 2025 22:38
@rudolfix rudolfix marked this pull request as ready for review September 14, 2025 19:28
@rudolfix rudolfix force-pushed the feat/explains-partition-and-split-loading branch from d84391c to cfc2bbd Compare September 15, 2025 10:05
@rudolfix rudolfix self-assigned this Sep 15, 2025
@rudolfix rudolfix added the ci full Use to trigger CI on a PR for full load tests label Sep 15, 2025
@rudolfix rudolfix force-pushed the feat/explains-partition-and-split-loading branch from cfc2bbd to 731a3b8 Compare September 15, 2025 17:25
@rudolfix rudolfix requested a review from anuunchin September 15, 2025 17:27
Copy link
Contributor

@anuunchin anuunchin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧠

file_glob (str, optional): The filter to apply to the files in glob format. by default lists all files in bucket_url non-recursively
kwargs: (Optional[Dict[str, Any]], optional): Additional arguments passed to fsspec constructor ie. dict(use_ssl=True) for s3fs
client_kwargs: (Optional[Dict[str, Any]], optional): Additional arguments passed to underlying fsspec native client ie. dict(verify="public.crt) for botocore
incremental (Optional[dlt.sources.incremental[Any]]): defines incremental cursor on listed files, with `modification_date`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
incremental (Optional[dlt.sources.incremental[Any]]): defines incremental cursor on listed files, with `modification_date`
incremental (Optional[dlt.sources.incremental[Any]]): Defines incremental cursor on listed files, with `modification_date`

client_kwargs: (Optional[Dict[str, Any]]): Additional arguments passed to underlying fsspec native client ie. dict(verify="public.crt) for botocore
kwargs (Optional[Dict[str, Any]]): Additional arguments passed to fsspec constructor ie. dict(use_ssl=True) for s3fs
client_kwargs (Optional[Dict[str, Any]]): Additional arguments passed to underlying fsspec native client ie. dict(verify="public.crt) for botocore
incremental (Optional[dlt.sources.incremental[Any]]): defines incremental cursor on listed files, with `modification_date`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
incremental (Optional[dlt.sources.incremental[Any]]): defines incremental cursor on listed files, with `modification_date`
incremental (Optional[dlt.sources.incremental[Any]]): Defines incremental cursor on listed files, with `modification_date`

state_only: bool = False,
sources: Optional[Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]]] = None,
) -> _DropResult:
# sources: Optional[Union[Iterable[Union[str, TSimpleRegex]], Union[str, TSimpleRegex]]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we removing the source regex, because we're abandoning the idea of multiple sources per schema? 👀

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afair (correctly), the multiple sources per schema idea was one of the future todo's in one of the tests

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code had no effect. right now there's 1:1 between schema and a source. so selecting many resources when schema is defined does not make sense

Comment on lines +101 to +102
while not pipeline.run(incremental_table.add_limit(2)).is_empty:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so cool!

3. **Set the comparison direction in the query**. By default greater than or equal op (**>=**) is used to compare initial/previous value with row column value. You can change it with `last_value_func` argument (**max**/**min**).
4. **Set if the comparison is inclusive or exclusive**. By default the range is closed (equal values are included). [Look here for explanation and examples](advanced.md#inclusive-and-exclusive-filtering). Note that for closed ranges `dlt` will use [internal deduplication](../../../general-usage/incremental/cursor.md#deduplicate-overlapping-ranges) which adds some processing cost.
4. **Configure backfill options(optional)**. You can use `end_value` with `range_end` to read data from specified range. You can also control **order returned rows**
to split long incremental loading into many chunks by time and row count. [Look here for details and examples]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the part with order returned rows should be a separate point, or no? 👀

Comment on lines +239 to +241
3. You can fix your nested tables in both staging and final datasets. Add `_dlt_root_id` to all nested tables and copy data
from related [root (top level) tables](../general-usage/schema.md#nested-references-root-and-nested-tables) `_dlt_id` (`row_key`).
In that case `dlt` will update pipeline schema but will skip database migration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But as a general rule, should we be endorsing approaches that involve manual tampering inside the destination outside of dlt's scope in cases like these? (i just feel like the first two approaches are more dlt- idiomatic 👀 )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right! but sometimes you want to keep your data and (2) does not apply. it would be cool to have a tool that propagates root key post factum. but we do not have it :)

@cloudflare-workers-and-pages
Copy link

cloudflare-workers-and-pages bot commented Sep 19, 2025

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Updated (UTC)
❌ Deployment failed
View logs
docs b9bb4dc Sep 20 2025, 09:57 AM

@rudolfix rudolfix merged commit 6f01555 into devel Sep 20, 2025
11 of 13 checks passed
@rudolfix rudolfix deleted the feat/explains-partition-and-split-loading branch September 20, 2025 09:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci full Use to trigger CI on a PR for full load tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants