Skip to content

Added chunksize argument to to_sql #8062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3267,6 +3267,12 @@ the database using :func:`~pandas.DataFrame.to_sql`.

data.to_sql('data', engine)

With some databases, writing large DataFrames can result in errors due to packet size limitations being exceeded. This can be avoided by setting the ``chunksize`` parameter when calling ``to_sql``. For example, the following writes ``data`` to the database in batches of 1000 rows at a time:

.. ipython:: python

data.to_sql('data', engine, chunksize=1000)

.. note::

Due to the limited support for timedelta's in the different database
Expand Down
3 changes: 3 additions & 0 deletions doc/source/v0.15.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ Known Issues

Enhancements
~~~~~~~~~~~~

- Added support for a ``chunksize`` parameter to ``to_sql`` function. This allows DataFrame to be written in chunks and avoid packet-size overflow errors (:issue:`8062`)

- Added support for bool, uint8, uint16 and uint32 datatypes in ``to_stata`` (:issue:`7097`, :issue:`7365`)

- Added ``layout`` keyword to ``DataFrame.plot`` (:issue:`6667`)
Expand Down
7 changes: 5 additions & 2 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ def to_msgpack(self, path_or_buf=None, **kwargs):
return packers.to_msgpack(path_or_buf, self, **kwargs)

def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
index_label=None):
index_label=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.

Expand All @@ -942,12 +942,15 @@ def to_sql(self, name, con, flavor='sqlite', if_exists='fail', index=True,
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.

"""
from pandas.io import sql
sql.to_sql(
self, name, con, flavor=flavor, if_exists=if_exists, index=index,
index_label=index_label)
index_label=index_label, chunksize=chunksize)

def to_pickle(self, path):
"""
Expand Down
74 changes: 50 additions & 24 deletions pandas/io/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def read_sql(sql, con, index_col=None, coerce_float=True, params=None,


def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
index_label=None):
index_label=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.

Expand All @@ -459,6 +459,9 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
Column label for index column(s). If None is given (default) and
`index` is True, then the index names are used.
A sequence should be given if the DataFrame uses MultiIndex.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a
time. If None, all rows will be written at once.

"""
if if_exists not in ('fail', 'replace', 'append'):
Expand All @@ -472,7 +475,7 @@ def to_sql(frame, name, con, flavor='sqlite', if_exists='fail', index=True,
raise NotImplementedError

pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
index_label=index_label)
index_label=index_label, chunksize=chunksize)


def has_table(table_name, con, flavor='sqlite'):
Expand Down Expand Up @@ -597,18 +600,30 @@ def insert_data(self):

return temp

def insert(self):
def insert(self, chunksize=None):

ins = self.insert_statement()
data_list = []
temp = self.insert_data()
keys = list(map(str, temp.columns))

for t in temp.itertuples():
data = dict((k, self.maybe_asscalar(v))
for k, v in zip(keys, t[1:]))
data_list.append(data)

self.pd_sql.execute(ins, data_list)
nrows = len(temp)
if chunksize is None:
chunksize = nrows
chunks = int(nrows / chunksize) + 1

con = self.pd_sql.engine.connect()
with con.begin() as trans:
for i in range(chunks):
start_i = i * chunksize
end_i = min((i + 1) * chunksize, nrows)
if start_i >= end_i:
break
data_list = []
for t in temp.iloc[start_i:end_i].itertuples():
data = dict((k, self.maybe_asscalar(v))
for k, v in zip(keys, t[1:]))
data_list.append(data)
con.execute(ins, data_list)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if there is a difference between (what you did now):

con = self.pd_sql.engine.connect()
with con.begin() as trans:
    for i in range(chunks):
        ...
        con.execute(ins, data_list)

and

with self.pd_sql.engine.begin() as con:
    for i in range(chunks):
        ...
        con.execute(ins, data_list)

Where the latter is a bit simpler (as we don't directly need to use the trans object). Doing the executes in one transaction should make this atomic?
But from http://docs.sqlalchemy.org/en/rel_0_9/core/connections.html#using-transactions I would understand that both should be equivalent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the later is better but it is not supported by sqlalchemy 0.7 (nor 0.7.4) --- I get aAttributeError: 'Engine' object has no attribute 'begin' error. On 0.8 it works. Let me know if its better to change the version requirements or leave the code as it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then we can just leave it like this (it's only one line longer, and for the rest equivalent). If we raise the required version, we can always clean it up


def read(self, coerce_float=True, parse_dates=None, columns=None):

Expand Down Expand Up @@ -843,11 +858,11 @@ def read_sql(self, sql, index_col=None, coerce_float=True,
return data_frame

def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None):
index_label=None, chunksize=None):
table = PandasSQLTable(
name, self, frame=frame, index=index, if_exists=if_exists,
index_label=index_label)
table.insert()
table.insert(chunksize)

@property
def tables(self):
Expand Down Expand Up @@ -948,19 +963,30 @@ def insert_statement(self):
self.name, col_names, wildcards)
return insert_statement

def insert(self):
def insert(self, chunksize=None):

ins = self.insert_statement()
temp = self.insert_data()
data_list = []

for t in temp.itertuples():
data = tuple((self.maybe_asscalar(v) for v in t[1:]))
data_list.append(data)

cur = self.pd_sql.con.cursor()
cur.executemany(ins, data_list)
cur.close()
self.pd_sql.con.commit()
nrows = len(temp)
if chunksize is None:
chunksize = nrows
chunks = int(nrows / chunksize) + 1

with self.pd_sql.con:
for i in range(chunks):
start_i = i * chunksize
end_i = min((i + 1) * chunksize, nrows)
if start_i >= end_i:
break
data_list = []
for t in temp.iloc[start_i:end_i].itertuples():
data = tuple((self.maybe_asscalar(v) for v in t[1:]))
data_list.append(data)

cur = self.pd_sql.con.cursor()
cur.executemany(ins, data_list)
cur.close()

def _create_table_statement(self):
"Return a CREATE TABLE statement to suit the contents of a DataFrame."
Expand Down Expand Up @@ -1069,7 +1095,7 @@ def _fetchall_as_list(self, cur):
return result

def to_sql(self, frame, name, if_exists='fail', index=True,
index_label=None):
index_label=None, chunksize=None):
"""
Write records stored in a DataFrame to a SQL database.

Expand All @@ -1087,7 +1113,7 @@ def to_sql(self, frame, name, if_exists='fail', index=True,
table = PandasSQLTableLegacy(
name, self, frame=frame, index=index, if_exists=if_exists,
index_label=index_label)
table.insert()
table.insert(chunksize)

def has_table(self, name):
flavor_map = {
Expand Down
8 changes: 8 additions & 0 deletions pandas/io/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,14 @@ def test_roundtrip(self):
result.index.name = None
tm.assert_frame_equal(result, self.test_frame1)

def test_roundtrip_chunksize(self):
sql.to_sql(self.test_frame1, 'test_frame_roundtrip', con=self.conn,
index=False, flavor='sqlite', chunksize=2)
result = sql.read_sql_query(
'SELECT * FROM test_frame_roundtrip',
con=self.conn)
tm.assert_frame_equal(result, self.test_frame1)

def test_execute_sql(self):
# drop_sql = "DROP TABLE IF EXISTS test" # should already be done
iris_results = sql.execute("SELECT * FROM iris", con=self.conn)
Expand Down