Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 145 additions & 92 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pyspark.sql.functions import PandasUDFType, pandas_udf

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.typedef import _infer_return_type
from databricks.koalas.typedef import _infer_return_type, as_spark_type
from databricks.koalas.frame import DataFrame
from databricks.koalas.internal import _InternalFrame
from databricks.koalas.missing.groupby import _MissingPandasLikeDataFrameGroupBy, \
Expand Down Expand Up @@ -700,10 +700,18 @@ def apply(self, func):
be much faster than using `apply` for their specific purposes, so try to
use them before reaching for `apply`.

.. note:: unlike pandas, it is required for ``func`` to specify return type hint.
.. note:: this API executes the function once to infer the type which is
potentially expensive, for instance, when the dataset is created after
aggregations or sorting.

.. note:: the output column names are `c0, c1, c2 ... cn`. These names
are positionally mapped to the returned DataFrame in ``func``. See examples below.
To avoid this, specify return type in ``func``, for instance, as below:

>>> def pandas_div_sum(x) -> ks.DataFrame[float, float]:
... return x[['B', 'C']] / x[['B', 'C']].sum()

If the return type is specified, the output column names become
`c0, c1, c2 ... cn`. These names are positionally mapped to the returned
DataFrame in ``func``. See examples below.

.. note:: the dataframe within ``func`` is actually a pandas dataframe. Therefore,
any pandas APIs within this function is allowed.
Expand Down Expand Up @@ -753,6 +761,16 @@ def apply(self, func):
1 aa 3 10
2 aa 4 12

You can omit the type hint and let Koalas infer its type.

>>> def plus_min(x):
... return x + x.min()
>>> g.apply(plus_min) # doctest: +NORMALIZE_WHITESPACE
A B C
0 aa 2 8
1 aa 3 10
2 bb 6 10

In case of Series, it works as below.

>>> def plus_max(x) -> ks.Series[np.int]:
Expand All @@ -762,18 +780,25 @@ def apply(self, func):
1 3
2 4
Name: B, dtype: int32

>>> def plus_min(x):
... return x + x.min()
>>> df.B.groupby(df.A).apply(plus_min)
0 2
1 3
2 6
Name: B, dtype: int64
"""
if not isinstance(func, Callable):
raise TypeError("%s object is not callable" % type(func))

assert callable(func), "the first argument should be a callable function."
spec = inspect.getfullargspec(func)
return_sig = spec.annotations.get("return", None)
if return_sig is None:
raise ValueError("Given function must have return type hint; however, not found.")

return_schema = _infer_return_type(func).tpe
return self._apply(func, return_schema)
return_schema = None # schema will inferred.
else:
return_schema = _infer_return_type(func).tpe
return self._apply(func, return_schema, retain_index=return_schema is None)

# TODO: implement 'dropna' parameter
def filter(self, func):
Expand Down Expand Up @@ -817,46 +842,26 @@ def filter(self, func):
groupby_names = [s.name for s in self._groupkeys]

def pandas_filter(pdf):
pdf = pdf.groupby(*groupby_names).filter(func)
return pdf.groupby(groupby_names).filter(func)

# Here, we restore the index column back in Spark DataFrame
# so that Koalas can understand it as an index.
kdf = self._apply(pandas_filter, data_schema, retain_index=True)
return DataFrame(self._kdf._internal.copy(sdf=kdf._sdf))

# TODO: deduplicate this logic with _InternalFrame.from_pandas
columns = pdf.columns
data_columns = [str(col) for col in columns]
def _apply(self, func, return_schema, retain_index):
should_infer_schema = return_schema is None
input_groupnames = [s.name for s in self._groupkeys]

index = pdf.index
if should_infer_schema:
# Here we execute with the first 1000 to get the return type.
# If the records were less than 1000, it uses pandas API directly for a shortcut.
limit = 1000
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I should have a configuration for such limit in a separate PR. I'll do it after this and your PR are merged @ueshin

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I merged my PR for the basic configuration.

pdf = self._kdf.head(limit + 1).to_pandas()
pdf = pdf.groupby(input_groupnames).apply(func)
kdf = DataFrame(pdf)
return_schema = kdf._sdf.schema
if len(pdf) <= limit:
return kdf

index_map = []
if isinstance(index, pd.MultiIndex):
if index.names is None:
index_map = [('__index_level_{}__'.format(i), None)
for i in range(len(index.levels))]
else:
index_map = [('__index_level_{}__'.format(i) if name is None else name, name)
for i, name in enumerate(index.names)]
else:
index_map = [(index.name
if index.name is not None else '__index_level_0__', index.name)]

index_columns = [index_column for index_column, _ in index_map]

reset_index = pdf.reset_index()
reset_index.columns = index_columns + data_columns
for name, col in reset_index.iteritems():
dt = col.dtype
if is_datetime64_dtype(dt) or is_datetime64tz_dtype(dt):
continue
reset_index[name] = col.replace({np.nan: None})
return reset_index

# DataFrame.apply loses the index. We should restore the original index column information
# below.
no_index_df = self._apply(pandas_filter, data_schema)
return DataFrame(self._kdf._internal.copy(sdf=no_index_df._sdf))

def _apply(self, func, return_schema):
index_columns = self._kdf._internal.index_columns
index_names = self._kdf._internal.index_names
data_columns = self._kdf._internal.data_columns
Expand All @@ -881,17 +886,62 @@ def rename_output(pdf):
pdf.index.name = index_names[0]

pdf = func(pdf)
# For now, just positionally map the column names to given schema's.

if retain_index:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly only deduplication here.

# If schema should be inferred, we don't restore index. Pandas seems restoring
# the index in some cases.
# When Spark output type is specified, without executing it, we don't know
# if we should restore the index or not. For instance, see the example in
# https://github.com/databricks/koalas/issues/628.

# TODO: deduplicate this logic with _InternalFrame.from_pandas
columns = pdf.columns

index = pdf.index

index_map = []
if isinstance(index, pd.MultiIndex):
if index.names is None:
index_map = [('__index_level_{}__'.format(i), None)
for i in range(len(index.levels))]
else:
index_map = [
('__index_level_{}__'.format(i) if name is None else name, name)
for i, name in enumerate(index.names)]
else:
index_map = [(index.name
if index.name is not None else '__index_level_0__', index.name)]

new_index_columns = [index_column for index_column, _ in index_map]
new_data_columns = [str(col) for col in columns]

reset_index = pdf.reset_index()
reset_index.columns = new_index_columns + new_data_columns
for name, col in reset_index.iteritems():
dt = col.dtype
if is_datetime64_dtype(dt) or is_datetime64tz_dtype(dt):
continue
reset_index[name] = col.replace({np.nan: None})
pdf = reset_index

# Just positionally map the column names to given schema's.
pdf = pdf.rename(columns=dict(zip(pdf.columns, return_schema.fieldNames())))

return pdf

grouped_map_func = pandas_udf(return_schema, PandasUDFType.GROUPED_MAP)(rename_output)

sdf = self._kdf._sdf
input_groupkeys = [s._scol for s in self._groupkeys]
sdf = sdf.groupby(*input_groupkeys).apply(grouped_map_func)
internal = _InternalFrame(
sdf=sdf, data_columns=return_schema.fieldNames(), index_map=[]) # index is lost.

if should_infer_schema:
# If schema is inferred, we can restore indexes too.
internal = kdf._internal.copy(sdf=sdf)
else:
# Otherwise, it loses index.
internal = _InternalFrame(
sdf=sdf, data_columns=return_schema.fieldNames(), index_map=[])
return DataFrame(internal)

def rank(self, method='average', ascending=True):
Expand Down Expand Up @@ -973,11 +1023,19 @@ def transform(self, func):
be much faster than using `transform` for their specific purposes, so try to
use them before reaching for `transform`.

.. note:: unlike pandas, it is required for ``func`` to specify return type hint.
.. note:: this API executes the function once to infer the type which is
potentially expensive, for instance, when the dataset is created after
aggregations or sorting.

To avoid this, specify return type in ``func``, for instance, as below:

>>> def convert_to_string(x) -> ks.Series[str]:
... return x.apply("a string {}".format)

.. note:: the series within ``func`` is actually a pandas series. Therefore,
any pandas APIs within this function is allowed.


Parameters
----------
func : callable
Expand Down Expand Up @@ -1024,68 +1082,63 @@ def transform(self, func):
1 4 12
2 6 10

You can omit the type hint and let Koalas infer its type.

>>> def plus_min(x):
... return x + x.min()
>>> g.transform(plus_min) # doctest: +NORMALIZE_WHITESPACE
B C
0 2 8
1 3 10
2 6 10

In case of Series, it works as below.

>>> df.B.groupby(df.A).transform(plus_max)
0 3
1 4
2 6
Name: B, dtype: int32

>>> df.B.groupby(df.A).transform(plus_min)
0 2
1 3
2 6
Name: B, dtype: int64
"""
# TODO: codes here are similar with GroupBy.apply. Needs to deduplicate.
if not isinstance(func, Callable):
raise TypeError("%s object is not callable" % type(func))

assert callable(func), "the first argument should be a callable function."
spec = inspect.getfullargspec(func)
return_sig = spec.annotations.get("return", None)
if return_sig is None:
raise ValueError("Given function must have return type hint; however, not found.")

return_type = _infer_return_type(func).tpe
input_groupnames = [s.name for s in self._groupkeys]
data_columns = self._kdf._internal.data_columns
return_schema = StructType([
StructField(c, return_type) for c in data_columns if c not in input_groupnames])

index_columns = self._kdf._internal.index_columns
index_names = self._kdf._internal.index_names
data_columns = self._kdf._internal.data_columns

def rename_output(pdf):
# TODO: This logic below was borrowed from `DataFrame.pandas_df` to set the index
# within each pdf properly. we might have to deduplicate it.
import pandas as pd

if len(index_columns) > 0:
append = False
for index_field in index_columns:
drop = index_field not in data_columns
pdf = pdf.set_index(index_field, drop=drop, append=append)
append = True
pdf = pdf[data_columns]

if len(index_names) > 0:
if isinstance(pdf.index, pd.MultiIndex):
pdf.index.names = index_names
else:
pdf.index.name = index_names[0]

def pandas_transform(pdf):
# pandas GroupBy.transform drops grouping columns.
pdf = pdf.drop(columns=input_groupnames)
pdf = pdf.transform(func)
# Remaps to the original name, positionally.
pdf = pdf.rename(columns=dict(zip(pdf.columns, return_schema.fieldNames())))
return pdf
return pdf.transform(func)

grouped_map_func = pandas_udf(return_schema, PandasUDFType.GROUPED_MAP)(rename_output)
if return_sig is None:
# Here we execute with the first 1000 to get the return type.
# If the records were less than 1000, it uses pandas API directly for a shortcut.
limit = 1000
pdf = self._kdf.head(limit + 1).to_pandas()
pdf = pdf.groupby(input_groupnames).transform(func)
kdf = DataFrame(pdf)
return_schema = kdf._sdf.schema
if len(pdf) <= limit:
return pdf

applied_kdf = self._apply(pandas_transform, return_schema, retain_index=True)
# kdf inferred from pdf holds a correct index.
return DataFrame(kdf._internal.copy(sdf=applied_kdf._sdf))
else:
return_type = _infer_return_type(func).tpe
data_columns = self._kdf._internal.data_columns
return_schema = StructType([
StructField(c, return_type) for c in data_columns if c not in input_groupnames])

sdf = self._kdf._sdf
input_groupkeys = [s._scol for s in self._groupkeys]
sdf = sdf.groupby(*input_groupkeys).apply(grouped_map_func)
internal = _InternalFrame(
sdf=sdf, data_columns=return_schema.fieldNames(), index_map=[]) # index is lost.
return DataFrame(internal)
return self._apply(pandas_transform, return_schema, retain_index=False)

# TODO: add bins, normalize parameter
def value_counts(self, sort=None, ascending=None, dropna=True):
Expand Down
Loading