Skip to content

Commit 6e98a2c

Browse files
authored
feat: support time range rolling on Series. (#1590)
* [WIP] implement range rolling with mods at expression level * implement range rolling for series * fix lint * update rewrite logic * fix mypy * differentiate range bound and unbounded window * fix mypy * relax sorting key search algo * check whether window key is prefix * address comments * fix type hint * add comment on skipping range window during order pull up * fix lint * check order for all AdditiveNode * explicitly dispatch additive nodes
1 parent 09ce979 commit 6e98a2c

File tree

12 files changed

+406
-52
lines changed

12 files changed

+406
-52
lines changed

bigframes/core/array_value.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def project_window_op(
405405
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
406406
"""
407407
# TODO: Support non-deterministic windowing
408-
if window_spec.row_bounded or not op.order_independent:
408+
if window_spec.is_row_bounded or not op.order_independent:
409409
if self.node.order_ambiguous and not self.session._strictly_ordered:
410410
if not self.session._allows_ambiguity:
411411
raise ValueError(

bigframes/core/compile/compiled.py

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from google.cloud import bigquery
3030
import pyarrow as pa
3131

32+
from bigframes.core import utils
3233
import bigframes.core.compile.aggregate_compiler as agg_compiler
3334
import bigframes.core.compile.googlesql
3435
import bigframes.core.compile.ibis_types
@@ -231,7 +232,7 @@ def aggregate(
231232
col_out: agg_compiler.compile_aggregate(
232233
aggregate,
233234
bindings,
234-
order_by=_convert_ordering_to_table_values(table, order_by),
235+
order_by=_convert_row_ordering_to_table_values(table, order_by),
235236
)
236237
for aggregate, col_out in aggregations
237238
}
@@ -439,7 +440,7 @@ def project_window_op(
439440
never_skip_nulls=never_skip_nulls,
440441
)
441442

442-
if expression.op.order_independent and not window_spec.row_bounded:
443+
if expression.op.order_independent and window_spec.is_unbounded:
443444
# notably percentile_cont does not support ordering clause
444445
window_spec = window_spec.without_order()
445446
window = self._ibis_window_from_spec(window_spec)
@@ -517,16 +518,30 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec):
517518
# 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed
518519
# 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed
519520
# 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties.
520-
if window_spec.ordering:
521-
order_by = _convert_ordering_to_table_values(
521+
if window_spec.is_row_bounded:
522+
if not window_spec.ordering:
523+
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
524+
raise ValueError("No ordering provided for ordered analytic function")
525+
order_by = _convert_row_ordering_to_table_values(
522526
self._column_names,
523527
window_spec.ordering,
524528
)
525-
elif window_spec.row_bounded:
526-
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
527-
raise ValueError("No ordering provided for ordered analytic function")
528-
else:
529+
530+
elif window_spec.is_range_bounded:
531+
order_by = [
532+
_convert_range_ordering_to_table_value(
533+
self._column_names,
534+
window_spec.ordering[0],
535+
)
536+
]
537+
# The rest if branches are for unbounded windows
538+
elif window_spec.ordering:
529539
# Unbound grouping window. Suitable for aggregations but not for analytic function application.
540+
order_by = _convert_row_ordering_to_table_values(
541+
self._column_names,
542+
window_spec.ordering,
543+
)
544+
else:
530545
order_by = None
531546

532547
window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by)
@@ -551,7 +566,7 @@ def is_window(column: ibis_types.Value) -> bool:
551566
return any(isinstance(op, ibis_ops.WindowFunction) for op in matches)
552567

553568

554-
def _convert_ordering_to_table_values(
569+
def _convert_row_ordering_to_table_values(
555570
value_lookup: typing.Mapping[str, ibis_types.Value],
556571
ordering_columns: typing.Sequence[OrderingExpression],
557572
) -> typing.Sequence[ibis_types.Value]:
@@ -579,6 +594,30 @@ def _convert_ordering_to_table_values(
579594
return ordering_values
580595

581596

597+
def _convert_range_ordering_to_table_value(
598+
value_lookup: typing.Mapping[str, ibis_types.Value],
599+
ordering_column: OrderingExpression,
600+
) -> ibis_types.Value:
601+
"""Converts the ordering for range windows to Ibis references.
602+
603+
Note that this method is different from `_convert_row_ordering_to_table_values` in
604+
that it does not arrange null values. There are two reasons:
605+
1. Manipulating null positions requires more than one ordering key, which is forbidden
606+
by SQL window syntax for range rolling.
607+
2. Pandas does not allow range rolling on timeseries with nulls.
608+
609+
Therefore, we opt for the simplest approach here: generate the simplest SQL and follow
610+
the BigQuery engine behavior.
611+
"""
612+
expr = op_compiler.compile_expression(
613+
ordering_column.scalar_expression, value_lookup
614+
)
615+
616+
if ordering_column.direction.is_ascending:
617+
return bigframes_vendored.ibis.asc(expr) # type: ignore
618+
return bigframes_vendored.ibis.desc(expr) # type: ignore
619+
620+
582621
def _string_cast_join_cond(
583622
lvalue: ibis_types.Column, rvalue: ibis_types.Column
584623
) -> ibis_types.BooleanColumn:
@@ -668,8 +707,14 @@ def _add_boundary(
668707
) -> ibis_expr_builders.LegacyWindowBuilder:
669708
if isinstance(bounds, RangeWindowBounds):
670709
return ibis_window.range(
671-
start=_to_ibis_boundary(bounds.start),
672-
end=_to_ibis_boundary(bounds.end),
710+
start=_to_ibis_boundary(
711+
None
712+
if bounds.start is None
713+
else utils.timedelta_to_micros(bounds.start)
714+
),
715+
end=_to_ibis_boundary(
716+
None if bounds.end is None else utils.timedelta_to_micros(bounds.end)
717+
),
673718
)
674719
if isinstance(bounds, RowsWindowBounds):
675720
if bounds.start is not None or bounds.end is not None:

bigframes/core/compile/compiler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def _replace_unsupported_ops(node: nodes.BigFrameNode):
8282
# TODO: Run all replacement rules as single bottom-up pass
8383
node = nodes.bottom_up(node, rewrites.rewrite_slice)
8484
node = nodes.bottom_up(node, rewrites.rewrite_timedelta_expressions)
85+
node = nodes.bottom_up(node, rewrites.rewrite_range_rolling)
8586
return node
8687

8788

bigframes/core/compile/polars/compiler.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import dataclasses
1717
import functools
1818
import itertools
19-
from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING, Union
19+
from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING
2020

2121
import bigframes.core
2222
from bigframes.core import window_spec
@@ -359,6 +359,7 @@ def compile_window(self, node: nodes.WindowOpNode):
359359
return df.with_columns([agg_expr])
360360

361361
else: # row-bounded window
362+
assert isinstance(window.bounds, window_spec.RowsWindowBounds)
362363
# Polars API semi-bounded, and any grouped rolling window challenging
363364
# https://github.com/pola-rs/polars/issues/4799
364365
# https://github.com/pola-rs/polars/issues/8976
@@ -382,9 +383,7 @@ def compile_window(self, node: nodes.WindowOpNode):
382383
return pl.concat([df, results], how="horizontal")
383384

384385

385-
def _get_period(
386-
bounds: Union[window_spec.RowsWindowBounds, window_spec.RangeWindowBounds]
387-
) -> Optional[int]:
386+
def _get_period(bounds: window_spec.RowsWindowBounds) -> Optional[int]:
388387
"""Returns None if the boundary is infinite."""
389388
if bounds.start is None or bounds.end is None:
390389
return None

bigframes/core/nodes.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,7 +1353,7 @@ def _validate(self):
13531353
"""Validate the local data in the node."""
13541354
# Since inner order and row bounds are coupled, rank ops can't be row bounded
13551355
assert (
1356-
not self.window_spec.row_bounded
1356+
not self.window_spec.is_row_bounded
13571357
) or self.expression.op.implicitly_inherits_order
13581358
assert all(ref in self.child.ids for ref in self.expression.column_references)
13591359

@@ -1415,7 +1415,9 @@ def inherits_order(self) -> bool:
14151415
op_inherits_order = (
14161416
not self.expression.op.order_independent
14171417
) and self.expression.op.implicitly_inherits_order
1418-
return op_inherits_order or self.window_spec.row_bounded
1418+
# range-bounded windows do not inherit orders because their ordering are
1419+
# already defined before rewrite time.
1420+
return op_inherits_order or self.window_spec.is_row_bounded
14191421

14201422
@property
14211423
def additive_base(self) -> BigFrameNode:

bigframes/core/rewrite/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from bigframes.core.rewrite.pruning import column_pruning
2020
from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice
2121
from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions
22+
from bigframes.core.rewrite.windows import rewrite_range_rolling
2223

2324
__all__ = [
2425
"legacy_join_as_projection",
@@ -29,4 +30,5 @@
2930
"remap_variables",
3031
"pull_up_order",
3132
"column_pruning",
33+
"rewrite_range_rolling",
3234
]

bigframes/core/rewrite/windows.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import dataclasses
18+
19+
from bigframes import operations as ops
20+
from bigframes.core import nodes
21+
22+
23+
def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
24+
if not isinstance(node, nodes.WindowOpNode):
25+
return node
26+
27+
if not node.window_spec.is_range_bounded:
28+
return node
29+
30+
if len(node.window_spec.ordering) != 1:
31+
raise ValueError(
32+
"Range rolling should only be performed on exactly one column."
33+
)
34+
35+
ordering_expr = node.window_spec.ordering[0]
36+
37+
new_ordering = dataclasses.replace(
38+
ordering_expr,
39+
scalar_expression=ops.UnixMicros().as_expr(ordering_expr.scalar_expression),
40+
)
41+
42+
return dataclasses.replace(
43+
node,
44+
window_spec=dataclasses.replace(node.window_spec, ordering=(new_ordering,)),
45+
)

bigframes/core/window/ordering.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from functools import singledispatch
18+
19+
from bigframes.core import expression as ex
20+
from bigframes.core import nodes, ordering
21+
22+
23+
@singledispatch
24+
def find_order_direction(
25+
root: nodes.BigFrameNode, column_id: str
26+
) -> ordering.OrderingDirection | None:
27+
"""Returns the order of the given column with tree traversal. If the column cannot be found,
28+
or the ordering information is not available, return None.
29+
"""
30+
return None
31+
32+
33+
@find_order_direction.register
34+
def _(root: nodes.OrderByNode, column_id: str):
35+
if len(root.by) == 0:
36+
# This is a no-op
37+
return find_order_direction(root.child, column_id)
38+
39+
# Make sure the window key is the prefix of sorting keys.
40+
order_expr = root.by[0]
41+
scalar_expr = order_expr.scalar_expression
42+
if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id:
43+
return order_expr.direction
44+
45+
return None
46+
47+
48+
@find_order_direction.register
49+
def _(root: nodes.ReversedNode, column_id: str):
50+
direction = find_order_direction(root.child, column_id)
51+
52+
if direction is None:
53+
return None
54+
return direction.reverse()
55+
56+
57+
@find_order_direction.register
58+
def _(root: nodes.SelectionNode, column_id: str):
59+
for alias_ref in root.input_output_pairs:
60+
if alias_ref.id.name == column_id:
61+
return find_order_direction(root.child, alias_ref.ref.id.name)
62+
63+
64+
@find_order_direction.register
65+
def _(root: nodes.FilterNode, column_id: str):
66+
return find_order_direction(root.child, column_id)
67+
68+
69+
@find_order_direction.register
70+
def _(root: nodes.InNode, column_id: str):
71+
return find_order_direction(root.left_child, column_id)
72+
73+
74+
@find_order_direction.register
75+
def _(root: nodes.WindowOpNode, column_id: str):
76+
return find_order_direction(root.child, column_id)
77+
78+
79+
@find_order_direction.register
80+
def _(root: nodes.ProjectionNode, column_id: str):
81+
return find_order_direction(root.child, column_id)

bigframes/core/window/rolling.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414

1515
from __future__ import annotations
1616

17+
import datetime
1718
import typing
1819

1920
import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling
21+
import numpy
22+
import pandas
2023

21-
from bigframes.core import log_adapter, window_spec
24+
from bigframes import dtypes
25+
from bigframes.core import expression as ex
26+
from bigframes.core import log_adapter, ordering, window_spec
2227
import bigframes.core.blocks as blocks
28+
from bigframes.core.window import ordering as window_ordering
2329
import bigframes.operations.aggregations as agg_ops
2430

2531

@@ -118,3 +124,38 @@ def _aggregate_block(
118124

119125
labels = [self._block.col_id_to_label[col] for col in agg_col_ids]
120126
return block.select_columns(result_ids).with_column_labels(labels)
127+
128+
129+
def create_range_window(
130+
block: blocks.Block,
131+
window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
132+
min_periods: int | None,
133+
closed: typing.Literal["right", "left", "both", "neither"],
134+
is_series: bool,
135+
) -> Window:
136+
137+
index_dtypes = block.index.dtypes
138+
if len(index_dtypes) > 1:
139+
raise ValueError("Range rolling on MultiIndex is not supported")
140+
if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE:
141+
raise ValueError("Index type should be timestamps with timezones")
142+
143+
order_direction = window_ordering.find_order_direction(
144+
block.expr.node, block.index_columns[0]
145+
)
146+
if order_direction is None:
147+
raise ValueError(
148+
"The index might not be in a monotonic order. Please sort the index before rolling."
149+
)
150+
if isinstance(window, str):
151+
window = pandas.Timedelta(window)
152+
spec = window_spec.WindowSpec(
153+
bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed),
154+
min_periods=1 if min_periods is None else min_periods,
155+
ordering=(
156+
ordering.OrderingExpression(
157+
ex.deref(block.index_columns[0]), order_direction
158+
),
159+
),
160+
)
161+
return Window(block, spec, block.value_columns, is_series=is_series)

0 commit comments

Comments
 (0)