Skip to content

Commit 1501a18

Browse files
replicators: Allow configuring max_parallel_snapshot_tables for MySQL.
This commit allows configuring the max_parallel_snapshot_tables parameter for MySQL. It was fixed to 8, but now it can be configured via cmd line or env variable. Defaults to number of cores -1. Release-Note-Core: Allow configuring the number of parallel tables for MySQL snapshots via MAX_PARALLEL_SNAPSHOT_TABLES or --max-parallel-snapshot-tables. Fixes: REA-5785 Closes: #1529 Change-Id: I7f18faf945b2a8c13cbbf879df7dfbb12bd274b2 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9715 Reviewed-by: Michael Zink <michael.z@readyset.io> Tested-by: Buildkite CI
1 parent 24d2b0f commit 1501a18

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

database-utils/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ pub struct UpstreamConfig {
135135
pub snapshot_report_interval_secs: u16,
136136

137137
/// The maximum number of relations that will be snapshotted in parallel from the upstream
138-
#[arg(long, hide = true)]
138+
#[arg(long, env = "MAX_PARALLEL_SNAPSHOT_TABLES")]
139139
#[serde(default = "default_max_parallel_snapshot_tables")]
140140
pub max_parallel_snapshot_tables: Option<usize>,
141141

replicators/src/mysql_connector/snapshot.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ use std::collections::HashSet;
3939

4040
const RS_BATCH_SIZE: usize = 1000; // How many queries to buffer before pushing to ReadySet
4141

42-
const MAX_SNAPSHOT_BATCH: usize = 8; // How many tables to snapshot at the same time
43-
4442
/// A list of databases MySQL uses internally, they should not be replicated
4543
pub const MYSQL_INTERNAL_DBS: &[&str] =
4644
&["mysql", "information_schema", "performance_schema", "sys"];
@@ -558,14 +556,15 @@ impl MySqlReplicator<'_> {
558556
db_schemas: &mut DatabaseSchemas,
559557
snapshot_report_interval_secs: u16,
560558
full_snapshot: bool,
561-
_max_parallel_snapshot_tables: usize, // TODO: limit parallelism for MySQL
559+
max_parallel_snapshot_tables: usize,
562560
) -> ReadySetResult<()> {
563561
let result = self
564562
.replicate_to_noria_with_table_locks(
565563
noria,
566564
db_schemas,
567565
snapshot_report_interval_secs,
568566
full_snapshot,
567+
max_parallel_snapshot_tables,
569568
)
570569
.await;
571570

@@ -585,6 +584,7 @@ impl MySqlReplicator<'_> {
585584
db_schemas: &mut DatabaseSchemas,
586585
snapshot_report_interval_secs: u16,
587586
full_snapshot: bool,
587+
max_parallel_snapshot_tables: usize,
588588
) -> ReadySetResult<()> {
589589
// NOTE: There are two ways to prevent DDL changes in MySQL:
590590
// `FLUSH TABLES WITH READ LOCK` or `LOCK INSTANCE FOR BACKUP`. Both are not
@@ -628,6 +628,7 @@ impl MySqlReplicator<'_> {
628628
table_list,
629629
&replication_offsets,
630630
snapshot_report_interval_secs,
631+
max_parallel_snapshot_tables,
631632
)
632633
.await
633634
}
@@ -678,6 +679,7 @@ impl MySqlReplicator<'_> {
678679
mut table_list: Vec<Relation>,
679680
replication_offsets: &ReplicationOffsets,
680681
snapshot_report_interval_secs: u16,
682+
max_parallel_snapshot_tables: usize,
681683
) -> ReadySetResult<()> {
682684
let mut replication_tasks = FuturesUnordered::new();
683685
let mut compacting_tasks = FuturesUnordered::new();
@@ -698,7 +700,7 @@ impl MySqlReplicator<'_> {
698700
);
699701
}
700702

701-
if replication_tasks.len() == MAX_SNAPSHOT_BATCH {
703+
if replication_tasks.len() == max_parallel_snapshot_tables {
702704
break;
703705
}
704706
}
@@ -745,7 +747,7 @@ impl MySqlReplicator<'_> {
745747
}
746748

747749
// If still have tables to snapshot add them to the task list
748-
while replication_tasks.len() < MAX_SNAPSHOT_BATCH && !table_list.is_empty() {
750+
while replication_tasks.len() < max_parallel_snapshot_tables && !table_list.is_empty() {
749751
let table = table_list.pop().expect("Not empty");
750752
if replication_offsets.has_table(&table) {
751753
info!(

0 commit comments

Comments
 (0)