|
| 1 | +# Copyright 2025 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +import functools |
| 18 | +from typing import Sequence |
| 19 | + |
| 20 | +import pandas as pd |
| 21 | + |
| 22 | +from bigframes.core import blocks |
| 23 | +from bigframes.core import expression as ex |
| 24 | +import bigframes.enums |
| 25 | +import bigframes.operations as ops |
| 26 | + |
| 27 | + |
| 28 | +def block_groupby_iter( |
| 29 | + block: blocks.Block, |
| 30 | + *, |
| 31 | + by_col_ids: Sequence[str], |
| 32 | + by_key_is_singular: bool, |
| 33 | + dropna: bool, |
| 34 | +): |
| 35 | + original_index_columns = block._index_columns |
| 36 | + original_index_labels = block._index_labels |
| 37 | + by_col_ids = by_col_ids |
| 38 | + block = block.reset_index( |
| 39 | + level=None, |
| 40 | + # Keep the original index columns so they can be recovered. |
| 41 | + drop=False, |
| 42 | + allow_duplicates=True, |
| 43 | + replacement=bigframes.enums.DefaultIndexKind.NULL, |
| 44 | + ).set_index( |
| 45 | + by_col_ids, |
| 46 | + # Keep by_col_ids in-place so the ordering doesn't change. |
| 47 | + drop=False, |
| 48 | + append=False, |
| 49 | + ) |
| 50 | + block.cached( |
| 51 | + force=True, |
| 52 | + # All DataFrames will be filtered by by_col_ids, so |
| 53 | + # force block.cached() to cluster by the new index by explicitly |
| 54 | + # setting `session_aware=False`. This will ensure that the filters |
| 55 | + # are more efficient. |
| 56 | + session_aware=False, |
| 57 | + ) |
| 58 | + keys_block, _ = block.aggregate(by_col_ids, dropna=dropna) |
| 59 | + for chunk in keys_block.to_pandas_batches(): |
| 60 | + # Convert to MultiIndex to make sure we get tuples, |
| 61 | + # even for singular keys. |
| 62 | + by_keys_index = chunk.index |
| 63 | + if not isinstance(by_keys_index, pd.MultiIndex): |
| 64 | + by_keys_index = pd.MultiIndex.from_frame(by_keys_index.to_frame()) |
| 65 | + |
| 66 | + for by_keys in by_keys_index: |
| 67 | + filtered_block = ( |
| 68 | + # To ensure the cache is used, filter first, then reset the |
| 69 | + # index before yielding the DataFrame. |
| 70 | + block.filter( |
| 71 | + functools.reduce( |
| 72 | + ops.and_op.as_expr, |
| 73 | + ( |
| 74 | + ops.eq_op.as_expr(by_col, ex.const(by_key)) |
| 75 | + for by_col, by_key in zip(by_col_ids, by_keys) |
| 76 | + ), |
| 77 | + ), |
| 78 | + ).set_index( |
| 79 | + original_index_columns, |
| 80 | + # We retained by_col_ids in the set_index call above, |
| 81 | + # so it's safe to drop the duplicates now. |
| 82 | + drop=True, |
| 83 | + append=False, |
| 84 | + index_labels=original_index_labels, |
| 85 | + ) |
| 86 | + ) |
| 87 | + |
| 88 | + if by_key_is_singular: |
| 89 | + yield by_keys[0], filtered_block |
| 90 | + else: |
| 91 | + yield by_keys, filtered_block |
0 commit comments