Skip to content

Commit f42825e

Browse files
committed
replicators: Save replication offset for freshly-created MySQL tables
We were not consistently saving the offset due to the potential for an empty schema field in Change::CreateTable, which would then cause the lookup into the table's mutator to fail. Set the schema field if it was empty. Also consider tables added so long as they are not immediately dropped in the same batch of events. Fixes: REA-5787 Release-Note-Core: Fix a bug where Readyset would fail to save the replication offset for MySQL tables created during active replication, leading to unnecessary table resnapshots. Change-Id: I45665deb0d85629e331fc9066eab861eee5127f2 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9723 Reviewed-by: Michael Zink <michael.z@readyset.io> Tested-by: Buildkite CI
1 parent e405c8e commit f42825e

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

replicators/src/noria_adapter.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -779,13 +779,20 @@ impl<'a> NoriaAdapter<'a> {
779779
changelist = changelist.with_schema_search_path(vec![schema.clone().into()]);
780780

781781
// Collect a list of all tables we're creating for later
782-
let tables = changelist
783-
.changes()
784-
.filter_map(|change| match change {
785-
Change::CreateTable { statement, .. } => Some(statement.table.clone()),
786-
_ => None,
787-
})
788-
.collect::<Vec<_>>();
782+
let mut added_tables = HashSet::new();
783+
for change in changelist.changes() {
784+
let mut table = match change {
785+
Change::CreateTable { statement, .. } => statement.table.clone(),
786+
Change::AddNonReplicatedRelation(table) => table.name.clone(),
787+
_ => continue,
788+
};
789+
table.schema.get_or_insert_with(|| (&schema).into());
790+
if matches!(change, Change::CreateTable { .. }) {
791+
added_tables.insert(table);
792+
} else {
793+
added_tables.remove(&table);
794+
}
795+
}
789796

790797
match self
791798
.noria
@@ -801,6 +808,7 @@ impl<'a> NoriaAdapter<'a> {
801808
let changes = mem::take(changelist.changes_mut());
802809
// If something went wrong, mark all the tables and views that we just tried to
803810
// create as non-replicated
811+
added_tables.clear();
804812
changelist
805813
.changes_mut()
806814
.extend(changes.into_iter().filter_map(|change| {
@@ -833,7 +841,7 @@ impl<'a> NoriaAdapter<'a> {
833841

834842
// Set the replication offset for each table we just created to this replication offset
835843
// (since otherwise they'll get initialized without an offset)
836-
for table in &tables {
844+
for table in &added_tables {
837845
self.replication_offsets
838846
.tables
839847
.insert(table.clone(), Some(pos.clone()));

0 commit comments

Comments
 (0)