Skip to content

Commit f2dce14

Browse files
committed
readyset-adapter: Improve duplicate cache creation handling
Previously, before creating a named cache, we would first drop any existing cache with that same name before creation. In Iad22f58, we added a similar check for unnamed caches, comparing query_id to first drop a duplicate before creation. It's important to first drop a duplicate cache to prevent the graph metadata in SqlToMirConverter from erroneously being updated with node index values that are then never persisted to the graph itself, which can happen if we decide to abandon the in-progress graph creation to instead reuse the previous cache. Unfortunately, we could still run into this inconsistency between SqlToMirConverter and the graph when creating duplicate caches with different names. Now, instead of these targeted checks that made decisions based on what was in the adapter's view name cache, we instead query the controller for any existing caches that match either the same query_id or requested name. If any such cache exists, drop it before creation. Fixes: REA-5765 Release-Note-Core: Fix a bug that led to errors when attempting to create duplicate caches. Change-Id: I4e5aa25d19e1ea4488c986ee65f4a85ec8a801a5 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9655 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent cd13971 commit f2dce14

File tree

2 files changed

+78
-7
lines changed

2 files changed

+78
-7
lines changed

readyset-adapter/src/backend.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ use mysql_common::row::convert::{FromRow, FromRowError};
8484
use readyset_adapter_types::{DeallocateId, ParsedCommand, PreparedStatementType};
8585
use readyset_client::consensus::{Authority, AuthorityControl, CacheDDLRequest};
8686
use readyset_client::query::*;
87+
use readyset_client::recipe::CacheExpr;
8788
use readyset_client::results::Results;
8889
use readyset_client::{ColumnSchema, PlaceholderIdx, ViewCreateRequest};
8990
pub use readyset_client_metrics::QueryDestination;
@@ -1922,22 +1923,36 @@ where
19221923
concurrently: bool,
19231924
) -> ReadySetResult<noria_connector::QueryResult<'static>> {
19241925
adapter_rewrites::process_query(&mut stmt, self.noria.rewrite_params())?;
1926+
let query_id = QueryId::from_select(&stmt, self.noria.schema_search_path());
1927+
let requested_name = name.clone();
19251928
let name = match name {
19261929
Some(name) => &*name,
19271930
None => {
1928-
*name = Some(QueryId::from_select(&stmt, self.noria.schema_search_path()).into());
1931+
*name = Some(query_id.into());
19291932
name.as_ref().unwrap()
19301933
}
19311934
};
1932-
// If we have another query with the same name, drop that query first
1933-
if let Some(view_request) = self.noria.view_create_request_from_name(name).await {
1935+
1936+
// If we have existing caches with the same query_id or name, drop them first.
1937+
for CacheExpr {
1938+
name,
1939+
statement,
1940+
query_id,
1941+
..
1942+
} in self
1943+
.noria
1944+
.verbose_views(Some(query_id), requested_name.as_ref())
1945+
.await?
1946+
{
19341947
warn!(
1935-
statement = %Sensitive(&view_request.statement.display(self.settings.dialect)),
1936-
name = %name.display(readyset_sql::Dialect::MySQL),
1948+
query_id = %query_id,
1949+
name = %name.display(DB::SQL_DIALECT),
1950+
statement = %Sensitive(&statement.display(self.settings.dialect)),
19371951
"Dropping previously cached query",
19381952
);
1939-
self.drop_cached_query(name).await?;
1953+
self.drop_cached_query(&name).await?;
19401954
}
1955+
19411956
// Now migrate the new query
19421957
let migration_state = match self
19431958
.noria

readyset-mysql/tests/integration.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3005,7 +3005,8 @@ async fn test_default_value_not_null_for_replication() {
30053005
}
30063006

30073007
#[tokio::test(flavor = "multi_thread")]
3008-
async fn create_duplicate_caches() {
3008+
#[tags(serial)]
3009+
async fn create_duplicate_unnamed_caches() {
30093010
let (opts, _handle, shutdown_tx) = setup().await;
30103011
let mut conn = Conn::new(opts).await.unwrap();
30113012

@@ -3030,6 +3031,61 @@ async fn create_duplicate_caches() {
30303031
shutdown_tx.shutdown().await;
30313032
}
30323033

3034+
#[tokio::test(flavor = "multi_thread")]
3035+
#[tags(serial)]
3036+
async fn create_duplicate_named_caches() {
3037+
let (opts, _handle, shutdown_tx) = setup().await;
3038+
let mut conn = Conn::new(opts).await.unwrap();
3039+
3040+
conn.query_drop("CREATE TABLE foo (a INT, b INT)")
3041+
.await
3042+
.unwrap();
3043+
3044+
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a = ?")
3045+
.await
3046+
.unwrap();
3047+
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3048+
.await
3049+
.unwrap();
3050+
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3051+
.await
3052+
.unwrap();
3053+
3054+
let caches: Vec<(String, String, String, String)> = conn.query("SHOW CACHES").await.unwrap();
3055+
assert_eq!(caches.len(), 1, "unexpected caches: {caches:?}");
3056+
3057+
shutdown_tx.shutdown().await;
3058+
}
3059+
3060+
#[tokio::test(flavor = "multi_thread")]
3061+
#[tags(serial)]
3062+
async fn create_duplicate_query_id_and_name_caches() {
3063+
let (opts, _handle, shutdown_tx) = setup().await;
3064+
let mut conn = Conn::new(opts).await.unwrap();
3065+
3066+
conn.query_drop("CREATE TABLE foo (a INT, b INT)")
3067+
.await
3068+
.unwrap();
3069+
3070+
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo")
3071+
.await
3072+
.unwrap();
3073+
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a = ?")
3074+
.await
3075+
.unwrap();
3076+
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3077+
.await
3078+
.unwrap();
3079+
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3080+
.await
3081+
.unwrap();
3082+
3083+
let caches: Vec<(String, String, String, String)> = conn.query("SHOW CACHES").await.unwrap();
3084+
assert_eq!(caches.len(), 1, "unexpected caches: {caches:?}");
3085+
3086+
shutdown_tx.shutdown().await;
3087+
}
3088+
30333089
#[tokio::test(flavor = "multi_thread")]
30343090
#[tags(serial, mysql8_upstream)]
30353091
#[ignore]

0 commit comments

Comments
 (0)