Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,36 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]:
"""Executes job client methods on staging dataset"""
return self # type: ignore

@staticmethod
def create_dataset_names(
schema: Schema, config: DestinationClientDwhConfiguration
) -> Tuple[str, str]:
"""
Creates regular and staging dataset names for given schema and config.
Raises a value error if the staging name is same as final dataset name.
returns (dataset_name, staging_dataset_name)

"""
dataset_name = config.normalize_dataset_name(schema)
staging_dataset_name = config.normalize_staging_dataset_name(schema)

if dataset_name == staging_dataset_name:
logger.error(
f"Staging dataset name '{staging_dataset_name}' is the same as final dataset name"
f" '{dataset_name}'."
)

raise ValueError(
f"The staging dataset name '{staging_dataset_name}' is identical to the final"
f" dataset name '{dataset_name}'. This configuration will cause data loss because"
" setup commands will truncate the final dataset when they should only truncate"
" the staging dataset.\nTo fix this, modify the `staging_dataset_name_layout`"
" setting in your destination configuration. For more information, see:"
" https://dlthub.com/docs/dlt-ecosystem/staging#staging-dataset"
)

return (dataset_name, staging_dataset_name)


class SupportsStagingDestination(ABC):
"""Adds capability to support a staging destination for the load"""
Expand Down
8 changes: 6 additions & 2 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,13 @@ def __init__(
table_needs_own_folder=True,
)

dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)

sql_client = AthenaSQLClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,12 @@ def __init__(
config: BigQueryClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = BigQuerySqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
config.get_location(),
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,12 @@ def __init__(
config: ClickHouseClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
self.sql_client: ClickHouseSqlClient = ClickHouseSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
list(schema.tables.keys()),
config.credentials,
capabilities,
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,12 @@ def __init__(
config: DatabricksClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = DatabricksSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ def __init__(
config: DremioClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = DremioSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ def __init__(
config: DuckDbClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = DuckDbSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def dataset_path(self) -> str:
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_name = self.dataset_name
try:
self.dataset_name = self.config.normalize_staging_dataset_name(self.schema)
_, self.dataset_name = WithStagingDataset.create_dataset_names(self.schema, self.config)
yield self
finally:
# restore previous dataset name
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/motherduck/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ def __init__(
config: MotherDuckClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = DuckDbClient.create_dataset_names(schema, config)
super().__init__(schema, config, capabilities) # type: ignore
sql_client = MotherDuckSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@ def __init__(
config: MsSqlClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = PyOdbcMsSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,12 @@ def __init__(
config: PostgresClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = Psycopg2SqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,12 @@ def __init__(
config: RedshiftClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = RedshiftSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ def __init__(
config: SnowflakeClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = SnowflakeSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
config.query_tag,
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def __init__(
config: SqlalchemyClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
self.sql_client = SqlalchemyClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ def __init__(
) -> None:
super().__init__(schema, config, capabilities)
self.config: SynapseClientConfiguration = config
dataset_name, staging_dataset_name = MsSqlJobClient.create_dataset_names(schema, config)
self.sql_client = SynapseSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
6 changes: 6 additions & 0 deletions docs/website/docs/dlt-ecosystem/staging.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ If you prefer to truncate it, put the following line in `config.toml`:
[load]
truncate_staging_dataset=true
```
> **⚠️ Important:** When configuring a custom staging dataset naming pattern, ensure that the resulting staging dataset name differs from the final dataset name. If the pattern results in identical names, dlt will raise a `ValueError` to alert you that the pattern must be adjusted. This prevents potential data loss from setup commands accidentally truncating the final dataset instead of the staging dataset.
>
> **Examples:**
> - ✅ **Good:** `staging_dataset_name_layout="%s_staging"` → `my_data` becomes `my_data_staging`
> - ✅ **Good:** `staging_dataset_name_layout="staging_%s"` → `my_data` becomes `staging_my_data`
> - ❌ **Bad:** `staging_dataset_name_layout="%s"` → `my_data` becomes `my_data` (same name!)

## Staging storage
`dlt` allows chaining destinations where the first one (`staging`) is responsible for uploading the files from the local filesystem to the remote storage. It then generates follow-up jobs for the second destination that (typically) copy the files from remote storage into the destination.
Expand Down
24 changes: 23 additions & 1 deletion tests/common/destination/test_reference.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from collections.abc import MutableMapping
from operator import eq
from typing import Dict

import pytest

from dlt.common.configuration.specs.connection_string_credentials import ConnectionStringCredentials
from dlt.common.destination import Destination, DestinationReference
from dlt.common.destination.client import DestinationClientDwhConfiguration
from dlt.common.destination.client import DestinationClientDwhConfiguration, WithStagingDataset
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import UnknownDestinationModule
from dlt.common.schema import Schema
Expand Down Expand Up @@ -478,6 +479,27 @@ def test_normalize_dataset_name_none_default_schema() -> None:
)


def test_create_dataset_names() -> None:
result = WithStagingDataset.create_dataset_names(
Schema("banana"),
DestinationClientDwhConfiguration()._bind_dataset_name(
dataset_name="test", default_schema_name=""
),
)
assert result == ("test_banana", "test_banana_staging")

with pytest.raises(ValueError) as exc_info:
WithStagingDataset.create_dataset_names(
Schema("staging"),
DestinationClientDwhConfiguration(staging_dataset_name_layout="%s")._bind_dataset_name(
dataset_name="test", default_schema_name=None
),
)
assert exc_info.type is ValueError
assert "staging dataset name" in str(exc_info.value)
assert "final dataset name" in str(exc_info.value)


def test_destination_repr() -> None:
from dlt import destinations

Expand Down
Loading