Skip to content

Commit d18d95d

Browse files
committed
replicators: Add a timeout to MySQL replication connections
We enable heartbeats on the source (every second), and if we receive no packet for five seconds, we close the connection and reconnect. Release-Note-Core: MySQL replication now uses heartbeats to detect stale, half-closed connections and reconnect to the server. Fixes: REA-5870 Change-Id: I791074209980a5eb765dcdf0f432549c254f55b3 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9988 Tested-by: Buildkite CI Reviewed-by: Michael Zink <michael.z@readyset.io>
1 parent 2b36d9d commit d18d95d

File tree

1 file changed

+28
-7
lines changed

1 file changed

+28
-7
lines changed

replicators/src/mysql_connector/connector.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::collections::HashMap;
33
use std::convert::{TryFrom, TryInto};
44
use std::str::FromStr as _;
55
use std::sync::Arc;
6+
use std::time::{Duration, Instant};
67

78
use async_trait::async_trait;
89
use atoi::atoi;
@@ -25,6 +26,7 @@ use readyset_data::encoding::{mysql_character_set_name_to_collation_id, Encoding
2526
use readyset_decimal::Decimal;
2627
use readyset_sql_parsing::{parse_query_with_config, ParsingConfig, ParsingPreset};
2728
use serde_json::Map;
29+
use tokio::time::timeout;
2830
use tracing::{error, info, warn};
2931

3032
use readyset_client::metrics::recorded;
@@ -119,7 +121,8 @@ type TableMetadata = (Vec<Option<u16>>, Vec<Option<bool>>);
119121
///
120122
/// The connector must also be assigned a unique `server_id` value
121123
pub(crate) struct MySqlBinlogConnector {
122-
/// A handle to the Readyset instance; used for retrieving schemas in MySQL 5.7. See [`MySqlBinlogConnector::table_schemas`].
124+
/// A handle to the Readyset instance; used for retrieving schemas in MySQL 5.7.
125+
/// See [`MySqlBinlogConnector::table_schemas`].
123126
noria: ReadySetHandle,
124127
/// This is the underlying (regular) MySQL connection
125128
connection: mysql::Conn,
@@ -137,7 +140,7 @@ pub(crate) struct MySqlBinlogConnector {
137140
enable_statement_logging: bool,
138141
/// Timestamp of the last reported position. This is use to ensure we keep the distance
139142
/// between min/max position as short as possible.
140-
last_reported_pos_ts: std::time::Instant,
143+
last_reported_pos_ts: Instant,
141144
/// Table filter
142145
table_filter: TableFilter,
143146
/// A cache of `CREATE TABLE` statements retrieved from the controller. This is only populated
@@ -155,6 +158,19 @@ impl MySqlBinlogConnector {
155158
self.server_id.unwrap_or(DEFAULT_SERVER_ID)
156159
}
157160

161+
async fn set_parameters(&mut self) -> mysql::Result<()> {
162+
// Enabling source to replica heartbeat packets requires manipulation of a knob that is
163+
// totally undocumented. Yes, this really is a user variable, not a session variable.
164+
// The two different ones are for different MySQL versions, but we can set both because
165+
// setting a non-existent user variable isn't an error. The timeout is in nanoseconds.
166+
self.connection
167+
.query_drop(
168+
"set @master_heartbeat_period = 1000000000, @source_heartbeat_period = 1000000000",
169+
)
170+
.await?;
171+
Ok(())
172+
}
173+
158174
/// In order to request a binlog, we must first register as a replica, and let the primary
159175
/// know what type of checksum we support (NONE and CRC32 are the options), NONE seems to work
160176
/// but others use CRC32 🤷‍♂️
@@ -240,13 +256,13 @@ impl MySqlBinlogConnector {
240256
next_position,
241257
current_gtid: None,
242258
enable_statement_logging,
243-
last_reported_pos_ts: std::time::Instant::now()
244-
- std::time::Duration::from_secs(MAX_POSITION_TIME),
259+
last_reported_pos_ts: Instant::now() - Duration::from_secs(MAX_POSITION_TIME),
245260
table_filter,
246261
table_schemas: Default::default(),
247262
parsing_config: parsing_preset.into_config().rate_limit_logging(false),
248263
};
249264

265+
connector.set_parameters().await?;
250266
connector.register_as_replica().await?;
251267
let binlog_request = connector.request_binlog().await;
252268
match binlog_request {
@@ -1067,7 +1083,7 @@ impl MySqlBinlogConnector {
10671083
/// This function returns a boolean indicating if we need to report the current position
10681084
fn report_position_elapsed(&mut self) -> bool {
10691085
if self.last_reported_pos_ts.elapsed().as_secs() > MAX_POSITION_TIME {
1070-
self.last_reported_pos_ts = std::time::Instant::now();
1086+
self.last_reported_pos_ts = Instant::now();
10711087
return true;
10721088
}
10731089
false
@@ -1086,7 +1102,11 @@ impl MySqlBinlogConnector {
10861102
use mysql_common::binlog::events;
10871103

10881104
loop {
1089-
let binlog_event = handle_err!(self, self.next_event().await);
1105+
let fut = self.next_event();
1106+
let res = timeout(Duration::from_secs(5), fut).await;
1107+
let res = handle_err!(self, res);
1108+
1109+
let binlog_event = handle_err!(self, res);
10901110

10911111
if u64::from(binlog_event.header().log_pos()) < self.next_position.position
10921112
&& self.next_position.position + u64::from(binlog_event.header().event_size())
@@ -1201,6 +1221,8 @@ impl MySqlBinlogConnector {
12011221
self.current_gtid = Some(ev.gno());
12021222
}
12031223

1224+
EventType::HEARTBEAT_EVENT => {}
1225+
12041226
/*
12051227
12061228
EventType::ANONYMOUS_GTID_EVENT => {}
@@ -1216,7 +1238,6 @@ impl MySqlBinlogConnector {
12161238
| EventType::FORMAT_DESCRIPTION_EVENT // A descriptor event that is written to the beginning of each binary log file. This event is used as of MySQL 5.0; it supersedes START_EVENT_V3.
12171239
| EventType::STOP_EVENT // Written when mysqld stops
12181240
| EventType::INCIDENT_EVENT // The event is used to inform the slave that something out of the ordinary happened on the master that might cause the database to be in an inconsistent state.
1219-
| EventType::HEARTBEAT_EVENT => {} // The event is originated by master's dump thread and sent straight to slave without being logged. Slave itself does not store it in relay log but rather uses a data for immediate checks and throws away the event.
12201241
12211242
EventType::UNKNOWN_EVENT | EventType::SLAVE_EVENT => {} // Ignored events
12221243

0 commit comments

Comments
 (0)