Skip to content

Eliminating contextmanager based transaction-handling #8278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/requirements-2.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pytz==2013b
http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz
html5lib==1.0b2
numexpr==1.4.2
sqlalchemy==0.7.4
sqlalchemy==0.7.10
pymysql==0.6.0
psycopg2==2.5
scipy==0.11.0
Expand Down
38 changes: 23 additions & 15 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pandas.core.base import PandasObject
from pandas.tseries.tools import to_datetime

from contextlib import contextmanager

class SQLAlchemyRequired(ImportError):
pass
Expand Down Expand Up @@ -637,13 +638,9 @@ def insert_data(self):

return column_names, data_list

def get_session(self):
con = self.pd_sql.engine.connect()
return con.begin()

def _execute_insert(self, trans, keys, data_iter):
def _execute_insert(self, conn, keys, data_iter):
data = [dict( (k, v) for k, v in zip(keys, row) ) for row in data_iter]
trans.connection.execute(self.insert_statement(), data)
conn.execute(self.insert_statement(), data)

def insert(self, chunksize=None):
keys, data_list = self.insert_data()
Expand All @@ -653,15 +650,15 @@ def insert(self, chunksize=None):
chunksize = nrows
chunks = int(nrows / chunksize) + 1

with self.get_session() as trans:
with self.pd_sql.run_transaction() as conn:
for i in range(chunks):
start_i = i * chunksize
end_i = min((i + 1) * chunksize, nrows)
if start_i >= end_i:
break

chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
self._execute_insert(trans, keys, chunk_iter)
self._execute_insert(conn, keys, chunk_iter)

def read(self, coerce_float=True, parse_dates=None, columns=None):

Expand Down Expand Up @@ -884,6 +881,9 @@ def __init__(self, engine, schema=None, meta=None):

self.meta = meta

def run_transaction(self):
return self.engine.begin()

def execute(self, *args, **kwargs):
"""Simple passthrough to SQLAlchemy engine"""
return self.engine.execute(*args, **kwargs)
Expand Down Expand Up @@ -1017,9 +1017,9 @@ def sql_schema(self):
return str(";\n".join(self.table))

def _execute_create(self):
with self.get_session():
with self.pd_sql.run_transaction() as conn:
for stmt in self.table:
self.pd_sql.execute(stmt)
conn.execute(stmt)

def insert_statement(self):
names = list(map(str, self.frame.columns))
Expand All @@ -1038,12 +1038,9 @@ def insert_statement(self):
self.name, col_names, wildcards)
return insert_statement

def get_session(self):
return self.pd_sql.con

def _execute_insert(self, trans, keys, data_iter):
def _execute_insert(self, conn, keys, data_iter):
data_list = list(data_iter)
trans.executemany(self.insert_statement(), data_list)
conn.executemany(self.insert_statement(), data_list)

def _create_table_setup(self):
"""Return a list of SQL statement that create a table reflecting the
Expand Down Expand Up @@ -1125,6 +1122,17 @@ def __init__(self, con, flavor, is_cursor=False):
else:
self.flavor = flavor

@contextmanager
def run_transaction(self):
cur = self.con.cursor()
try:
yield cur
self.con.commit()
except:
self.con.rollback()
finally:
cur.close()

def execute(self, *args, **kwargs):
if self.is_cursor:
cur = self.con
Expand Down
26 changes: 26 additions & 0 deletions pandas/io/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,28 @@ def _to_sql_save_index(self):
ix_cols = self._get_index_columns('test_to_sql_saves_index')
self.assertEqual(ix_cols, [['A',],])

def _transaction_test(self):
self.pandasSQL.execute("CREATE TABLE test_trans (A INT, B TEXT)")

ins_sql = "INSERT INTO test_trans (A,B) VALUES (1, 'blah')"

# Make sure when transaction is rolled back, no rows get inserted
try:
with self.pandasSQL.run_transaction() as trans:
trans.execute(ins_sql)
raise Exception('error')
except:
# ignore raised exception
pass
res = self.pandasSQL.read_sql('SELECT * FROM test_trans')
self.assertEqual(len(res), 0)

# Make sure when transaction is committed, rows do get inserted
with self.pandasSQL.run_transaction() as trans:
trans.execute(ins_sql)
res2 = self.pandasSQL.read_sql('SELECT * FROM test_trans')
self.assertEqual(len(res2), 1)


#------------------------------------------------------------------------------
#--- Testing the public API
Expand Down Expand Up @@ -1072,6 +1094,8 @@ def _get_index_columns(self, tbl_name):
def test_to_sql_save_index(self):
self._to_sql_save_index()

def test_transactions(self):
self._transaction_test()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed anymore I think? (TestMySQLLegacy inherits from TestSQLiteLegacy)


class TestSQLiteAlchemy(_TestSQLAlchemy):
"""
Expand Down Expand Up @@ -1380,6 +1404,8 @@ def _get_index_columns(self, tbl_name):
def test_to_sql_save_index(self):
self._to_sql_save_index()

def test_transactions(self):
self._transaction_test()

class TestMySQLLegacy(TestSQLiteLegacy):
"""
Expand Down