Skip to content

Commit 01f7af8

Browse files
authored
Fix(mssql): Use truncate+insert for FULL models instead of merge (#4531)
1 parent 4d183b2 commit 01f7af8

File tree

2 files changed

+121
-5
lines changed

2 files changed

+121
-5
lines changed

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
from sqlglot import exp
1111

1212
from sqlmesh.core.dialect import to_schema
13-
from sqlmesh.core.engine_adapter.base import EngineAdapterWithIndexSupport
13+
from sqlmesh.core.engine_adapter.base import (
14+
EngineAdapterWithIndexSupport,
15+
EngineAdapter,
16+
InsertOverwriteStrategy,
17+
)
1418
from sqlmesh.core.engine_adapter.mixins import (
1519
GetCurrentCatalogFromFunctionMixin,
1620
InsertOverwriteWithMergeMixin,
@@ -281,3 +285,45 @@ def _rename_table(
281285
# The function that renames tables in MSSQL takes string literals as arguments instead of identifiers,
282286
# so we shouldn't quote the identifiers.
283287
self.execute(exp.rename_table(old_table_name, new_table_name), quote_identifiers=False)
288+
289+
def _insert_overwrite_by_condition(
290+
self,
291+
table_name: TableName,
292+
source_queries: t.List[SourceQuery],
293+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
294+
where: t.Optional[exp.Condition] = None,
295+
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
296+
**kwargs: t.Any,
297+
) -> None:
298+
if not where or where == exp.true():
299+
# this is a full table replacement, call the base strategy to do DELETE+INSERT
300+
# which will result in TRUNCATE+INSERT due to how we have overridden self.delete_from()
301+
return EngineAdapter._insert_overwrite_by_condition(
302+
self,
303+
table_name=table_name,
304+
source_queries=source_queries,
305+
columns_to_types=columns_to_types,
306+
where=where,
307+
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT,
308+
**kwargs,
309+
)
310+
311+
# For actual conditional overwrites, use MERGE from InsertOverwriteWithMergeMixin
312+
return super()._insert_overwrite_by_condition(
313+
table_name=table_name,
314+
source_queries=source_queries,
315+
columns_to_types=columns_to_types,
316+
where=where,
317+
insert_overwrite_strategy_override=insert_overwrite_strategy_override,
318+
**kwargs,
319+
)
320+
321+
def delete_from(self, table_name: TableName, where: t.Union[str, exp.Expression]) -> None:
322+
if where == exp.true():
323+
# "A TRUNCATE TABLE operation can be rolled back within a transaction."
324+
# ref: https://learn.microsoft.com/en-us/sql/t-sql/statements/truncate-table-transact-sql?view=sql-server-ver15#remarks
325+
return self.execute(
326+
exp.TruncateTable(expressions=[exp.to_table(table_name, dialect=self.dialect)])
327+
)
328+
329+
return super().delete_from(table_name, where)

tests/core/engine_adapter/test_mssql.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
pytestmark = [pytest.mark.engine, pytest.mark.mssql]
2525

2626

27-
def test_columns(make_mocked_engine_adapter: t.Callable):
28-
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)
27+
@pytest.fixture
28+
def adapter(make_mocked_engine_adapter: t.Callable) -> MSSQLEngineAdapter:
29+
return make_mocked_engine_adapter(MSSQLEngineAdapter)
30+
2931

32+
def test_columns(adapter: MSSQLEngineAdapter):
3033
adapter.cursor.fetchall.return_value = [
3134
("decimal_ps", "decimal", None, 5, 4),
3235
("decimal", "decimal", None, 18, 0),
@@ -504,7 +507,8 @@ def test_replace_query(make_mocked_engine_adapter: t.Callable):
504507

505508
assert to_sql_calls(adapter) == [
506509
"""SELECT 1 FROM [information_schema].[tables] WHERE [table_name] = 'test_table';""",
507-
"MERGE INTO [test_table] AS [__MERGE_TARGET__] USING (SELECT [a] AS [a] FROM [tbl]) AS [__MERGE_SOURCE__] ON (1 = 0) WHEN NOT MATCHED BY SOURCE THEN DELETE WHEN NOT MATCHED THEN INSERT ([a]) VALUES ([a]);",
510+
"TRUNCATE TABLE [test_table];",
511+
"INSERT INTO [test_table] ([a]) SELECT [a] FROM [tbl];",
508512
]
509513

510514

@@ -551,7 +555,8 @@ def temp_table_exists(table: exp.Table) -> bool:
551555

552556
assert to_sql_calls(adapter) == [
553557
f"""IF NOT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = '{temp_table_name}') EXEC('CREATE TABLE [{temp_table_name}] ([a] INTEGER, [b] INTEGER)');""",
554-
"MERGE INTO [test_table] AS [__MERGE_TARGET__] USING (SELECT CAST([a] AS INTEGER) AS [a], CAST([b] AS INTEGER) AS [b] FROM [__temp_test_table_abcdefgh]) AS [__MERGE_SOURCE__] ON (1 = 0) WHEN NOT MATCHED BY SOURCE THEN DELETE WHEN NOT MATCHED THEN INSERT ([a], [b]) VALUES ([a], [b]);",
558+
"TRUNCATE TABLE [test_table];",
559+
f"INSERT INTO [test_table] ([a], [b]) SELECT CAST([a] AS INTEGER) AS [a], CAST([b] AS INTEGER) AS [b] FROM [{temp_table_name}];",
555560
f"DROP TABLE IF EXISTS [{temp_table_name}];",
556561
]
557562

@@ -751,3 +756,68 @@ def test_create_table_from_query(make_mocked_engine_adapter: t.Callable, mocker:
751756
"CREATE VIEW [__temp_ctas_test_random_id] AS SELECT * FROM (SELECT TOP 1 * FROM [t]);"
752757
in to_sql_calls(adapter)
753758
)
759+
760+
761+
def test_replace_query_strategy(adapter: MSSQLEngineAdapter, mocker: MockerFixture):
762+
# ref issue 4472: https://github.com/TobikoData/sqlmesh/issues/4472
763+
# The FULL strategy calls EngineAdapter.replace_query() which calls _insert_overwrite_by_condition() should use DELETE+INSERT and not MERGE
764+
expressions = d.parse(
765+
f"""
766+
MODEL (
767+
name db.table,
768+
kind FULL,
769+
dialect tsql
770+
);
771+
772+
select a, b from db.upstream_table;
773+
"""
774+
)
775+
model = load_sql_based_model(expressions)
776+
777+
exists_mock = mocker.patch(
778+
"sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.table_exists",
779+
return_value=False,
780+
)
781+
782+
assert not adapter.table_exists("test_table")
783+
784+
# initial - table doesnt exist
785+
adapter.replace_query(
786+
"test_table",
787+
model.render_query_or_raise(),
788+
table_format=model.table_format,
789+
storage_format=model.storage_format,
790+
partitioned_by=model.partitioned_by,
791+
partition_interval_unit=model.partition_interval_unit,
792+
clustered_by=model.clustered_by,
793+
table_properties=model.physical_properties,
794+
table_description=model.description,
795+
column_descriptions=model.column_descriptions,
796+
columns_to_types=model.columns_to_types_or_raise,
797+
)
798+
799+
# subsequent - table exists
800+
exists_mock.return_value = True
801+
assert adapter.table_exists("test_table")
802+
803+
adapter.replace_query(
804+
"test_table",
805+
model.render_query_or_raise(),
806+
table_format=model.table_format,
807+
storage_format=model.storage_format,
808+
partitioned_by=model.partitioned_by,
809+
partition_interval_unit=model.partition_interval_unit,
810+
clustered_by=model.clustered_by,
811+
table_properties=model.physical_properties,
812+
table_description=model.description,
813+
column_descriptions=model.column_descriptions,
814+
columns_to_types=model.columns_to_types_or_raise,
815+
)
816+
817+
assert to_sql_calls(adapter) == [
818+
# initial - create table if not exists
819+
"IF NOT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'test_table') EXEC('SELECT * INTO [test_table] FROM (SELECT [a] AS [a], [b] AS [b] FROM [db].[upstream_table] AS [upstream_table]) AS temp');",
820+
# subsequent - truncate + insert
821+
"TRUNCATE TABLE [test_table];",
822+
"INSERT INTO [test_table] ([a], [b]) SELECT [a] AS [a], [b] AS [b] FROM [db].[upstream_table] AS [upstream_table];",
823+
]

0 commit comments

Comments
 (0)