Skip to content
6 changes: 6 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,10 +1371,16 @@ def aggregate(
) -> typing.Tuple[Block, typing.Sequence[str]]:
"""
Apply aggregations to the block.

Arguments:
by_column_id: column id of the aggregation key, this is preserved through the transform and used as index.
aggregations: input_column_id, operation tuples
dropna: whether null keys should be dropped

Returns:
Tuple[Block, Sequence[str]]:
The first element is the grouped block. The second is the
column IDs corresponding to each applied aggregation.
"""
if column_labels is None:
column_labels = pd.Index(range(len(aggregations)))
Expand Down
62 changes: 61 additions & 1 deletion bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from __future__ import annotations

import datetime
import functools
import typing
from typing import Literal, Optional, Sequence, Tuple, Union
from typing import Iterable, Literal, Optional, Sequence, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
Expand All @@ -38,6 +39,8 @@
import bigframes.core.window_spec as window_specs
import bigframes.dataframe as df
import bigframes.dtypes as dtypes
import bigframes.enums
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.series as series

Expand All @@ -54,6 +57,7 @@ def __init__(
selected_cols: typing.Optional[typing.Sequence[str]] = None,
dropna: bool = True,
as_index: bool = True,
by_key_is_singular: bool = False,
):
# TODO(tbergeron): Support more group-by expression types
self._block = block
Expand All @@ -64,6 +68,9 @@ def __init__(
)
}
self._by_col_ids = by_col_ids
self._by_key_is_singular = by_key_is_singular
if by_key_is_singular:
assert len(by_col_ids) == 1, "singular key should be exactly one group key"

self._dropna = dropna
self._as_index = as_index
Expand Down Expand Up @@ -149,6 +156,59 @@ def head(self, n: int = 5) -> df.DataFrame:
)
)

def __iter__(self) -> Iterable[Tuple[blocks.Label, df.DataFrame]]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very similar to series group by. I'll see if I can refactor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor complete. :-)

original_index_columns = self._block._index_columns
original_index_labels = self._block._index_labels
by_col_ids = self._by_col_ids
block = self._block.reset_index(
level=None,
# Keep the original index columns so they can be recovered.
drop=False,
allow_duplicates=True,
replacement=bigframes.enums.DefaultIndexKind.NULL,
).set_index(
by_col_ids,
# Keep by_col_ids in-place so the ordering doesn't change.
drop=False,
append=False,
)
block.cached(
force=True,
# All DataFrames will be filtered by by_col_ids, so
# force block.cached() to cluster by the new index by explicitly
# setting `session_aware=False`. This will ensure that the filters
# are more efficient.
session_aware=False,
)
keys_block, _ = block.aggregate(by_col_ids, dropna=self._dropna)
for chunk in keys_block.to_pandas_batches():
for by_keys in pd.MultiIndex.from_frame(chunk.index.to_frame()):
filtered_df = df.DataFrame(
# To ensure the cache is used, filter first, then reset the
# index before yielding the DataFrame.
block.filter(
functools.reduce(
ops.and_op.as_expr,
(
ops.eq_op.as_expr(by_col, ex.const(by_key))
for by_col, by_key in zip(by_col_ids, by_keys)
),
),
).set_index(
original_index_columns,
# We retained by_col_ids in the set_index call above,
# so it's safe to drop the duplicates now.
drop=True,
append=False,
index_labels=original_index_labels,
)
)

if self._by_key_is_singular:
yield by_keys[0], filtered_df
else:
yield by_keys, filtered_df

def size(self) -> typing.Union[df.DataFrame, series.Series]:
agg_block, _ = self._block.aggregate_size(
by_column_ids=self._by_col_ids,
Expand Down
57 changes: 56 additions & 1 deletion bigframes/core/groupby/series_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from __future__ import annotations

import datetime
import functools
import typing
from typing import Literal, Sequence, Union
from typing import Iterable, Literal, Sequence, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
Expand All @@ -37,6 +38,8 @@
import bigframes.core.window_spec as window_specs
import bigframes.dataframe as df
import bigframes.dtypes
import bigframes.enums
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.series as series

Expand Down Expand Up @@ -75,6 +78,58 @@ def head(self, n: int = 5) -> series.Series:
)
)

def __iter__(self) -> Iterable[Tuple[blocks.Label, series.Series]]:
original_index_columns = self._block._index_columns
original_index_labels = self._block._index_labels
by_col_ids = self._by_col_ids
block = self._block.reset_index(
level=None,
# Keep the original index columns so they can be recovered.
drop=False,
allow_duplicates=True,
replacement=bigframes.enums.DefaultIndexKind.NULL,
).set_index(
by_col_ids,
# Keep by_col_ids in-place so the ordering doesn't change.
drop=False,
append=False,
)
block.cached(
force=True,
# All DataFrames will be filtered by by_col_ids, so
# force block.cached() to cluster by the new index by explicitly
# setting `session_aware=False`. This will ensure that the filters
# are more efficient.
session_aware=False,
)
keys_block, _ = block.aggregate(by_col_ids, dropna=self._dropna)
for chunk in keys_block.to_pandas_batches():
for by_keys in chunk.index:
filtered_series = series.Series(
# To ensure the cache is used, filter first, then reset the
# index before yielding the DataFrame.
block.filter(
functools.reduce(
ops.and_op.as_expr,
(
ops.eq_op.as_expr(by_col, ex.const(by_key))
for by_col, by_key in zip(by_col_ids, by_keys)
),
),
)
.set_index(
original_index_columns,
# We retained by_col_ids in the set_index call above,
# so it's safe to drop the duplicates now.
drop=True,
append=False,
index_labels=original_index_labels,
)
.select_column(self._value_column),
)
filtered_series.name = self._value_name
yield by_keys, filtered_series

def all(self) -> series.Series:
return self._aggregate(agg_ops.all_op)

Expand Down
11 changes: 11 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3909,11 +3909,17 @@ def _groupby_level(
as_index: bool = True,
dropna: bool = True,
):
if utils.is_list_like(level):
by_key_is_singular = False
else:
by_key_is_singular = True

return groupby.DataFrameGroupBy(
self._block,
by_col_ids=self._resolve_levels(level),
as_index=as_index,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)

def _groupby_series(
Expand All @@ -3926,10 +3932,14 @@ def _groupby_series(
as_index: bool = True,
dropna: bool = True,
):
# Pandas makes a distinction between groupby with a list of keys
# versus groupby with a single item in some methods, like __iter__.
if not isinstance(by, bigframes.series.Series) and utils.is_list_like(by):
by = list(by)
by_key_is_singular = False
else:
by = [typing.cast(typing.Union[blocks.Label, bigframes.series.Series], by)]
by_key_is_singular = True

block = self._block
col_ids: typing.Sequence[str] = []
Expand Down Expand Up @@ -3959,6 +3969,7 @@ def _groupby_series(
by_col_ids=col_ids,
as_index=as_index,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)

def abs(self) -> DataFrame:
Expand Down
Loading