From 5c0536a7a8b0bfb3ddac5311fba780f196b43440 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Mon, 8 Jan 2018 11:46:48 +0200 Subject: [PATCH 01/25] Open S3FileSystem for writing when called from DataFrame.to_parquet --- pandas/io/common.py | 5 +++-- pandas/io/parquet.py | 2 +- pandas/io/s3.py | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index da60698fe529f..60674b02cb52b 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -169,7 +169,7 @@ def _stringify_path(filepath_or_buffer): def get_filepath_or_buffer(filepath_or_buffer, encoding=None, - compression=None): + compression=None, mode='rb'): """ If the filepath_or_buffer is a url, translate and return the buffer. Otherwise passthrough. @@ -199,7 +199,8 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, from pandas.io import s3 return s3.get_filepath_or_buffer(filepath_or_buffer, encoding=encoding, - compression=compression) + compression=compression, + mode=mode) if isinstance(filepath_or_buffer, (compat.string_types, compat.binary_type, diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 0c88706a3bec2..510972d1b0908 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -107,7 +107,7 @@ def write(self, df, path, compression='snappy', self.validate_dataframe(df) if self._pyarrow_lt_070: self._validate_write_lt_070(df) - path, _, _ = get_filepath_or_buffer(path) + path, _, _ = get_filepath_or_buffer(path, mode='wb') if self._pyarrow_lt_060: table = self.api.Table.from_pandas(df, timestamps_to_ms=True) diff --git a/pandas/io/s3.py b/pandas/io/s3.py index 5e48de757d00e..8036691edb638 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -19,10 +19,10 @@ def _strip_schema(url): def get_filepath_or_buffer(filepath_or_buffer, encoding=None, - compression=None): + compression=None, mode='rb'): fs = s3fs.S3FileSystem(anon=False) try: - filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer)) + filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode) except (OSError, NoCredentialsError): # boto3 has troubles when trying to access a public file # when credentialed... @@ -31,5 +31,5 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, # A NoCredentialsError is raised if you don't have creds # for that bucket. fs = s3fs.S3FileSystem(anon=True) - filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer)) + filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode) return filepath_or_buffer, None, compression From c4b490e186144bb795597d9af9d2209be1d603b7 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 07:23:20 +0200 Subject: [PATCH 02/25] Adding unit test and changelog entry. --- doc/source/whatsnew/v0.23.0.txt | 1 + pandas/tests/io/test_s3.py | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v0.23.0.txt b/doc/source/whatsnew/v0.23.0.txt index dc305f36f32ec..d420063fbf540 100644 --- a/doc/source/whatsnew/v0.23.0.txt +++ b/doc/source/whatsnew/v0.23.0.txt @@ -415,6 +415,7 @@ I/O - Bug in :func:`read_sas` where a file with 0 variables gave an ``AttributeError`` incorrectly. Now it gives an ``EmptyDataError`` (:issue:`18184`) - Bug in :func:`DataFrame.to_latex()` where pairs of braces meant to serve as invisible placeholders were escaped (:issue:`18667`) - Bug in :func:`read_json` where large numeric values were causing an ``OverflowError`` (:issue:`18842`) +- Bug in :func:`DataFrame.to_parquet` where an attempt to write to S3 path fails with ``ValueError`` (:issue:`19134`) - Plotting diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index 8c2a32af33765..07312136edba8 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -1,8 +1,27 @@ -from pandas.io.common import _is_s3_url +import pytest +import pandas as pd +from pandas.io.common import _is_s3_url +from pandas.util import testing as tm class TestS3URL(object): def test_is_s3_url(self): assert _is_s3_url("s3://pandas/somethingelse.com") assert not _is_s3_url("s4://pandas/somethingelse.com") + +class TestIntegration(object): + def test_s3_roundtrip(self): + expected = pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'}) + + boto3 = pytest.importorskip('boto3') + moto = pytest.importorskip('moto') + + with moto.mock_s3(): + conn = boto3.resource("s3", region_name="us-east-1") + conn.create_bucket(Bucket="pandas-test") + + expected.to_parquet('s3://pandas-test/test.parquet') + actual = pd.read_parquet('s3://pandas-test/test.parquet') + + tm.assert_frame_equal(actual, expected) From 57bb814a74415fee05a4b60ff51d9b3c0a858164 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 07:26:11 +0200 Subject: [PATCH 03/25] PEP8 E302 better 2 lines breaks in code then 1 on the tree. --- pandas/tests/io/test_s3.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index 07312136edba8..7a3d45593a80a 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -25,3 +25,4 @@ def test_s3_roundtrip(self): actual = pd.read_parquet('s3://pandas-test/test.parquet') tm.assert_frame_equal(actual, expected) + From e3e948e6248515ee8ed2a521d4713ba80df6aa5b Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 16:37:25 +0200 Subject: [PATCH 04/25] Add mode= documentation for common.py --- pandas/io/common.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandas/io/common.py b/pandas/io/common.py index 60674b02cb52b..5a0bff6ab51e6 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -179,6 +179,7 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), or buffer encoding : the encoding to use to decode py3 bytes, default is 'utf-8' + mode : One of 'rb' or 'wb' or 'ab'. default: 'rb' Returns ------- From 7040ef49360c65774758114c554c06688b01eee4 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 16:39:16 +0200 Subject: [PATCH 05/25] Move test_s3_roundtrip from test_s3.py to test_parquet.py --- pandas/tests/io/test_parquet.py | 17 +++++++++++++++++ pandas/tests/io/test_s3.py | 21 +-------------------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 31c2ded49b7a0..13cb1c4bef20c 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -486,3 +486,20 @@ def test_filter_row_groups(self, fp): row_group_offsets=1) result = read_parquet(path, fp, filters=[('a', '==', 0)]) assert len(result) == 1 + +class TestIntegrationWithS3(Base): + def test_s3_roundtrip(self): + expected = pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'}) + + boto3 = pytest.importorskip('boto3') + moto = pytest.importorskip('moto') + + with moto.mock_s3(): + conn = boto3.resource("s3", region_name="us-east-1") + conn.create_bucket(Bucket="pandas-test") + + expected.to_parquet('s3://pandas-test/test.parquet') + actual = pd.read_parquet('s3://pandas-test/test.parquet') + + tm.assert_frame_equal(actual, expected) + diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index 7a3d45593a80a..a8ebb26f4ac0e 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -1,8 +1,5 @@ -import pytest -import pandas as pd - from pandas.io.common import _is_s3_url -from pandas.util import testing as tm + class TestS3URL(object): @@ -10,19 +7,3 @@ def test_is_s3_url(self): assert _is_s3_url("s3://pandas/somethingelse.com") assert not _is_s3_url("s4://pandas/somethingelse.com") -class TestIntegration(object): - def test_s3_roundtrip(self): - expected = pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'}) - - boto3 = pytest.importorskip('boto3') - moto = pytest.importorskip('moto') - - with moto.mock_s3(): - conn = boto3.resource("s3", region_name="us-east-1") - conn.create_bucket(Bucket="pandas-test") - - expected.to_parquet('s3://pandas-test/test.parquet') - actual = pd.read_parquet('s3://pandas-test/test.parquet') - - tm.assert_frame_equal(actual, expected) - From 2e43ba6127c7df500c1ae0b8d9b1d487c997dde3 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 19:45:04 +0200 Subject: [PATCH 06/25] =?UTF-8?q?Use=20df=5Fcompat=20&=20s3=5Fresource=20p?= =?UTF-8?q?ytest=20fixtures=20=F0=9F=A4=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pandas/tests/io/test_parquet.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 13cb1c4bef20c..3ecdefa277702 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -488,18 +488,11 @@ def test_filter_row_groups(self, fp): assert len(result) == 1 class TestIntegrationWithS3(Base): - def test_s3_roundtrip(self): - expected = pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'}) + def test_s3_roundtrip(self, df_compat, s3_resource): + df_compat.to_parquet('s3://pandas-test/test.parquet') - boto3 = pytest.importorskip('boto3') - moto = pytest.importorskip('moto') + expected = df_compat + actual = pd.read_parquet('s3://pandas-test/test.parquet') - with moto.mock_s3(): - conn = boto3.resource("s3", region_name="us-east-1") - conn.create_bucket(Bucket="pandas-test") - - expected.to_parquet('s3://pandas-test/test.parquet') - actual = pd.read_parquet('s3://pandas-test/test.parquet') - - tm.assert_frame_equal(actual, expected) + tm.assert_frame_equal(expected, actual) From c95a542bee3c90c97105f6b8eadc26a5fd60219d Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 19:53:17 +0200 Subject: [PATCH 07/25] numpy compliant docstring --- pandas/io/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index 5a0bff6ab51e6..54dfd1c17851c 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -179,7 +179,7 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), or buffer encoding : the encoding to use to decode py3 bytes, default is 'utf-8' - mode : One of 'rb' or 'wb' or 'ab'. default: 'rb' + mode : {'rb', 'wb', 'ab'} Returns ------- From e32f0c93928cd8caaa03dc5c2a00406526765f7d Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 19:59:16 +0200 Subject: [PATCH 08/25] issue # in unit test code --- pandas/tests/io/test_parquet.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 3ecdefa277702..0c68cf1f67777 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -489,6 +489,7 @@ def test_filter_row_groups(self, fp): class TestIntegrationWithS3(Base): def test_s3_roundtrip(self, df_compat, s3_resource): + # GH #19134 df_compat.to_parquet('s3://pandas-test/test.parquet') expected = df_compat From 424eb6aade9d0a241fba101c810ec935f819d6d7 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 23:11:17 +0200 Subject: [PATCH 09/25] Test to_parquet only when pyarrow is installed. fastparquet is unsupported. --- pandas/tests/io/test_parquet.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 0c68cf1f67777..29009be7045e4 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -487,13 +487,15 @@ def test_filter_row_groups(self, fp): result = read_parquet(path, fp, filters=[('a', '==', 0)]) assert len(result) == 1 + class TestIntegrationWithS3(Base): - def test_s3_roundtrip(self, df_compat, s3_resource): + def test_s3_roundtrip(self, df_compat, s3_resource, engine): # GH #19134 - df_compat.to_parquet('s3://pandas-test/test.parquet') - expected = df_compat - actual = pd.read_parquet('s3://pandas-test/test.parquet') + if engine == 'pyarrow': + df_compat.to_parquet('s3://pandas-test/test.parquet', engine) - tm.assert_frame_equal(expected, actual) + expected = df_compat + actual = pd.read_parquet('s3://pandas-test/test.parquet', engine) + tm.assert_frame_equal(expected, actual) From 8ed608d9420c964de03fb3caa51b61a615240014 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 23:46:48 +0200 Subject: [PATCH 10/25] Add S3 support to fastparquet (WIP) --- pandas/io/parquet.py | 8 +++++--- pandas/tests/io/test_parquet.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 510972d1b0908..1c10b9e6b5867 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -194,14 +194,16 @@ def write(self, df, path, compression='snappy', **kwargs): # thriftpy/protocol/compact.py:339: # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. - path, _, _ = get_filepath_or_buffer(path) + path, _, _ = get_filepath_or_buffer(path, mode='wb') with catch_warnings(record=True): self.api.write(path, df, - compression=compression, **kwargs) + compression=compression, + open_with=lambda path, mode: path, + **kwargs) def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - parquet_file = self.api.ParquetFile(path) + parquet_file = self.api.ParquetFile(path, open_with=lambda path, mode: path) return parquet_file.to_pandas(columns=columns, **kwargs) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 29009be7045e4..6b6280c1c2c58 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -491,9 +491,9 @@ def test_filter_row_groups(self, fp): class TestIntegrationWithS3(Base): def test_s3_roundtrip(self, df_compat, s3_resource, engine): # GH #19134 - if engine == 'pyarrow': - df_compat.to_parquet('s3://pandas-test/test.parquet', engine) + df_compat.to_parquet('s3://pandas-test/test.parquet', + engine, compression=None) expected = df_compat actual = pd.read_parquet('s3://pandas-test/test.parquet', engine) From 1cf2184216c2da6ee368b9986ebb5732330b3195 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 9 Jan 2018 23:51:31 +0200 Subject: [PATCH 11/25] Refactor test_s3_roundtrip to standalone function. --- pandas/tests/io/test_parquet.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 6b6280c1c2c58..35fe3930d8f78 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -203,6 +203,16 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): result = read_parquet(path, engine=pa, columns=['a', 'd']) tm.assert_frame_equal(result, df[['a', 'd']]) +def test_s3_roundtrip(df_compat, s3_resource, engine): + # GH #19134 + if engine == 'pyarrow': + df_compat.to_parquet('s3://pandas-test/test.parquet', + engine, compression=None) + + expected = df_compat + actual = pd.read_parquet('s3://pandas-test/test.parquet', engine) + + tm.assert_frame_equal(expected, actual) class Base(object): @@ -487,15 +497,3 @@ def test_filter_row_groups(self, fp): result = read_parquet(path, fp, filters=[('a', '==', 0)]) assert len(result) == 1 - -class TestIntegrationWithS3(Base): - def test_s3_roundtrip(self, df_compat, s3_resource, engine): - # GH #19134 - if engine == 'pyarrow': - df_compat.to_parquet('s3://pandas-test/test.parquet', - engine, compression=None) - - expected = df_compat - actual = pd.read_parquet('s3://pandas-test/test.parquet', engine) - - tm.assert_frame_equal(expected, actual) From 401137467587b26a8524d82e06353e8667cd37f9 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Wed, 10 Jan 2018 01:44:28 +0200 Subject: [PATCH 12/25] Revert s3 support PoC changes made in FastParquetImpl --- pandas/io/parquet.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 1c10b9e6b5867..510972d1b0908 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -194,16 +194,14 @@ def write(self, df, path, compression='snappy', **kwargs): # thriftpy/protocol/compact.py:339: # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. - path, _, _ = get_filepath_or_buffer(path, mode='wb') + path, _, _ = get_filepath_or_buffer(path) with catch_warnings(record=True): self.api.write(path, df, - compression=compression, - open_with=lambda path, mode: path, - **kwargs) + compression=compression, **kwargs) def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - parquet_file = self.api.ParquetFile(path, open_with=lambda path, mode: path) + parquet_file = self.api.ParquetFile(path) return parquet_file.to_pandas(columns=columns, **kwargs) From 452104e590b626ec88f62fba397eaf985eeb14b7 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Wed, 10 Jan 2018 01:47:36 +0200 Subject: [PATCH 13/25] Move test_s3_roundtrip into TestParquetPyArrow and add pa fixture --- pandas/tests/io/test_parquet.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 35fe3930d8f78..14d338a2f312d 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -203,16 +203,6 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): result = read_parquet(path, engine=pa, columns=['a', 'd']) tm.assert_frame_equal(result, df[['a', 'd']]) -def test_s3_roundtrip(df_compat, s3_resource, engine): - # GH #19134 - if engine == 'pyarrow': - df_compat.to_parquet('s3://pandas-test/test.parquet', - engine, compression=None) - - expected = df_compat - actual = pd.read_parquet('s3://pandas-test/test.parquet', engine) - - tm.assert_frame_equal(expected, actual) class Base(object): @@ -436,6 +426,16 @@ def test_categorical_unsupported(self, pa_lt_070): df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) self.check_error_on_write(df, pa, NotImplementedError) + def test_s3_roundtrip(self, df_compat, s3_resource, pa): + # GH #19134 + df_compat.to_parquet('s3://pandas-test/test.parquet', + engine=pa, compression=None) + + expected = df_compat + actual = read_parquet('s3://pandas-test/test.parquet', engine=pa) + + tm.assert_frame_equal(expected, actual) + class TestParquetFastParquet(Base): From 230c8148dad1739873489e52d663f8e376bdb814 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Wed, 10 Jan 2018 03:21:03 +0200 Subject: [PATCH 14/25] check_round_trip refactoring to trap on FastParquet wrires to s3. --- pandas/tests/io/test_parquet.py | 80 +++++++++++++++++---------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 14d338a2f312d..506f665f87c50 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -212,28 +212,37 @@ def check_error_on_write(self, df, engine, exc): with tm.ensure_clean() as path: to_parquet(df, path, engine, compression=None) - def check_round_trip(self, df, engine, expected=None, - write_kwargs=None, read_kwargs=None, - check_names=True): + def do_round_trip(self, df, path, engine_impl, expected=None, + write_kwargs=None, read_kwargs=None, + check_names=True): + if write_kwargs is None: - write_kwargs = {} + write_kwargs = {'compression': None} + if read_kwargs is None: read_kwargs = {} - with tm.ensure_clean() as path: - df.to_parquet(path, engine, **write_kwargs) - result = read_parquet(path, engine, **read_kwargs) - if expected is None: - expected = df - tm.assert_frame_equal(result, expected, check_names=check_names) + df.to_parquet(path, engine_impl, **write_kwargs) + actual = read_parquet(path, engine_impl, **read_kwargs) - # repeat - to_parquet(df, path, engine, **write_kwargs) - result = pd.read_parquet(path, engine, **read_kwargs) + if expected is None: + expected = df - if expected is None: - expected = df - tm.assert_frame_equal(result, expected, check_names=check_names) + tm.assert_frame_equal(expected, actual, check_names=check_names) + + def check_round_trip(self, df, engine, expected=None, + write_kwargs=None, read_kwargs=None, + check_names=True): + + with tm.ensure_clean() as path: + self.do_round_trip(df, path, engine, expected, + write_kwargs=write_kwargs, read_kwargs=read_kwargs, + check_names=check_names) + + # repeat + self.do_round_trip(df, path, engine, expected, + write_kwargs=write_kwargs, read_kwargs=read_kwargs, + check_names=check_names) class TestBasic(Base): @@ -251,7 +260,7 @@ def test_columns_dtypes(self, engine): # unicode df.columns = [u'foo', u'bar'] - self.check_round_trip(df, engine, write_kwargs={'compression': None}) + self.check_round_trip(df, engine) def test_columns_dtypes_invalid(self, engine): @@ -292,7 +301,6 @@ def test_read_columns(self, engine): expected = pd.DataFrame({'string': list('abc')}) self.check_round_trip(df, engine, expected=expected, - write_kwargs={'compression': None}, read_kwargs={'columns': ['string']}) def test_write_index(self, engine): @@ -304,7 +312,7 @@ def test_write_index(self, engine): pytest.skip("pyarrow is < 0.7.0") df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, write_kwargs={'compression': None}) + self.check_round_trip(df, engine) indexes = [ [2, 3, 4], @@ -315,15 +323,12 @@ def test_write_index(self, engine): # non-default index for index in indexes: df.index = index - self.check_round_trip( - df, engine, - write_kwargs={'compression': None}, - check_names=check_names) + self.check_round_trip(df, engine, check_names=check_names) # index with meta-data df.index = [0, 1, 2] df.index.name = 'foo' - self.check_round_trip(df, engine, write_kwargs={'compression': None}) + self.check_round_trip(df, engine) def test_write_multiindex(self, pa_ge_070): # Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version @@ -332,7 +337,7 @@ def test_write_multiindex(self, pa_ge_070): df = pd.DataFrame({'A': [1, 2, 3]}) index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)]) df.index = index - self.check_round_trip(df, engine, write_kwargs={'compression': None}) + self.check_round_trip(df, engine) def test_write_column_multiindex(self, engine): # column multi-index @@ -428,13 +433,7 @@ def test_categorical_unsupported(self, pa_lt_070): def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 - df_compat.to_parquet('s3://pandas-test/test.parquet', - engine=pa, compression=None) - - expected = df_compat - actual = read_parquet('s3://pandas-test/test.parquet', engine=pa) - - tm.assert_frame_equal(expected, actual) + self.do_round_trip(df_compat, 's3://pandas-test/test.parquet', pa) class TestParquetFastParquet(Base): @@ -446,7 +445,7 @@ def test_basic(self, fp, df_full): # additional supported types for fastparquet df['timedelta'] = pd.timedelta_range('1 day', periods=3) - self.check_round_trip(df, fp, write_kwargs={'compression': None}) + self.check_round_trip(df, fp) @pytest.mark.skip(reason="not supported") def test_duplicate_columns(self, fp): @@ -459,8 +458,7 @@ def test_duplicate_columns(self, fp): def test_bool_with_none(self, fp): df = pd.DataFrame({'a': [True, None, False]}) expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') - self.check_round_trip(df, fp, expected=expected, - write_kwargs={'compression': None}) + self.check_round_trip(df, fp, expected=expected) def test_unsupported(self, fp): @@ -476,7 +474,7 @@ def test_categorical(self, fp): if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"): pytest.skip("CategoricalDtype not supported for older fp") df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) - self.check_round_trip(df, fp, write_kwargs={'compression': None}) + self.check_round_trip(df, fp) def test_datetime_tz(self, fp): # doesn't preserve tz @@ -485,8 +483,7 @@ def test_datetime_tz(self, fp): # warns on the coercion with catch_warnings(record=True): - self.check_round_trip(df, fp, df.astype('datetime64[ns]'), - write_kwargs={'compression': None}) + self.check_round_trip(df, fp, df.astype('datetime64[ns]')) def test_filter_row_groups(self, fp): d = {'a': list(range(0, 3))} @@ -497,3 +494,10 @@ def test_filter_row_groups(self, fp): result = read_parquet(path, fp, filters=[('a', '==', 0)]) assert len(result) == 1 + def test_s3_roundtrip(self, df_compat, s3_resource, fp): + print(s3_resource, fp) + + # GH #19134 + with pytest.raises(TypeError): + self.do_round_trip(df_compat, 's3://pandas-test/test.parquet', fp) + From 026ecc7ebb988b9178d5da44f03136817c1f42ae Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Thu, 11 Jan 2018 23:03:09 +0200 Subject: [PATCH 15/25] FastParquet should fail when s3 write attempt detected. --- pandas/io/common.py | 16 +++++----------- pandas/io/parquet.py | 6 +++++- pandas/io/s3.py | 14 +++++++++++++- pandas/tests/io/test_parquet.py | 15 +++++++-------- pandas/tests/io/test_s3.py | 6 +++--- 5 files changed, 33 insertions(+), 24 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index 54dfd1c17851c..eb5e5398211b7 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -11,6 +11,7 @@ from pandas.io.formats.printing import pprint_thing from pandas.core.common import AbstractMethodError from pandas.core.dtypes.common import is_number, is_file_like +from pandas.io.s3 import is_s3_url # compat from pandas.errors import (ParserError, DtypeWarning, # noqa @@ -91,14 +92,6 @@ def _is_url(url): return False -def _is_s3_url(url): - """Check for an s3, s3n, or s3a url""" - try: - return parse_url(url).scheme in ['s3', 's3n', 's3a'] - except: - return False - - def _expand_user(filepath_or_buffer): """Return the argument with an initial component of ~ or ~user replaced by that user's home directory. @@ -169,7 +162,7 @@ def _stringify_path(filepath_or_buffer): def get_filepath_or_buffer(filepath_or_buffer, encoding=None, - compression=None, mode='rb'): + compression=None, mode=None): """ If the filepath_or_buffer is a url, translate and return the buffer. Otherwise passthrough. @@ -179,7 +172,8 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), or buffer encoding : the encoding to use to decode py3 bytes, default is 'utf-8' - mode : {'rb', 'wb', 'ab'} + mode : {'rb', 'wb', 'ab'} applies to S3 where a write mandates opening the + file in 'wb' mode. Returns ------- @@ -196,7 +190,7 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, reader = BytesIO(req.read()) return reader, encoding, compression - if _is_s3_url(filepath_or_buffer): + if is_s3_url(filepath_or_buffer): from pandas.io import s3 return s3.get_filepath_or_buffer(filepath_or_buffer, encoding=encoding, diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 510972d1b0908..838cf2f8b6976 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -6,7 +6,7 @@ from pandas.compat import string_types from pandas.core.common import AbstractMethodError from pandas.io.common import get_filepath_or_buffer - +from pandas.io.s3 import is_s3_url def get_engine(engine): """ return our implementation """ @@ -190,6 +190,10 @@ def __init__(self): self.api = fastparquet def write(self, df, path, compression='snappy', **kwargs): + if is_s3_url(path): + raise NotImplementedError("fastparquet s3 write is not implemented." + " Consider using pyarrow instead.") + self.validate_dataframe(df) # thriftpy/protocol/compact.py:339: # DeprecationWarning: tostring() is deprecated. diff --git a/pandas/io/s3.py b/pandas/io/s3.py index 8036691edb638..fc2092972f1dc 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -18,8 +18,20 @@ def _strip_schema(url): return result.netloc + result.path +def is_s3_url(url): + """Check for an s3, s3n, or s3a url""" + try: + return parse_url(url).scheme in ['s3', 's3n', 's3a'] + except: + return False + + def get_filepath_or_buffer(filepath_or_buffer, encoding=None, - compression=None, mode='rb'): + compression=None, mode=None): + + if mode is None: + mode = 'rb' + fs = s3fs.S3FileSystem(anon=False) try: filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer), mode) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 506f665f87c50..21a2f5fa62813 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -236,12 +236,14 @@ def check_round_trip(self, df, engine, expected=None, with tm.ensure_clean() as path: self.do_round_trip(df, path, engine, expected, - write_kwargs=write_kwargs, read_kwargs=read_kwargs, + write_kwargs=write_kwargs, + read_kwargs=read_kwargs, check_names=check_names) # repeat self.do_round_trip(df, path, engine, expected, - write_kwargs=write_kwargs, read_kwargs=read_kwargs, + write_kwargs=write_kwargs, + read_kwargs=read_kwargs, check_names=check_names) @@ -433,7 +435,7 @@ def test_categorical_unsupported(self, pa_lt_070): def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 - self.do_round_trip(df_compat, 's3://pandas-test/test.parquet', pa) + self.do_round_trip(df_compat, 's3://pandas-test/pyarrow.parquet', pa) class TestParquetFastParquet(Base): @@ -495,9 +497,6 @@ def test_filter_row_groups(self, fp): assert len(result) == 1 def test_s3_roundtrip(self, df_compat, s3_resource, fp): - print(s3_resource, fp) - # GH #19134 - with pytest.raises(TypeError): - self.do_round_trip(df_compat, 's3://pandas-test/test.parquet', fp) - + with pytest.raises(NotImplementedError): + self.do_round_trip(df_compat, 's3://pandas-test/fastparquet.parquet', fp) diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index a8ebb26f4ac0e..1c7d17f4804ce 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -1,9 +1,9 @@ -from pandas.io.common import _is_s3_url +from pandas.io.s3 import is_s3_url class TestS3URL(object): def test_is_s3_url(self): - assert _is_s3_url("s3://pandas/somethingelse.com") - assert not _is_s3_url("s4://pandas/somethingelse.com") + assert is_s3_url("s3://pandas/somethingelse.com") + assert not is_s3_url("s4://pandas/somethingelse.com") From 56081c977fcdd4b61786f9ad3f95bde13c9db09e Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Sat, 13 Jan 2018 19:09:26 +0200 Subject: [PATCH 16/25] FastParquet should fail when s3 write attempt detected. --- doc/source/whatsnew/v0.23.0.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v0.23.0.txt b/doc/source/whatsnew/v0.23.0.txt index d420063fbf540..2f39c23126f3a 100644 --- a/doc/source/whatsnew/v0.23.0.txt +++ b/doc/source/whatsnew/v0.23.0.txt @@ -415,7 +415,7 @@ I/O - Bug in :func:`read_sas` where a file with 0 variables gave an ``AttributeError`` incorrectly. Now it gives an ``EmptyDataError`` (:issue:`18184`) - Bug in :func:`DataFrame.to_latex()` where pairs of braces meant to serve as invisible placeholders were escaped (:issue:`18667`) - Bug in :func:`read_json` where large numeric values were causing an ``OverflowError`` (:issue:`18842`) -- Bug in :func:`DataFrame.to_parquet` where an attempt to write to S3 path fails with ``ValueError`` (:issue:`19134`) +- Bug in :func:`DataFrame.to_parquet` exception is thrown if write destination is S3 (:issue:`19134`) - Plotting From 6122373de1f34f145a8041e8f978e2f474d4281f Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Sat, 13 Jan 2018 19:56:50 +0200 Subject: [PATCH 17/25] Revert check_round_trip(), drop do_round_trip(). --- pandas/tests/io/test_parquet.py | 46 ++++++++++++++++----------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 21a2f5fa62813..0df5bd5db34a3 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -212,9 +212,9 @@ def check_error_on_write(self, df, engine, exc): with tm.ensure_clean() as path: to_parquet(df, path, engine, compression=None) - def do_round_trip(self, df, path, engine_impl, expected=None, - write_kwargs=None, read_kwargs=None, - check_names=True): + def check_round_trip(self, df, engine, expected=None, path=None, + write_kwargs=None, read_kwargs=None, + check_names=True): if write_kwargs is None: write_kwargs = {'compression': None} @@ -222,30 +222,28 @@ def do_round_trip(self, df, path, engine_impl, expected=None, if read_kwargs is None: read_kwargs = {} - df.to_parquet(path, engine_impl, **write_kwargs) - actual = read_parquet(path, engine_impl, **read_kwargs) - if expected is None: expected = df - tm.assert_frame_equal(expected, actual, check_names=check_names) - - def check_round_trip(self, df, engine, expected=None, - write_kwargs=None, read_kwargs=None, - check_names=True): - - with tm.ensure_clean() as path: - self.do_round_trip(df, path, engine, expected, - write_kwargs=write_kwargs, - read_kwargs=read_kwargs, - check_names=check_names) + if path is None: + with tm.ensure_clean() as path: + df.to_parquet(path, engine, **write_kwargs) + actual = read_parquet(path, engine, **read_kwargs) + tm.assert_frame_equal(expected, actual, check_names=check_names) + + # repeat + df.to_parquet(path, engine, **write_kwargs) + actual = read_parquet(path, engine, **read_kwargs) + tm.assert_frame_equal(expected, actual, check_names=check_names) + else: + df.to_parquet(path, engine, **write_kwargs) + actual = read_parquet(path, engine, **read_kwargs) + tm.assert_frame_equal(expected, actual, check_names=check_names) # repeat - self.do_round_trip(df, path, engine, expected, - write_kwargs=write_kwargs, - read_kwargs=read_kwargs, - check_names=check_names) - + df.to_parquet(path, engine, **write_kwargs) + actual = read_parquet(path, engine, **read_kwargs) + tm.assert_frame_equal(expected, actual, check_names=check_names) class TestBasic(Base): @@ -435,7 +433,7 @@ def test_categorical_unsupported(self, pa_lt_070): def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 - self.do_round_trip(df_compat, 's3://pandas-test/pyarrow.parquet', pa) + self.check_round_trip(df_compat, pa, path='s3://pandas-test/pyarrow.parquet') class TestParquetFastParquet(Base): @@ -499,4 +497,4 @@ def test_filter_row_groups(self, fp): def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 with pytest.raises(NotImplementedError): - self.do_round_trip(df_compat, 's3://pandas-test/fastparquet.parquet', fp) + self.check_round_trip(df_compat, fp, path='s3://pandas-test/fastparquet.parquet') From 70adb42ffe4cad0dbaceca1a821facc8524f9bd8 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Sat, 13 Jan 2018 20:03:15 +0200 Subject: [PATCH 18/25] flake8. --- pandas/io/parquet.py | 3 ++- pandas/tests/io/test_parquet.py | 19 +++++++++++++------ pandas/tests/io/test_s3.py | 1 - 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 838cf2f8b6976..b08b92f945479 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -8,6 +8,7 @@ from pandas.io.common import get_filepath_or_buffer from pandas.io.s3 import is_s3_url + def get_engine(engine): """ return our implementation """ @@ -191,7 +192,7 @@ def __init__(self): def write(self, df, path, compression='snappy', **kwargs): if is_s3_url(path): - raise NotImplementedError("fastparquet s3 write is not implemented." + raise NotImplementedError("fastparquet s3 write isn't implemented." " Consider using pyarrow instead.") self.validate_dataframe(df) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 0df5bd5db34a3..44f1d6a0b4cdc 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -229,21 +229,26 @@ def check_round_trip(self, df, engine, expected=None, path=None, with tm.ensure_clean() as path: df.to_parquet(path, engine, **write_kwargs) actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names) + tm.assert_frame_equal(expected, actual, + check_names=check_names) # repeat df.to_parquet(path, engine, **write_kwargs) actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names) + tm.assert_frame_equal(expected, actual, + check_names=check_names) else: df.to_parquet(path, engine, **write_kwargs) actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names) + tm.assert_frame_equal(expected, actual, + check_names=check_names) # repeat df.to_parquet(path, engine, **write_kwargs) actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names) + tm.assert_frame_equal(expected, actual, + check_names=check_names) + class TestBasic(Base): @@ -433,7 +438,8 @@ def test_categorical_unsupported(self, pa_lt_070): def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 - self.check_round_trip(df_compat, pa, path='s3://pandas-test/pyarrow.parquet') + self.check_round_trip(df_compat, pa, + path='s3://pandas-test/pyarrow.parquet') class TestParquetFastParquet(Base): @@ -497,4 +503,5 @@ def test_filter_row_groups(self, fp): def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 with pytest.raises(NotImplementedError): - self.check_round_trip(df_compat, fp, path='s3://pandas-test/fastparquet.parquet') + self.check_round_trip(df_compat, fp, + path='s3://pandas-test/fastparquet.parquet') diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index 1c7d17f4804ce..e1141cd9a3aa0 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -6,4 +6,3 @@ class TestS3URL(object): def test_is_s3_url(self): assert is_s3_url("s3://pandas/somethingelse.com") assert not is_s3_url("s4://pandas/somethingelse.com") - From 87614c2cbf85f56b44444f201734b8770de3396c Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Sun, 14 Jan 2018 16:33:00 +0200 Subject: [PATCH 19/25] flake8. --- pandas/io/common.py | 9 ++++++++- pandas/io/parquet.py | 3 +-- pandas/io/s3.py | 8 -------- pandas/tests/io/test_s3.py | 2 +- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index eb5e5398211b7..a058eecb431e0 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -11,7 +11,6 @@ from pandas.io.formats.printing import pprint_thing from pandas.core.common import AbstractMethodError from pandas.core.dtypes.common import is_number, is_file_like -from pandas.io.s3 import is_s3_url # compat from pandas.errors import (ParserError, DtypeWarning, # noqa @@ -161,6 +160,14 @@ def _stringify_path(filepath_or_buffer): return filepath_or_buffer +def is_s3_url(url): + """Check for an s3, s3n, or s3a url""" + try: + return parse_url(url).scheme in ['s3', 's3n', 's3a'] + except: + return False + + def get_filepath_or_buffer(filepath_or_buffer, encoding=None, compression=None, mode=None): """ diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index b08b92f945479..3f3f472317637 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -5,8 +5,7 @@ from pandas import DataFrame, RangeIndex, Int64Index, get_option from pandas.compat import string_types from pandas.core.common import AbstractMethodError -from pandas.io.common import get_filepath_or_buffer -from pandas.io.s3 import is_s3_url +from pandas.io.common import get_filepath_or_buffer, is_s3_url def get_engine(engine): diff --git a/pandas/io/s3.py b/pandas/io/s3.py index fc2092972f1dc..e2650e29c0db3 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -18,14 +18,6 @@ def _strip_schema(url): return result.netloc + result.path -def is_s3_url(url): - """Check for an s3, s3n, or s3a url""" - try: - return parse_url(url).scheme in ['s3', 's3n', 's3a'] - except: - return False - - def get_filepath_or_buffer(filepath_or_buffer, encoding=None, compression=None, mode=None): diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index e1141cd9a3aa0..7a3062f470ce8 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -1,4 +1,4 @@ -from pandas.io.s3 import is_s3_url +from pandas.io.common import is_s3_url class TestS3URL(object): From 55c575d70671bec2eff682c1f09f458da07d0e7f Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Mon, 15 Jan 2018 13:41:39 +0200 Subject: [PATCH 20/25] Explain why fastparquet is unimplemented. --- pandas/tests/io/test_parquet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 44f1d6a0b4cdc..287e266cd0084 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -503,5 +503,7 @@ def test_filter_row_groups(self, fp): def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 with pytest.raises(NotImplementedError): + # Known limitation, fastparquet doesn't support writing to S3. + # https://git.io/vNCcz self.check_round_trip(df_compat, fp, path='s3://pandas-test/fastparquet.parquet') From 556c43f74f590a56d3102f6badeefab22f8a0e19 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 16 Jan 2018 01:15:19 +0200 Subject: [PATCH 21/25] pandas now knows how to play /w fastparquet on s3 (hackish..) --- pandas/io/parquet.py | 21 ++++++++++++++------- pandas/tests/io/test_parquet.py | 7 ++----- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 3f3f472317637..1d50189c5ba92 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -190,22 +190,29 @@ def __init__(self): self.api = fastparquet def write(self, df, path, compression='snappy', **kwargs): - if is_s3_url(path): - raise NotImplementedError("fastparquet s3 write isn't implemented." - " Consider using pyarrow instead.") - self.validate_dataframe(df) # thriftpy/protocol/compact.py:339: # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. - path, _, _ = get_filepath_or_buffer(path) + + if is_s3_url(path): + path, _, _ = get_filepath_or_buffer(path, mode='wb') + kwargs['open_with'] = lambda path, _: path + else: + path, _, _ = get_filepath_or_buffer(path) + with catch_warnings(record=True): self.api.write(path, df, compression=compression, **kwargs) def read(self, path, columns=None, **kwargs): - path, _, _ = get_filepath_or_buffer(path) - parquet_file = self.api.ParquetFile(path) + if is_s3_url(path): + s3, _, _ = get_filepath_or_buffer(path) + parquet_file = self.api.ParquetFile(path, open_with=s3.s3.open) + else: + path, _, _ = get_filepath_or_buffer(path) + parquet_file = self.api.ParquetFile(path) + return parquet_file.to_pandas(columns=columns, **kwargs) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 287e266cd0084..04e9e5b73e640 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -502,8 +502,5 @@ def test_filter_row_groups(self, fp): def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 - with pytest.raises(NotImplementedError): - # Known limitation, fastparquet doesn't support writing to S3. - # https://git.io/vNCcz - self.check_round_trip(df_compat, fp, - path='s3://pandas-test/fastparquet.parquet') + self.check_round_trip(df_compat, fp, + path='s3://pandas-test/fastparquet.parquet') From 22f1ae593a71d99f44b7a978f6cfe780e73ec894 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 16 Jan 2018 17:30:59 +0200 Subject: [PATCH 22/25] PR review fixes. --- pandas/io/common.py | 5 ++-- pandas/io/parquet.py | 6 +++++ pandas/tests/io/test_parquet.py | 41 +++++++++++++++++---------------- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index a058eecb431e0..79b9a60c58c54 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -179,12 +179,11 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), or buffer encoding : the encoding to use to decode py3 bytes, default is 'utf-8' - mode : {'rb', 'wb', 'ab'} applies to S3 where a write mandates opening the - file in 'wb' mode. + mode : str, optional applies when opening S3 destinations for writing Returns ------- - a filepath_or_buffer, the encoding, the compression + a filepath_ or buffer or S3File instance, the encoding, the compression """ filepath_or_buffer = _stringify_path(filepath_or_buffer) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 1d50189c5ba92..79fc6a391eb03 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -196,7 +196,10 @@ def write(self, df, path, compression='snappy', **kwargs): # Use tobytes() instead. if is_s3_url(path): + # path is s3:// so we need to open the s3file in 'wb' mode. + # TODO: Support 'ab' path, _, _ = get_filepath_or_buffer(path, mode='wb') + # And pass the opened s3file to the fastparquet internal impl. kwargs['open_with'] = lambda path, _: path else: path, _, _ = get_filepath_or_buffer(path) @@ -207,6 +210,9 @@ def write(self, df, path, compression='snappy', **kwargs): def read(self, path, columns=None, **kwargs): if is_s3_url(path): + # When path is s3:// an S3File is returned. + # We need to retain the original path(str) while also + # pass the S3File().open function to fsatparquet impl. s3, _, _ = get_filepath_or_buffer(path) parquet_file = self.api.ParquetFile(path, open_with=s3.s3.open) else: diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 04e9e5b73e640..7b04fea0eb212 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -204,6 +204,21 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): tm.assert_frame_equal(result, df[['a', 'd']]) +def check_round_trip_equals(df, path, engine, + write_kwargs, read_kwargs, + expected, check_names): + + df.to_parquet(path, engine, **write_kwargs) + actual = read_parquet(path, engine, **read_kwargs) + tm.assert_frame_equal(expected, actual, + check_names=check_names) + + # repeat + df.to_parquet(path, engine, **write_kwargs) + actual = read_parquet(path, engine, **read_kwargs) + tm.assert_frame_equal(expected, actual, + check_names=check_names) + class Base(object): def check_error_on_write(self, df, engine, exc): @@ -227,27 +242,13 @@ def check_round_trip(self, df, engine, expected=None, path=None, if path is None: with tm.ensure_clean() as path: - df.to_parquet(path, engine, **write_kwargs) - actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) - - # repeat - df.to_parquet(path, engine, **write_kwargs) - actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) + check_round_trip_equals(df, path, engine, + write_kwargs=write_kwargs, read_kwargs=read_kwargs, + expected=expected, check_names=check_names) else: - df.to_parquet(path, engine, **write_kwargs) - actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) - - # repeat - df.to_parquet(path, engine, **write_kwargs) - actual = read_parquet(path, engine, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) + check_round_trip_equals(df, path, engine, + write_kwargs=write_kwargs, read_kwargs=read_kwargs, + expected=expected, check_names=check_names) class TestBasic(Base): From ce11de682880108d7d012ee81e46fec8b02a0576 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 16 Jan 2018 20:13:53 +0200 Subject: [PATCH 23/25] flake8 --- pandas/io/common.py | 2 +- pandas/tests/io/test_parquet.py | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index 79b9a60c58c54..03df68a2b4152 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -164,7 +164,7 @@ def is_s3_url(url): """Check for an s3, s3n, or s3a url""" try: return parse_url(url).scheme in ['s3', 's3n', 's3a'] - except: + except: # noqa return False diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 7b04fea0eb212..d472a5ed23c75 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -205,8 +205,8 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): def check_round_trip_equals(df, path, engine, - write_kwargs, read_kwargs, - expected, check_names): + write_kwargs, read_kwargs, + expected, check_names): df.to_parquet(path, engine, **write_kwargs) actual = read_parquet(path, engine, **read_kwargs) @@ -219,6 +219,7 @@ def check_round_trip_equals(df, path, engine, tm.assert_frame_equal(expected, actual, check_names=check_names) + class Base(object): def check_error_on_write(self, df, engine, exc): @@ -243,12 +244,16 @@ def check_round_trip(self, df, engine, expected=None, path=None, if path is None: with tm.ensure_clean() as path: check_round_trip_equals(df, path, engine, - write_kwargs=write_kwargs, read_kwargs=read_kwargs, - expected=expected, check_names=check_names) + write_kwargs=write_kwargs, + read_kwargs=read_kwargs, + expected=expected, + check_names=check_names) else: check_round_trip_equals(df, path, engine, - write_kwargs=write_kwargs, read_kwargs=read_kwargs, - expected=expected, check_names=check_names) + write_kwargs=write_kwargs, + read_kwargs=read_kwargs, + expected=expected, + check_names=check_names) class TestBasic(Base): From 900a1c42fe4449ede38863e02964a69282711a7f Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Wed, 17 Jan 2018 02:25:07 +0200 Subject: [PATCH 24/25] Documentation fixes. --- doc/source/whatsnew/v0.23.0.txt | 2 +- pandas/io/common.py | 2 +- pandas/io/parquet.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/doc/source/whatsnew/v0.23.0.txt b/doc/source/whatsnew/v0.23.0.txt index 2f39c23126f3a..a0ce50b881915 100644 --- a/doc/source/whatsnew/v0.23.0.txt +++ b/doc/source/whatsnew/v0.23.0.txt @@ -415,7 +415,7 @@ I/O - Bug in :func:`read_sas` where a file with 0 variables gave an ``AttributeError`` incorrectly. Now it gives an ``EmptyDataError`` (:issue:`18184`) - Bug in :func:`DataFrame.to_latex()` where pairs of braces meant to serve as invisible placeholders were escaped (:issue:`18667`) - Bug in :func:`read_json` where large numeric values were causing an ``OverflowError`` (:issue:`18842`) -- Bug in :func:`DataFrame.to_parquet` exception is thrown if write destination is S3 (:issue:`19134`) +- Bug in :func:`DataFrame.to_parquet` where an exception was raised if the write destination is S3 (:issue:`19134`) - Plotting diff --git a/pandas/io/common.py b/pandas/io/common.py index 03df68a2b4152..747b370de920c 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -179,7 +179,7 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), or buffer encoding : the encoding to use to decode py3 bytes, default is 'utf-8' - mode : str, optional applies when opening S3 destinations for writing + mode : str, optional Returns ------- diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 79fc6a391eb03..e28e53a840e3b 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -198,6 +198,7 @@ def write(self, df, path, compression='snappy', **kwargs): if is_s3_url(path): # path is s3:// so we need to open the s3file in 'wb' mode. # TODO: Support 'ab' + path, _, _ = get_filepath_or_buffer(path, mode='wb') # And pass the opened s3file to the fastparquet internal impl. kwargs['open_with'] = lambda path, _: path From 9dbc77cefd3eb53d96daaa99596765725ad8392e Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Wed, 17 Jan 2018 12:46:59 +0200 Subject: [PATCH 25/25] PEP8 (also need to restart build, CI queue dropped) --- pandas/io/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index 747b370de920c..c2d1da5a1035d 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -164,7 +164,7 @@ def is_s3_url(url): """Check for an s3, s3n, or s3a url""" try: return parse_url(url).scheme in ['s3', 's3n', 's3a'] - except: # noqa + except: # noqa return False