Skip to content

Commit a925ea9

Browse files
topk: fix wrong results on non-unique order col
Every now and then, we would get incorrect results when the query contains an order by on a non-unique column. After dropping to m values (assuming m < k), we lookup the group by key in the parent, sort the result set, skip the first m and take k - m. That sounds reasonable when the order by is unique. However, in the case of non-unique order by cols, the sort on the result set doesn't guarantee that the top m records (which we skip) match what we currently have in the node. This can cause duplicates and ultimately incorrect state. Change-Id: I119e0648ac6722bf57146af545e4172d9a72b7f5 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9689 Tested-by: Buildkite CI Reviewed-by: Sidney Cammeresi <sac@readyset.io>
1 parent fc35ec2 commit a925ea9

File tree

1 file changed

+10
-13
lines changed

1 file changed

+10
-13
lines changed

readyset-dataflow/src/ops/topk.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::borrow::Cow;
22
use std::cmp::Ordering;
3-
use std::collections::{BinaryHeap, HashMap};
3+
use std::collections::{BinaryHeap, HashMap, HashSet};
44
use std::convert::TryInto;
55

66
use dataflow_state::PointKey;
@@ -140,8 +140,6 @@ impl TopK {
140140
let group_start_index = current.len().saturating_sub(self.k);
141141

142142
if original_group_len >= self.k && current.len() < self.k {
143-
let diff = self.k - current.len();
144-
145143
// there used to be k things in the group, now there are fewer than k.
146144
match self.lookup(
147145
*self.src,
@@ -157,18 +155,17 @@ impl TopK {
157155
)
158156
}
159157
IngredientLookupResult::Records(rs) => {
158+
let old_current = current.drain().map(|r| r.row).collect::<HashSet<_>>();
159+
160160
let mut rs = rs.collect::<Result<Vec<_>, _>>()?;
161161
rs.sort_unstable_by(|a, b| self.order.cmp(a.as_ref(), b.as_ref()).reverse());
162-
current.extend(
163-
rs.into_iter()
164-
.map(|row| CurrentRecord {
165-
row,
166-
order: &self.order,
167-
is_new: true,
168-
})
169-
.skip(current.len())
170-
.take(diff),
171-
);
162+
163+
current.extend(rs.into_iter().take(self.k).map(|row| CurrentRecord {
164+
is_new: !old_current.contains(&row),
165+
row,
166+
order: &self.order,
167+
}));
168+
172169
lookup = Some(Lookup {
173170
on: *self.src,
174171
cols: self.group_by.clone(),

0 commit comments

Comments
 (0)