Skip to content

BUG: read_parquet, to_parquet for s3 destinations #19135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jan 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5c0536a
Open S3FileSystem for writing when called from DataFrame.to_parquet
Jan 8, 2018
c4b490e
Adding unit test and changelog entry.
Jan 9, 2018
57bb814
PEP8 E302 better 2 lines breaks in code then 1 on the tree.
Jan 9, 2018
e3e948e
Add mode= documentation for common.py
Jan 9, 2018
7040ef4
Move test_s3_roundtrip from test_s3.py to test_parquet.py
Jan 9, 2018
2e43ba6
Use df_compat & s3_resource pytest fixtures 🤗
Jan 9, 2018
c95a542
numpy compliant docstring
Jan 9, 2018
e32f0c9
issue # in unit test code
Jan 9, 2018
424eb6a
Test to_parquet only when pyarrow is installed. fastparquet is unsupp…
Jan 9, 2018
8ed608d
Add S3 support to fastparquet (WIP)
Jan 9, 2018
1cf2184
Refactor test_s3_roundtrip to standalone function.
Jan 9, 2018
4011374
Revert s3 support PoC changes made in FastParquetImpl
Jan 9, 2018
452104e
Move test_s3_roundtrip into TestParquetPyArrow and add pa fixture
Jan 9, 2018
230c814
check_round_trip refactoring to trap on FastParquet wrires to s3.
Jan 10, 2018
026ecc7
FastParquet should fail when s3 write attempt detected.
Jan 11, 2018
56081c9
FastParquet should fail when s3 write attempt detected.
Jan 13, 2018
6122373
Revert check_round_trip(), drop do_round_trip().
Jan 13, 2018
70adb42
flake8.
Jan 13, 2018
87614c2
flake8.
Jan 14, 2018
55c575d
Explain why fastparquet is unimplemented.
Jan 15, 2018
556c43f
pandas now knows how to play /w fastparquet on s3 (hackish..)
Jan 15, 2018
22f1ae5
PR review fixes.
Jan 16, 2018
ce11de6
flake8
Jan 16, 2018
900a1c4
Documentation fixes.
Jan 17, 2018
9dbc77c
PEP8 (also need to restart build, CI queue dropped)
Jan 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.23.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ I/O
- Bug in :func:`read_sas` where a file with 0 variables gave an ``AttributeError`` incorrectly. Now it gives an ``EmptyDataError`` (:issue:`18184`)
- Bug in :func:`DataFrame.to_latex()` where pairs of braces meant to serve as invisible placeholders were escaped (:issue:`18667`)
- Bug in :func:`read_json` where large numeric values were causing an ``OverflowError`` (:issue:`18842`)
- Bug in :func:`DataFrame.to_parquet` where an exception was raised if the write destination is S3 (:issue:`19134`)
-

Plotting
Expand Down
26 changes: 14 additions & 12 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ def _is_url(url):
return False


def _is_s3_url(url):
"""Check for an s3, s3n, or s3a url"""
try:
return parse_url(url).scheme in ['s3', 's3n', 's3a']
except:
return False


def _expand_user(filepath_or_buffer):
"""Return the argument with an initial component of ~ or ~user
replaced by that user's home directory.
Expand Down Expand Up @@ -168,8 +160,16 @@ def _stringify_path(filepath_or_buffer):
return filepath_or_buffer


def is_s3_url(url):
"""Check for an s3, s3n, or s3a url"""
try:
return parse_url(url).scheme in ['s3', 's3n', 's3a']
except: # noqa
return False


def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
compression=None):
compression=None, mode=None):
"""
If the filepath_or_buffer is a url, translate and return the buffer.
Otherwise passthrough.
Expand All @@ -179,10 +179,11 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path),
or buffer
encoding : the encoding to use to decode py3 bytes, default is 'utf-8'
mode : str, optional

Returns
-------
a filepath_or_buffer, the encoding, the compression
a filepath_ or buffer or S3File instance, the encoding, the compression
"""
filepath_or_buffer = _stringify_path(filepath_or_buffer)

Expand All @@ -195,11 +196,12 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
reader = BytesIO(req.read())
return reader, encoding, compression

if _is_s3_url(filepath_or_buffer):
if is_s3_url(filepath_or_buffer):
from pandas.io import s3
return s3.get_filepath_or_buffer(filepath_or_buffer,
encoding=encoding,
compression=compression)
compression=compression,
mode=mode)

if isinstance(filepath_or_buffer, (compat.string_types,
compat.binary_type,
Expand Down
28 changes: 23 additions & 5 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pandas import DataFrame, RangeIndex, Int64Index, get_option
from pandas.compat import string_types
from pandas.core.common import AbstractMethodError
from pandas.io.common import get_filepath_or_buffer
from pandas.io.common import get_filepath_or_buffer, is_s3_url


def get_engine(engine):
Expand Down Expand Up @@ -107,7 +107,7 @@ def write(self, df, path, compression='snappy',
self.validate_dataframe(df)
if self._pyarrow_lt_070:
self._validate_write_lt_070(df)
path, _, _ = get_filepath_or_buffer(path)
path, _, _ = get_filepath_or_buffer(path, mode='wb')

if self._pyarrow_lt_060:
table = self.api.Table.from_pandas(df, timestamps_to_ms=True)
Expand Down Expand Up @@ -194,14 +194,32 @@ def write(self, df, path, compression='snappy', **kwargs):
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.
path, _, _ = get_filepath_or_buffer(path)

if is_s3_url(path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment on what is happening here

# path is s3:// so we need to open the s3file in 'wb' mode.
# TODO: Support 'ab'

path, _, _ = get_filepath_or_buffer(path, mode='wb')
# And pass the opened s3file to the fastparquet internal impl.
kwargs['open_with'] = lambda path, _: path
else:
path, _, _ = get_filepath_or_buffer(path)

with catch_warnings(record=True):
self.api.write(path, df,
compression=compression, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path)
if is_s3_url(path):
# When path is s3:// an S3File is returned.
# We need to retain the original path(str) while also
# pass the S3File().open function to fsatparquet impl.
s3, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path, open_with=s3.s3.open)
else:
path, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path)

return parquet_file.to_pandas(columns=columns, **kwargs)


Expand Down
10 changes: 7 additions & 3 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ def _strip_schema(url):


def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
compression=None):
compression=None, mode=None):

if mode is None:
mode = 'rb'

fs = s3fs.S3FileSystem(anon=False)
try:
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer))
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
except (OSError, NoCredentialsError):
# boto3 has troubles when trying to access a public file
# when credentialed...
Expand All @@ -31,5 +35,5 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer))
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode)
return filepath_or_buffer, None, compression
84 changes: 54 additions & 30 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp):
tm.assert_frame_equal(result, df[['a', 'd']])


def check_round_trip_equals(df, path, engine,
write_kwargs, read_kwargs,
expected, check_names):

df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual,
check_names=check_names)

# repeat
df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual,
check_names=check_names)


class Base(object):

def check_error_on_write(self, df, engine, exc):
Expand All @@ -212,28 +228,32 @@ def check_error_on_write(self, df, engine, exc):
with tm.ensure_clean() as path:
to_parquet(df, path, engine, compression=None)

def check_round_trip(self, df, engine, expected=None,
def check_round_trip(self, df, engine, expected=None, path=None,
write_kwargs=None, read_kwargs=None,
check_names=True):

if write_kwargs is None:
write_kwargs = {}
write_kwargs = {'compression': None}

if read_kwargs is None:
read_kwargs = {}
with tm.ensure_clean() as path:
df.to_parquet(path, engine, **write_kwargs)
result = read_parquet(path, engine, **read_kwargs)

if expected is None:
expected = df
tm.assert_frame_equal(result, expected, check_names=check_names)

# repeat
to_parquet(df, path, engine, **write_kwargs)
result = pd.read_parquet(path, engine, **read_kwargs)
if expected is None:
expected = df

if expected is None:
expected = df
tm.assert_frame_equal(result, expected, check_names=check_names)
if path is None:
with tm.ensure_clean() as path:
check_round_trip_equals(df, path, engine,
write_kwargs=write_kwargs,
read_kwargs=read_kwargs,
expected=expected,
check_names=check_names)
else:
check_round_trip_equals(df, path, engine,
write_kwargs=write_kwargs,
read_kwargs=read_kwargs,
expected=expected,
check_names=check_names)


class TestBasic(Base):
Expand All @@ -251,7 +271,7 @@ def test_columns_dtypes(self, engine):

# unicode
df.columns = [u'foo', u'bar']
self.check_round_trip(df, engine, write_kwargs={'compression': None})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason you are changing all of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes, I've took the opportunity to do small refactoring in test_parquet.py.

test_parquet.py Base.check_round_trip had some code that was both repeating twice and that I needed to test my new functionality of doing round_trip when s3 is the destination. So refactored it into do_round_trip.

Now i'm using do_round_trip in both the new units add to test writing/reading parquet files from s3 and in the rest of the unit testing code that continue ti call the existing check_round_trip which uses local file system as it's default destination when calling do_round_trip.

So now we have check_round_trip that calls do_round_trip passing it an engine_impl and a local file system path and my new unit tests under TestParquetPyArrow calling do_round_trip with a pyarrow engine impl and 's3://pandas-test/pyarrow.parquet' and TestParquetFastParquet which calls do_round_trip passing it a fastparquet engine impl and an 's3://pandas-test/fastparquet.parquet' (but expecting a NotImplementedError exception).

As for write_kwargs = {'compression': None} - It was repeating in some of the unit tests in the class. So as part of the refactoring I took the opportunity to set it as a default, if write_kwargs is None.

self.check_round_trip(df, engine)

def test_columns_dtypes_invalid(self, engine):

Expand Down Expand Up @@ -292,7 +312,6 @@ def test_read_columns(self, engine):

expected = pd.DataFrame({'string': list('abc')})
self.check_round_trip(df, engine, expected=expected,
write_kwargs={'compression': None},
read_kwargs={'columns': ['string']})

def test_write_index(self, engine):
Expand All @@ -304,7 +323,7 @@ def test_write_index(self, engine):
pytest.skip("pyarrow is < 0.7.0")

df = pd.DataFrame({'A': [1, 2, 3]})
self.check_round_trip(df, engine, write_kwargs={'compression': None})
self.check_round_trip(df, engine)

indexes = [
[2, 3, 4],
Expand All @@ -315,15 +334,12 @@ def test_write_index(self, engine):
# non-default index
for index in indexes:
df.index = index
self.check_round_trip(
df, engine,
write_kwargs={'compression': None},
check_names=check_names)
self.check_round_trip(df, engine, check_names=check_names)

# index with meta-data
df.index = [0, 1, 2]
df.index.name = 'foo'
self.check_round_trip(df, engine, write_kwargs={'compression': None})
self.check_round_trip(df, engine)

def test_write_multiindex(self, pa_ge_070):
# Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version
Expand All @@ -332,7 +348,7 @@ def test_write_multiindex(self, pa_ge_070):
df = pd.DataFrame({'A': [1, 2, 3]})
index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)])
df.index = index
self.check_round_trip(df, engine, write_kwargs={'compression': None})
self.check_round_trip(df, engine)

def test_write_column_multiindex(self, engine):
# column multi-index
Expand Down Expand Up @@ -426,6 +442,11 @@ def test_categorical_unsupported(self, pa_lt_070):
df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
self.check_error_on_write(df, pa, NotImplementedError)

def test_s3_roundtrip(self, df_compat, s3_resource, pa):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the same test to FP, but assert that it raises.

# GH #19134
self.check_round_trip(df_compat, pa,
path='s3://pandas-test/pyarrow.parquet')


class TestParquetFastParquet(Base):

Expand All @@ -436,7 +457,7 @@ def test_basic(self, fp, df_full):
# additional supported types for fastparquet
df['timedelta'] = pd.timedelta_range('1 day', periods=3)

self.check_round_trip(df, fp, write_kwargs={'compression': None})
self.check_round_trip(df, fp)

@pytest.mark.skip(reason="not supported")
def test_duplicate_columns(self, fp):
Expand All @@ -449,8 +470,7 @@ def test_duplicate_columns(self, fp):
def test_bool_with_none(self, fp):
df = pd.DataFrame({'a': [True, None, False]})
expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16')
self.check_round_trip(df, fp, expected=expected,
write_kwargs={'compression': None})
self.check_round_trip(df, fp, expected=expected)

def test_unsupported(self, fp):

Expand All @@ -466,7 +486,7 @@ def test_categorical(self, fp):
if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"):
pytest.skip("CategoricalDtype not supported for older fp")
df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
self.check_round_trip(df, fp, write_kwargs={'compression': None})
self.check_round_trip(df, fp)

def test_datetime_tz(self, fp):
# doesn't preserve tz
Expand All @@ -475,8 +495,7 @@ def test_datetime_tz(self, fp):

# warns on the coercion
with catch_warnings(record=True):
self.check_round_trip(df, fp, df.astype('datetime64[ns]'),
write_kwargs={'compression': None})
self.check_round_trip(df, fp, df.astype('datetime64[ns]'))

def test_filter_row_groups(self, fp):
d = {'a': list(range(0, 3))}
Expand All @@ -486,3 +505,8 @@ def test_filter_row_groups(self, fp):
row_group_offsets=1)
result = read_parquet(path, fp, filters=[('a', '==', 0)])
assert len(result) == 1

def test_s3_roundtrip(self, df_compat, s3_resource, fp):
# GH #19134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment on why this is NI

self.check_round_trip(df_compat, fp,
path='s3://pandas-test/fastparquet.parquet')
6 changes: 3 additions & 3 deletions pandas/tests/io/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from pandas.io.common import _is_s3_url
from pandas.io.common import is_s3_url


class TestS3URL(object):

def test_is_s3_url(self):
assert _is_s3_url("s3://pandas/somethingelse.com")
assert not _is_s3_url("s4://pandas/somethingelse.com")
assert is_s3_url("s3://pandas/somethingelse.com")
assert not is_s3_url("s4://pandas/somethingelse.com")