Skip to content

Commit fe71362

Browse files
committed
replicators: Automatically stop replicating CREATE TABLE LIKE tables
When a table is created using the `CREATE TABLE LIKE` syntax, the replicator will automatically add it to the set of non-replicated relations. Fixes: REA-6164 AI-Use: level:1 Change-Id: Ib50ffbd468947f0a65e9620af7edc8976a6a6964 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/10956 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent 54bbecf commit fe71362

File tree

4 files changed

+135
-11
lines changed

4 files changed

+135
-11
lines changed

readyset-sql/src/ast/create.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,15 @@ pub enum CreateTableLike {
608608
Parenthesized(Relation),
609609
}
610610

611+
impl CreateTableLike {
612+
pub fn as_relation_mut(&mut self) -> &mut Relation {
613+
match self {
614+
CreateTableLike::Plain(relation) => relation,
615+
CreateTableLike::Parenthesized(relation) => relation,
616+
}
617+
}
618+
}
619+
611620
impl TryFromDialect<sqlparser::ast::CreateTableLikeKind> for CreateTableLike {
612621
fn try_from_dialect(
613622
value: sqlparser::ast::CreateTableLikeKind,

replicators/src/mysql_connector/connector.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tracing::{error, info, warn};
3131

3232
use database_utils::UpstreamConfig;
3333
use readyset_client::metrics::recorded;
34-
use readyset_client::recipe::changelist::Change;
34+
use readyset_client::recipe::changelist::{Change, IntoChanges as _};
3535
use readyset_client::recipe::ChangeList;
3636
use readyset_client::{Modification, ReadySetHandle, TableOperation};
3737
use readyset_data::{Collation as RsCollation, DfValue, Dialect, TimestampTz};
@@ -953,7 +953,9 @@ impl MySqlBinlogConnector {
953953
/// `COMMIT` queries are issued for writes on non-transactional storage engines such as MyISAM.
954954
/// We report the position after the `COMMIT` query if necessary.
955955
///
956-
/// TRUNCATE statements are also parsed and handled here.
956+
/// `TRUNCATE` statements and `CREATE TABLE new_table LIKE existing_table` statements are also
957+
/// parsed and handled here, because we fall into this path when the binlog event doesn't have
958+
/// the [`binlog::consts::StatusVarKey::UpdatedDbNames`] status variable.
957959
///
958960
/// TODO: Transactions begin with `BEGIN` queries, but we do not currently support those.
959961
fn try_non_ddl_action_from_query(
@@ -971,13 +973,33 @@ impl MySqlBinlogConnector {
971973
// MySQL only allows one table in the statement, or we would be in trouble.
972974
let mut relation = truncate.tables[0].relation.clone();
973975
if relation.schema.is_none() {
974-
relation.schema = Some(SqlIdentifier::from(q_event.schema()))
976+
relation.schema = Some(q_event.schema().into())
975977
}
976978
Ok(ReplicationAction::TableAction {
977979
table: relation,
978980
actions: vec![TableOperation::Truncate],
979981
})
980982
}
983+
Ok(SqlQuery::CreateTable(mut create)) => {
984+
if let Some(like) = &mut create.like {
985+
let relation = like.as_relation_mut();
986+
if relation.schema.is_none() {
987+
relation.schema = Some(q_event.schema().into())
988+
}
989+
} else {
990+
return Err(ReadySetError::ReplicationFailed(format!(
991+
"Unexpected CREATE TABLE without LIKE should have been handled earlier: {}",
992+
create.table.display_unquoted()
993+
)));
994+
}
995+
if create.table.schema.is_none() {
996+
create.table.schema = Some(q_event.schema().into())
997+
}
998+
Ok(ReplicationAction::DdlChange {
999+
schema: q_event.schema().into(),
1000+
changes: create.into_changes(),
1001+
})
1002+
}
9811003
_ => Err(ReadySetError::SkipEvent),
9821004
}
9831005
}

replicators/src/noria_adapter.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -786,12 +786,13 @@ impl<'a> NoriaAdapter<'a> {
786786
let keep = self
787787
.table_filter
788788
.should_be_processed(schema.as_str(), statement.table.name.as_str())
789-
&& statement.body.is_ok();
789+
&& statement.body.is_ok()
790+
&& statement.like.is_none();
790791
if !keep {
791792
non_replicated_tables.push(Relation {
792793
schema: Some(schema.clone().into()),
793794
name: statement.table.name.clone(),
794-
})
795+
});
795796
}
796797
keep
797798
}
@@ -820,14 +821,18 @@ impl<'a> NoriaAdapter<'a> {
820821
});
821822

822823
// Mark all tables that were filtered as non-replicated, too
823-
changelist
824-
.changes_mut()
825-
.extend(non_replicated_tables.into_iter().map(|relation| {
826-
Change::AddNonReplicatedRelation(NonReplicatedRelation {
824+
for relation in non_replicated_tables {
825+
if let Some(schema) = &relation.schema {
826+
self.table_filter
827+
.deny_replication(schema, relation.name.as_str());
828+
}
829+
changelist
830+
.changes_mut()
831+
.push(Change::AddNonReplicatedRelation(NonReplicatedRelation {
827832
name: relation,
828833
reason: NotReplicatedReason::Configuration,
829-
})
830-
}));
834+
}));
835+
}
831836

832837
// Resolve schemas and drop renamed tables; the new names will get snapshotted when we
833838
// return `ResnapshotNeeded` in the next `if` block

replicators/tests/tests.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,94 @@ async fn mysql_rename_swap_with_ignore() {
11281128
shutdown_tx.shutdown().await;
11291129
}
11301130

1131+
#[tokio::test(flavor = "multi_thread")]
1132+
#[tags(serial, slow, mysql_upstream)]
1133+
async fn mysql_create_table_like_with_ignored() {
1134+
readyset_tracing::init_test_logging();
1135+
let url = &mysql_url();
1136+
let mut client = DbConnection::connect(url).await.unwrap();
1137+
1138+
// Create the initial table and populate it
1139+
client
1140+
.query(
1141+
"
1142+
DROP TABLE IF EXISTS original_table CASCADE;
1143+
DROP TABLE IF EXISTS like_table CASCADE;
1144+
CREATE TABLE original_table (id INT PRIMARY KEY, name VARCHAR(50));
1145+
INSERT INTO original_table VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');
1146+
",
1147+
)
1148+
.await
1149+
.unwrap();
1150+
1151+
// Start replication to snapshot the original table
1152+
let (mut ctx, shutdown_tx) = TestHandle::start_noria(
1153+
url.to_string(),
1154+
None, // No special config - replicate all tables
1155+
)
1156+
.await
1157+
.unwrap();
1158+
1159+
ctx.controller_rx
1160+
.as_mut()
1161+
.unwrap()
1162+
.snapshot_completed()
1163+
.await
1164+
.unwrap();
1165+
1166+
// Verify that original_table is replicated
1167+
check_results!(
1168+
ctx,
1169+
"original_table",
1170+
"create_table_like_before",
1171+
&[
1172+
&[DfValue::Int(1), tiny(b"Alice")],
1173+
&[DfValue::Int(2), tiny(b"Bob")],
1174+
&[DfValue::Int(3), tiny(b"Charlie")]
1175+
]
1176+
);
1177+
1178+
// Create a new table using CREATE TABLE ... LIKE
1179+
client
1180+
.query("CREATE TABLE like_table LIKE original_table")
1181+
.await
1182+
.unwrap();
1183+
1184+
// Give the replicator a moment to process the DDL change
1185+
sleep(Duration::from_secs(2)).await;
1186+
1187+
// Verify that like_table is NOT replicated (added to ignored tables)
1188+
ctx.assert_table_missing("public", "like_table").await;
1189+
1190+
// Verify that original_table is still replicated and working
1191+
check_results!(
1192+
ctx,
1193+
"original_table",
1194+
"create_table_like_after",
1195+
&[
1196+
&[DfValue::Int(1), tiny(b"Alice")],
1197+
&[DfValue::Int(2), tiny(b"Bob")],
1198+
&[DfValue::Int(3), tiny(b"Charlie")]
1199+
]
1200+
);
1201+
1202+
assert!(!ctx.noria.tables().await.unwrap().contains_key(&Relation {
1203+
schema: Some("public".into()),
1204+
name: "like_table".into(),
1205+
}));
1206+
1207+
let non_replicated_rels = ctx.noria.non_replicated_relations().await.unwrap();
1208+
let relation = Relation {
1209+
schema: Some("public".into()),
1210+
name: "like_table".into(),
1211+
};
1212+
assert!(non_replicated_rels.contains(&NonReplicatedRelation::new(relation.clone()),));
1213+
1214+
ctx.stop().await;
1215+
client.stop().await;
1216+
shutdown_tx.shutdown().await;
1217+
}
1218+
11311219
#[tokio::test(flavor = "multi_thread")]
11321220
#[tags(serial, slow, mysql_upstream)]
11331221
async fn mysql_replication_resnapshot() {

0 commit comments

Comments
 (0)