Skip to content

Commit 616d306

Browse files
committed
BUG: Fix uploading of dataframes containing int64 and float64 columns
Fixes googleapis#116 and googleapis#96 by loading data in CSV chunks.
1 parent f040c18 commit 616d306

File tree

4 files changed

+108
-33
lines changed

4 files changed

+108
-33
lines changed

pandas_gbq/_load.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
"""Helper methods for loading data into BigQuery"""
2+
3+
from google.cloud import bigquery
4+
import six
5+
6+
7+
def encode_chunk(dataframe):
8+
"""Return a file-like object of CSV-encoded rows.
9+
10+
Args:
11+
dataframe (pandas.DataFrame): A chunk of a dataframe to encode
12+
"""
13+
csv_buffer = six.StringIO()
14+
dataframe.to_csv(
15+
csv_buffer, index=False, header=False, encoding='utf-8',
16+
date_format='%Y-%m-%d %H:%M')
17+
18+
# Convert to a BytesIO buffer so that unicode text is properly handled.
19+
# See: https://github.com/pydata/pandas-gbq/issues/106
20+
body = csv_buffer.getvalue()
21+
if isinstance(body, bytes):
22+
body = body.decode('utf-8')
23+
body = body.encode('utf-8')
24+
return six.BytesIO(body)
25+
26+
27+
def encode_chunks(dataframe, chunksize):
28+
dataframe = dataframe.reset_index(drop=True)
29+
remaining_rows = len(dataframe)
30+
total_rows = remaining_rows
31+
start_index = 0
32+
while start_index < total_rows:
33+
chunk_buffer = encode_chunk(
34+
dataframe[start_index:start_index+chunksize])
35+
start_index += chunksize
36+
remaining_rows = max(0, remaining_rows - chunksize)
37+
yield remaining_rows, chunk_buffer
38+
39+
40+
def load_chunks(client, dataframe, dataset_id, table_id, chunksize):
41+
destination_table = client.dataset(dataset_id).table(table_id)
42+
job_config = bigquery.LoadJobConfig()
43+
job_config.write_disposition = 'WRITE_APPEND'
44+
job_config.source_format = 'CSV'
45+
46+
for remaining_rows, chunk_buffer in encode_chunks(dataframe, chunksize):
47+
yield remaining_rows
48+
client.load_table_from_file(
49+
chunk_buffer,
50+
destination_table,
51+
job_config=job_config).result()

pandas_gbq/gbq.py

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -557,44 +557,18 @@ def run_query(self, query, **kwargs):
557557
return schema, result_rows
558558

559559
def load_data(self, dataframe, dataset_id, table_id, chunksize):
560-
from google.cloud.bigquery import LoadJobConfig
561-
from six import BytesIO
560+
from pandas_gbq import _load
562561

563-
destination_table = self.client.dataset(dataset_id).table(table_id)
564-
job_config = LoadJobConfig()
565-
job_config.write_disposition = 'WRITE_APPEND'
566-
job_config.source_format = 'NEWLINE_DELIMITED_JSON'
567-
rows = []
568-
remaining_rows = len(dataframe)
569-
570-
total_rows = remaining_rows
562+
total_rows = len(dataframe)
571563
self._print("\n\n")
572564

573-
for index, row in dataframe.reset_index(drop=True).iterrows():
574-
row_json = row.to_json(
575-
force_ascii=False, date_unit='s', date_format='iso')
576-
rows.append(row_json)
577-
remaining_rows -= 1
578-
579-
if (len(rows) % chunksize == 0) or (remaining_rows == 0):
565+
try:
566+
for remaining_rows in _load.load_chunks(
567+
self.client, dataframe, dataset_id, table_id, chunksize):
580568
self._print("\rLoad is {0}% Complete".format(
581569
((total_rows - remaining_rows) * 100) / total_rows))
582-
583-
body = '{}\n'.format('\n'.join(rows))
584-
if isinstance(body, bytes):
585-
body = body.decode('utf-8')
586-
body = body.encode('utf-8')
587-
body = BytesIO(body)
588-
589-
try:
590-
self.client.load_table_from_file(
591-
body,
592-
destination_table,
593-
job_config=job_config).result()
594-
except self.http_error as ex:
595-
self.process_http_error(ex)
596-
597-
rows = []
570+
except self.http_error as ex:
571+
self.process_http_error(ex)
598572

599573
self._print("\n")
600574

pandas_gbq/tests/test__load.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
import numpy
3+
import pandas
4+
5+
6+
def test_encode_chunk_with_unicode():
7+
"""Test that a dataframe containing unicode can be encoded as a file.
8+
9+
See: https://github.com/pydata/pandas-gbq/issues/106
10+
"""
11+
from pandas_gbq._load import encode_chunk
12+
13+
df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6),
14+
columns=list('ABCD'))
15+
df['s'] = u'信用卡'
16+
csv_buffer = encode_chunk(df)
17+
csv_bytes = csv_buffer.read()
18+
csv_string = csv_bytes.decode('utf-8')
19+
assert u'信用卡' in csv_string
20+
21+
22+
def test_encode_chunks_splits_dataframe():
23+
from pandas_gbq._load import encode_chunks
24+
df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6))
25+
num_chunks = len(list(encode_chunks(df, 2)))
26+
assert num_chunks == 3

pandas_gbq/tests/test_gbq.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,6 +1218,30 @@ def test_upload_other_unicode_data(self):
12181218

12191219
tm.assert_numpy_array_equal(expected.values, result.values)
12201220

1221+
def test_upload_mixed_float_and_int(self):
1222+
"""Test that we can upload a dataframe containing an int64 and float64 column.
1223+
See: https://github.com/pydata/pandas-gbq/issues/116
1224+
"""
1225+
test_id = "mixed_float_and_int"
1226+
test_size = 2
1227+
df = DataFrame(
1228+
[[1,1.1],[2,2.2]],
1229+
index=['row 1', 'row 2'],
1230+
columns=['intColumn','floatColumn'])
1231+
1232+
gbq.to_gbq(
1233+
df, self.destination_table + test_id,
1234+
_get_project_id(),
1235+
private_key=_get_private_key_path(),
1236+
chunksize=10000)
1237+
1238+
result_df = gbq.read_gbq("SELECT * FROM {0}".format(
1239+
self.destination_table + test_id),
1240+
project_id=_get_project_id(),
1241+
private_key=_get_private_key_path())
1242+
1243+
assert len(result_df) == test_size
1244+
12211245
def test_generate_schema(self):
12221246
df = tm.makeMixedDataFrame()
12231247
schema = gbq._generate_bq_schema(df)

0 commit comments

Comments
 (0)