Skip to content

Commit 614a6ee

Browse files
committed
adapter: Fix views synchronizer bugs
Previously, the views synchronizer only checked the server for views for queries that were in the "pending" state. This meant that if the migration handler set a query's state to "dry run succeeded" before the views synchronizer had a chance to check the server for a view, the query would be stuck in the "dry run succeeded" state forever, even if a view for the query did indeed exist already. This commit fixes the issue by having the views synchronizer check the server for views for queries in *either* the "pending" or "dry run succeeded" states. In order to prevent the views synchronizer from rechecking every query with status "dry run succeeded" over and over again, a "cache" has been added to the views synchronizer to keep track of which queries have already been checked. While working on this, I also noticed that it was possible for the following sequence of events to occur: - Migration handler sees that a query is pending and kicks off a dry run migration - Views synchronizer finds a view on the server for the same query and sets the status to "successful" - Migration handler finishes the dry run migration for the query and overwrites the status as "dry run succeeded" This could lead to a situation where a query that was previously (correctly) labeled as "successful" is moved back to the "dry run succeeded" state. To fix the issue, this commit updates the migration handler to only write the "dry run succeeded" status if the query's status is still "pending" after the dry run is completed. Release-Note-Core: Fixed a bug where queries that already had caches were sometimes stuck in the `SHOW PROXIED QUERIES` list Change-Id: Ie5faa100158fc80c906d8ad5cb897d8a02a07be9 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7442 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent 840348d commit 614a6ee

File tree

8 files changed

+2142
-12
lines changed

8 files changed

+2142
-12
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

logictests/generated/mysql/queries

Lines changed: 1000 additions & 0 deletions
Large diffs are not rendered by default.

readyset-adapter/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ quanta = { version = "0.11", default-features = false }
5151
lru = "0.12.0"
5252
crossbeam-skiplist = "0.1.1"
5353
slab = "0.4"
54+
xxhash-rust = { version = "0.8.10", features = ["xxh3"] }
5455

5556
readyset-adapter-types = { path = "../readyset-adapter-types/" }
5657
readyset-alloc = { path = "../readyset-alloc/" }
@@ -81,6 +82,10 @@ path = "src/lib.rs"
8182
name = "parse"
8283
harness = false
8384

85+
[[bench]]
86+
name = "hash"
87+
harness = false
88+
8489
[features]
8590
ryw = []
8691
failure_injection = ["fail/failpoints"]

readyset-adapter/benches/hash.rs

Lines changed: 1059 additions & 0 deletions
Large diffs are not rendered by default.

readyset-adapter/src/migration_handler.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,18 @@ impl MigrationHandler {
344344
match controller.dry_run(changelist).await {
345345
Ok(_) => {
346346
self.start_time.remove(view_request);
347+
348+
// It's possible that the ViewsSynchronizer found an existing view for this query
349+
// on the server while we were performing a dry run, in which case it would have
350+
// updated the query's status to "successful". In this situation, we don't want to
351+
// overwrite the "successful" status, so we only write the new "dry run succeeded"
352+
// status if the query's status is still "pending"
347353
self.query_status_cache
348-
.update_query_migration_state(view_request, MigrationState::DryRunSucceeded);
354+
.with_mut_migration_state(view_request, |status| {
355+
if status.is_pending() {
356+
*status = MigrationState::DryRunSucceeded;
357+
}
358+
});
349359
}
350360
Err(e) if e.caused_by_unsupported() => {
351361
self.start_time.remove(view_request);

readyset-adapter/src/query_status_cache.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,24 @@ impl QueryStatusCache {
560560
}
561561
}
562562

563+
/// Yields to the given function `f` a mutable reference to the migration state of the query
564+
/// `q`. The primary purpose of this method is allow for atomic reads and writes of the
565+
/// migration state of a query.
566+
pub fn with_mut_migration_state<Q, F>(&self, q: &Q, f: F) -> bool
567+
where
568+
Q: QueryStatusKey,
569+
F: Fn(&mut MigrationState),
570+
{
571+
q.with_mut_status(self, |maybe_query_status| {
572+
if let Some(query_status) = maybe_query_status {
573+
f(&mut query_status.migration_state);
574+
true
575+
} else {
576+
false
577+
}
578+
})
579+
}
580+
563581
/// Marks a query as dropped by the user.
564582
///
565583
/// NOTE: this should only be called after we successfully remove a View for this query. This is
@@ -785,6 +803,22 @@ impl QueryStatusCache {
785803
.into()
786804
}
787805

806+
/// Returns a list of queries whose migration states match `states`.
807+
pub fn queries_with_statuses(&self, states: &[MigrationState]) -> QueryList {
808+
let statuses = self.persistent_handle.statuses.read();
809+
statuses
810+
.iter()
811+
.filter_map(|(_query_id, (query, status))| {
812+
if states.contains(&status.migration_state) {
813+
Some((query.clone(), status.clone()))
814+
} else {
815+
None
816+
}
817+
})
818+
.collect::<Vec<(Query, QueryStatus)>>()
819+
.into()
820+
}
821+
788822
/// Returns a list of queries that have a state of [`QueryState::Successful`].
789823
pub fn allow_list(&self) -> Vec<(QueryId, Arc<ViewCreateRequest>, QueryStatus)> {
790824
self.persistent_handle.allow_list()

readyset-adapter/src/views_synchronizer.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashSet;
12
use std::sync::Arc;
23

34
use dataflow_expression::Dialect;
@@ -8,6 +9,7 @@ use readyset_util::shared_cache::LocalCache;
89
use readyset_util::shutdown::ShutdownReceiver;
910
use tokio::select;
1011
use tracing::{debug, info, instrument, trace, warn};
12+
use xxhash_rust::xxh3;
1113

1214
use crate::query_status_cache::QueryStatusCache;
1315

@@ -22,6 +24,13 @@ pub struct ViewsSynchronizer {
2224
dialect: Dialect,
2325
/// Global and thread-local cache of view endpoints and prepared statements.
2426
view_name_cache: LocalCache<ViewCreateRequest, Relation>,
27+
/// A cache to keep track of the queries for which we've already checked the server for
28+
/// existing views. Note that this cache is *not* updated (i.e. a query is not removed) when a
29+
/// "dry run succeeded" query is migrated.
30+
///
31+
/// This HashSet stores 128-bit hashes computed via xxHash in an attempt to minimize the amount
32+
/// of data we need to store to keep track of the queries we've already seen.
33+
views_checked: HashSet<u128>,
2534
}
2635

2736
impl ViewsSynchronizer {
@@ -38,6 +47,7 @@ impl ViewsSynchronizer {
3847
poll_interval,
3948
dialect,
4049
view_name_cache,
50+
views_checked: HashSet::new(),
4151
}
4252
}
4353

@@ -69,24 +79,34 @@ impl ViewsSynchronizer {
6979

7080
async fn poll(&mut self) {
7181
debug!("Views synchronizer polling");
72-
let queries = self
82+
let (queries, hashes): (Vec<_>, Vec<_>) = self
7383
.query_status_cache
74-
.pending_migration()
84+
.queries_with_statuses(&[MigrationState::DryRunSucceeded, MigrationState::Pending])
7585
.into_iter()
7686
.filter_map(|(q, _)| {
77-
q.into_parsed()
78-
// once arc_unwrap_or_clone is stabilized, we can use that cleaner syntax
79-
.map(|p| Arc::try_unwrap(p).unwrap_or_else(|arc| (*arc).clone()))
87+
q.into_parsed().and_then(|p| {
88+
let hash = xxh3::xxh3_128(&bincode::serialize(&*p).unwrap());
89+
90+
if self.views_checked.contains(&hash) {
91+
// once arc_unwrap_or_clone is stabilized, we can use that cleaner syntax
92+
Some((
93+
Arc::try_unwrap(p).unwrap_or_else(|arc| (*arc).clone()),
94+
hash,
95+
))
96+
} else {
97+
None
98+
}
99+
})
80100
})
81-
.collect::<Vec<_>>();
101+
.unzip();
82102

83103
match self
84104
.controller
85105
.view_names(queries.clone(), self.dialect)
86106
.await
87107
{
88108
Ok(statuses) => {
89-
for (query, name) in queries.into_iter().zip(statuses) {
109+
for ((query, name), hash) in queries.into_iter().zip(statuses).zip(hashes) {
90110
trace!(
91111
// FIXME(REA-2168): Use correct dialect.
92112
query = %query.statement.display(nom_sql::Dialect::MySQL),
@@ -96,7 +116,8 @@ impl ViewsSynchronizer {
96116
if let Some(name) = name {
97117
self.view_name_cache.insert(query.clone(), name).await;
98118
self.query_status_cache
99-
.update_query_migration_state(&query, MigrationState::Successful)
119+
.update_query_migration_state(&query, MigrationState::Successful);
120+
self.views_checked.insert(hash);
100121
}
101122
}
102123
}

readyset-clustertest/src/readyset_mysql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1599,7 +1599,7 @@ async fn views_synchronize_between_deployments() {
15991599

16001600
// Eventually it should show up in adapter 1 too
16011601
eventually! {
1602-
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;");
1602+
adapter_1.as_mysql_conn().unwrap().query_drop("SELECT * FROM t1;").await.unwrap();
16031603
last_statement_destination(adapter_1.as_mysql_conn().unwrap()).await == QueryDestination::Readyset
16041604
}
16051605

0 commit comments

Comments
 (0)