Skip to content

Commit 65eda6c

Browse files
mir: topk: allow aliases in ORDER BY clause
Add a projection node before the topK node that evaluates the alias and passes it to the topK node to be used for ordering. This also fixes some other minor issues: - GROUP BY columns were never passed to topK (or pagiantion) nodes. - Clean-up some fields in TopK and Pagination nodes as well. Change-Id: Ia1421353784ec61aa51e8e0d39a40dc083b0004c Fixes: REA-5716 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9387 Reviewed-by: Vassili Zarouba <vassili@readyset.io> Tested-by: Buildkite CI
1 parent 5cb4247 commit 65eda6c

File tree

9 files changed

+168
-83
lines changed

9 files changed

+168
-83
lines changed

logictests/topk.test

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,40 @@ LIMIT 2
7676
----
7777
5
7878
4
79+
80+
# test expressions in order by clause
81+
query I nosort
82+
select column_2 from table_1 order by column_2 + 1 limit 3
83+
----
84+
4
85+
5
86+
6
87+
88+
# test aliases in order by clause
89+
query I nosort
90+
select column_2 + 1 as alias from table_1 order by alias limit 3
91+
----
92+
5
93+
6
94+
7
95+
96+
# test aliased aggreagates in order by clause
97+
query I nosort
98+
select count(*) as total from table_1 group by column_1 order by total limit 3
99+
----
100+
3
101+
102+
statement ok
103+
INSERT INTO table_1 (column_1, column_2) VALUES (2, 7), (2, 8), (2, 9), (2, 10), (2, 11);
104+
105+
query I nosort
106+
select min(column_2) as minimum from table_1 group by column_1 order by minimum desc limit 3
107+
----
108+
7
109+
4
110+
111+
query I nosort
112+
select count(column_2) as total from table_1 group by column_1 order by total desc limit 3
113+
----
114+
5
115+
3

readyset-mir/src/graph.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,17 @@ impl MirGraph {
246246
}
247247
columns
248248
}
249+
MirNodeInner::TopK {
250+
order, group_by, ..
251+
} => {
252+
let mut columns = self.columns(node);
253+
columns.extend(order.iter().map(|(c, _)| c.clone()));
254+
// group by cols are pulled through by default; however, there are some cases
255+
// where a group by wouldn't be specified, in these cases a bogokey is used
256+
// which needs to be pulled through
257+
columns.extend(group_by.clone());
258+
columns
259+
}
249260
_ => self.columns(node),
250261
}
251262
}

readyset-mir/src/node.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,7 @@ mod tests {
368368
#[test]
369369
fn topk() {
370370
same_columns_as_parent(MirNodeInner::TopK {
371-
order: Some(vec![(
372-
Column::new(Some("base"), "a"),
373-
OrderType::OrderAscending,
374-
)]),
371+
order: vec![(Column::new(Some("base"), "a"), OrderType::OrderAscending)],
375372
group_by: vec![Column::new(Some("base"), "b")],
376373
limit: 3,
377374
})
@@ -381,10 +378,7 @@ mod tests {
381378
fn paginate() {
382379
has_columns_single_parent(
383380
MirNodeInner::Paginate {
384-
order: Some(vec![(
385-
Column::new(Some("base"), "a"),
386-
OrderType::OrderAscending,
387-
)]),
381+
order: vec![(Column::new(Some("base"), "a"), OrderType::OrderAscending)],
388382
group_by: vec![Column::new(Some("base"), "b")],
389383
limit: 3,
390384
},

readyset-mir/src/node/node_inner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ pub enum MirNodeInner {
255255
/// [`Paginate`]: dataflow::ops::paginate::Paginate
256256
Paginate {
257257
/// Set of columns used for ordering the results
258-
order: Option<Vec<(Column, OrderType)>>,
258+
order: Vec<(Column, OrderType)>,
259259
/// Set of columns that are indexed to form a unique grouping of results
260260
group_by: Vec<Column>,
261261
/// How many rows per page
@@ -268,7 +268,7 @@ pub enum MirNodeInner {
268268
/// [`TopK`]: dataflow::ops::topk::TopK
269269
TopK {
270270
/// Set of columns used for ordering the results
271-
order: Option<Vec<(Column, OrderType)>>,
271+
order: Vec<(Column, OrderType)>,
272272
/// Set of columns that are indexed to form a unique grouping of results
273273
group_by: Vec<Column>,
274274
/// Numeric literal that determines the number of results stored per group. Taken from the

readyset-mir/src/rewrite/add_bogokey.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ fn add_bogokey_leaf(query: &mut MirQuery<'_>) -> ReadySetResult<()> {
7979
if let MirNodeInner::TopK { group_by, .. } =
8080
&mut query.get_node_mut(node_to_insert_above).unwrap().inner
8181
{
82-
group_by.push(Column::named("bogokey"))
82+
// TODO: Move this up once the if-let chains are stabilized in Rust
83+
if group_by.is_empty() {
84+
group_by.push(Column::named("bogokey"))
85+
}
8386
};
8487

8588
Ok(())
@@ -274,7 +277,7 @@ mod tests {
274277
let topk = mir_graph.add_node(MirNode::new(
275278
"topk".into(),
276279
MirNodeInner::TopK {
277-
order: None,
280+
order: vec![],
278281
group_by: vec![],
279282
limit: 3,
280283
},

readyset-mir/src/visualize.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -334,14 +334,10 @@ impl GraphViz for MirNodeInner {
334334
..
335335
} => {
336336
let order = order
337-
.as_ref()
338-
.map(|v| {
339-
v.iter()
340-
.map(|(c, o)| format!("{}: {}", c.name.as_str(), o))
341-
.collect::<Vec<_>>()
342-
.join(", ")
343-
})
344-
.unwrap_or_else(|| "".into());
337+
.iter()
338+
.map(|(c, o)| format!("{}: {}", c.name.as_str(), o))
339+
.collect::<Vec<_>>()
340+
.join(", ");
345341
write!(f, "Paginate [limit: {}; {}]", limit, order)
346342
}
347343
MirNodeInner::TopK {
@@ -350,14 +346,10 @@ impl GraphViz for MirNodeInner {
350346
..
351347
} => {
352348
let order = order
353-
.as_ref()
354-
.map(|v| {
355-
v.iter()
356-
.map(|(c, o)| format!("{}: {}", c.name.as_str(), o))
357-
.collect::<Vec<_>>()
358-
.join(", ")
359-
})
360-
.unwrap_or_else(|| "".into());
349+
.iter()
350+
.map(|(c, o)| format!("{}: {}", c.name.as_str(), o))
351+
.collect::<Vec<_>>()
352+
.join(", ");
361353
write!(f, "TopK [k: {}; {}]", limit, order)
362354
}
363355
MirNodeInner::Union {

readyset-server/src/controller/mir_to_flow.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,7 +1107,7 @@ fn make_paginate_or_topk_node(
11071107
name: Relation,
11081108
parent: MirNodeIndex,
11091109
columns: &[Column],
1110-
order: &Option<Vec<(Column, OrderType)>>,
1110+
order: &[(Column, OrderType)],
11111111
group_by: &[Column],
11121112
limit: usize,
11131113
is_topk: bool,
@@ -1145,24 +1145,20 @@ fn make_paginate_or_topk_node(
11451145
.map(|c| graph.column_id_for_column(parent, c))
11461146
.collect::<ReadySetResult<Vec<_>>>()?;
11471147

1148-
let cmp_rows = match *order {
1149-
Some(ref o) => {
1150-
o.iter()
1151-
.map(|(c, order_type)| {
1152-
// SQL and Readyset disagree on what ascending and descending order means, so do the
1153-
// conversion here.
1154-
let reversed_order_type = match *order_type {
1155-
OrderType::OrderAscending => OrderType::OrderDescending,
1156-
OrderType::OrderDescending => OrderType::OrderAscending,
1157-
};
1158-
graph
1159-
.column_id_for_column(parent, c)
1160-
.map(|id| (id, reversed_order_type))
1161-
})
1162-
.collect::<ReadySetResult<Vec<_>>>()?
1163-
}
1164-
None => Vec::new(),
1165-
};
1148+
let cmp_rows = order
1149+
.iter()
1150+
.map(|(c, order_type)| {
1151+
// SQL and Readyset disagree on what ascending and descending order means, so do the
1152+
// conversion here.
1153+
let reversed_order_type = match *order_type {
1154+
OrderType::OrderAscending => OrderType::OrderDescending,
1155+
OrderType::OrderDescending => OrderType::OrderAscending,
1156+
};
1157+
graph
1158+
.column_id_for_column(parent, c)
1159+
.map(|id| (id, reversed_order_type))
1160+
})
1161+
.collect::<ReadySetResult<Vec<_>>>()?;
11661162

11671163
// make the new operator and record its metadata
11681164
let na = if is_topk {

readyset-server/src/controller/sql/mir/mod.rs

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,8 @@ impl SqlToMirConverter {
332332
.transpose()?,
333333
limit,
334334
make_topk,
335+
// TODO: we should have access to the output cols even if this is a compound query
336+
None,
335337
)?
336338
.last()
337339
.unwrap();
@@ -1305,6 +1307,7 @@ impl SqlToMirConverter {
13051307
order: &Option<Vec<(Expr, OrderType)>>,
13061308
limit: usize,
13071309
is_topk: bool,
1310+
outputs: Option<&Vec<OutputColumn>>,
13081311
) -> ReadySetResult<Vec<NodeIndex>> {
13091312
if !self.config.allow_topk && is_topk {
13101313
unsupported!("TopK is not supported");
@@ -1314,37 +1317,78 @@ impl SqlToMirConverter {
13141317

13151318
// Gather a list of expressions we need to evaluate before the paginate node
13161319
let mut exprs_to_project = vec![];
1317-
let order = order.as_ref().map(|oc| {
1318-
oc.iter()
1319-
.map(|(expr, ot)| {
1320-
(
1321-
match expr {
1322-
Expr::Column(col) => Column::from(col),
1323-
expr => {
1324-
let col = Column::named(
1325-
// FIXME(REA-2168+2502): Use correct dialect.
1326-
expr.display(readyset_sql::Dialect::MySQL).to_string(),
1327-
);
1328-
if self
1329-
.mir_graph
1330-
.column_id_for_column(parent, &col)
1331-
.err()
1320+
1321+
let order = order
1322+
.clone()
1323+
.unwrap_or_default()
1324+
.iter()
1325+
.map(|(expr, ot)| {
1326+
let col = match expr {
1327+
Expr::Column(col) => {
1328+
let col = Column::from(col);
1329+
match self.mir_graph.column_id_for_column(parent, &col) {
1330+
Ok(_) => Ok(col),
1331+
Err(ReadySetError::NonExistentColumn { .. }) => {
1332+
// we couldn't find this column in the parent, this means
1333+
// that either:
1334+
// 1. the col is referencing an alias
1335+
// 2. the col is referencing a col that doesn't even exist
1336+
// We pass the list of output cols to check their aliases,
1337+
// if we couldn't find the outputs then mark the column
1338+
// as non-existent
1339+
let outputs =
1340+
outputs.ok_or_else(|| ReadySetError::NonExistentColumn {
1341+
column: col.name.to_string(),
1342+
node: self.mir_graph[parent]
1343+
.name()
1344+
.display_unquoted()
1345+
.to_string(),
1346+
})?;
1347+
let resolved_expr = outputs
13321348
.iter()
1333-
.any(|err| {
1334-
matches!(err, ReadySetError::NonExistentColumn { .. })
1349+
.find_map(|o| {
1350+
if *o.name() == *col.name {
1351+
Some(o.clone().into_expr())
1352+
} else {
1353+
None
1354+
}
13351355
})
1336-
{
1337-
// Only project the expression if we haven't already
1338-
exprs_to_project.push(expr.clone());
1339-
}
1340-
col
1356+
.ok_or_else(|| ReadySetError::NonExistentColumn {
1357+
column: col.name.to_string(),
1358+
node: self.mir_graph[parent]
1359+
.name()
1360+
.display_unquoted()
1361+
.to_string(),
1362+
})?;
1363+
1364+
exprs_to_project.push((resolved_expr, Some(col.name.clone())));
1365+
Ok(col)
13411366
}
1342-
},
1343-
*ot,
1344-
)
1345-
})
1346-
.collect()
1347-
});
1367+
Err(e) => Err(e),
1368+
}
1369+
}
1370+
expr => {
1371+
let col = Column::named(
1372+
// FIXME(REA-2168+2502): Use correct dialect.
1373+
expr.display(readyset_sql::Dialect::MySQL).to_string(),
1374+
);
1375+
if self
1376+
.mir_graph
1377+
.column_id_for_column(parent, &col)
1378+
.err()
1379+
.iter()
1380+
.any(|err| matches!(err, ReadySetError::NonExistentColumn { .. }))
1381+
{
1382+
// Only project the expression if we haven't already
1383+
exprs_to_project.push((expr.clone(), None));
1384+
}
1385+
Ok(col)
1386+
}
1387+
}?;
1388+
1389+
Ok((col, *ot))
1390+
})
1391+
.collect::<ReadySetResult<Vec<_>>>()?;
13481392

13491393
let mut nodes = vec![];
13501394

@@ -1358,12 +1402,13 @@ impl SqlToMirConverter {
13581402
parent_columns
13591403
.into_iter()
13601404
.map(ProjectExpr::Column)
1361-
.chain(exprs_to_project.into_iter().map(|expr| {
1405+
.chain(exprs_to_project.into_iter().map(|(expr, name)| {
13621406
// FIXME(ENG-2502): Use correct dialect.
1363-
let alias = expr
1364-
.display(readyset_sql::Dialect::MySQL)
1365-
.to_string()
1366-
.into();
1407+
let alias = name.unwrap_or_else(|| {
1408+
expr.display(readyset_sql::Dialect::MySQL)
1409+
.to_string()
1410+
.into()
1411+
});
13671412
ProjectExpr::Expr { alias, expr }
13681413
}))
13691414
.collect(),
@@ -2367,7 +2412,7 @@ impl SqlToMirConverter {
23672412
let make_topk = offset.is_none();
23682413
// view key will have the offset parameter if it exists. We must filter it out
23692414
// of the group by, because the column originates at this node
2370-
let group_by = view_key
2415+
let mut group_by: Vec<Column> = view_key
23712416
.columns
23722417
.iter()
23732418
.filter_map(|(col, _)| {
@@ -2379,6 +2424,14 @@ impl SqlToMirConverter {
23792424
})
23802425
.collect();
23812426

2427+
group_by.extend(query_graph.group_by.iter().filter_map(|expr| {
2428+
if let Expr::Column(col) = expr {
2429+
Some(Column::from(col.clone()))
2430+
} else {
2431+
None
2432+
}
2433+
}));
2434+
23822435
// Order by expression projections and either a topk or paginate node
23832436
let paginate_nodes = self.make_paginate_node(
23842437
query_name,
@@ -2393,6 +2446,7 @@ impl SqlToMirConverter {
23932446
order,
23942447
*limit,
23952448
make_topk,
2449+
Some(&query_graph.columns),
23962450
)?;
23972451
func_nodes.extend(paginate_nodes.clone());
23982452
final_node = *paginate_nodes.last().unwrap();

readyset-sql-passes/src/adapter_rewrites/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ fn use_fallback_pagination(
5656
}
5757

5858
if server_supports_pagination &&
59-
// Can't handle parameterized LIMIT even if support is enabled
60-
!matches!(limit_clause.limit(), Some(Literal::Placeholder(_))) &&
6159
// Can't handle bare OFFSET
6260
!(limit_clause.limit().is_none() && limit_clause.offset().is_some())
6361
{

0 commit comments

Comments
 (0)