Skip to content

Commit 05f43a0

Browse files
feat: enhance MSSQL bulk insert handling with pyodbc support
1 parent 630cc6e commit 05f43a0

File tree

1 file changed

+21
-4
lines changed

1 file changed

+21
-4
lines changed

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,28 @@ def query_factory() -> Query:
220220
columns_to_types_create = columns_to_types.copy()
221221
self._convert_df_datetime(df, columns_to_types_create)
222222
self.create_table(temp_table, columns_to_types_create)
223-
rows: t.List[t.Tuple[t.Any, ...]] = list(
224-
df.replace({np.nan: None}).itertuples(index=False, name=None) # type: ignore
225-
)
226223
conn = self._connection_pool.get()
227-
conn.bulk_copy(temp_table.sql(dialect=self.dialect), rows)
224+
225+
if self._connection_pool.driver == "pyodbc":
226+
cursor = conn.cursor()
227+
228+
# Prepare the insert query
229+
column_names = ', '.join([col for col in columns_to_types.keys()])
230+
placeholders = ', '.join(['?' for _ in columns_to_types.keys()])
231+
insert_query = f"INSERT INTO {temp_table.sql(dialect=self.dialect)} ({column_names}) VALUES ({placeholders})"
232+
233+
# Replace NaN with None and prepare rows
234+
rows = [tuple(row) for row in df.replace({np.nan: None}).itertuples(index=False, name=None)]
235+
236+
# Execute the query with multiple rows
237+
cursor.executemany(insert_query, rows)
238+
conn.commit()
239+
cursor.close()
240+
else: # Use bulk_copy for pymssql
241+
rows: t.List[t.Tuple[t.Any, ...]] = list(
242+
df.replace({np.nan: None}).itertuples(index=False, name=None) # type: ignore
243+
)
244+
conn.bulk_copy(temp_table.sql(dialect=self.dialect), rows)
228245
return exp.select(*self._casted_columns(columns_to_types)).from_(temp_table) # type: ignore
229246

230247
return [

0 commit comments

Comments
 (0)