Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def merge_delta_table(
data: Union[pa.Table, pa.RecordBatchReader],
schema: TTableSchema,
load_table_name: str,
streamed_exec: bool,
) -> None:
"""Merges in-memory Arrow data into on-disk Delta table."""

Expand All @@ -142,6 +143,7 @@ def merge_delta_table(
predicate=predicate,
source_alias="source",
target_alias="target",
streamed_exec=streamed_exec,
)
.when_matched_update_all()
.when_not_matched_insert_all()
Expand Down
1 change: 1 addition & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class FilesystemConfiguration(BaseConfiguration):
"""Additional arguments as Config in botocore"""
deltalake_storage_options: Optional[DictStrAny] = None
deltalake_configuration: Optional[DictStrOptionalStr] = None
deltalake_streamed_exec: bool = True

@property
def protocol(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def run(self) -> None:
data=arrow_rbr,
schema=self._load_table,
load_table_name=self.load_table_name,
streamed_exec=self._job_client.config.deltalake_streamed_exec,
)
else:
location = self._job_client.get_open_table_location("delta", self.load_table_name)
Expand Down
7 changes: 7 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/delta-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ def my_upsert_resource():
- Deleting records from nested tables not supported
- This means updates to JSON columns that involve element removals are not propagated. For example, if you first load `{"key": 1, "nested": [1, 2]}` and then load `{"key": 1, "nested": [1]}`, then the record for element `2` will not be deleted from the nested table.

By default, dlt runs Delta table upserts in streamed mode to reduce memory pressure. To enable the use of source table statistics to derive an early pruning predicate, set:

```toml
[destination.filesystem]
deltalake_streamed_exec = false
```

## Delta table format storage options and configuration
You can pass storage options and configuration by configuring both `destination.filesystem.deltalake_storage_options` and
`destination.filesystem.deltalake_configuration`:
Expand Down
3 changes: 3 additions & 0 deletions tests/load/filesystem/test_filesystem_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def test_filesystem_configuration() -> None:
"kwargs": None,
"deltalake_storage_options": None,
"deltalake_configuration": None,
"deltalake_streamed_exec": True,
}


Expand Down Expand Up @@ -234,6 +235,7 @@ def test_filesystem_configuration_with_additional_arguments() -> None:
"delta.minWriterVersion": "7",
"delta.enableChangeDataFeed": "true",
},
deltalake_streamed_exec=False,
)
assert dict(config) == {
"read_only": False,
Expand All @@ -247,6 +249,7 @@ def test_filesystem_configuration_with_additional_arguments() -> None:
"delta.minWriterVersion": "7",
"delta.enableChangeDataFeed": "true",
},
"deltalake_streamed_exec": False,
}


Expand Down
Loading