You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The `dlt` destination decorator allows you to receive all data passing through your pipeline in a simple function. This can be extremely useful for
10
-
reverse ETL, where you are pushing data back to an API.
9
+
The `dlt` destination decorator allows you to receive all data passing through your pipeline in a simple function. This can be extremely useful for reverse ETL, where you are pushing data back to an API.
11
10
12
-
You can also use this for sending data to a queue or a simple database destination that is not
13
-
yet supported by `dlt`, be aware that you will have to manually handle your own migrations in this case.
11
+
You can also use this for sending data to a queue or a simple database destination that is not yet supported by `dlt`, although be aware that you will have to manually handle your own migrations in this case.
14
12
15
-
It will also allow you to simply get a path
16
-
to the files of your normalized data, so if you need direct access to parquet or jsonl files to copy them somewhere or push them to a database,
17
-
you can do this here too.
13
+
It will also allow you to simply get a path to the files of your normalized data. So, if you need direct access to parquet or jsonl files to copy them somewhere or push them to a database, you can do this here too.
18
14
19
15
## Install `dlt` for reverse ETL
20
16
@@ -25,8 +21,7 @@ pip install dlt
25
21
26
22
## Set up a destination function for your pipeline
27
23
28
-
The custom destination decorator differs from other destinations in that you do not need to provide connection credentials, but rather you provide a function which gets called for all items loaded during a pipeline run or load operation.
29
-
With the `@dlt.destination` you can convert any function that takes two arguments into a `dlt` destination.
24
+
The custom destination decorator differs from other destinations in that you do not need to provide connection credentials, but rather you provide a function which gets called for all items loaded during a pipeline run or load operation. With the `@dlt.destination`, you can convert any function that takes two arguments into a `dlt` destination.
30
25
31
26
A very simple dlt pipeline that pushes a list of items into a destination function might look like this:
1. You can also remove the typing information (`TDataItems` and `TTableSchema`) from this example, typing generally are useful to know the shape of the incoming objects though.
43
+
1. You can also remove the typing information (`TDataItems` and `TTableSchema`) from this example. Typing is generally useful to know the shape of the incoming objects, though.
49
44
2. There are a few other ways for declaring custom destination functions for your pipeline described below.
50
45
:::
51
46
52
-
## `@dlt.destination`, custom destination function and signature
47
+
###`@dlt.destination`, custom destination function, and signature
53
48
54
49
The full signature of the destination decorator plus its function is the following:
* The `batch_size` parameter on the destination decorator defines how many items per function call are batched together and sent as an array. If you set a batch-size of `0`,
71
-
instead of passing in actual dataitems, you will receive one call per load job with the path of the file as the items argument. You can then open and process that file
72
-
in any way you like.
73
-
* The `loader_file_format` parameter on the destination decorator defines in which format files are stored in the load package before being sent to the destination function,
74
-
this can be `jsonl` or `parquet`.
75
-
* The `name` parameter on the destination decorator defines the name of the destination that get's created by the destination decorator.
76
-
* The `naming_convention` parameter on the destination decorator defines the name of the destination that gets created by the destination decorator. This controls how table and column names are normalized. The default is `direct` which will keep all names the same.
64
+
### Decorator arguments
65
+
* The `batch_size` parameter on the destination decorator defines how many items per function call are batched together and sent as an array. If you set a batch-size of `0`, instead of passing in actual data items, you will receive one call per load job with the path of the file as the items argument. You can then open and process that file in any way you like.
66
+
* The `loader_file_format` parameter on the destination decorator defines in which format files are stored in the load package before being sent to the destination function. This can be `jsonl` or `parquet`.
67
+
* The `name` parameter on the destination decorator defines the name of the destination that gets created by the destination decorator.
68
+
* The `naming_convention` parameter on the destination decorator defines the name of the destination that gets created by the destination decorator. This controls how table and column names are normalized. The default is `direct`, which will keep all names the same.
77
69
* The `max_nesting_level` parameter on the destination decorator defines how deep the normalizer will go to normalize complex fields on your data to create subtables. This overwrites any settings on your `source` and is set to zero to not create any nested tables by default.
78
-
* The `skip_dlt_columns_and_tables` parameter on the destination decorator defines wether internal tables and columns will be fed into the custom destination function. This is set to `True` by default.
70
+
* The `skip_dlt_columns_and_tables` parameter on the destination decorator defines whether internal tables and columns will be fed into the custom destination function. This is set to `True` by default.
79
71
80
72
:::note
81
-
* The custom destination sets the `max_nesting_level` to 0 by default, which means no subtables will be generated during the normalization phase.
82
-
* The custom destination also skips all internal tables and columns by default, if you need these, set `skip_dlt_columns_and_tables` to False.
73
+
Settings above make sure that shape of the data you receive in the destination function is as close as possible to what you see in the data source.
74
+
75
+
* The custom destination sets the `max_nesting_level` to 0 by default, which means no sub-tables will be generated during the normalization phase.
76
+
* The custom destination also skips all internal tables and columns by default. If you need these, set `skip_dlt_columns_and_tables` to False.
83
77
:::
84
78
85
79
### Custom destination function
86
80
* The `items` parameter on the custom destination function contains the items being sent into the destination function.
87
-
* The `table` parameter contains the schema table the current call belongs to including all table hints and columns. For example, the table name can be accessed with `table["name"]`.
81
+
* The `table` parameter contains the schema table the current call belongs to, including all table hints and columns. For example, the table name can be accessed with `table["name"]`.
88
82
* You can also add config values and secrets to the function arguments, see below!
89
83
90
-
91
-
## Adding config variables and secrets
84
+
## Add configuration, credentials and other secret to the destination function
92
85
The destination decorator supports settings and secrets variables. If you, for example, plan to connect to a service that requires an API secret or a login, you can do the following:
93
86
94
87
```py
@@ -104,25 +97,11 @@ You can then set a config variable in your `.dlt/secrets.toml`: like so:
104
97
api_key="<my-api-key>"
105
98
```
106
99
107
-
## Destination state
108
-
109
-
The destination keeps a local record of how many `DataItems` were processed, so if you, for example, use the custom destination to push `DataItems` to a remote API, and this
110
-
API becomes unavailable during the load resulting in a failed `dlt` pipeline run, you can repeat the run of your pipeline at a later stage and the destination will continue
111
-
where it left of. For this reason, it makes sense to choose a batch size that you can process in one transaction (say one API request or one database transaction) so that if this
112
-
request or transaction fails repeatedly, you can repeat it at the next run without pushing duplicate data to your remote location.
113
-
114
-
## Concurrency
100
+
Custom destinations follow the same configuration rules as [regular named destinations](../../general-usage/destination.md#configure-a-destination)
115
101
116
-
Calls to the destination function by default will be executed on multiple threads, so you need to make sure you are not using any non-thread-safe nonlocal or global variables from outside
117
-
your destination function. If you need to have all calls be executed from the same thread, you can set the `workers` config variable of the load step to 1.
102
+
## Use the custom destination in `dlt` pipeline
118
103
119
-
:::tip
120
-
For performance reasons, we recommend keeping the multithreaded approach and making sure that you, for example, are using threadsafe connection pools to a remote database or queue.
121
-
:::
122
-
123
-
## Referencing the destination function
124
-
125
-
There are multiple ways to reference the custom destination function you want to use:
104
+
There are multiple ways to pass the custom destination function to `dlt` pipeline:
126
105
- Directly reference the destination function
127
106
128
107
```py
@@ -133,13 +112,24 @@ There are multiple ways to reference the custom destination function you want to
133
112
# reference function directly
134
113
p = dlt.pipeline("my_pipe", destination=local_destination_func)
135
114
```
136
-
- Directly via destination reference. In this case, don't use decorator for the destination function.
115
+
116
+
Like for [regular destinations](../../general-usage/destination.md#pass-explicit-credentials), you are allowed to pass configuration and credentials
@@ -151,7 +141,7 @@ There are multiple ways to reference the custom destination function you want to
151
141
)
152
142
)
153
143
```
154
-
- Via fully qualified string to function location (can be used from `config.toml` or ENV vars). Destination function should be located in another file.
144
+
- Via a fully qualified string to function location (can be used from `config.toml` or ENV vars). The destination function should be located in another file.
155
145
```py
156
146
# file my_pipeline.py
157
147
@@ -166,6 +156,27 @@ There are multiple ways to reference the custom destination function you want to
166
156
)
167
157
```
168
158
159
+
## Adjust batch size and retry policy for atomic loads
160
+
The destination keeps a local record of how many `DataItems` were processed, so if you, for example, use the custom destination to push `DataItems` to a remote API, and this
161
+
API becomes unavailable during the load resulting in a failed `dlt` pipeline run, you can repeat the run of your pipeline at a later moment and the custom destination will **restart from the whole batch that failed**. We are preventing any data from being lost, but you can still get duplicated data if you committed half of the batch ie. to a database and then failed.
162
+
**Keeping the batch atomicity is on you**. For this reason it makes sense to choose a batch size that you can process in one transaction (say one api request or one database transaction) so that if this request or transaction fail repeatedly you can repeat it at the next run without pushing duplicate data to your remote location. For systems that
163
+
are not transactional and do not tolerate duplicated data, you can use batch of size 1.
164
+
165
+
Destination functions that raise exceptions are retried 5 times before giving up (`load.raise_on_max_retries` config option). If you run the pipeline again, it will resume loading before extracting new data.
166
+
167
+
If your exception derives from `DestinationTerminalException`, the whole load job will be marked as failed and not retried again.
168
+
169
+
:::caution
170
+
If you wipe out the pipeline folder (where job files and destination state are saved) you will not be able to restart from the last failed batch.
171
+
However, it is fairly easy to backup and restore the pipeline directory, [see details below](#manage-pipeline-state-for-incremental-loading).
172
+
:::
173
+
174
+
## Increase or decrease loading parallelism
175
+
Calls to the destination function by default will be executed on multiple threads, so you need to make sure you are not using any non-thread-safe nonlocal or global variables from outside your destination function. If you need to have all calls be executed from the same thread, you can set the `workers`[config variable of the load step](../../reference/performance.md#load) to 1.
176
+
177
+
:::tip
178
+
For performance reasons, we recommend keeping the multithreaded approach and making sure that you, for example, are using threadsafe connection pools to a remote database or queue.
179
+
:::
169
180
170
181
## Write disposition
171
182
@@ -175,3 +186,15 @@ There are multiple ways to reference the custom destination function you want to
175
186
176
187
`@dlt.destination` does not support staging files in remote locations before being called at this time. If you need this feature, please let us know.
177
188
189
+
## Manage pipeline state for incremental loading
190
+
Custom destinations do not have a general mechanism to restore pipeline state. This will impact data sources that rely on the state being kept ie. all incremental resources.
191
+
If you wipe the pipeline directory (ie. by deleting a folder or running on AWS lambda / Github Actions where you get a clean runner) the progress of the incremental loading is lost. On the next run you will re-acquire the data from the beginning.
192
+
193
+
While we are working on a pluggable state storage you can fix the problem above by:
194
+
1. Not wiping the pipeline directory. For example if you run your pipeline on an EC instance periodically, the state will be preserved.
195
+
2. By doing a restore/backup of the pipeline directory before/after it runs. This is way easier than it sounds and [here's a script you can reuse](https://gist.github.com/rudolfix/ee6e16d8671f26ac4b9ffc915ad24b6e).
196
+
197
+
## What's next
198
+
199
+
* Check out our [Custom BigQuery Destination](../../examples/custom_destination_bigquery/) example.
200
+
* Need help with building a custom destination? Ask your questions in our [Slack Community](https://dlthub.com/community) technical help channel.
You can implement [your own destination](../walkthroughs/create-new-destination.md) and pass the destination class type or instance to `dlt` pipeline.
174
+
## Create new destination
175
+
You have two ways to implement a new destination:
176
+
1. You can use `@dlt.destination` decorator and [implement a sink function](../dlt-ecosystem/destinations/destination.md). This is perfect way to implement reverse ETL destinations that push data back to REST APIs.
177
+
2. You can implement [a full destination](../walkthroughs/create-new-destination.md) where you have a full control over load jobs and schema migration.
Copy file name to clipboardExpand all lines: docs/website/docs/intro.md
+6-6Lines changed: 6 additions & 6 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -17,9 +17,9 @@ from various and often messy data sources into well-structured, live datasets. T
17
17
```sh
18
18
pip install dlt
19
19
```
20
-
Unlike other solutions, with dlt, there's no need to use any backends or containers. Simply import `dlt` in a Python file or a Jupyter Notebook cell, and create a pipeline to load data into any of the [supported destinations](dlt-ecosystem/destinations/). You can load data from any source that produces Python data structures, including APIs, files, databases, and more.
20
+
Unlike other solutions, with dlt, there's no need to use any backends or containers. Simply import `dlt` in a Python file or a Jupyter Notebook cell, and create a pipeline to load data into any of the [supported destinations](dlt-ecosystem/destinations/). You can load data from any source that produces Python data structures, including APIs, files, databases, and more.`dlt` also supports building a [custom destination](dlt-ecosystem/destinations/destination.md), which you can use as reverse ETL.
21
21
22
-
The library will create or update tables, infer data types and handle nested data automatically. Here are a few example pipelines:
22
+
The library will create or update tables, infer data types, and handle nested data automatically. Here are a few example pipelines:
23
23
24
24
<Tabs
25
25
groupId="source-type"
@@ -60,7 +60,7 @@ pip install "dlt[duckdb]"
60
60
Now **run** your Python file or Notebook cell.
61
61
62
62
How it works? The library extracts data from a [source](general-usage/glossary.md#source) (here: **chess.com REST API**), inspects its structure to create a
63
-
[schema](general-usage/glossary.md#schema), structures, normalizes and verifies the data, and then
63
+
[schema](general-usage/glossary.md#schema), structures, normalizes, and verifies the data, and then
64
64
loads it into a [destination](general-usage/glossary.md#destination) (here: **duckdb**, into a database schema **player_data** and table name **player**).
0 commit comments