Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions databricks/koalas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ def abs(self):

# TODO: by argument only support the grouping name and as_index only for now. Documentation
# should be updated when it's supported.
def groupby(self, by, axis=0, as_index: bool = True):
def groupby(self, by, axis=0, as_index: bool = True, dropna: bool = True):
"""
Group DataFrame or Series using a Series of columns.

Expand All @@ -1483,6 +1483,10 @@ def groupby(self, by, axis=0, as_index: bool = True):
For aggregated output, return object with group labels as the
index. Only relevant for DataFrame input. as_index=False is
effectively "SQL-style" grouped output.
dropna : bool, default True
If True, and if group keys contain NA values,
NA values together with row/column will be dropped.
If False, NA values will also be treated as the key in groups.

Returns
-------
Expand Down Expand Up @@ -1518,6 +1522,24 @@ def groupby(self, by, axis=0, as_index: bool = True):
Animal Max Speed
...Falcon 375.0
...Parrot 25.0

We can also choose to include NA in group keys or not by setting dropna parameter,
the default setting is True:

>>> l = [[1, 2, 3], [1, None, 4], [2, 1, 3], [1, 2, 2]]
>>> df = ks.DataFrame(l, columns=["a", "b", "c"])
>>> df.groupby(by=["b"]).sum() # doctest: +NORMALIZE_WHITESPACE
a c
b
1.0 2 3
2.0 2 5

>>> df.groupby(by=["b"], dropna=False).sum() # doctest: +NORMALIZE_WHITESPACE
a c
b
NaN 1 4
1.0 2 3
2.0 2 5
"""
from databricks.koalas.groupby import DataFrameGroupBy, SeriesGroupBy

Expand Down Expand Up @@ -1560,9 +1582,9 @@ def groupby(self, by, axis=0, as_index: bool = True):
raise NotImplementedError('axis should be either 0 or "index" currently.')

if isinstance(self, ks.DataFrame):
return DataFrameGroupBy._build(self, by, as_index=as_index)
return DataFrameGroupBy._build(self, by, as_index=as_index, dropna=dropna)
elif isinstance(self, ks.Series):
return SeriesGroupBy._build(self, by, as_index=as_index)
return SeriesGroupBy._build(self, by, as_index=as_index, dropna=dropna)
else:
raise TypeError(
"Constructor expects DataFrame or Series; however, " "got [%s]" % (self,)
Expand Down
35 changes: 29 additions & 6 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ def aggregate(self, func_or_funcs=None, *args, **kwargs):
func_or_funcs = OrderedDict([(col, func_or_funcs) for col in agg_cols])

kdf = DataFrame(GroupBy._spark_groupby(self._kdf, func_or_funcs, self._groupkeys))

if self._dropna:
kdf = DataFrame(
kdf._internal.with_new_sdf(
kdf._internal.spark_frame.dropna(subset=kdf._internal.index_spark_column_names)
)
)

if not self._as_index:
should_drop_index = set(
i for i, gkey in enumerate(self._groupkeys) if gkey._kdf is not self._kdf
Expand Down Expand Up @@ -2276,6 +2284,14 @@ def _reduce_for_stat_function(self, sfun, only_numeric):
column_label_names=self._kdf._internal.column_label_names,
)
kdf = DataFrame(internal)

if self._dropna:
kdf = DataFrame(
kdf._internal.with_new_sdf(
kdf._internal.spark_frame.dropna(subset=kdf._internal.index_spark_column_names)
)
)

if not self._as_index:
should_drop_index = set(
i for i, gkey in enumerate(self._groupkeys) if gkey._kdf is not self._kdf
Expand Down Expand Up @@ -2381,7 +2397,7 @@ def _resolve_grouping(kdf: DataFrame, by: List[Union[Series, Tuple[str, ...]]])
class DataFrameGroupBy(GroupBy):
@staticmethod
def _build(
kdf: DataFrame, by: List[Union[Series, Tuple[str, ...]]], as_index: bool
kdf: DataFrame, by: List[Union[Series, Tuple[str, ...]]], as_index: bool, dropna: bool
) -> "DataFrameGroupBy":
if any(isinstance(col_or_s, Series) and not same_anchor(kdf, col_or_s) for col_or_s in by):
(
Expand All @@ -2396,6 +2412,7 @@ def _build(
kdf,
new_by_series,
as_index=as_index,
dropna=dropna,
column_labels_to_exlcude=column_labels_to_exlcude,
)

Expand All @@ -2404,12 +2421,14 @@ def __init__(
kdf: DataFrame,
by: List[Series],
as_index: bool,
dropna: bool,
column_labels_to_exlcude: Set[Tuple[str, ...]],
agg_columns: List[Tuple[str, ...]] = None,
):
self._kdf = kdf
self._groupkeys = by
self._as_index = as_index
self._dropna = dropna
self._column_labels_to_exlcude = column_labels_to_exlcude

self._agg_columns_selected = agg_columns is not None
Expand Down Expand Up @@ -2438,7 +2457,9 @@ def __getattr__(self, item: str) -> Any:
def __getitem__(self, item):
if self._as_index and is_name_like_value(item):
return SeriesGroupBy(
self._kdf._kser_for(item if is_name_like_tuple(item) else (item,)), self._groupkeys
self._kdf._kser_for(item if is_name_like_tuple(item) else (item,)),
self._groupkeys,
dropna=self._dropna,
)
else:
if is_name_like_tuple(item):
Expand All @@ -2458,6 +2479,7 @@ def __getitem__(self, item):
self._kdf,
self._groupkeys,
as_index=self._as_index,
dropna=self._dropna,
column_labels_to_exlcude=self._column_labels_to_exlcude,
agg_columns=item,
)
Expand Down Expand Up @@ -2568,26 +2590,27 @@ def describe(self):
class SeriesGroupBy(GroupBy):
@staticmethod
def _build(
kser: Series, by: List[Union[Series, Tuple[str, ...]]], as_index: bool
kser: Series, by: List[Union[Series, Tuple[str, ...]]], as_index: bool, dropna: bool
) -> "SeriesGroupBy":
if any(isinstance(col_or_s, Series) and not same_anchor(kser, col_or_s) for col_or_s in by):
kdf, new_by_series, _ = GroupBy._resolve_grouping_from_diff_dataframes(
kser.to_frame(), by
)
return SeriesGroupBy(
first_series(kdf).rename(kser.name), new_by_series, as_index=as_index
first_series(kdf).rename(kser.name), new_by_series, as_index=as_index, dropna=dropna
)
else:
new_by_series = GroupBy._resolve_grouping(kser._kdf, by)
return SeriesGroupBy(kser, new_by_series, as_index=as_index)
return SeriesGroupBy(kser, new_by_series, as_index=as_index, dropna=dropna)

def __init__(self, kser: Series, by: List[Series], as_index: bool = True):
def __init__(self, kser: Series, by: List[Series], as_index: bool = True, dropna: bool = True):
self._kser = kser
self._groupkeys = by

if not as_index:
raise TypeError("as_index=False only valid with DataFrame")
self._as_index = True
self._dropna = dropna
self._agg_columns_selected = True

def __getattr__(self, item: str) -> Any:
Expand Down
161 changes: 161 additions & 0 deletions databricks/koalas/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,167 @@ def test_aggregate_relabel(self):
)
self.assert_eq(agg_kdf, agg_pdf)

def test_dropna(self):
pdf = pd.DataFrame(
{"A": [None, 1, None, 1, 2], "B": [1, 2, 3, None, None], "C": [4, 5, 6, 7, None]}
)
kdf = ks.from_pandas(pdf)

# pd.DataFrame.groupby with dropna parameter is implemented since pandas 1.1.0
if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
for dropna in [True, False]:
for as_index in [True, False]:
if as_index:
sort = lambda df: df.sort_index()
else:
sort = lambda df: df.sort_values("A").reset_index(drop=True)

self.assert_eq(
sort(kdf.groupby("A", as_index=as_index, dropna=dropna).std()),
sort(pdf.groupby("A", as_index=as_index, dropna=dropna).std()),
)

self.assert_eq(
sort(kdf.groupby("A", as_index=as_index, dropna=dropna).B.std()),
sort(pdf.groupby("A", as_index=as_index, dropna=dropna).B.std()),
)
self.assert_eq(
sort(kdf.groupby("A", as_index=as_index, dropna=dropna)["B"].std()),
sort(pdf.groupby("A", as_index=as_index, dropna=dropna)["B"].std()),
)

self.assert_eq(
sort(
kdf.groupby("A", as_index=as_index, dropna=dropna).agg(
{"B": "min", "C": "std"}
)
),
sort(
pdf.groupby("A", as_index=as_index, dropna=dropna).agg(
{"B": "min", "C": "std"}
)
),
)

for dropna in [True, False]:
for as_index in [True, False]:
if as_index:
sort = lambda df: df.sort_index()
else:
sort = lambda df: df.sort_values(["A", "B"]).reset_index(drop=True)

self.assert_eq(
sort(
kdf.groupby(["A", "B"], as_index=as_index, dropna=dropna).agg(
{"C": ["min", "std"]}
)
),
sort(
pdf.groupby(["A", "B"], as_index=as_index, dropna=dropna).agg(
{"C": ["min", "std"]}
)
),
almost=True,
)

# multi-index columns
columns = pd.MultiIndex.from_tuples([("X", "A"), ("X", "B"), ("Y", "C")])
pdf.columns = columns
kdf.columns = columns

for dropna in [True, False]:
for as_index in [True, False]:
if as_index:
sort = lambda df: df.sort_index()
else:
sort = lambda df: df.sort_values(("X", "A")).reset_index(drop=True)
sorted_stats_kdf = sort(
kdf.groupby(("X", "A"), as_index=as_index, dropna=dropna).agg(
{("X", "B"): "min", ("Y", "C"): "std"}
)
)
sorted_stats_pdf = sort(
pdf.groupby(("X", "A"), as_index=as_index, dropna=dropna).agg(
{("X", "B"): "min", ("Y", "C"): "std"}
)
)
self.assert_eq(sorted_stats_kdf, sorted_stats_pdf)
else:
# Testing dropna=True (pandas default behavior)
for as_index in [True, False]:
if as_index:
sort = lambda df: df.sort_index()
else:
sort = lambda df: df.sort_values("A").reset_index(drop=True)

self.assert_eq(
sort(kdf.groupby("A", as_index=as_index, dropna=True)["B"].min()),
sort(pdf.groupby("A", as_index=as_index)["B"].min()),
)

if as_index:
sort = lambda df: df.sort_index()
else:
sort = lambda df: df.sort_values(["A", "B"]).reset_index(drop=True)

self.assert_eq(
sort(
kdf.groupby(["A", "B"], as_index=as_index, dropna=True).agg(
{"C": ["min", "std"]}
)
),
sort(pdf.groupby(["A", "B"], as_index=as_index).agg({"C": ["min", "std"]})),
almost=True,
)

# Testing dropna=False
index = pd.Index([1.0, 2.0, np.nan], name="A")
expected = ks.Series([2.0, np.nan, 1.0], index=index, name="B")
result = kdf.groupby("A", as_index=True, dropna=False)["B"].min().sort_index()
self.assert_eq(expected, result)

expected = ks.DataFrame({"A": [1.0, 2.0, np.nan], "B": [2.0, np.nan, 1.0]})
result = (
kdf.groupby("A", as_index=False, dropna=False)["B"]
.min()
.sort_values("A")
.reset_index(drop=True)
)
self.assert_eq(expected, result)

index = pd.MultiIndex.from_tuples(
[(1.0, 2.0), (1.0, None), (2.0, None), (None, 1.0), (None, 3.0)], names=["A", "B"]
)
expected = ks.DataFrame(
{
("C", "min"): [5.0, 7.0, np.nan, 4.0, 6.0],
("C", "std"): [np.nan, np.nan, np.nan, np.nan, np.nan],
},
index=index,
)
result = (
kdf.groupby(["A", "B"], as_index=True, dropna=False)
.agg({"C": ["min", "std"]})
.sort_index()
)
self.assert_eq(expected, result)

expected = ks.DataFrame(
{
("A", ""): [1.0, 1.0, 2.0, np.nan, np.nan],
("B", ""): [2.0, np.nan, np.nan, 1.0, 3.0],
("C", "min"): [5.0, 7.0, np.nan, 4.0, 6.0],
("C", "std"): [np.nan, np.nan, np.nan, np.nan, np.nan],
}
)
result = (
kdf.groupby(["A", "B"], as_index=False, dropna=False)
.agg({"C": ["min", "std"]})
.sort_values(["A", "B"])
.reset_index(drop=True)
)
self.assert_eq(expected, result)

def test_describe(self):
# support for numeric type, not support for string type yet
datas = []
Expand Down