Skip to content

Commit c863ca4

Browse files
replicators: When restarting replicator, log the position we were at.
When we restart the replicator, we start from the last position we applied to a table. In case of an instance where we do not snapshot all tables, the position we error out vs the position we restart at might be different. For a better debugging experience, log the last parsed position before erroring out. Release-Note-Core: Improve logging when restarting replicator to print the last replication position. Change-Id: I1fac045c7253e8ca141b295899618d61226369c1 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9739 Tested-by: Buildkite CI Reviewed-by: Michael Zink <michael.z@readyset.io>
1 parent eda78c9 commit c863ca4

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

replicators/src/mysql_connector/snapshot.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,9 @@ impl MySqlReplicator<'_> {
716716
"Compacting table",
717717
table = %table.display(readyset_sql::Dialect::MySQL)
718718
);
719-
span.in_scope(|| info!("Setting replication offset"));
719+
span.in_scope(
720+
|| info!(repl_offset = %repl_offset, "Setting replication offset"),
721+
);
720722
noria_table
721723
.set_replication_offset(repl_offset)
722724
.map_err(log_err)

replicators/src/noria_adapter.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -469,9 +469,13 @@ impl<'a> NoriaAdapter<'a> {
469469
Some(max) if max > &current_pos => {
470470
info!(start = %current_pos, end = %max, "Catching up");
471471
let max = max.clone();
472-
adapter
472+
if let Err(err) = adapter
473473
.main_loop(&mut current_pos, Some(max), controller_tx, replicator_rx)
474-
.await?;
474+
.await
475+
{
476+
warn!(position = %current_pos, "Replicator stopped during catch-up phase");
477+
return Err(err);
478+
}
475479
}
476480
_ => {}
477481
}
@@ -481,9 +485,13 @@ impl<'a> NoriaAdapter<'a> {
481485
let _ = controller_tx.send(ControllerMessage::SnapshotDone);
482486

483487
info!(position = %current_pos, "Streaming replication started");
484-
adapter
488+
if let Err(err) = adapter
485489
.main_loop(&mut current_pos, None, controller_tx, replicator_rx)
486-
.await?;
490+
.await
491+
{
492+
warn!(position = %current_pos, "Replicator stopped during streaming replication");
493+
return Err(err);
494+
}
487495

488496
unreachable!("`main_loop` will never stop with an Ok status if `until = None`");
489497
}
@@ -719,19 +727,27 @@ impl<'a> NoriaAdapter<'a> {
719727

720728
if min_pos != max_pos {
721729
info!(start = %min_pos, end = %max_pos, "Catching up");
722-
adapter
730+
if let Err(err) = adapter
723731
.main_loop(&mut min_pos, Some(max_pos), controller_tx, replicator_rx)
724-
.await?;
732+
.await
733+
{
734+
warn!(position = %min_pos, "Replicator stopped during catch-up phase");
735+
return Err(err);
736+
}
725737
}
726738

727739
// Let Controller know that the initial snapshotting is complete. Ignores the error, which
728740
// will not occur unless the Controller dropped the rx half of this channel.
729741
let _ = controller_tx.send(ControllerMessage::SnapshotDone);
730742

731743
info!(position = %min_pos, "Streaming replication started");
732-
adapter
744+
if let Err(err) = adapter
733745
.main_loop(&mut min_pos, None, controller_tx, replicator_rx)
734-
.await?;
746+
.await
747+
{
748+
warn!(position = %min_pos, "Replicator stopped during streaming replication");
749+
return Err(err);
750+
}
735751

736752
unreachable!("`main_loop` will never stop with an Ok status if `until = None`");
737753
}

0 commit comments

Comments
 (0)