Skip to content

Commit a7f2f6e

Browse files
replicators: Add snapshot query comment for MySQL
This commit adds the ability to add a comment to snapshot queries for MySQL. The comment is added to the snapshot queries by the caller and is automatically enclosed within the delimiters /* and */. This is useful to by-pass tools like Percona Toolkit pt-kill which automatically kills queries that take longer than a threshold. Fixes REA-5946. Closes: 1549 Release-Note-Core: Added the ability to add a comment to snapshot queries for MySQL. Change-Id: I9220e91a5c5a42ffe4b98ede5956d8d389478977 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/10242 Tested-by: Buildkite CI Reviewed-by: Michael Zink <michael.z@readyset.io>
1 parent c9dfb64 commit a7f2f6e

File tree

5 files changed

+44
-8
lines changed

5 files changed

+44
-8
lines changed

database-utils/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ pub struct UpstreamConfig {
165165
)]
166166
#[serde(default = "default_status_update_interval_secs")]
167167
pub status_update_interval_secs: u16,
168+
169+
/// A string to be included as a comment in snapshot queries for MySQL.
170+
/// The provided value is automatically enclosed within the delimiters
171+
/// /* and */. The caller must supply only the raw comment text, without
172+
/// any comment markers.
173+
#[arg(long, env = "SNAPSHOT_QUERY_COMMENT", default_value = "")]
174+
#[serde(default)]
175+
pub snapshot_query_comment: Option<String>,
168176
}
169177

170178
impl UpstreamConfig {
@@ -321,6 +329,7 @@ impl Default for UpstreamConfig {
321329
ignore_ulimit_check: false,
322330
status_update_interval_secs: 10,
323331
max_parallel_snapshot_tables: default_max_parallel_snapshot_tables(),
332+
snapshot_query_comment: Default::default(),
324333
}
325334
}
326335
}

readyset/pkg/common/readyset.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,9 @@ NO_COLOR=true
107107
## ALLOWED_USERS should be different than the one you added to the
108108
## UPSTREAM_DB_URL - format "user1:pass1,user2:pass2"
109109
# ALLOWED_USERS=""
110+
111+
## A string to be included as a comment in snapshot queries for MySQL.
112+
## The provided value is automatically enclosed within the delimiters
113+
## /* and */. The caller must supply only the raw comment text, without
114+
## any comment markers.
115+
# SNAPSHOT_QUERY_COMMENT=

replicators/src/mysql_connector/snapshot.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ pub(crate) struct MySqlReplicator<'a> {
6363
pub(crate) table_filter: &'a mut TableFilter,
6464
/// Base parsing config to use
6565
pub(crate) parsing_preset: ParsingPreset,
66+
/// Whether to add a comment for snapshot queries
67+
pub(crate) snapshot_query_comment: Option<String>,
6668
}
6769

6870
/// Get the list of tables defined in the database
@@ -423,6 +425,7 @@ impl MySqlReplicator<'_> {
423425
mut trx: Transaction<'static>,
424426
mut table_mutator: readyset_client::Table,
425427
snapshot_report_interval_secs: u16,
428+
snapshot_query_comment: Option<String>,
426429
) -> ReadySetResult<()> {
427430
let mut cnt = 0;
428431
let mut snapshot_type = SnapshotType::new(&table_mutator)?;
@@ -432,7 +435,7 @@ impl MySqlReplicator<'_> {
432435
// don't have support to lookup a collation from its name. Temporally get the
433436
// collation ID from querying IS. Later we can avoid the extra query.
434437
let (count_query, initial_query, bound_base_query, collation_query) =
435-
snapshot_type.get_queries(&table_mutator);
438+
snapshot_type.get_queries(&table_mutator, snapshot_query_comment);
436439

437440
let collations = trx
438441
.query(collation_query)
@@ -661,14 +664,20 @@ impl MySqlReplicator<'_> {
661664
span.in_scope(|| info!("Read lock released"));
662665

663666
let table_mutator = noria.table(table.clone()).instrument(span.clone()).await?;
667+
let snapshot_query_comment = self.snapshot_query_comment.clone();
664668

665669
Ok(tokio::spawn(async move {
666670
(
667671
table,
668672
repl_offset,
669-
Self::snapshot_table(trx, table_mutator, snapshot_report_interval_secs)
670-
.instrument(span)
671-
.await,
673+
Self::snapshot_table(
674+
trx,
675+
table_mutator,
676+
snapshot_report_interval_secs,
677+
snapshot_query_comment,
678+
)
679+
.instrument(span)
680+
.await,
672681
)
673682
}))
674683
}

replicators/src/mysql_connector/snapshot_type.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ impl SnapshotType {
6565
/// Returns:
6666
/// * A tuple containing the count query, the initial query, the bound based query
6767
/// and the collation query
68-
pub fn get_queries(&self, table: &readyset_client::Table) -> (String, String, String, String) {
68+
pub fn get_queries(
69+
&self,
70+
table: &readyset_client::Table,
71+
snapshot_query_comment: Option<String>,
72+
) -> (String, String, String, String) {
6973
let force_index = match self {
7074
SnapshotType::KeyBased { name, .. } => {
7175
if let Some(name) = name {
@@ -95,6 +99,13 @@ impl SnapshotType {
9599
table.table_name().name,
96100
schema
97101
);
102+
103+
let snapshot_query_comment = snapshot_query_comment
104+
.map(|s| s.replace("/*", "").replace("*/", ""))
105+
.filter(|s| !s.is_empty())
106+
.map(|s| format!(" /* {s} */"))
107+
.unwrap_or_default();
108+
98109
let (initial_query, bound_based_query) = match self {
99110
SnapshotType::KeyBased { ref keys, .. } => {
100111
let keys = keys
@@ -119,14 +130,14 @@ impl SnapshotType {
119130
let next_bound = format!("({next_bound})");
120131

121132
let initial_query = format!(
122-
"SELECT * FROM {} {} ORDER BY {} LIMIT {}",
133+
"SELECT{snapshot_query_comment} * FROM {} {} ORDER BY {} LIMIT {}",
123134
table.table_name().display(readyset_sql::Dialect::MySQL),
124135
force_index,
125136
order_by,
126137
MYSQL_BATCH_SIZE
127138
);
128139
let bound_based_query = format!(
129-
"SELECT * FROM {} {} WHERE {} ORDER BY {} LIMIT {}",
140+
"SELECT{snapshot_query_comment} * FROM {} {} WHERE {} ORDER BY {} LIMIT {}",
130141
table.table_name().display(readyset_sql::Dialect::MySQL),
131142
force_index,
132143
next_bound,
@@ -137,7 +148,7 @@ impl SnapshotType {
137148
}
138149
SnapshotType::FullTableScan => {
139150
let initial_query = format!(
140-
"SELECT * FROM {}",
151+
"SELECT{snapshot_query_comment} * FROM {}",
141152
table.table_name().display(readyset_sql::Dialect::MySQL)
142153
);
143154
(initial_query.clone(), initial_query)

replicators/src/noria_adapter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ impl<'a> NoriaAdapter<'a> {
348348
pool,
349349
table_filter,
350350
parsing_preset,
351+
snapshot_query_comment: config.snapshot_query_comment.clone(),
351352
};
352353

353354
let snapshot_start = Instant::now();

0 commit comments

Comments
 (0)