Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions databricks/koalas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

from functools import wraps
from typing import Union, Callable, Any
from distutils.version import LooseVersion

import pyspark
import numpy as np
import pandas as pd
from pandas.api.types import is_list_like
Expand All @@ -33,7 +35,7 @@
from databricks.koalas import numpy_compat
from databricks.koalas.internal import _InternalFrame, SPARK_INDEX_NAME_FORMAT
from databricks.koalas.typedef import pandas_wraps, spark_type_to_pandas_dtype
from databricks.koalas.utils import align_diff_series, scol_for, validate_axis
from databricks.koalas.utils import align_diff_series, scol_for, validate_axis, default_session
from databricks.koalas.frame import DataFrame


Expand Down Expand Up @@ -941,15 +943,23 @@ def value_counts(self, normalize=False, sort=True, ascending=False, bins=None, d
Name: koalas, dtype: int64
"""
from databricks.koalas.series import Series, _col
from databricks.koalas.indexes import MultiIndex
if LooseVersion(pyspark.__version__) < LooseVersion("2.4") and \
default_session().conf.get("spark.sql.execution.arrow.enabled") == "true" and \
isinstance(self, MultiIndex):
raise RuntimeError("if you're using pyspark < 2.4, set conf "
"'spark.sql.execution.arrow.enabled' to 'false' "
"for using this function with MultiIndex")
Comment on lines +946 to +952
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I think we should do this only in MultiIndex by overriding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, that seems better. thanks for the advice!

if bins is not None:
raise NotImplementedError("value_counts currently does not support bins")

if dropna:
sdf_dropna = self._internal._sdf.dropna()
sdf_dropna = self._internal._sdf.select(self._scol).dropna()
else:
sdf_dropna = self._internal._sdf
sdf_dropna = self._internal._sdf.select(self._scol)
index_name = SPARK_INDEX_NAME_FORMAT(0)
sdf = sdf_dropna.groupby(self._scol.alias(index_name)).count()
column_name = self._internal.data_columns[0]
sdf = sdf_dropna.groupby(scol_for(sdf_dropna, column_name).alias(index_name)).count()
if sort:
if ascending:
sdf = sdf.orderBy(F.col('count'))
Expand Down
140 changes: 139 additions & 1 deletion databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import matplotlib
matplotlib.use('agg')
from matplotlib import pyplot as plt
import pyspark
import numpy as np
import pandas as pd

Expand All @@ -33,6 +34,7 @@
from databricks.koalas.exceptions import PandasNotImplementedError
from databricks.koalas.missing.series import _MissingPandasLikeSeries
from databricks.koalas.config import set_option, reset_option
from databricks.koalas.utils import default_session


class SeriesTest(ReusedSQLTestCase, SQLTestUtils):
Expand Down Expand Up @@ -243,7 +245,8 @@ def test_nunique(self):
self.assertEqual(ks.Series(range(100)).nunique(approx=True), 103)
self.assertEqual(ks.Series(range(100)).nunique(approx=True, rsd=0.01), 100)

def test_value_counts(self):
def _test_value_counts(self):
# this is also containing test for Index & MultiIndex
pser = pd.Series([1, 2, 1, 3, 3, np.nan, 1, 4], name="x")
kser = ks.from_pandas(pser)

Expand All @@ -261,6 +264,15 @@ def test_value_counts(self):
self.assert_eq(kser.value_counts(ascending=True, dropna=False),
pser.value_counts(ascending=True, dropna=False), almost=True)

self.assert_eq(kser.index.value_counts(normalize=True),
pser.index.value_counts(normalize=True), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True),
pser.index.value_counts(ascending=True), almost=True)
self.assert_eq(kser.index.value_counts(normalize=True, dropna=False),
pser.index.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True, dropna=False),
pser.index.value_counts(ascending=True, dropna=False), almost=True)

with self.assertRaisesRegex(NotImplementedError,
"value_counts currently does not support bins"):
kser.value_counts(bins=3)
Expand All @@ -269,6 +281,132 @@ def test_value_counts(self):
kser.name = 'index'
self.assert_eq(kser.value_counts(), pser.value_counts(), almost=True)

# Series from DataFrame
pdf = pd.DataFrame({'a': [1, 2, 3], 'b': [None, 1, None]})
kdf = ks.from_pandas(pdf)

self.assert_eq(kdf.a.value_counts(normalize=True),
pdf.a.value_counts(normalize=True), almost=True)
self.assert_eq(kdf.a.value_counts(ascending=True),
pdf.a.value_counts(ascending=True), almost=True)
self.assert_eq(kdf.a.value_counts(normalize=True, dropna=False),
pdf.a.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kdf.a.value_counts(ascending=True, dropna=False),
pdf.a.value_counts(ascending=True, dropna=False), almost=True)

self.assert_eq(kser.index.value_counts(normalize=True),
pser.index.value_counts(normalize=True), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True),
pser.index.value_counts(ascending=True), almost=True)
self.assert_eq(kser.index.value_counts(normalize=True, dropna=False),
pser.index.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True, dropna=False),
pser.index.value_counts(ascending=True, dropna=False), almost=True)

# Series with NaN index
pser = pd.Series([1, 2, 3], index=[2, None, 5])
kser = ks.from_pandas(pser)

self.assert_eq(kser.value_counts(normalize=True),
pser.value_counts(normalize=True), almost=True)
self.assert_eq(kser.value_counts(ascending=True),
pser.value_counts(ascending=True), almost=True)
self.assert_eq(kser.value_counts(normalize=True, dropna=False),
pser.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.value_counts(ascending=True, dropna=False),
pser.value_counts(ascending=True, dropna=False), almost=True)

self.assert_eq(kser.index.value_counts(normalize=True),
pser.index.value_counts(normalize=True), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True),
pser.index.value_counts(ascending=True), almost=True)
self.assert_eq(kser.index.value_counts(normalize=True, dropna=False),
pser.index.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True, dropna=False),
pser.index.value_counts(ascending=True, dropna=False), almost=True)

# Series with MultiIndex
pser.index = pd.MultiIndex.from_tuples([('x', 'a'), ('x', 'b'), ('y', 'c')])
kser = ks.from_pandas(pser)

self.assert_eq(kser.value_counts(normalize=True),
pser.value_counts(normalize=True), almost=True)
self.assert_eq(kser.value_counts(ascending=True),
pser.value_counts(ascending=True), almost=True)
self.assert_eq(kser.value_counts(normalize=True, dropna=False),
pser.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.value_counts(ascending=True, dropna=False),
pser.value_counts(ascending=True, dropna=False), almost=True)

self.assert_eq(kser.index.value_counts(normalize=True),
pser.index.value_counts(normalize=True), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True),
pser.index.value_counts(ascending=True), almost=True)
self.assert_eq(kser.index.value_counts(normalize=True, dropna=False),
pser.index.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True, dropna=False),
pser.index.value_counts(ascending=True, dropna=False), almost=True)

# Series with MultiIndex some of index has NaN
pser.index = pd.MultiIndex.from_tuples([('x', 'a'), ('x', None), ('y', 'c')])
kser = ks.from_pandas(pser)

self.assert_eq(kser.value_counts(normalize=True),
pser.value_counts(normalize=True), almost=True)
self.assert_eq(kser.value_counts(ascending=True),
pser.value_counts(ascending=True), almost=True)
self.assert_eq(kser.value_counts(normalize=True, dropna=False),
pser.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.value_counts(ascending=True, dropna=False),
pser.value_counts(ascending=True, dropna=False), almost=True)

self.assert_eq(kser.index.value_counts(normalize=True),
pser.index.value_counts(normalize=True), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True),
pser.index.value_counts(ascending=True), almost=True)
self.assert_eq(kser.index.value_counts(normalize=True, dropna=False),
pser.index.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True, dropna=False),
pser.index.value_counts(ascending=True, dropna=False), almost=True)

# Series with MultiIndex some of index is NaN.
# This test only available for pandas >= 0.24.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these only for pandas >= 0.24?

Copy link
Contributor Author

@itholic itholic Dec 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because pandas < 0.24 doesn't support None for MultiIndex like below way.

>>> pd.__version__
'0.23.0'
>>> pidx = pd.MultiIndex.from_tuples([('x', 'a'), None, ('y', 'c')])
Traceback (most recent call last):
...
TypeError: object of type 'NoneType' has no len()

so i think this test should be ran on pandas >= 0.24

if LooseVersion(pd.__version__) >= LooseVersion("0.24"):
pser.index = pd.MultiIndex.from_tuples([('x', 'a'), None, ('y', 'c')])
kser = ks.from_pandas(pser)

self.assert_eq(kser.value_counts(normalize=True),
pser.value_counts(normalize=True), almost=True)
self.assert_eq(kser.value_counts(ascending=True),
pser.value_counts(ascending=True), almost=True)
self.assert_eq(kser.value_counts(normalize=True, dropna=False),
pser.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.value_counts(ascending=True, dropna=False),
pser.value_counts(ascending=True, dropna=False), almost=True)

self.assert_eq(kser.index.value_counts(normalize=True),
pser.index.value_counts(normalize=True), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True),
pser.index.value_counts(ascending=True), almost=True)
self.assert_eq(kser.index.value_counts(normalize=True, dropna=False),
pser.index.value_counts(normalize=True, dropna=False), almost=True)
self.assert_eq(kser.index.value_counts(ascending=True, dropna=False),
pser.index.value_counts(ascending=True, dropna=False), almost=True)

def test_value_counts(self):
if LooseVersion(pyspark.__version__) < LooseVersion("2.4") and \
default_session().conf.get("spark.sql.execution.arrow.enabled") == "true":
default_session().conf.set("spark.sql.execution.arrow.enabled", "false")
try:
self._test_value_counts()
finally:
default_session().conf.set("spark.sql.execution.arrow.enabled", "true")
self.assertRaises(
RuntimeError,
lambda: ks.MultiIndex.from_tuples([('x', 'a'), ('x', 'b')]).value_counts())
else:
self._test_value_counts()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you disable arrow execution only when pyspark < 2.4 and for MultiIndex?

Copy link
Contributor Author

@itholic itholic Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay i will 👍
thanks !


def test_nsmallest(self):
sample_lst = [1, 2, 3, 4, np.nan, 6]
pser = pd.Series(sample_lst, name='x')
Expand Down