Skip to content

Commit d9888d2

Browse files
committed
remove on param from windowspec
1 parent 1a1089f commit d9888d2

File tree

4 files changed

+52
-23
lines changed

4 files changed

+52
-23
lines changed

bigframes/core/blocks.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,11 +1066,6 @@ def apply_window_op(
10661066
skip_reproject_unsafe: bool = False,
10671067
never_skip_nulls: bool = False,
10681068
) -> typing.Tuple[Block, str]:
1069-
if column == window_spec.on:
1070-
# For row-based window, do nothing
1071-
# TODO(b/388916840) Support range rolling with "on"
1072-
return self, column
1073-
10741069
block = self
10751070
if skip_null_groups:
10761071
for key in window_spec.grouping_keys:

bigframes/core/groupby/dataframe_group_by.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,13 +317,19 @@ def rolling(
317317
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
318318
min_periods=min_periods if min_periods is not None else window,
319319
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
320-
on=None if on is None else self._block.resolve_label_exact_or_error(on),
321320
)
322321
block = self._block.order_by(
323322
[order.ascending_over(col) for col in self._by_col_ids],
324323
)
324+
skip_agg_col_id = (
325+
None if on is None else self._block.resolve_label_exact_or_error(on)
326+
)
325327
return windows.Window(
326-
block, window_spec, self._selected_cols, drop_null_groups=self._dropna
328+
block,
329+
window_spec,
330+
self._selected_cols,
331+
drop_null_groups=self._dropna,
332+
skip_agg_column_id=skip_agg_col_id,
327333
)
328334

329335
@validations.requires_ordering()

bigframes/core/window/rolling.py

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@ def __init__(
3434
value_column_ids: typing.Sequence[str],
3535
drop_null_groups: bool = True,
3636
is_series: bool = False,
37+
skip_agg_column_id: str | None = None,
3738
):
3839
self._block = block
3940
self._window_spec = window_spec
4041
self._value_column_ids = value_column_ids
4142
self._drop_null_groups = drop_null_groups
4243
self._is_series = is_series
44+
# The column ID that won't be aggregated on.
45+
# This is equivalent to pandas `on` parameter in rolling()
46+
self._skip_agg_column_id = skip_agg_column_id
4347

4448
def count(self):
4549
return self._apply_aggregate(agg_ops.count_op)
@@ -66,10 +70,37 @@ def _apply_aggregate(
6670
self,
6771
op: agg_ops.UnaryAggregateOp,
6872
):
69-
block = self._block
70-
labels = [block.col_id_to_label[col] for col in self._value_column_ids]
71-
block, result_ids = block.multi_apply_window_op(
72-
self._value_column_ids,
73+
agg_col_ids = [
74+
col_id
75+
for col_id in self._value_column_ids
76+
if col_id != self._skip_agg_column_id
77+
]
78+
agg_block = self._aggregate_block(op, agg_col_ids)
79+
80+
if self._skip_agg_column_id is not None:
81+
# Concat the skipped column to the result.
82+
agg_block, _ = agg_block.join(
83+
self._block.select_column(self._skip_agg_column_id), how="outer"
84+
)
85+
86+
if self._is_series:
87+
from bigframes.series import Series
88+
89+
return Series(agg_block)
90+
else:
91+
from bigframes.dataframe import DataFrame
92+
93+
# Preserve column order.
94+
column_labels = [
95+
self._block.col_id_to_label[col_id] for col_id in self._value_column_ids
96+
]
97+
return DataFrame(agg_block)._reindex_columns(column_labels)
98+
99+
def _aggregate_block(
100+
self, op: agg_ops.UnaryAggregateOp, agg_col_ids: typing.List[str]
101+
) -> blocks.Block:
102+
block, result_ids = self._block.multi_apply_window_op(
103+
agg_col_ids,
73104
op,
74105
self._window_spec,
75106
skip_null_groups=self._drop_null_groups,
@@ -85,13 +116,5 @@ def _apply_aggregate(
85116
)
86117
block = block.set_index(col_ids=index_ids)
87118

88-
if self._is_series:
89-
from bigframes.series import Series
90-
91-
return Series(block.select_columns(result_ids).with_column_labels(labels))
92-
else:
93-
from bigframes.dataframe import DataFrame
94-
95-
return DataFrame(
96-
block.select_columns(result_ids).with_column_labels(labels)
97-
)
119+
labels = [self._block.col_id_to_label[col] for col in agg_col_ids]
120+
return block.select_columns(result_ids).with_column_labels(labels)

bigframes/dataframe.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3318,10 +3318,15 @@ def rolling(
33183318
window_def = windows.WindowSpec(
33193319
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
33203320
min_periods=min_periods if min_periods is not None else window,
3321-
on=None if on is None else self._block.resolve_label_exact_or_error(on),
3321+
)
3322+
skip_agg_col_id = (
3323+
None if on is None else self._block.resolve_label_exact_or_error(on)
33223324
)
33233325
return bigframes.core.window.Window(
3324-
self._block, window_def, self._block.value_columns
3326+
self._block,
3327+
window_def,
3328+
self._block.value_columns,
3329+
skip_agg_column_id=skip_agg_col_id,
33253330
)
33263331

33273332
@validations.requires_ordering()

0 commit comments

Comments
 (0)