Skip to content
27 changes: 27 additions & 0 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import inspect
from collections import Callable, OrderedDict, namedtuple
from functools import partial
from itertools import product
from typing import Any, List, Tuple, Union

import numpy as np
Expand Down Expand Up @@ -210,6 +211,12 @@ def _spark_groupby(kdf, func, groupkeys):
if aggfunc == "nunique":
reordered.append(
F.expr('count(DISTINCT `{0}`) as `{1}`'.format(name, data_col)))

# Implement "quartiles" aggregate function for ``describe``.
elif aggfunc == "quartiles":
reordered.append(
F.expr('percentile_approx(`{0}`, array(0.25, 0.5, 0.75)) as `{1}`'.format(name, data_col)))

else:
reordered.append(F.expr('{1}(`{0}`) as `{2}`'.format(name, aggfunc, data_col)))
sdf = sdf.groupby(*groupkey_cols).agg(*reordered)
Expand All @@ -224,6 +231,26 @@ def _spark_groupby(kdf, func, groupkeys):
column_scols=[scol_for(sdf, col) for col in data_columns],
index_map=index_map)

def describe(self):
kdf = self.agg(["count", "mean", "std", "min", "quartiles", "max"]).reset_index()

# Split "quartiles" columns into first, second, and third quartiles.
for label, content in kdf.iteritems():
if label[1] == "quartiles":
exploded = ks.DataFrame(content.tolist())
Copy link
Contributor

@itholic itholic Jan 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line seems a little danger since it can potentially raise memory issue such like OOM,
(because tolist() loads all the data into the single driver's memory.)

so i think maybe we can use content.to_frame(), or should find another way.

or we can simply just don't support quartiles for now since memory issue written above with describing proper notice to docs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can just get items from the "quartiles" column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic @ueshin Hey! Sorry it's taking me a while to make the requested changes. I figured out a way to refactor this using _column_op; will it work? I'm still using to_numpy, though, only because Koalas doesn't allow constructing a DataFrame from a column-Series mapping yet.

Copy link
Contributor

@itholic itholic Jan 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deepyaman

Hi Deepyaman! thanks for your continued efforts here.

Basically, for handling DataFrame of Koalas efficiently,

we usually use internal spark DataFrame (sdf in short, and you can get by kdf._sdf or kdf._internal.sdf), not directly Koalas API.

I made some another way of implementation using sdf for you below.
(I can't say that this is a perfect & good quality code since it's very roughly implemented and not enough tested, but maybe it will help you to understand Koalas' internal processing even just a bit)

    def describe(self):
        kdf = self.agg(["count", "mean", "std", "min", "max", "quartiles"]).reset_index()
        formatted_percentiles = ["25%", "50%", "75%"]
        sdf = kdf._sdf
        group_key_names = [groupkey.name for groupkey in self._groupkeys]

        quartiles_columns = []
        for data_column in self._kdf._internal.data_columns:
            if data_column not in group_key_names:
                quartiles_columns.append((data_column, 'quartiles'))
        # `quartiles_columns` here looks like the below
        # [('b', 'quartiles'), ('c', 'quartiles')]

        # add columns (b, 25%), (b, 50%) ... (c, 50%), (c, 75%) to `sdf`
        for col_name, quartiles_column in quartiles_columns:
            for i, percentile in enumerate(formatted_percentiles):
                sdf = sdf.withColumn(
                    name_like_string((col_name, percentile)),
                    F.col(name_like_string((col_name, quartiles_column))).getItem(i))
        # so, `sdf` here looks like the below
        # +-----------------+-----+ ... +-----------------+--------+--------+--------+--------+--------+--------+
        # |__index_level_0__|(a, )| ... |__natural_order__|(b, 25%)|(b, 50%)|(b, 75%)|(c, 25%)|(c, 50%)|(c, 75%)|
        # +-----------------+-----+ ... +-----------------+--------+--------+--------+--------+--------+--------+
        # |                0|    1| ... |     592705486848|       4|       4|       5|       7|       7|       8|
        # |                1|    3| ... |     919123001344|       6|       6|       6|       9|       9|       9|
        # +-----------------+-----+ ... +-----------------+--------+--------+--------+--------+--------+--------+

        # make the column list what we want to select from `sdf`
        columns = []
        for col_name, _ in quartiles_columns:
            for func in ["count", "mean", "std", "min", "25%", "50%", "75%", "max"]:
                columns.append((col_name, func))

        name_like_string_columns = [name_like_string(col) for col in columns]
        internal = _InternalFrame(
            sdf=sdf.select(*self._kdf._internal.index_columns, *name_like_string_columns),
            index_map=self._kdf._internal.index_map)

        idx = pd.MultiIndex.from_tuples(columns)
        # `idx` here looks like the below
        # MultiIndex([('b', 'count'),
        #             ('b',  'mean'),
        #             ('b',   'std'),
        #             ('b',   'min'),
        #             ('b',   '25%'),
        #             ('b',   '50%'),
        #             ('b',   '75%'),
        #             ('b',   'max'),
        #             ('c', 'count'),
        #             ('c',  'mean'),
        #             ('c',   'std'),
        #             ('c',   'min'),
        #             ('c',   '25%'),
        #             ('c',   '50%'),
        #             ('c',   '75%'),
        #             ('c',   'max')],
        #            )

        result = DataFrame(internal)
        result.columns = idx

        return result.astype("float64")

and, now implementation seems like invokes job many times like the below.

>>> kdf.groupby('a').describe()
2020-01-13 17:08:31 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2020-01-13 17:08:31 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2020-01-13 17:08:32 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2020-01-13 17:08:32 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2020-01-13 17:08:32 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2020-01-13 17:08:33 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2020-01-13 17:08:33 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
      b                                             c
  count mean       std  min  25%  50%  75%  max count mean       std  min  25%  50%  75%  max
a
1   2.0  4.5  0.707107  4.0  4.0  4.0  5.0  5.0   2.0  7.5  0.707107  7.0  7.0  7.0  8.0  8.0
3   1.0  6.0       NaN  6.0  6.0  6.0  6.0  6.0   1.0  9.0       NaN  9.0  9.0  9.0  9.0  9.0

we can reduce them via handling internal frame properly like the below.

>>> kdf.groupby('a').describe()
2020-01-13 17:33:00 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
      b                                             c
  count mean       std  min  25%  50%  75%  max count mean       std  min  25%  50%  75%  max
0   2.0  4.5  0.707107  4.0  4.0  4.0  5.0  5.0   2.0  7.5  0.707107  7.0  7.0  7.0  8.0  8.0
1   1.0  6.0       NaN  6.0  6.0  6.0  6.0  6.0   1.0  9.0       NaN  9.0  9.0  9.0  9.0  9.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic Thank you for the feedback. I'll try to rewrite it following your suggestions above and get back to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deepyaman My pleasure :) Hope it helps you!

exploded.columns = [(label[0], "25%"), (label[0], "50%"), (label[0], "75%")]
kdf = kdf.drop(label).join(exploded)

# Reindex the DataFrame to reflect initial grouping and agg columns.
input_groupnames = [s.name for s in self._groupkeys]
kdf.set_index([(key, "") for key in input_groupnames], inplace=True)
kdf.index.names = input_groupnames

# Reorder columns lexicographically by agg column followed by stats.
agg_cols = (col.name for col in self._agg_columns)
stats = ["count", "mean", "std", "min", "25%", "50%", "75%", "max"]
return kdf[list(product(agg_cols, stats))]

def count(self):
"""
Compute count of group, excluding missing values.
Expand Down
1 change: 0 additions & 1 deletion databricks/koalas/missing/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class _MissingPandasLikeDataFrameGroupBy(object):
# Functions
boxplot = unsupported_function('boxplot')
cumcount = unsupported_function('cumcount')
describe = unsupported_function('describe')
get_group = unsupported_function('get_group')
median = unsupported_function('median')
ngroup = unsupported_function('ngroup')
Expand Down