From ddff15653832eb4f1eb65959beeb699cb745dfb5 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Sun, 21 Jan 2018 21:35:04 +0200 Subject: [PATCH 1/4] Refactor test_parquet.py to use check_round_trip at module level --- pandas/tests/io/test_parquet.py | 161 +++++++++++++++----------------- 1 file changed, 74 insertions(+), 87 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index d472a5ed23c75..dda7423a2aa2c 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -110,48 +110,72 @@ def df_full(): pd.Timestamp('20130103')]}) -def test_invalid_engine(df_compat): +def check_round_trip(df, engine=None, path=None, + write_kwargs=None, read_kwargs=None, + expected=None, check_names=True): - with pytest.raises(ValueError): - df_compat.to_parquet('foo', 'bar') + if write_kwargs is None: + write_kwargs = {'compression': None} + if read_kwargs is None: + read_kwargs = {} -def test_options_py(df_compat, pa): - # use the set option + if engine: + write_kwargs['engine'] = engine + read_kwargs['engine'] = engine - df = df_compat - with tm.ensure_clean() as path: + if expected is None: + expected = df - with pd.option_context('io.parquet.engine', 'pyarrow'): - df.to_parquet(path) + if path is None: + with tm.ensure_clean() as path: + df.to_parquet(path, **write_kwargs) + actual = read_parquet(path, **read_kwargs) + tm.assert_frame_equal(expected, actual, + check_names=check_names) + + # repeat + df.to_parquet(path, **write_kwargs) + actual = read_parquet(path, **read_kwargs) + tm.assert_frame_equal(expected, actual, + check_names=check_names) + else: + df.to_parquet(path, **write_kwargs) + actual = read_parquet(path, **read_kwargs) + tm.assert_frame_equal(expected, actual, + check_names=check_names) + + # repeat + df.to_parquet(path, **write_kwargs) + actual = read_parquet(path, **read_kwargs) + tm.assert_frame_equal(expected, actual, + check_names=check_names) - result = read_parquet(path) - tm.assert_frame_equal(result, df) +def test_invalid_engine(df_compat): + with pytest.raises(ValueError): + check_round_trip(df_compat, 'foo', 'bar') -def test_options_fp(df_compat, fp): + +def test_options_py(df_compat, pa): # use the set option - df = df_compat - with tm.ensure_clean() as path: + with pd.option_context('io.parquet.engine', 'pyarrow'): + check_round_trip(df_compat) - with pd.option_context('io.parquet.engine', 'fastparquet'): - df.to_parquet(path, compression=None) - result = read_parquet(path) - tm.assert_frame_equal(result, df) +def test_options_fp(df_compat, fp): + # use the set option + with pd.option_context('io.parquet.engine', 'fastparquet'): + check_round_trip(df_compat) -def test_options_auto(df_compat, fp, pa): - df = df_compat - with tm.ensure_clean() as path: - - with pd.option_context('io.parquet.engine', 'auto'): - df.to_parquet(path) +def test_options_auto(df_compat, fp, pa): + # use the set option - result = read_parquet(path) - tm.assert_frame_equal(result, df) + with pd.option_context('io.parquet.engine', 'auto'): + check_round_trip(df_compat) def test_options_get_engine(fp, pa): @@ -228,53 +252,23 @@ def check_error_on_write(self, df, engine, exc): with tm.ensure_clean() as path: to_parquet(df, path, engine, compression=None) - def check_round_trip(self, df, engine, expected=None, path=None, - write_kwargs=None, read_kwargs=None, - check_names=True): - - if write_kwargs is None: - write_kwargs = {'compression': None} - - if read_kwargs is None: - read_kwargs = {} - - if expected is None: - expected = df - - if path is None: - with tm.ensure_clean() as path: - check_round_trip_equals(df, path, engine, - write_kwargs=write_kwargs, - read_kwargs=read_kwargs, - expected=expected, - check_names=check_names) - else: - check_round_trip_equals(df, path, engine, - write_kwargs=write_kwargs, - read_kwargs=read_kwargs, - expected=expected, - check_names=check_names) - class TestBasic(Base): def test_error(self, engine): - for obj in [pd.Series([1, 2, 3]), 1, 'foo', pd.Timestamp('20130101'), np.array([1, 2, 3])]: self.check_error_on_write(obj, engine, ValueError) def test_columns_dtypes(self, engine): - df = pd.DataFrame({'string': list('abc'), 'int': list(range(1, 4))}) # unicode df.columns = [u'foo', u'bar'] - self.check_round_trip(df, engine) + check_round_trip(df, engine) def test_columns_dtypes_invalid(self, engine): - df = pd.DataFrame({'string': list('abc'), 'int': list(range(1, 4))}) @@ -302,8 +296,7 @@ def test_compression(self, engine, compression): pytest.importorskip('brotli') df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, - write_kwargs={'compression': compression}) + check_round_trip(df, engine, write_kwargs={'compression': compression}) def test_read_columns(self, engine): # GH18154 @@ -311,8 +304,8 @@ def test_read_columns(self, engine): 'int': list(range(1, 4))}) expected = pd.DataFrame({'string': list('abc')}) - self.check_round_trip(df, engine, expected=expected, - read_kwargs={'columns': ['string']}) + check_round_trip(df, engine, expected=expected, + read_kwargs={'columns': ['string']}) def test_write_index(self, engine): check_names = engine != 'fastparquet' @@ -323,7 +316,7 @@ def test_write_index(self, engine): pytest.skip("pyarrow is < 0.7.0") df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine) + check_round_trip(df, engine) indexes = [ [2, 3, 4], @@ -334,12 +327,12 @@ def test_write_index(self, engine): # non-default index for index in indexes: df.index = index - self.check_round_trip(df, engine, check_names=check_names) + check_round_trip(df, engine, check_names=check_names) # index with meta-data df.index = [0, 1, 2] df.index.name = 'foo' - self.check_round_trip(df, engine) + check_round_trip(df, engine) def test_write_multiindex(self, pa_ge_070): # Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version @@ -348,7 +341,7 @@ def test_write_multiindex(self, pa_ge_070): df = pd.DataFrame({'A': [1, 2, 3]}) index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)]) df.index = index - self.check_round_trip(df, engine) + check_round_trip(df, engine) def test_write_column_multiindex(self, engine): # column multi-index @@ -357,7 +350,6 @@ def test_write_column_multiindex(self, engine): self.check_error_on_write(df, engine, ValueError) def test_multiindex_with_columns(self, pa_ge_070): - engine = pa_ge_070 dates = pd.date_range('01-Jan-2018', '01-Dec-2018', freq='MS') df = pd.DataFrame(np.random.randn(2 * len(dates), 3), @@ -368,14 +360,10 @@ def test_multiindex_with_columns(self, pa_ge_070): index2 = index1.copy(names=None) for index in [index1, index2]: df.index = index - with tm.ensure_clean() as path: - df.to_parquet(path, engine) - result = read_parquet(path, engine) - expected = df - tm.assert_frame_equal(result, expected) - result = read_parquet(path, engine, columns=['A', 'B']) - expected = df[['A', 'B']] - tm.assert_frame_equal(result, expected) + + check_round_trip(df, engine) + check_round_trip(df, engine, read_kwargs={'columns': ['A', 'B']}, + expected=df[['A', 'B']]) class TestParquetPyArrow(Base): @@ -391,7 +379,7 @@ def test_basic(self, pa, df_full): tz='Europe/Brussels') df['bool_with_none'] = [True, None, True] - self.check_round_trip(df, pa) + check_round_trip(df, pa) @pytest.mark.xfail(reason="pyarrow fails on this (ARROW-1883)") def test_basic_subset_columns(self, pa, df_full): @@ -402,8 +390,8 @@ def test_basic_subset_columns(self, pa, df_full): df['datetime_tz'] = pd.date_range('20130101', periods=3, tz='Europe/Brussels') - self.check_round_trip(df, pa, expected=df[['string', 'int']], - read_kwargs={'columns': ['string', 'int']}) + check_round_trip(df, pa, expected=df[['string', 'int']], + read_kwargs={'columns': ['string', 'int']}) def test_duplicate_columns(self, pa): # not currently able to handle duplicate columns @@ -433,7 +421,7 @@ def test_categorical(self, pa_ge_070): # de-serialized as object expected = df.assign(a=df.a.astype(object)) - self.check_round_trip(df, pa, expected) + check_round_trip(df, pa, expected=expected) def test_categorical_unsupported(self, pa_lt_070): pa = pa_lt_070 @@ -444,20 +432,19 @@ def test_categorical_unsupported(self, pa_lt_070): def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 - self.check_round_trip(df_compat, pa, - path='s3://pandas-test/pyarrow.parquet') + check_round_trip(df_compat, pa, + path='s3://pandas-test/pyarrow.parquet') class TestParquetFastParquet(Base): def test_basic(self, fp, df_full): - df = df_full # additional supported types for fastparquet df['timedelta'] = pd.timedelta_range('1 day', periods=3) - self.check_round_trip(df, fp) + check_round_trip(df, fp) @pytest.mark.skip(reason="not supported") def test_duplicate_columns(self, fp): @@ -470,7 +457,7 @@ def test_duplicate_columns(self, fp): def test_bool_with_none(self, fp): df = pd.DataFrame({'a': [True, None, False]}) expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') - self.check_round_trip(df, fp, expected=expected) + check_round_trip(df, fp, expected=expected) def test_unsupported(self, fp): @@ -486,7 +473,7 @@ def test_categorical(self, fp): if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"): pytest.skip("CategoricalDtype not supported for older fp") df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) - self.check_round_trip(df, fp) + check_round_trip(df, fp) def test_datetime_tz(self, fp): # doesn't preserve tz @@ -495,7 +482,7 @@ def test_datetime_tz(self, fp): # warns on the coercion with catch_warnings(record=True): - self.check_round_trip(df, fp, df.astype('datetime64[ns]')) + check_round_trip(df, fp, expected=df.astype('datetime64[ns]')) def test_filter_row_groups(self, fp): d = {'a': list(range(0, 3))} @@ -508,5 +495,5 @@ def test_filter_row_groups(self, fp): def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 - self.check_round_trip(df_compat, fp, - path='s3://pandas-test/fastparquet.parquet') + check_round_trip(df_compat, fp, + path='s3://pandas-test/fastparquet.parquet') From 9342a6f5a3f8b8c816879a9f257b6d71ac2f4077 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Sun, 21 Jan 2018 22:30:51 +0200 Subject: [PATCH 2/4] PR fixes. --- pandas/tests/io/test_parquet.py | 48 ++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index dda7423a2aa2c..6c40ae4762a12 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -112,7 +112,27 @@ def df_full(): def check_round_trip(df, engine=None, path=None, write_kwargs=None, read_kwargs=None, - expected=None, check_names=True): + expected=None, check_names=True, + repeat=2): + """ + Verify parquet serialize and deserialize produce the same results. + + Performs a pandas to disk and disk to pandas round trip, + then compares the 2 resulting DataFrames to verify full + cycle is successful. + + :param df: Dataframe to be serialized to disk + :param engine: str one of ['pyarrow', 'fastparquet'] + :param path: str + :param write_kwargs: dict(str:str) params to be passed to the serialization + engine. + :param read_kwargs: dict(str:str) params to be passed to the + deserialization engine. + :param expected: DataFrame If provides deserialization will be + compared againt it. + :param check_names: list(str) specific columns to be compared + :param repeat No. of times to repeat the test. + """ if write_kwargs is None: write_kwargs = {'compression': None} @@ -127,29 +147,19 @@ def check_round_trip(df, engine=None, path=None, if expected is None: expected = df - if path is None: - with tm.ensure_clean() as path: - df.to_parquet(path, **write_kwargs) - actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) - - # repeat - df.to_parquet(path, **write_kwargs) - actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) - else: + def compare(): df.to_parquet(path, **write_kwargs) actual = read_parquet(path, **read_kwargs) tm.assert_frame_equal(expected, actual, check_names=check_names) - # repeat - df.to_parquet(path, **write_kwargs) - actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) + if path is None: + with tm.ensure_clean() as path: + for _ in range(repeat): + compare() + else: + for _ in range(repeat): + compare() def test_invalid_engine(df_compat): From 0d4f8ce2346008a0d04159a3afc4c8392feb77a7 Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Mon, 22 Jan 2018 22:07:58 +0200 Subject: [PATCH 3/4] numpy doc string --- pandas/tests/io/test_parquet.py | 61 +++++++++++++++------------------ 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 6c40ae4762a12..121ee84c03d5b 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -114,52 +114,47 @@ def check_round_trip(df, engine=None, path=None, write_kwargs=None, read_kwargs=None, expected=None, check_names=True, repeat=2): - """ - Verify parquet serialize and deserialize produce the same results. + """Verify parquet serializer and deserializer produce the same results. Performs a pandas to disk and disk to pandas round trip, - then compares the 2 resulting DataFrames to verify full - cycle is successful. - - :param df: Dataframe to be serialized to disk - :param engine: str one of ['pyarrow', 'fastparquet'] - :param path: str - :param write_kwargs: dict(str:str) params to be passed to the serialization - engine. - :param read_kwargs: dict(str:str) params to be passed to the - deserialization engine. - :param expected: DataFrame If provides deserialization will be - compared againt it. - :param check_names: list(str) specific columns to be compared - :param repeat No. of times to repeat the test. + then compares the 2 resulting DataFrames to verify equality. + + Parameters + ---------- + df: Dataframe + engine: str, optional + 'pyarrow' or 'fastparquet' + path: str, optional + write_kwargs: dict of str:str, optional + read_kwargs: dict of str:str, optional + expected: DataFrame, optional + Expected deserialization result + check_names: list of str, optional + Closed set of column names to be compared + repeat: int, optional + How many times to repeat the test """ - if write_kwargs is None: - write_kwargs = {'compression': None} - - if read_kwargs is None: - read_kwargs = {} + write_kwargs = write_kwargs or {'compression': None} + read_kwargs = read_kwargs or {} + expected = expected or df if engine: write_kwargs['engine'] = engine read_kwargs['engine'] = engine - if expected is None: - expected = df - - def compare(): - df.to_parquet(path, **write_kwargs) - actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, - check_names=check_names) + def compare(repeat): + for _ in range(repeat): + df.to_parquet(path, **write_kwargs) + actual = read_parquet(path, **read_kwargs) + tm.assert_frame_equal(expected, actual, + check_names=check_names) if path is None: with tm.ensure_clean() as path: - for _ in range(repeat): - compare() + compare(repeat) else: - for _ in range(repeat): - compare() + compare(repeat) def test_invalid_engine(df_compat): From f687979dacd3c12bab4743686635a6dbe000c71b Mon Sep 17 00:00:00 2001 From: maxim veksler Date: Tue, 23 Jan 2018 10:56:42 +0200 Subject: [PATCH 4/4] PR review fixes --- pandas/tests/io/test_parquet.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 121ee84c03d5b..8a6a22abe23fa 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -128,7 +128,7 @@ def check_round_trip(df, engine=None, path=None, write_kwargs: dict of str:str, optional read_kwargs: dict of str:str, optional expected: DataFrame, optional - Expected deserialization result + Expected deserialization result, otherwise will be equal to `df` check_names: list of str, optional Closed set of column names to be compared repeat: int, optional @@ -137,7 +137,9 @@ def check_round_trip(df, engine=None, path=None, write_kwargs = write_kwargs or {'compression': None} read_kwargs = read_kwargs or {} - expected = expected or df + + if expected is None: + expected = df if engine: write_kwargs['engine'] = engine