Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,36 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]:
"""Executes job client methods on staging dataset"""
return self # type: ignore

@staticmethod
def create_dataset_names(
schema: Schema, config: DestinationClientDwhConfiguration
) -> Tuple[str, str]:
"""
Creates regular and staging dataset names for given schema and config.
Raises a value error if the staging name is same as final dataset name.
returns (dataset_name, staging_dataset_name)

"""
dataset_name = config.normalize_dataset_name(schema)
staging_dataset_name = config.normalize_staging_dataset_name(schema)

if dataset_name == staging_dataset_name:
logger.error(
f"Staging dataset name '{staging_dataset_name}' is the same as final dataset name"
f" '{dataset_name}'."
)

raise ValueError(
f"The staging dataset name '{staging_dataset_name}' is identical to the final"
f" dataset name '{dataset_name}'. This configuration will cause data loss because"
" setup commands will truncate the final dataset when they should only truncate"
" the staging dataset.\nTo fix this, modify the `staging_dataset_name_layout`"
" setting in your destination configuration. For more information, see:"
" https://dlthub.com/docs/dlt-ecosystem/staging#staging-dataset"
)

return (dataset_name, staging_dataset_name)


class SupportsStagingDestination(ABC):
"""Adds capability to support a staging destination for the load"""
Expand Down
8 changes: 6 additions & 2 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,13 @@ def __init__(
table_needs_own_folder=True,
)

dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)

sql_client = AthenaSQLClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,12 @@ def __init__(
config: BigQueryClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = BigQuerySqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
config.get_location(),
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,12 @@ def __init__(
config: ClickHouseClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
self.sql_client: ClickHouseSqlClient = ClickHouseSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
list(schema.tables.keys()),
config.credentials,
capabilities,
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,12 @@ def __init__(
config: DatabricksClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = DatabricksSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ def __init__(
config: DremioClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = DremioSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/duckdb/duck.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ def __init__(
config: DuckDbClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = DuckDbSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def dataset_path(self) -> str:
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
current_dataset_name = self.dataset_name
try:
self.dataset_name = self.config.normalize_staging_dataset_name(self.schema)
_, self.dataset_name = WithStagingDataset.create_dataset_names(self.schema, self.config)
yield self
finally:
# restore previous dataset name
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/motherduck/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ def __init__(
config: MotherDuckClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = DuckDbClient.create_dataset_names(schema, config)
super().__init__(schema, config, capabilities) # type: ignore
sql_client = MotherDuckSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@ def __init__(
config: MsSqlClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = PyOdbcMsSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,12 @@ def __init__(
config: PostgresClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = Psycopg2SqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,12 @@ def __init__(
config: RedshiftClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = InsertValuesJobClient.create_dataset_names(
schema, config
)
sql_client = RedshiftSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ def __init__(
config: SnowflakeClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
sql_client = SnowflakeSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
config.query_tag,
Expand Down
7 changes: 5 additions & 2 deletions dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def __init__(
config: SqlalchemyClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
dataset_name, staging_dataset_name = SqlJobClientWithStagingDataset.create_dataset_names(
schema, config
)
self.sql_client = SqlalchemyClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
5 changes: 3 additions & 2 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ def __init__(
) -> None:
super().__init__(schema, config, capabilities)
self.config: SynapseClientConfiguration = config
dataset_name, staging_dataset_name = MsSqlJobClient.create_dataset_names(schema, config)
self.sql_client = SynapseSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
dataset_name,
staging_dataset_name,
config.credentials,
capabilities,
)
Expand Down
24 changes: 23 additions & 1 deletion tests/common/destination/test_reference.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from collections.abc import MutableMapping
from operator import eq
from typing import Dict

import pytest

from dlt.common.configuration.specs.connection_string_credentials import ConnectionStringCredentials
from dlt.common.destination import Destination, DestinationReference
from dlt.common.destination.client import DestinationClientDwhConfiguration
from dlt.common.destination.client import DestinationClientDwhConfiguration, WithStagingDataset
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import UnknownDestinationModule
from dlt.common.schema import Schema
Expand Down Expand Up @@ -478,6 +479,27 @@ def test_normalize_dataset_name_none_default_schema() -> None:
)


def test_create_dataset_names() -> None:
result = WithStagingDataset.create_dataset_names(
Schema("banana"),
DestinationClientDwhConfiguration()._bind_dataset_name(
dataset_name="test", default_schema_name=""
),
)
assert result == ("test_banana", "test_banana_staging")

with pytest.raises(ValueError) as exc_info:
WithStagingDataset.create_dataset_names(
Schema("staging"),
DestinationClientDwhConfiguration(staging_dataset_name_layout="%s")._bind_dataset_name(
dataset_name="test", default_schema_name=None
),
)
assert exc_info.type is ValueError
assert "staging dataset name" in str(exc_info.value)
assert "final dataset name" in str(exc_info.value)


def test_destination_repr() -> None:
from dlt import destinations

Expand Down
Loading