Skip to content

Commit 08761f6

Browse files
replicators: correctly handle PostgreSQL table snapshot errors
If more tables are queued for snapshot than the configured max_parallel_snapshot_tables, and at least the first table to finish fails, the failure could be silently ignored. This happened because the wait loop only monitored the number of in-progress snapshots, without verifying if any completed snapshot had failed. As a result, we continued snapshotting subsequent tables without properly reporting the failure. This caused the failed table to still be marked as in-progress (having an entry in the offsets but with a None value), which we caught by the panic in the end of the snapshot loop. Fixes: REA-5722 Closes: #1510 Release-Note-Core: Fixed a bug where a PotgreSQL table snapshot failure could be silently ignored, causing Readyset to fail to remove the failed table from the list of tables to snapshot, and abort the entire snapshot process. Change-Id: Ib1dcabbb11531fb515d71777124f4bec08774965 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9374 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent 116aebf commit 08761f6

File tree

1 file changed

+54
-32
lines changed

1 file changed

+54
-32
lines changed

replicators/src/postgres_connector/snapshot.rs

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,51 @@ impl<'a> PostgresReplicator<'a> {
669669
})
670670
}
671671

672+
/// Handle a table snapshot error by removing the table from the set of tables and adding it as
673+
/// a non-replicated relation. If the error is not a TableError, propagate it.
674+
///
675+
/// # Arguments
676+
///
677+
/// * `error` - The error that occurred while snapshotting the table.
678+
/// * `tables` - A mutable reference to the set of tables to snapshot.
679+
///
680+
/// # Returns
681+
///
682+
/// Returns the error if it is not a TableError, otherwise returns Ok(()).
683+
async fn handle_table_snapshot_error(
684+
&mut self,
685+
error: ReadySetError,
686+
tables: &mut Vec<TableDescription>,
687+
) -> ReadySetResult<()> {
688+
// Remove from the set of tables any that failed to snapshot,
689+
// and add them as non-replicated relations.
690+
// Propagate any non-TableErrors.
691+
692+
match error {
693+
ReadySetError::TableError { ref table, .. } => {
694+
warn!(%error, table=%table.display(Dialect::PostgreSQL), "Error snapshotting, table will not be used");
695+
tables.retain(|t| t.name != *table);
696+
self.noria
697+
.extend_recipe_no_leader_ready(ChangeList::from_changes(
698+
vec![
699+
Change::Drop {
700+
name: table.clone(),
701+
if_exists: false,
702+
},
703+
Change::AddNonReplicatedRelation(NonReplicatedRelation {
704+
name: table.clone(),
705+
reason: NotReplicatedReason::from_string(&format!("{:?}", error)),
706+
}),
707+
],
708+
DataDialect::DEFAULT_POSTGRESQL,
709+
))
710+
.await?;
711+
}
712+
_ => Err(error)?,
713+
}
714+
Ok(())
715+
}
716+
672717
/// Snapshot the contents of the upstream database to ReadySet, starting with the DDL, followed
673718
/// by each table's contents.
674719
///
@@ -919,11 +964,10 @@ impl<'a> PostgresReplicator<'a> {
919964
// Commit the transaction we were using to snapshot the schema. This is important since that
920965
// transaction holds onto locks for tables which we now need to load data from.
921966
self.transaction.take().unwrap().commit().await?;
922-
923967
// Finally copy each table into noria
924968
info!(tables=%tables.len(), %max_parallel_snapshot_tables, "Snapshotting tables");
925969
let mut snapshotting_tables = FuturesUnordered::new();
926-
for table in &tables {
970+
for table in &tables.clone() {
927971
set_failpoint!(failpoints::POSTGRES_SNAPSHOT_TABLE);
928972
let span =
929973
info_span!("Snapshotting table", table = %table.name.display(Dialect::PostgreSQL));
@@ -942,7 +986,11 @@ impl<'a> PostgresReplicator<'a> {
942986
let snapshot_name = replication_slot.snapshot_name.clone();
943987
let table = table.clone();
944988
if snapshotting_tables.len() >= max_parallel_snapshot_tables {
945-
snapshotting_tables.next().await;
989+
// If we're already at the max number of parallel snapshots, wait for one to finish
990+
// before adding another.
991+
if let Some(Err(error)) = snapshotting_tables.next().await {
992+
self.handle_table_snapshot_error(error, &mut tables).await?;
993+
}
946994
}
947995
snapshotting_tables.push(Self::snapshot_table(
948996
pool,
@@ -955,36 +1003,10 @@ impl<'a> PostgresReplicator<'a> {
9551003
))
9561004
}
9571005

958-
// Remove from the set of tables any that failed to snapshot,
959-
// and add them as non-replicated relations.
960-
// Propagate any non-TableErrors.
1006+
// Wait for all tables to finish snapshotting
9611007
while let Some(res) = snapshotting_tables.next().await {
962-
if let Err(e) = res {
963-
match e {
964-
ReadySetError::TableError { ref table, .. } => {
965-
warn!(%e, table=%table.display(Dialect::PostgreSQL), "Error snapshotting, table will not be used");
966-
tables.retain(|t| t.name != *table);
967-
self.noria
968-
.extend_recipe_no_leader_ready(ChangeList::from_changes(
969-
vec![
970-
Change::Drop {
971-
name: table.clone(),
972-
if_exists: false,
973-
},
974-
Change::AddNonReplicatedRelation(NonReplicatedRelation {
975-
name: table.clone(),
976-
reason: NotReplicatedReason::from_string(&format!(
977-
"{:?}",
978-
e
979-
)),
980-
}),
981-
],
982-
DataDialect::DEFAULT_POSTGRESQL,
983-
))
984-
.await?;
985-
}
986-
_ => Err(e)?,
987-
}
1008+
if let Err(error) = res {
1009+
self.handle_table_snapshot_error(error, &mut tables).await?;
9881010
}
9891011
}
9901012

0 commit comments

Comments
 (0)