Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,23 +461,19 @@ def expanding(self, min_periods: int = 1) -> windows.Window:

def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]:
if func:
if isinstance(func, str):
return self.size() if func == "size" else self._agg_string(func)
elif utils.is_dict_like(func):
if utils.is_dict_like(func):
return self._agg_dict(func)
elif utils.is_list_like(func):
return self._agg_list(func)
else:
raise NotImplementedError(
f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}"
)
return self.size() if func == "size" else self._agg_func(func)
else:
return self._agg_named(**kwargs)

def _agg_string(self, func: str) -> df.DataFrame:
def _agg_func(self, func) -> df.DataFrame:
ids, labels = self._aggregated_columns()
aggregations = [
aggs.agg(col_id, agg_ops.lookup_agg_func(func)) for col_id in ids
aggs.agg(col_id, agg_ops.lookup_agg_func(func)[0]) for col_id in ids
]
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
Expand All @@ -500,7 +496,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
funcs_for_id if utils.is_list_like(funcs_for_id) else [funcs_for_id]
)
for f in func_list:
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f)))
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0]))
column_labels.append(label)
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
Expand All @@ -525,19 +521,23 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
ids, labels = self._aggregated_columns()
aggregations = [
aggs.agg(col_id, agg_ops.lookup_agg_func(f)) for col_id in ids for f in func
aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0])
for col_id in ids
for f in func
]

if self._block.column_labels.nlevels > 1:
# Restructure MultiIndex for proper format: (idx1, idx2, func)
# rather than ((idx1, idx2), func).
column_labels = [
tuple(label) + (f,)
tuple(label) + (agg_ops.lookup_agg_func(f)[1],)
for label in labels.to_frame(index=False).to_numpy()
for f in func
]
else: # Single-level index
column_labels = [(label, f) for label in labels for f in func]
column_labels = [
(label, agg_ops.lookup_agg_func(f)[1]) for label in labels for f in func
]

agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
Expand All @@ -563,7 +563,7 @@ def _agg_named(self, **kwargs) -> df.DataFrame:
if not isinstance(v, tuple) or (len(v) != 2):
raise TypeError("kwargs values must be 2-tuples of column, aggfunc")
col_id = self._resolve_label(v[0])
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(v[1])))
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(v[1])[0]))
column_labels.append(k)
agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
Expand Down
17 changes: 8 additions & 9 deletions bigframes/core/groupby/series_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,17 @@ def prod(self, *args) -> series.Series:

def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]:
column_names: list[str] = []
if isinstance(func, str):
aggregations = [aggs.agg(self._value_column, agg_ops.lookup_agg_func(func))]
column_names = [func]
elif utils.is_list_like(func):
aggregations = [
aggs.agg(self._value_column, agg_ops.lookup_agg_func(f)) for f in func
]
column_names = list(func)
else:
if utils.is_dict_like(func):
raise NotImplementedError(
f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}"
)
if not utils.is_list_like(func):
func = [func]

aggregations = [
aggs.agg(self._value_column, agg_ops.lookup_agg_func(f)[0]) for f in func
]
column_names = [agg_ops.lookup_agg_func(f)[1] for f in func]

agg_block, _ = self._block.aggregate(
by_column_ids=self._by_col_ids,
Expand Down
25 changes: 10 additions & 15 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3170,12 +3170,7 @@ def nunique(self) -> bigframes.series.Series:
block = self._block.aggregate_all_and_stack(agg_ops.nunique_op)
return bigframes.series.Series(block)

def agg(
self,
func: str
| typing.Sequence[str]
| typing.Mapping[blocks.Label, typing.Sequence[str] | str],
) -> DataFrame | bigframes.series.Series:
def agg(self, func) -> DataFrame | bigframes.series.Series:
if utils.is_dict_like(func):
# Must check dict-like first because dictionaries are list-like
# according to Pandas.
Expand All @@ -3189,15 +3184,17 @@ def agg(
if col_id is None:
raise KeyError(f"Column {col_label} does not exist")
for agg_func in agg_func_list:
agg_op = agg_ops.lookup_agg_func(typing.cast(str, agg_func))
op_and_label = agg_ops.lookup_agg_func(agg_func)
agg_expr = (
agg_expressions.UnaryAggregation(agg_op, ex.deref(col_id))
if isinstance(agg_op, agg_ops.UnaryAggregateOp)
else agg_expressions.NullaryAggregation(agg_op)
agg_expressions.UnaryAggregation(
op_and_label[0], ex.deref(col_id)
)
if isinstance(op_and_label[0], agg_ops.UnaryAggregateOp)
else agg_expressions.NullaryAggregation(op_and_label[0])
)
aggs.append(agg_expr)
labels.append(col_label)
funcnames.append(agg_func)
funcnames.append(op_and_label[1])

# if any list in dict values, format output differently
if any(utils.is_list_like(v) for v in func.values()):
Expand All @@ -3218,7 +3215,7 @@ def agg(
)
)
elif utils.is_list_like(func):
aggregations = [agg_ops.lookup_agg_func(f) for f in func]
aggregations = [agg_ops.lookup_agg_func(f)[0] for f in func]

for dtype, agg in itertools.product(self.dtypes, aggregations):
agg.output_type(
Expand All @@ -3234,9 +3231,7 @@ def agg(

else: # function name string
return bigframes.series.Series(
self._block.aggregate_all_and_stack(
agg_ops.lookup_agg_func(typing.cast(str, func))
)
self._block.aggregate_all_and_stack(agg_ops.lookup_agg_func(func)[0])
)

aggregate = agg
Expand Down
42 changes: 29 additions & 13 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import abc
import dataclasses
import typing
from typing import ClassVar, Iterable, Optional, TYPE_CHECKING
from typing import Callable, ClassVar, Iterable, Optional, TYPE_CHECKING

import numpy as np
import pandas as pd
import pyarrow as pa

Expand Down Expand Up @@ -678,7 +679,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT


# TODO: Alternative names and lookup from numpy function objects
_AGGREGATIONS_LOOKUP: typing.Dict[
_STRING_TO_AGG_OP: typing.Dict[
str, typing.Union[UnaryAggregateOp, NullaryAggregateOp]
] = {
op.name: op
Expand All @@ -705,17 +706,32 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
]
}

_CALLABLE_TO_AGG_OP: typing.Dict[
Callable, typing.Union[UnaryAggregateOp, NullaryAggregateOp]
] = {
np.sum: sum_op,
np.mean: mean_op,
np.median: median_op,
np.prod: product_op,
np.max: max_op,
np.min: min_op,
np.std: std_op,
np.var: var_op,
np.all: all_op,
np.any: any_op,
np.unique: nunique_op,
# TODO(b/443252872): Solve
# list: ArrayAggOp(),
np.size: size_op,
}

def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregateOp]:
if callable(key):
raise NotImplementedError(
"Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)."
)
if not isinstance(key, str):
raise ValueError(
f"Cannot aggregate using object of type: {type(key)}. Use string method name (eg. 'sum')"
)
if key in _AGGREGATIONS_LOOKUP:
return _AGGREGATIONS_LOOKUP[key]

def lookup_agg_func(
key,
) -> tuple[typing.Union[UnaryAggregateOp, NullaryAggregateOp], str]:
if key in _STRING_TO_AGG_OP:
return (_STRING_TO_AGG_OP[key], key)
if key in _CALLABLE_TO_AGG_OP:
return (_CALLABLE_TO_AGG_OP[key], key.__name__)
else:
raise ValueError(f"Unrecognize aggregate function: {key}")
6 changes: 2 additions & 4 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,17 +1330,15 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series:
raise NotImplementedError(
f"Multiple aggregations only supported on numeric series. {constants.FEEDBACK_LINK}"
)
aggregations = [agg_ops.lookup_agg_func(f) for f in func]
aggregations = [agg_ops.lookup_agg_func(f)[0] for f in func]
return Series(
self._block.summarize(
[self._value_column],
aggregations,
)
)
else:
return self._apply_aggregation(
agg_ops.lookup_agg_func(typing.cast(str, func))
)
return self._apply_aggregation(agg_ops.lookup_agg_func(func)[0])

aggregate = agg
aggregate.__doc__ = inspect.getdoc(vendored_pandas_series.Series.agg)
Expand Down
17 changes: 16 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6011,7 +6011,7 @@ def test_astype_invalid_type_fail(scalars_dfs):
bf_df.astype(123)


def test_agg_with_dict_lists(scalars_dfs):
def test_agg_with_dict_lists_strings(scalars_dfs):
bf_df, pd_df = scalars_dfs
agg_funcs = {
"int64_too": ["min", "max"],
Expand All @@ -6026,6 +6026,21 @@ def test_agg_with_dict_lists(scalars_dfs):
)


def test_agg_with_dict_lists_callables(scalars_dfs):
bf_df, pd_df = scalars_dfs
agg_funcs = {
"int64_too": [np.min, np.max],
"int64_col": [np.min, np.var],
}

bf_result = bf_df.agg(agg_funcs).to_pandas()
pd_result = pd_df.agg(agg_funcs)

pd.testing.assert_frame_equal(
bf_result, pd_result, check_dtype=False, check_index_type=False
)


def test_agg_with_dict_list_and_str(scalars_dfs):
bf_df, pd_df = scalars_dfs
agg_funcs = {
Expand Down
Loading