Skip to content

Commit c8159f2

Browse files
mir: window functions initial support
Handle creation and rewrite of Window MIR node. Addresses: REA-5790 Change-Id: I52c24d9547688b0278b53ae17ed15648c68039e4 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9647 Reviewed-by: Michael Zink <michael.z@readyset.io> Tested-by: Buildkite CI
1 parent 0a85768 commit c8159f2

File tree

11 files changed

+266
-3
lines changed

11 files changed

+266
-3
lines changed

readyset-dataflow/src/ops/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub mod project;
2121
pub mod topk;
2222
pub mod union;
2323
pub(crate) mod utils;
24+
pub mod window;
2425

2526
use crate::ops::grouped::concat::GroupConcat;
2627
use crate::processing::{
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::fmt::Display;
2+
3+
use readyset_errors::{unsupported, ReadySetResult};
4+
use readyset_sql::ast::FunctionExpr;
5+
use serde::{Deserialize, Serialize};
6+
7+
use crate::ops::grouped::aggregate::Aggregation;
8+
9+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10+
pub enum WindowOperation {
11+
RowNumber,
12+
Rank,
13+
DenseRank,
14+
Max,
15+
Min,
16+
Aggregation(Aggregation),
17+
}
18+
19+
impl WindowOperation {
20+
pub fn from_fn(fn_expr: FunctionExpr) -> ReadySetResult<WindowOperation> {
21+
Ok(match fn_expr {
22+
FunctionExpr::RowNumber => WindowOperation::RowNumber,
23+
FunctionExpr::Max { .. } => WindowOperation::Max,
24+
FunctionExpr::Min { .. } => WindowOperation::Min,
25+
FunctionExpr::CountStar | FunctionExpr::Count { .. } => {
26+
WindowOperation::Aggregation(Aggregation::Count)
27+
}
28+
FunctionExpr::Sum { .. } => WindowOperation::Aggregation(Aggregation::Sum),
29+
FunctionExpr::Avg { .. } => WindowOperation::Aggregation(Aggregation::Avg),
30+
FunctionExpr::Rank => WindowOperation::Rank,
31+
FunctionExpr::DenseRank => WindowOperation::DenseRank,
32+
_ => unsupported!("Function {fn_expr:?} is not supported as a window function"),
33+
})
34+
}
35+
}
36+
37+
impl Display for WindowOperation {
38+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39+
match self {
40+
WindowOperation::RowNumber => write!(f, "row_number"),
41+
WindowOperation::Rank => write!(f, "rank"),
42+
WindowOperation::DenseRank => write!(f, "dense_rank"),
43+
WindowOperation::Max => write!(f, "max"),
44+
WindowOperation::Min => write!(f, "min"),
45+
// TODO: Make Aggregation impl Display
46+
WindowOperation::Aggregation(a) => write!(f, "{a:?}"),
47+
}
48+
}
49+
}

readyset-mir/src/graph.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,10 @@ impl MirGraph {
335335
.into_iter()
336336
.chain(iter::once(MirColumn::named(&*PAGE_NUMBER_COL)))
337337
.collect(),
338+
MirNodeInner::Window { output_column, .. } => parent_columns()
339+
.into_iter()
340+
.chain(iter::once(output_column.clone()))
341+
.collect(),
338342
MirNodeInner::Distinct { group_by } => group_by
339343
.iter()
340344
.cloned()

readyset-mir/src/node/node_inner.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use common::{DfValue, IndexType};
44
use dataflow::ops::grouped::aggregate::Aggregation;
55
use dataflow::ops::grouped::extremum::Extremum;
66
use dataflow::ops::union;
7+
use dataflow::ops::window::WindowOperation;
78
use dataflow::PostLookupAggregates;
89
use derive_more::From;
910
use itertools::Itertools;
@@ -98,6 +99,17 @@ pub enum MirNodeInner {
9899
primary_key: Option<Box<[Column]>>,
99100
unique_keys: Box<[Box<[Column]>]>,
100101
},
102+
/// Node that evaluates Window Functions over a window.
103+
/// PARTITIONS and ORDER BY can be expressions, but should
104+
/// be evaluated by a parent node before getting passed here.
105+
Window {
106+
group_by: Vec<Column>,
107+
partition_by: Vec<Column>,
108+
order_by: Vec<(Column, OrderType, NullOrder)>,
109+
output_column: Column,
110+
function: WindowOperation,
111+
args: Vec<Column>,
112+
},
101113
/// Node that computes the extreme value (minimum or maximum) of a column grouped by another
102114
/// set of columns, outputting its result as an additional column.
103115
///
@@ -436,6 +448,27 @@ impl MirNodeInner {
436448
.join(", ");
437449
format!("{op_string} γ[{group_cols}]")
438450
}
451+
MirNodeInner::Window {
452+
partition_by,
453+
order_by,
454+
function,
455+
args,
456+
..
457+
} => {
458+
let op_string = format!("{}({})", function, args.iter().join(", "));
459+
let partition_cols = partition_by
460+
.iter()
461+
.map(|c| c.name.as_str())
462+
.collect::<Vec<_>>()
463+
.join(", ");
464+
let order_cols = order_by
465+
.iter()
466+
.map(|(c, o, no)| format!("{c} {o}({no})"))
467+
.collect::<Vec<_>>()
468+
.into_iter()
469+
.join(", ");
470+
format!("{op_string} Over[{partition_cols}; {order_cols}]")
471+
}
439472
MirNodeInner::Base {
440473
column_specs,
441474
unique_keys,

readyset-mir/src/rewrite/filters_to_join_keys.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ pub(crate) fn convert_filters_to_join_keys(query: &mut MirQuery<'_>) -> ReadySet
254254
// TODO: figure out what to do about unions
255255
continue 'filter;
256256
}
257+
MirNodeInner::Window { .. } => {
258+
// TODO: figure out what to do about windows
259+
continue 'filter;
260+
}
257261
MirNodeInner::Join { .. } => {
258262
let join_parents = query.ancestors(ancestor_idx)?;
259263
let left_parent = *join_parents

readyset-mir/src/rewrite/predicate_pushup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ fn commutes_with(conditions: &Expr, inner: &MirNodeInner) -> bool {
1616
| MirNodeInner::Paginate { group_by, .. }
1717
| MirNodeInner::TopK { group_by, .. }
1818
| MirNodeInner::Distinct { group_by, .. }
19+
| MirNodeInner::Window { group_by, .. }
1920
| MirNodeInner::Extremum { group_by, .. } => conditions
2021
.referred_columns()
2122
.all(|col| group_by.iter().any(|c| c == col)),

readyset-mir/src/rewrite/pull_keys.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ fn push_view_key(query: &mut MirQuery<'_>, node_idx: NodeIndex) -> ReadySetResul
3939
| MirNodeInner::Extremum { group_by, .. }
4040
| MirNodeInner::Distinct { group_by }
4141
| MirNodeInner::Paginate { group_by, .. }
42+
| MirNodeInner::Window { group_by, .. }
4243
| MirNodeInner::TopK { group_by, .. } => {
4344
for ViewKeyColumn { column, op, .. } in &key {
4445
invariant_eq!(

readyset-mir/src/visualize.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,27 @@ impl GraphViz for MirNodeInner {
370370
MirNodeInner::AliasTable { ref table } => {
371371
write!(f, "AliasTable [{}]", table.display_unquoted())
372372
}
373+
MirNodeInner::Window {
374+
group_by,
375+
partition_by,
376+
order_by,
377+
function,
378+
args,
379+
..
380+
} => {
381+
write!(
382+
f,
383+
"Window [key: {}, p: {}, o: {}, f: {}, args: {}]",
384+
group_by.iter().join(", "),
385+
partition_by.iter().join(", "),
386+
order_by
387+
.iter()
388+
.map(|(c, o, no)| format!("{}: {}({})", c.name.as_str(), o, no))
389+
.join(", "),
390+
function,
391+
args.iter().join(", ")
392+
)
393+
}
373394
}
374395
}
375396
}

readyset-server/src/controller/mir_to_flow.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ pub(super) fn mir_node_to_flow_parts(
136136
mig,
137137
)?)
138138
}
139+
MirNodeInner::Window { .. } => unsupported!("lowering window node to flow"),
139140
MirNodeInner::Filter { ref conditions } => {
140141
invariant_eq!(ancestors.len(), 1);
141142
let parent = ancestors[0];

readyset-server/src/controller/sql/mir/mod.rs

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use catalog_tables::is_catalog_table;
99
use common::IndexType;
1010
use dataflow::ops::grouped::aggregate::Aggregation;
1111
use dataflow::ops::union;
12+
use dataflow::ops::window::WindowOperation;
1213
use lazy_static::lazy_static;
1314
use mir::graph::MirGraph;
1415
use mir::node::node_inner::MirNodeInner;
@@ -24,7 +25,7 @@ use readyset_errors::{
2425
unsupported_err, ReadySetError, ReadySetResult,
2526
};
2627
use readyset_sql::analysis::visit::{walk_expr, Visitor};
27-
use readyset_sql::analysis::{self, ReferredColumns};
28+
use readyset_sql::analysis::{self, is_aggregate, ReferredColumns};
2829
use readyset_sql::ast::{
2930
self, BinaryOperator, CaseWhenBranch, ColumnSpecification, CompoundSelectOperator,
3031
CreateTableBody, Expr, FieldDefinitionExpr, FieldReference, FunctionExpr, GroupByClause,
@@ -47,6 +48,7 @@ use crate::controller::sql::query_graph::{
4748
to_query_graph, ExprColumn, OutputColumn, Pagination, QueryGraph,
4849
};
4950
use crate::controller::sql::query_signature::Signature;
51+
use crate::sql::query_graph::WindowFunction;
5052

5153
mod grouped;
5254
mod join;
@@ -2065,6 +2067,134 @@ impl SqlToMirConverter {
20652067
Ok(leaf)
20662068
}
20672069

2070+
fn make_window_node(
2071+
&mut self,
2072+
query_name: &Relation,
2073+
name: Relation,
2074+
mut parent: NodeIndex,
2075+
funcs: &[WindowFunction],
2076+
group_by: Vec<Column>,
2077+
) -> ReadySetResult<NodeIndex> {
2078+
if funcs.is_empty() {
2079+
return Ok(parent);
2080+
} else if funcs.len() != 1 {
2081+
// TOOD(mohamed): On paper, it should be possible by simply connecting the
2082+
// window functions in series (fn1 -> fn2 -> fn3 -> ... -> reader)
2083+
// However, the performance will be so bad that we rather just not support
2084+
// it for now. Making the execution "smarter" with how it handles
2085+
// its inputs and outputs will make this possible, but should be left
2086+
// as a future improvement
2087+
unsupported!("Multiple window functions not yet supported");
2088+
}
2089+
2090+
let dialect = readyset_sql::Dialect::MySQL;
2091+
let WindowFunction {
2092+
function,
2093+
partition_by,
2094+
order_by,
2095+
alias,
2096+
} = funcs.first().unwrap().clone();
2097+
2098+
let arguments = function.arguments().cloned().collect::<Vec<_>>();
2099+
2100+
let function = WindowOperation::from_fn(function)?;
2101+
2102+
let order_cols = order_by
2103+
.iter()
2104+
.map(|(e, _, _)| e)
2105+
.cloned()
2106+
.collect::<Vec<_>>();
2107+
2108+
let has_agg = |e: &Expr| matches!(e, Expr::Call(f) if is_aggregate(f));
2109+
if partition_by.iter().any(has_agg)
2110+
|| arguments.iter().any(has_agg)
2111+
|| order_cols.iter().any(has_agg)
2112+
{
2113+
unsupported!("Aggregates in window functions not yet supported");
2114+
}
2115+
2116+
// if the partition, ordering cols, or args require projection,
2117+
// create a projection node and use that as a parent
2118+
// TODO: do we need to make this strict? i.e. only project certain
2119+
// Expr variants?
2120+
let p = |e: &Expr| !matches!(e, Expr::Column(_));
2121+
let needs_proj_node =
2122+
partition_by.iter().any(p) || arguments.iter().any(p) || order_cols.iter().any(p);
2123+
2124+
if needs_proj_node {
2125+
let node_name = format!(
2126+
"{}_window_project_n{}",
2127+
name.display_unquoted(),
2128+
self.mir_graph.node_count()
2129+
);
2130+
2131+
let node = self.make_project_node(
2132+
query_name,
2133+
node_name.into(),
2134+
parent,
2135+
partition_by
2136+
.iter()
2137+
.chain(arguments.iter())
2138+
.chain(order_cols.iter())
2139+
.cloned()
2140+
.map(|e| -> ReadySetResult<_> {
2141+
Ok(ProjectExpr::Expr {
2142+
alias: e
2143+
.alias(dialect)
2144+
// returns None if e is a placeholder or a variable
2145+
.ok_or_else(|| {
2146+
unsupported_err!("Placeholders not allowed in this context")
2147+
})?,
2148+
expr: e,
2149+
})
2150+
})
2151+
.collect::<ReadySetResult<Vec<_>>>()?,
2152+
);
2153+
parent = node;
2154+
}
2155+
2156+
let output_column = Column::named(alias);
2157+
2158+
let partition_by = partition_by
2159+
.iter()
2160+
.map(|e| e.alias(dialect).unwrap())
2161+
.map(|e| Column::named(e))
2162+
.collect();
2163+
2164+
let order_by = order_by
2165+
.into_iter()
2166+
.map(|(e, order, no)| (Column::named(e.alias(dialect).unwrap()), order, no))
2167+
.collect();
2168+
2169+
let args = arguments
2170+
.iter()
2171+
.map(|e| e.alias(dialect).unwrap())
2172+
.map(|e| Column::named(e))
2173+
.collect();
2174+
2175+
let node_name = format!(
2176+
"{}_window_n{}",
2177+
name.display_unquoted(),
2178+
self.mir_graph.node_count()
2179+
);
2180+
2181+
Ok(self.add_query_node(
2182+
query_name.clone(),
2183+
MirNode::new(
2184+
node_name.into(),
2185+
MirNodeInner::Window {
2186+
group_by,
2187+
partition_by,
2188+
order_by,
2189+
output_column,
2190+
function,
2191+
args,
2192+
},
2193+
),
2194+
&[parent],
2195+
))
2196+
}
2197+
20682198
fn predicates_above_group_by<'a>(
20692199
&mut self,
20702200
query_name: &Relation,
@@ -2370,7 +2500,21 @@ impl SqlToMirConverter {
23702500
prev_node = subquery_leaf;
23712501
}
23722502

2373-
// 8. Add function and grouped nodes
2503+
// 8. Add window functions or grouped nodes (mutually exclusive for now)
2504+
if !query_graph.aggregates.is_empty() && !query_graph.window_functions.is_empty() {
2505+
unsupported!("Mixing window functions and aggregates is not supported yet")
2506+
};
2507+
2508+
let group_by: Vec<_> = view_key.columns.iter().map(|(c, _)| c.clone()).collect();
2509+
2510+
prev_node = self.make_window_node(
2511+
query_name,
2512+
format!("q_{:x}", query_graph.signature().hash).into(),
2513+
prev_node,
2514+
&query_graph.window_functions,
2515+
group_by,
2516+
)?;
2517+
23742518
let mut func_nodes: Vec<NodeIndex> = make_grouped(
23752519
self,
23762520
query_name,
@@ -2620,6 +2764,10 @@ impl SqlToMirConverter {
26202764
let are_repeat_reads_required =
26212765
query_graph.collapsed_where_in || view_key.index_type == IndexType::BTreeMap;
26222766

2767+
if are_repeat_reads_required && !query_graph.window_functions.is_empty() {
2768+
unsupported!("Post-lookups are not supported for window functions");
2769+
}
2770+
26232771
let post_lookup_aggregates = if are_repeat_reads_required {
26242772
// When a query contains WHERE col IN (?, ?, ...), it gets rewritten
26252773
// (or collapsed) to WHERE col = ? during SQL parsing, with the

0 commit comments

Comments
 (0)