From 3c43c95e8d8faefb110bc06aba733e342dc02c31 Mon Sep 17 00:00:00 2001 From: Artemy Kolchinsky Date: Mon, 18 Aug 2014 13:29:56 -0700 Subject: [PATCH] Added chunksize argument to to_sql Adding chunksize parameter in to_sql method of DataFrame Fix Travis CI failures Changing transaction management to be compatible with SQLAlchemy 0.7.1 Code review fixes Documentation updates Documentation fixes Fixes --- doc/source/io.rst | 6 +++ doc/source/v0.15.0.txt | 3 ++ pandas/core/generic.py | 7 +++- pandas/io/sql.py | 74 +++++++++++++++++++++++++------------ pandas/io/tests/test_sql.py | 8 ++++ 5 files changed, 72 insertions(+), 26 deletions(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index 6e5d254d27b7f..e249585c10784 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -3267,6 +3267,12 @@ the database using :func:`~pandas.DataFrame.to_sql`. data.to_sql('data', engine) +With some databases, writing large DataFrames can result in errors due to packet size limitations being exceeded. This can be avoided by setting the ``chunksize`` parameter when calling ``to_sql``. For example, the following writes ``data`` to the database in batches of 1000 rows at a time: + +.. ipython:: python + + data.to_sql('data', engine, chunksize=1000) + .. note:: Due to the limited support for timedelta's in the different database diff --git a/doc/source/v0.15.0.txt b/doc/source/v0.15.0.txt index b987104ac2408..b13d055143794 100644 --- a/doc/source/v0.15.0.txt +++ b/doc/source/v0.15.0.txt @@ -425,6 +425,9 @@ Known Issues Enhancements ~~~~~~~~~~~~ + +- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`) + - Added support for bool, uint8, uint16 and uint32 datatypes in ``to_stata`` (:issue:`7097`, :issue:`7365`) - Added ``layout`` keyword to ``DataFrame.plot`` (:issue:`6667`) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index ee5016386af4c..d56095b6300a4 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -916,7 +916,7 @@ def to_msgpack(self, path_or_buf=None, **kwargs): return packers.to_msgpack(path_or_buf, self, **kwargs) def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True, - index_label=None): + index_label=None, chunksize=None): """ Write records stored in a DataFrame to a SQL database. @@ -942,12 +942,15 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True, Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. + chunksize : int, default None + If not None, then rows will be written in batches of this size at a + time. If None, all rows will be written at once. """ from pandas.io import sql sql.to_sql( self, name, con, flavor=flavor, if_exists=if_exists, index=index, - index_label=index_label) + index_label=index_label, chunksize=chunksize) def to_pickle(self, path): """ diff --git a/pandas/io/sql.py b/pandas/io/sql.py index cb234f825a51e..914ade45adaa1 100644 --- a/pandas/io/sql.py +++ b/pandas/io/sql.py @@ -432,7 +432,7 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None, def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True, - index_label=None): + index_label=None, chunksize=None): """ Write records stored in a DataFrame to a SQL database. @@ -459,6 +459,9 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True, Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. + chunksize : int, default None + If not None, then rows will be written in batches of this size at a + time. If None, all rows will be written at once. """ if if_exists not in ('fail', 'replace', 'append'): @@ -472,7 +475,7 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True, raise NotImplementedError pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index, - index_label=index_label) + index_label=index_label, chunksize=chunksize) def has_table(table_name, con, flavor='sqlite'): @@ -597,18 +600,30 @@ def insert_data(self): return temp - def insert(self): + def insert(self, chunksize=None): + ins = self.insert_statement() - data_list = [] temp = self.insert_data() keys = list(map(str, temp.columns)) - for t in temp.itertuples(): - data = dict((k, self.maybe_asscalar(v)) - for k, v in zip(keys, t[1:])) - data_list.append(data) - - self.pd_sql.execute(ins, data_list) + nrows = len(temp) + if chunksize is None: + chunksize = nrows + chunks = int(nrows / chunksize) + 1 + + con = self.pd_sql.engine.connect() + with con.begin() as trans: + for i in range(chunks): + start_i = i * chunksize + end_i = min((i + 1) * chunksize, nrows) + if start_i >= end_i: + break + data_list = [] + for t in temp.iloc[start_i:end_i].itertuples(): + data = dict((k, self.maybe_asscalar(v)) + for k, v in zip(keys, t[1:])) + data_list.append(data) + con.execute(ins, data_list) def read(self, coerce_float=True, parse_dates=None, columns=None): @@ -843,11 +858,11 @@ def read_sql(self, sql, index_col=None, coerce_float=True, return data_frame def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None): + index_label=None, chunksize=None): table = PandasSQLTable( name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label) - table.insert() + table.insert(chunksize) @property def tables(self): @@ -948,19 +963,30 @@ def insert_statement(self): self.name, col_names, wildcards) return insert_statement - def insert(self): + def insert(self, chunksize=None): + ins = self.insert_statement() temp = self.insert_data() - data_list = [] - - for t in temp.itertuples(): - data = tuple((self.maybe_asscalar(v) for v in t[1:])) - data_list.append(data) - cur = self.pd_sql.con.cursor() - cur.executemany(ins, data_list) - cur.close() - self.pd_sql.con.commit() + nrows = len(temp) + if chunksize is None: + chunksize = nrows + chunks = int(nrows / chunksize) + 1 + + with self.pd_sql.con: + for i in range(chunks): + start_i = i * chunksize + end_i = min((i + 1) * chunksize, nrows) + if start_i >= end_i: + break + data_list = [] + for t in temp.iloc[start_i:end_i].itertuples(): + data = tuple((self.maybe_asscalar(v) for v in t[1:])) + data_list.append(data) + + cur = self.pd_sql.con.cursor() + cur.executemany(ins, data_list) + cur.close() def _create_table_statement(self): "Return a CREATE TABLE statement to suit the contents of a DataFrame." @@ -1069,7 +1095,7 @@ def _fetchall_as_list(self, cur): return result def to_sql(self, frame, name, if_exists='fail', index=True, - index_label=None): + index_label=None, chunksize=None): """ Write records stored in a DataFrame to a SQL database. @@ -1087,7 +1113,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True, table = PandasSQLTableLegacy( name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label) - table.insert() + table.insert(chunksize) def has_table(self, name): flavor_map = { diff --git a/pandas/io/tests/test_sql.py b/pandas/io/tests/test_sql.py index 6a0130e515d59..68f170759b666 100644 --- a/pandas/io/tests/test_sql.py +++ b/pandas/io/tests/test_sql.py @@ -455,6 +455,14 @@ def test_roundtrip(self): result.index.name = None tm.assert_frame_equal(result, self.test_frame1) + def test_roundtrip_chunksize(self): + sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn, + index=False, flavor='sqlite', chunksize=2) + result = sql.read_sql_query( + 'SELECT * FROM test_frame_roundtrip', + con=self.conn) + tm.assert_frame_equal(result, self.test_frame1) + def test_execute_sql(self): # drop_sql = "DROP TABLE IF EXISTS test" # should already be done iris_results = sql.execute("SELECT * FROM iris", con=self.conn)