Skip to content

Commit 41abc03

Browse files
Drop MySQL tables that are not part of replication filters
This commit add a functionality to drop tables that have being removed from replication filters. This happens if a table is either removed from --replication-tables or added to --replication-tables-ignore. Release-Note-Core: Added functionality to remove leftover tables that have being excluded from replication filters. Fixes: REA-4379 Closes: #1397 Change-Id: I8bc0035c5a1d3bd7247b3448c6a29806fb7aa8bb Reviewed-on: https://gerrit.readyset.name/c/readyset/+/8367 Tested-by: Buildkite CI Reviewed-by: Michael Zink <michael.z@readyset.io>
1 parent 4f09cfe commit 41abc03

File tree

1 file changed

+44
-1
lines changed

1 file changed

+44
-1
lines changed

replicators/src/mysql_connector/snapshot.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use crate::db_util::DatabaseSchemas;
3232
use crate::mysql_connector::snapshot_type::SnapshotType;
3333
use crate::table_filter::TableFilter;
3434
use crate::TablesSnapshottingGaugeGuard;
35+
use std::collections::HashSet;
3536

3637
const RS_BATCH_SIZE: usize = 1000; // How many queries to buffer before pushing to ReadySet
3738

@@ -178,7 +179,7 @@ impl<'a> MySqlReplicator<'a> {
178179
self.table_filter
179180
.should_be_processed(schema.as_str(), table.as_str())
180181
});
181-
182+
self.drop_leftover_tables(noria, &replicated_tables).await?;
182183
noria
183184
.extend_recipe_no_leader_ready(ChangeList::from_changes(
184185
non_replicated_tables
@@ -749,6 +750,48 @@ impl<'a> MySqlReplicator<'a> {
749750
.await?;
750751
Ok(())
751752
}
753+
754+
/// This functions drops all tables that we have internally and have not been selected as
755+
/// part of snapshot, normally because table filters have been changed.
756+
///
757+
/// # Arguments
758+
///
759+
/// * `noria`: The target ReadySet deployment
760+
/// * `snapshot_tables`: The list of tables that are part of the snapshot
761+
async fn drop_leftover_tables(
762+
&mut self,
763+
noria: &mut readyset_client::ReadySetHandle,
764+
snapshot_tables: &[(String, String)],
765+
) -> ReadySetResult<()> {
766+
let mut changes = Vec::new();
767+
let current_tables = noria.tables().await?;
768+
let snapshot_tables_set: HashSet<Relation> = snapshot_tables
769+
.iter()
770+
.map(|(schema, name)| Relation {
771+
schema: Some(schema.clone().into()),
772+
name: name.clone().into(),
773+
})
774+
.collect();
775+
776+
for table in current_tables.keys() {
777+
if !snapshot_tables_set.contains(table) {
778+
warn!(table = %table.display(nom_sql::Dialect::MySQL), "Dropping table that is not part of snapshot");
779+
changes.push(Change::Drop {
780+
name: table.clone(),
781+
if_exists: false,
782+
});
783+
}
784+
}
785+
if !changes.is_empty() {
786+
noria
787+
.extend_recipe_no_leader_ready(ChangeList::from_changes(
788+
changes,
789+
Dialect::DEFAULT_MYSQL,
790+
))
791+
.await?;
792+
}
793+
Ok(())
794+
}
752795
}
753796

754797
/// Convert each entry in a row to a ReadySet type that can be inserted into the base tables

0 commit comments

Comments
 (0)