-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
BUG: read_parquet, to_parquet for s3 destinations #19135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Could you add
- tests (with a reference to the issue)
- a release note in whatsnew/v0.23.0.txt
We have some other tests that use moto
. Search for those to see how to structure them and lmk if you need any guidance.
pandas/io/common.py
Outdated
@@ -169,7 +169,7 @@ def _stringify_path(filepath_or_buffer): | |||
|
|||
|
|||
def get_filepath_or_buffer(filepath_or_buffer, encoding=None, | |||
compression=None): | |||
compression=None, mode='rb'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add this to the Parameters section of the docstring, and note that it's only really used for S3 files.
Codecov Report
@@ Coverage Diff @@
## master #19135 +/- ##
==========================================
+ Coverage 91.52% 91.56% +0.04%
==========================================
Files 147 148 +1
Lines 48775 48882 +107
==========================================
+ Hits 44639 44759 +120
+ Misses 4136 4123 -13
Continue to review full report at Codecov.
|
Hello @maximveksler! Thanks for updating the PR. Cheers ! There are no PEP8 issues in this Pull Request. 🍻 Comment last updated on January 17, 2018 at 10:47 Hours UTC |
@TomAugspurger any help on why unit test can't find pyarrow / fastparquet ? |
Moto had some issues yesterday. Looking into it in a bit.
Looks like there are some listing errors too.
…________________________________
From: Maxim Veksler <[email protected]>
Sent: Tuesday, January 9, 2018 1:21:17 AM
To: pandas-dev/pandas
Cc: Tom Augspurger; Mention
Subject: Re: [pandas-dev/pandas] Fixes S3 to_parquet write to new path (#19135)
@TomAugspurger<https://github.com/tomaugspurger> any help on why unit test can't find pyarrow / fastparquet ?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub<#19135 (comment)>, or mute the thread<https://github.com/notifications/unsubscribe-auth/ABQHIv72IMcFH-b9uk7XSJiGi4c_qIbuks5tIxNtgaJpZM4RWU04>.
|
Doesn't look like a moto issue, more like a unit test environment configuration. But I might be wrong here.. had just a quick glance and couldn't spot the issue, |
More specifically
|
pandas/tests/io/test_s3.py
Outdated
|
||
class TestS3URL(object): | ||
|
||
def test_is_s3_url(self): | ||
assert _is_s3_url("s3://pandas/somethingelse.com") | ||
assert not _is_s3_url("s4://pandas/somethingelse.com") | ||
|
||
class TestIntegration(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to pandas/tests/io/test_parquet.py
and make a new class TestS3
, that's marked with tm.network
.
Then your test methods should take an argument s3_resource
to use that fixture:
pandas/pandas/tests/io/conftest.py
Line 29 in c753e1e
def s3_resource(tips_file, jsonl_file): |
That will take care of all the skipping / mocking for you. You just have to write the test at that point.
pandas/tests/io/test_parquet.py
Outdated
@@ -486,3 +486,20 @@ def test_filter_row_groups(self, fp): | |||
row_group_offsets=1) | |||
result = read_parquet(path, fp, filters=[('a', '==', 0)]) | |||
assert len(result) == 1 | |||
|
|||
class TestIntegrationWithS3(Base): | |||
def test_s3_roundtrip(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should take an s3_resource
.
Then remove everything below except for
+ expected = pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'})
+ expected.to_parquet('s3://pandas-test/test.parquet')
+ actual = pd.read_parquet('s3://pandas-test/test.parquet')
+
+ tm.assert_frame_equal(actual, expected)
+
pandas/tests/io/test_s3.py
Outdated
@@ -6,3 +6,4 @@ class TestS3URL(object): | |||
def test_is_s3_url(self): | |||
assert _is_s3_url("s3://pandas/somethingelse.com") | |||
assert not _is_s3_url("s4://pandas/somethingelse.com") | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause the linter to fail, not sure.
pandas/io/common.py
Outdated
@@ -179,6 +179,7 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, | |||
filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), | |||
or buffer | |||
encoding : the encoding to use to decode py3 bytes, default is 'utf-8' | |||
mode : One of 'rb' or 'wb' or 'ab'. default: 'rb' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jreback thoughts on this parameter? I think it'd be better to just remove it, and hardcode mode='wb' in the call to s3.get_filepath_or_buffer
down below. That's essentially what we do for URLs with the BytesIO
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jreback hard coding leads to failure to read from S3, an exception from s3fs https://github.com/dask/s3fs/blob/master/s3fs/core.py#L1005
I've decided to add it all the way in the call chain precisely for this reason. It might be possible to change s3fs implementation because from what I know S3 assets don't have a read/write notion in them, or split pandas code into 2 get_readble_filepath_or_buffer
and get_writable_filepath_or_buffer
but I don't feel I know the code base well enough judge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes sorry I missed that.
In that case let's update the docstring to be numpydoc compliant: http://numpydoc.readthedocs.io/en/latest/format.html#sections
mode : {'rb', 'wb', 'ab'}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NP, my pleasure.
yeah this parameter should not be in he common call rather in the s3 specific |
Looks like you'll need to skip the test if the engine isn't available. I think that |
You should just have to accept an |
Hey guys, It's my 3rd attempt to fix the unit test. Not too excited to spam commit history with CI a/b testing attempts :) Any way to reproduce the failing CI locally? |
@TomAugspurger cool, looking into and thanks! Q - If tests are failing because code can't find pyarrow / fastparquet and the fixture will cause the test to skip the unit test when it can't be found then... won't it defeat the whole purpose of the unit test ?! Can't we just we have the CI server install pyarrow/fastparquet and make it available to the pytest instead? |
The failure at https://circleci.com/gh/pandas-dev/pandas/8985#tests/containers/2 was because pyarrow / fastparquet weren't installed, and the test wasn't skipped. If you take an engine argument (and pass it through to the writer / reader) it should be OK. To test locally, uninstall both pyarrow and fastparquet. The test should be skipped. |
We do on some of our builds. But we also need to make sure pandas works without pyarrrow / fastparquet, so not all of our builds have them installed. |
Tom, regarding your comment on pyarrow understanding S3FileSystem. I think you're right, because fastparquet seems to not understand them. I'm getting
Should I just add an exception in the |
Hmm, what version of s3fs and fastparquet do you have locally? I pulled your branch, added the pytest /Users/taugspurger/Envs/pandas-dev/lib/python3.6/site-packages/pandas/pandas/tests/io/test_parquet.py -k test_s3_roundtrip -v -rsx
=================================================================== test session starts ====================================================================
platform darwin -- Python 3.6.1, pytest-3.3.1, py-1.5.2, pluggy-0.6.0 -- /Users/taugspurger/Envs/pandas-dev/bin/python3.6
cachedir: .cache
rootdir: /Users/taugspurger/Envs/pandas-dev/lib/python3.6/site-packages/pandas, inifile: setup.cfg
plugins: xdist-1.15.0, rerunfailures-2.2, repeat-0.4.1, cov-2.5.1, annotate-1.0.0, hypothesis-3.44.9
collected 44 items
pandas/tests/io/test_parquet.py::TestIntegrationWithS3::test_s3_roundtrip[fastparquet] PASSED [ 50%]
pandas/tests/io/test_parquet.py::TestIntegrationWithS3::test_s3_roundtrip[pyarrow] PASSED [100%]
=================================================================== 42 tests deselected ====================================================================
========================================================= 2 passed, 42 deselected in 1.70 seconds ========================================================== |
FYI, you may want to run |
obtained with I find it odd that the test is passing for you. It should have failed because I now discovered Could you please verify that the Last - thanks for the flake8 tip, will do now. |
|
My apologies, I forgot to pass Here's my current diff. +
class TestIntegrationWithS3(Base):
- def test_s3_roundtrip(self, df_compat, s3_resource):
+ def test_s3_roundtrip(self, df_compat, s3_resource, engine):
# GH #19134
- df_compat.to_parquet('s3://pandas-test/test.parquet')
+ key = 's3://pandas-test/test-{}.parquet'.format(engine)
+ df_compat.to_parquet(key, engine=engine)
expected = df_compat
- actual = pd.read_parquet('s3://pandas-test/test.parquet')
+ actual = pd.read_parquet(key, engine=engine)
tm.assert_frame_equal(expected, actual)
- |
Yeah, fastparquet fails even with the fix.
I think i'll leave it as a known limitation. |
@maximveksler this is doable. In the fastparquet writer, do - path, _, _ = get_filepath_or_buffer(path)
+ path, _, _ = get_filepath_or_buffer(path, mode='wb')
with catch_warnings(record=True):
self.api.write(path, df,
- compression=compression, **kwargs)
+ compression=compression,
+ open_with=lambda path, mode: path,
+ **kwargs) That's basically telling fastparquet "we already have an open file, just use it". |
pandas/tests/io/test_parquet.py
Outdated
|
||
if engine == 'pyarrow': | ||
df_compat.to_parquet('s3://pandas-test/test.parquet', engine) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don’t inherit Base nore use a class
make just a function
use the fixtures instead of engine directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jreback how about now? Am I not using the fixture already? Not sure what you mean by that.
@TomAugspurger please see latest commit, I've added fastparquet write and it seems to work (had to disable snappy) but read I can't seem to be able to get to work without changes in |
@martindurant what's the easiest way to open a |
No, there is no way to simply pass a file-file object to
where s3 is a S3FileSystem. |
lgtm. ping when changed & green. |
@maximveksler keep in mind, PR's can take a while to get merged. we have quite a lot and quite a bit of activity. All PR's need review and feedback time. |
@jreback got ya, NP.. |
pandas/io/parquet.py
Outdated
@@ -190,6 +190,10 @@ def __init__(self): | |||
self.api = fastparquet | |||
|
|||
def write(self, df, path, compression='snappy', **kwargs): | |||
if is_s3_url(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this if block, go from S3File
-> url
path = 's3://{}'.format(path.path)
kwargs['open_with'] = path.s3.open
See if that works?
pandas/io/common.py
Outdated
@@ -179,6 +179,8 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, | |||
filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), | |||
or buffer | |||
encoding : the encoding to use to decode py3 bytes, default is 'utf-8' | |||
mode : {'rb', 'wb', 'ab'} applies to S3 where a write mandates opening the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no this is just distracting as it only applies to s3 and is simply a pass thru option. pls change.
pandas/tests/io/test_parquet.py
Outdated
|
||
# repeat | ||
to_parquet(df, path, engine, **write_kwargs) | ||
result = pd.read_parquet(path, engine, **read_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls do it my way
pandas/io/parquet.py
Outdated
parquet_file = self.api.ParquetFile(path) | ||
if is_s3_url(path): | ||
s3, _, _ = get_filepath_or_buffer(path) | ||
parquet_file = self.api.ParquetFile(path, open_with=s3.s3.open) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out guys, that we had both a write and a read problem for when using fastparquet through S3Filesystem
. We now should have a good test coverage of both use cases, and a workable implementation. (hands crossed).
doc/source/whatsnew/v0.23.0.txt
Outdated
@@ -415,6 +415,7 @@ I/O | |||
- Bug in :func:`read_sas` where a file with 0 variables gave an ``AttributeError`` incorrectly. Now it gives an ``EmptyDataError`` (:issue:`18184`) | |||
- Bug in :func:`DataFrame.to_latex()` where pairs of braces meant to serve as invisible placeholders were escaped (:issue:`18667`) | |||
- Bug in :func:`read_json` where large numeric values were causing an ``OverflowError`` (:issue:`18842`) | |||
- Bug in :func:`DataFrame.to_parquet` exception is thrown if write destination is S3 (:issue:`19134`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where exception was raised
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several possible exceptions in the fail chain.
3 different components
- S3Filesystem,
- pyarrow writer,
- fastparquet reader & writer.
pyarrow - write attempt
FileNotFoundException or ValueError (depends on if file exists in S3 or not).
fastparquet - read attempt
Exception in attempting to concat str and S3File
fastparquet - write attempt
Exception in attempting to open path using default_open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where an exception was raised if the write destination is S3.
@@ -194,14 +194,25 @@ def write(self, df, path, compression='snappy', **kwargs): | |||
# thriftpy/protocol/compact.py:339: | |||
# DeprecationWarning: tostring() is deprecated. | |||
# Use tobytes() instead. | |||
path, _, _ = get_filepath_or_buffer(path) | |||
|
|||
if is_s3_url(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment on what is happening here
pandas/io/parquet.py
Outdated
path, _, _ = get_filepath_or_buffer(path) | ||
parquet_file = self.api.ParquetFile(path) | ||
if is_s3_url(path): | ||
s3, _, _ = get_filepath_or_buffer(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason not to directly call the s3.get_filepath_or_buffer here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not familiar with pandas code, i'm frankly not sure about existing design where common.py#get_filepath_or_buffer
returns s3file, or str, of buffer
but I'm holding back from making too many changes in my first PR... so I prefer to continue using what is implemented and working in for ex. PyArrowImpl#read
where reading from s3
works.
pandas/tests/io/test_parquet.py
Outdated
|
||
# repeat | ||
to_parquet(df, path, engine, **write_kwargs) | ||
result = pd.read_parquet(path, engine, **read_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs updating
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very small doc corrections. ping on green.
doc/source/whatsnew/v0.23.0.txt
Outdated
@@ -415,6 +415,7 @@ I/O | |||
- Bug in :func:`read_sas` where a file with 0 variables gave an ``AttributeError`` incorrectly. Now it gives an ``EmptyDataError`` (:issue:`18184`) | |||
- Bug in :func:`DataFrame.to_latex()` where pairs of braces meant to serve as invisible placeholders were escaped (:issue:`18667`) | |||
- Bug in :func:`read_json` where large numeric values were causing an ``OverflowError`` (:issue:`18842`) | |||
- Bug in :func:`DataFrame.to_parquet` exception is thrown if write destination is S3 (:issue:`19134`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where an exception was raised if the write destination is S3.
pandas/io/common.py
Outdated
@@ -179,10 +179,11 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, | |||
filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), | |||
or buffer | |||
encoding : the encoding to use to decode py3 bytes, default is 'utf-8' | |||
mode : str, optional applies when opening S3 destinations for writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mode : str, optional
pandas/io/parquet.py
Outdated
# path is s3:// so we need to open the s3file in 'wb' mode. | ||
# TODO: Support 'ab' | ||
path, _, _ = get_filepath_or_buffer(path, mode='wb') | ||
# And pass the opened s3file to the fastparquet internal impl. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blank line here
lgtm @maximveksler ping on green (may be a while as travis is currently in backlog on mac biulds) |
@jreback NP, will do. |
@jreback, @TomAugspurger i'm refactoring |
@maximveksler this PR is ok |
@jreback looks like travis is back online, could you please rerun the build ? |
Looks like it's queued: https://travis-ci.org/pandas-dev/pandas/pull_requests |
thanks @maximveksler nice patch! |
Appreciate the guidance and feedback loops @TomAugspurger @jreback @martindurant |
git diff upstream/master -u -- "*.py" | flake8 --diff
closes #19134