Skip to content

Commit 0c19c0a

Browse files
committed
generates IF NOT EXISTS for dlt tables
1 parent 04bebd4 commit 0c19c0a

File tree

5 files changed

+35
-9
lines changed

5 files changed

+35
-9
lines changed

dlt/destinations/impl/athena/athena.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ def _get_table_update_sql(
452452
partition_clause = self._iceberg_partition_clause(
453453
cast(Optional[Dict[str, str]], table.get(PARTITION_HINT))
454454
)
455-
sql.append(f"""CREATE TABLE {qualified_table_name}
455+
sql.append(f"""{self._make_create_table(qualified_table_name, table)}
456456
({columns})
457457
{partition_clause}
458458
LOCATION '{location.rstrip('/')}'

dlt/destinations/job_client_impl.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -520,22 +520,31 @@ def _make_add_column_sql(
520520
"""Make one or more ADD COLUMN sql clauses to be joined in ALTER TABLE statement(s)"""
521521
return [f"ADD COLUMN {self._get_column_def_sql(c, table_format)}" for c in new_columns]
522522

523+
def _make_create_table(self, qualified_name: str, table: TTableSchema) -> str:
524+
not_exists_clause = " "
525+
if (
526+
table["name"] in self.schema.dlt_table_names()
527+
and self.capabilities.create_table_not_exists
528+
):
529+
not_exists_clause = " IF NOT EXISTS "
530+
return f"CREATE TABLE{not_exists_clause}{qualified_name}"
531+
523532
def _get_table_update_sql(
524533
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
525534
) -> List[str]:
526535
# build sql
527-
canonical_name = self.sql_client.make_qualified_table_name(table_name)
536+
qualified_name = self.sql_client.make_qualified_table_name(table_name)
528537
table = self.prepare_load_table(table_name)
529538
table_format = table.get("table_format")
530539
sql_result: List[str] = []
531540
if not generate_alter:
532541
# build CREATE
533-
sql = f"CREATE TABLE {canonical_name} (\n"
542+
sql = self._make_create_table(qualified_name, table) + " (\n"
534543
sql += ",\n".join([self._get_column_def_sql(c, table_format) for c in new_columns])
535544
sql += ")"
536545
sql_result.append(sql)
537546
else:
538-
sql_base = f"ALTER TABLE {canonical_name}\n"
547+
sql_base = f"ALTER TABLE {qualified_name}\n"
539548
add_column_statements = self._make_add_column_sql(new_columns, table_format)
540549
if self.capabilities.alter_add_multi_column:
541550
column_sql = ",\n"
@@ -559,13 +568,13 @@ def _get_table_update_sql(
559568
if hint == "not_null":
560569
logger.warning(
561570
f"Column(s) {hint_columns} with NOT NULL are being added to existing"
562-
f" table {canonical_name}. If there's data in the table the operation"
571+
f" table {qualified_name}. If there's data in the table the operation"
563572
" will fail."
564573
)
565574
else:
566575
logger.warning(
567576
f"Column(s) {hint_columns} with hint {hint} are being added to existing"
568-
f" table {canonical_name}. Several hint types may not be added to"
577+
f" table {qualified_name}. Several hint types may not be added to"
569578
" existing tables."
570579
)
571580
return sql_result

tests/load/athena_iceberg/test_athena_table_builder.py

Whitespace-only changes.

tests/load/mssql/test_mssql_table_builder.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def test_alter_table(client: MsSqlJobClient) -> None:
5555
# existing table has no columns
5656
sql = client._get_table_update_sql("event_test_table", TABLE_UPDATE, True)[0]
5757
sqlfluff.parse(sql, dialect="tsql")
58-
canonical_name = client.sql_client.make_qualified_table_name("event_test_table")
59-
assert sql.count(f"ALTER TABLE {canonical_name}\nADD") == 1
58+
qualified_name = client.sql_client.make_qualified_table_name("event_test_table")
59+
assert sql.count(f"ALTER TABLE {qualified_name}\nADD") == 1
6060
assert "event_test_table" in sql
6161
assert '"col1" bigint NOT NULL' in sql
6262
assert '"col2" float NOT NULL' in sql
@@ -75,3 +75,11 @@ def test_alter_table(client: MsSqlJobClient) -> None:
7575
assert '"col6_precision" decimal(6,2) NOT NULL' in sql
7676
assert '"col7_precision" varbinary(19)' in sql
7777
assert '"col11_precision" time(3) NOT NULL' in sql
78+
79+
80+
def test_create_dlt_table(client: MsSqlJobClient) -> None:
81+
# non existing table
82+
sql = client._get_table_update_sql("_dlt_version", TABLE_UPDATE, False)[0]
83+
sqlfluff.parse(sql, dialect="tsql")
84+
qualified_name = client.sql_client.make_qualified_table_name("_dlt_version")
85+
assert f"CREATE TABLE {qualified_name}" in sql

tests/load/postgres/test_postgres_table_builder.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def test_create_table(client: PostgresClient) -> None:
5757
# non existing table
5858
sql = client._get_table_update_sql("event_test_table", TABLE_UPDATE, False)[0]
5959
sqlfluff.parse(sql, dialect="postgres")
60-
assert "event_test_table" in sql
60+
qualified_name = client.sql_client.make_qualified_table_name("event_test_table")
61+
assert f"CREATE TABLE {qualified_name}" in sql
6162
assert '"col1" bigint NOT NULL' in sql
6263
assert '"col2" double precision NOT NULL' in sql
6364
assert '"col3" boolean NOT NULL' in sql
@@ -173,3 +174,11 @@ def test_create_table_case_sensitive(cs_client: PostgresClient) -> None:
173174
# every line starts with "Col"
174175
for line in sql.split("\n")[1:]:
175176
assert line.startswith('"Col')
177+
178+
179+
def test_create_dlt_table(client: PostgresClient) -> None:
180+
# non existing table
181+
sql = client._get_table_update_sql("_dlt_version", TABLE_UPDATE, False)[0]
182+
sqlfluff.parse(sql, dialect="postgres")
183+
qualified_name = client.sql_client.make_qualified_table_name("_dlt_version")
184+
assert f"CREATE TABLE IF NOT EXISTS {qualified_name}" in sql

0 commit comments

Comments
 (0)