Skip to content

Commit a0eab18

Browse files
readyset-dataflow: Fix unquery eviction for straddled joins.
During on_replay_misses, we add remapped keys for left and right ancestors if the node is partial. However during unquery eviction, we were triggering evictions for all ancestors that were not a base node. This condition is not guaranteed to be the same, as a node without a state for example, is not a base node but is also not partial, in this situation we would trigger eviction for a node that does not have a remapping, causing the downstream nodes to not find a tag to fullfill the eviction key. Fixes: REA-6145 Release-Note-Core: Fixed an issue during eviction of straddled joins causing it to attempt to evict a key that does not have a remapping. Change-Id: I86ea30e7cc5881f2fa895f4d872ec0264211bca4 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/10985 Tested-by: Buildkite CI Reviewed-by: Sidney Cammeresi <sac@readyset.io>
1 parent 58cf981 commit a0eab18

File tree

4 files changed

+139
-1
lines changed

4 files changed

+139
-1
lines changed

readyset-client-test-helpers/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ pub struct TestBuilder {
130130
replication_server_id: Option<ReplicationServerId>,
131131
mixed_comparisons: bool,
132132
topk: bool,
133+
straddled_joins: bool,
133134
parsing_preset: ParsingPreset,
134135
}
135136

@@ -158,6 +159,7 @@ impl TestBuilder {
158159
replication_server_id: None,
159160
mixed_comparisons: true,
160161
topk: false,
162+
straddled_joins: false,
161163
parsing_preset: ParsingPreset::for_tests(),
162164
}
163165
}
@@ -250,6 +252,11 @@ impl TestBuilder {
250252
self
251253
}
252254

255+
pub fn set_straddled_joins(mut self, straddled_joins: bool) -> Self {
256+
self.straddled_joins = straddled_joins;
257+
self
258+
}
259+
253260
pub fn parsing_preset(mut self, parsing_preset: ParsingPreset) -> Self {
254261
self.parsing_preset = parsing_preset;
255262
self
@@ -301,6 +308,7 @@ impl TestBuilder {
301308
};
302309
builder.set_persistence(persistence);
303310
builder.set_topk(self.topk);
311+
builder.set_straddled_joins(self.straddled_joins);
304312
builder.set_pagination(false);
305313
builder.set_mixed_comparisons(self.mixed_comparisons);
306314
builder.set_parsing_preset(self.parsing_preset);

readyset-client-test-helpers/src/mysql_helpers.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,33 @@ pub async fn last_query_info(conn: &mut impl Queryable) -> QueryInfo {
3939
.unwrap()
4040
}
4141

42+
/// Helper function that gets EXPLAIN MATERIALIZATIONS and ensure that nodes have a specific number of keys
43+
pub async fn assert_materializations_have_key(
44+
conn: &mut impl Queryable,
45+
skip_base_node: bool,
46+
key: &str,
47+
) {
48+
type Materialization = (usize, usize, String, String, String, usize, bool, String);
49+
let materializations: Vec<Materialization> =
50+
conn.query("EXPLAIN MATERIALIZATIONS").await.unwrap();
51+
for (
52+
_domain_index,
53+
_node_index,
54+
_node_name,
55+
node_description,
56+
keys,
57+
_size_bytes,
58+
_partial,
59+
_indexes,
60+
) in materializations
61+
{
62+
if skip_base_node && node_description == "B" {
63+
continue;
64+
}
65+
assert_eq!(keys, key);
66+
}
67+
}
68+
4269
pub async fn recreate_database<N>(dbname: N)
4370
where
4471
N: Display,

readyset-dataflow/src/domain/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2708,7 +2708,12 @@ impl Domain {
27082708
let keys = if is_generated {
27092709
// No need to store remapping; if it's not there, we don't have the key.
27102710
let mut missed_keys = self.nodes[src].borrow_mut().handle_upquery(m)?;
2711-
missed_keys.retain(|m| !self.nodes[m.node].borrow().is_base());
2711+
missed_keys.retain(|upstream_miss| {
2712+
self.state
2713+
.get(upstream_miss.node)
2714+
.map(|state| state.is_partial())
2715+
.unwrap_or(false)
2716+
});
27122717
missed_keys
27132718
} else {
27142719
vec![m]
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use mysql_async::prelude::Queryable;
2+
use readyset_adapter::backend::MigrationMode;
3+
use readyset_client_metrics::QueryDestination;
4+
use readyset_client_test_helpers::mysql_helpers::{
5+
self, MySQLAdapter, assert_materializations_have_key, last_query_info,
6+
};
7+
use readyset_client_test_helpers::{TestBuilder, sleep};
8+
use test_utils::tags;
9+
10+
/// REA-6145: no tag found for value
11+
#[tokio::test]
12+
#[tags(serial, mysql_upstream)]
13+
async fn test_sj_eviction_no_remapping() {
14+
readyset_tracing::init_test_logging();
15+
let db_name = "sj_eviction_no_remapping";
16+
mysql_helpers::recreate_database(db_name).await;
17+
18+
let (rs_opts, mut handle, shutdown_tx) = TestBuilder::default()
19+
.recreate_database(false)
20+
.migration_mode(MigrationMode::OutOfBand)
21+
.set_straddled_joins(true)
22+
.replicate_db(db_name.to_string())
23+
.build::<MySQLAdapter>()
24+
.await;
25+
26+
let upstream_opts = mysql_helpers::upstream_config().db_name(Some(db_name));
27+
let mut upstream_conn = mysql_async::Conn::new(upstream_opts).await.unwrap();
28+
let mut rs_conn = mysql_async::Conn::new(rs_opts).await.unwrap();
29+
30+
upstream_conn
31+
.query_drop(
32+
"CREATE TABLE t1 (
33+
id int NOT NULL AUTO_INCREMENT,
34+
t2_id int DEFAULT NULL,
35+
key_v varchar(255) DEFAULT NULL,
36+
deleted_at datetime DEFAULT NULL,
37+
PRIMARY KEY (id)
38+
);
39+
CREATE TABLE t2 (
40+
id int NOT NULL AUTO_INCREMENT,
41+
key_i int DEFAULT NULL,
42+
deleted_at datetime DEFAULT NULL,
43+
PRIMARY KEY (id)
44+
);",
45+
)
46+
.await
47+
.unwrap();
48+
49+
sleep().await;
50+
51+
rs_conn
52+
.query_drop("CREATE CACHE sj_cache FROM SELECT * FROM t1 INNER JOIN t2 ON (t1.t2_id = t2.id) WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND t2.key_i = ? AND t1.key_v = ?;")
53+
.await
54+
.unwrap();
55+
56+
sleep().await;
57+
58+
let result: Vec<(i32, i32)> = rs_conn.query("SELECT * FROM t1 INNER JOIN t2 ON (t1.t2_id = t2.id) WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND t2.key_i = 1 AND t1.key_v = '3';").await.unwrap();
59+
assert_eq!(result.len(), 0);
60+
61+
let last_statement = last_query_info(&mut rs_conn).await;
62+
assert_eq!(
63+
last_statement.destination,
64+
QueryDestination::Readyset(Some("sj_cache".into()))
65+
);
66+
67+
let result: Vec<(i32, i32)> = rs_conn.query("SELECT * FROM t1 INNER JOIN t2 ON (t1.t2_id = t2.id) WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND t2.key_i = 1 AND t1.key_v = '2';").await.unwrap();
68+
assert_eq!(result.len(), 0);
69+
70+
let last_statement = last_query_info(&mut rs_conn).await;
71+
assert_eq!(
72+
last_statement.destination,
73+
QueryDestination::Readyset(Some("sj_cache".into()))
74+
);
75+
76+
let result: Vec<(i32, i32)> = rs_conn.query("SELECT * FROM t1 INNER JOIN t2 ON (t1.t2_id = t2.id) WHERE t1.deleted_at IS NULL AND t2.deleted_at IS NULL AND t2.key_i = 1 AND t1.key_v = '1';").await.unwrap();
77+
assert_eq!(result.len(), 0);
78+
79+
let last_statement = last_query_info(&mut rs_conn).await;
80+
assert_eq!(
81+
last_statement.destination,
82+
QueryDestination::Readyset(Some("sj_cache".into()))
83+
);
84+
85+
assert_materializations_have_key(&mut rs_conn, true, "3").await;
86+
87+
handle.flush_partial().await.unwrap();
88+
handle.flush_partial().await.unwrap();
89+
handle.flush_partial().await.unwrap();
90+
sleep().await;
91+
92+
shutdown_tx.shutdown().await;
93+
94+
upstream_conn
95+
.query_drop(format!("DROP DATABASE {db_name}"))
96+
.await
97+
.unwrap();
98+
}

0 commit comments

Comments
 (0)