Skip to content

Commit 81db6af

Browse files
Add support for MySQL 5.7
This commits fixes some issues that prevent us from connecting and replicating from a 5.7 instance. The main issue with 5.7 is the lack of support for the `LOCK INSTANCE FOR BACKUP` which allows tables to be created during the backup. During snapshot we do the following: - get the list of tables - for each table run a select 1 from table_name to add a metadata lock and prevent the table from being altered or dropped. - get the current binlog position If a new table happens to be created between the time we get the list and before we get the current binlog position, we will not snapshot the table. This is a rare case and the worst that can happen is the table be missing from readyset in the first snapshot. If the user restart readyset it will snapshot the table. Fixes: REA-4910 Fixes: #1393 Release-Note-Core: Fixes issues that prevent us from connecting and replicating from a MySQL 5.7 instance. Change-Id: I89b74edb9dd9a4ccfe00d55d0e32cd2efc491c54 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/8131 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent b16b816 commit 81db6af

File tree

6 files changed

+60
-25
lines changed

6 files changed

+60
-25
lines changed

readyset-errors/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,8 +678,13 @@ pub enum ReadySetError {
678678
MirNodeMustHaveDfNodeAssigned { mir_node_index: usize },
679679

680680
/// Error that the upstream database is using an unsupported server version.
681-
#[error("Upstream server version {major}-{minor} is too low. Minimum supported major version is {min}")]
682-
UnsupportedServerVersion { major: u16, minor: String, min: u16 },
681+
#[error("Upstream server version {major}-{minor} is too low. Minimum supported version is {min_major}.{min_minor}")]
682+
UnsupportedServerVersion {
683+
major: u16,
684+
minor: String,
685+
min_major: u16,
686+
min_minor: u16,
687+
},
683688

684689
/// Error that the upstream database reports a server version the ReadySet could not parse.
685690
#[error("Upstream server version could not be parsed")]

readyset-mysql/src/upstream.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ type StatementID = u32;
2626

2727
/// Indicates the minimum upstream server version that we currently support. Used to error out
2828
/// during connection phase if the version for the upstream server is too low.
29-
const MIN_UPSTREAM_VERSION: u16 = 8;
29+
const MIN_UPSTREAM_MAJOR_VERSION: u16 = 5;
30+
const MIN_UPSTREAM_MINOR_VERSION: u16 = 7;
3031

3132
fn dt_to_value_params(dt: &[DfValue]) -> ReadySetResult<Vec<mysql_async::Value>> {
3233
dt.iter().map(|v| v.try_into()).collect()
@@ -202,11 +203,14 @@ impl MySqlUpstream {
202203

203204
// Check that the server version is supported.
204205
let (major, minor, _) = conn.server_version();
205-
if major < MIN_UPSTREAM_VERSION {
206+
if major < MIN_UPSTREAM_MAJOR_VERSION
207+
|| (major == MIN_UPSTREAM_MAJOR_VERSION && minor < MIN_UPSTREAM_MINOR_VERSION)
208+
{
206209
return Err(Error::ReadySet(ReadySetError::UnsupportedServerVersion {
207210
major,
208211
minor: minor.to_string(),
209-
min: MIN_UPSTREAM_VERSION,
212+
min_major: MIN_UPSTREAM_MAJOR_VERSION,
213+
min_minor: MIN_UPSTREAM_MINOR_VERSION,
210214
}));
211215
}
212216

readyset-psql/src/upstream.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use crate::Error;
2525

2626
/// Indicates the minimum upstream server version that we currently support. Used to error out
2727
/// during connection phase if the version for the upstream server is too low.
28-
const MIN_UPSTREAM_VERSION: u16 = 13;
28+
const MIN_UPSTREAM_MAJOR_VERSION: u16 = 13;
29+
const MIN_UPSTREAM_MINOR_VERSION: u16 = 0;
2930

3031
/// A connector to an underlying PostgreSQL database
3132
pub struct PostgreSqlUpstream {
@@ -208,11 +209,22 @@ impl UpstreamDatabase for PostgreSqlUpstream {
208209
let major = major
209210
.parse()
210211
.map_err(|_| Error::ReadySet(ReadySetError::UnparseableServerVersion))?;
211-
if major < MIN_UPSTREAM_VERSION {
212+
let minor: u16 = minor
213+
.chars()
214+
.take_while(|c| c.is_ascii_digit())
215+
.collect::<String>()
216+
.parse()
217+
.map_err(|_| Error::ReadySet(ReadySetError::UnparseableServerVersion))?;
218+
219+
#[allow(clippy::absurd_extreme_comparisons)]
220+
if major < MIN_UPSTREAM_MAJOR_VERSION
221+
|| (major == MIN_UPSTREAM_MAJOR_VERSION && minor < MIN_UPSTREAM_MINOR_VERSION)
222+
{
212223
return Err(Error::ReadySet(ReadySetError::UnsupportedServerVersion {
213224
major,
214-
minor: minor.to_owned(),
215-
min: MIN_UPSTREAM_VERSION,
225+
minor: minor.to_string(),
226+
min_major: MIN_UPSTREAM_MAJOR_VERSION,
227+
min_minor: MIN_UPSTREAM_MINOR_VERSION,
216228
}));
217229
}
218230
let version = format!("{version} ReadySet");

replicators/src/mysql_connector/connector.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ use replication_offset::ReplicationOffset;
3535
use crate::mysql_connector::utils::mysql_pad_collation_column;
3636
use crate::noria_adapter::{Connector, ReplicationAction};
3737

38-
const CHECKSUM_QUERY: &str = "SET @source_binlog_checksum='CRC32'";
38+
use super::utils::get_mysql_version;
39+
3940
const DEFAULT_SERVER_ID: u32 = u32::MAX - 55;
4041
const MAX_POSITION_TIME: u64 = 10;
4142

@@ -84,7 +85,21 @@ impl MySqlBinlogConnector {
8485
/// know what type of checksum we support (NONE and CRC32 are the options), NONE seems to work
8586
/// but others use CRC32 🤷‍♂️
8687
async fn register_as_replica(&mut self) -> mysql::Result<()> {
87-
self.connection.query_drop(CHECKSUM_QUERY).await?;
88+
let query = match get_mysql_version(&mut self.connection).await {
89+
Ok(version) => {
90+
if version >= 80400 {
91+
// MySQL 8.4.0 and above
92+
"SET @source_binlog_checksum='CRC32'"
93+
} else {
94+
// MySQL 8.3.0 and below
95+
"SET @master_binlog_checksum='CRC32'"
96+
}
97+
}
98+
Err(err) => {
99+
return Err(err);
100+
}
101+
};
102+
self.connection.query_drop(query).await?;
88103

89104
let cmd = mysql_common::packets::ComRegisterSlave::new(self.server_id());
90105
self.connection.write_command(&cmd).await?;

replicators/src/mysql_connector/snapshot.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use tokio::task::JoinHandle;
2727
use tracing::{debug, error, info, info_span, warn};
2828
use tracing_futures::Instrument;
2929

30-
use super::utils::{mysql_pad_collation_column, parse_mysql_version};
30+
use super::utils::{get_mysql_version, mysql_pad_collation_column};
3131
use crate::db_util::DatabaseSchemas;
3232
use crate::mysql_connector::snapshot_type::SnapshotType;
3333
use crate::mysql_connector::utils::MYSQL_BATCH_SIZE;
@@ -350,19 +350,11 @@ impl MySqlReplicator {
350350
Ok(tx)
351351
}
352352

353-
/// Get MySQL Server Version
354-
async fn get_mysql_version(&self) -> mysql::Result<u32> {
355-
let mut conn = self.pool.get_conn().await?;
356-
let version: mysql::Row = conn.query_first("SELECT VERSION()").await?.unwrap();
357-
let version: String = version.get(0).expect("MySQL version");
358-
parse_mysql_version(&version)
359-
}
360-
361353
/// Use the SHOW MASTER STATUS or SHOW BINARY LOG STATUS statement to determine
362354
/// the current binary log file name and position.
363355
async fn get_binlog_position(&self) -> mysql::Result<MySqlPosition> {
364356
let mut conn = self.pool.get_conn().await?;
365-
let query = match self.get_mysql_version().await {
357+
let query = match get_mysql_version(&mut conn).await {
366358
Ok(version) => {
367359
if version >= 80400 {
368360
// MySQL 8.4.0 and above
@@ -581,7 +573,7 @@ impl MySqlReplicator {
581573
match conn.query_drop("LOCK INSTANCE FOR BACKUP").await {
582574
Ok(_) => Some(conn),
583575
Err(err) => {
584-
warn!(%err, "Failed to acquire instance lock, DDL changes may cause inconsistency");
576+
warn!(%err, "Failed to acquire instance lock, if new tables are created, we might not detect them.");
585577
None
586578
}
587579
}

replicators/src/mysql_connector/utils.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::string::FromUtf8Error;
2-
use std::sync::Arc;
3-
1+
use mysql_async::{self as mysql, prelude::Queryable};
42
use mysql_common::collations::{self, Collation, CollationId};
53
use mysql_srv::ColumnType;
64
use readyset_data::DfValue;
5+
use std::string::FromUtf8Error;
6+
use std::sync::Arc;
77

88
//TODO(marce): Make this a configuration parameter or dynamically adjust based on the table size
99
pub const MYSQL_BATCH_SIZE: usize = 100_000; // How many rows to fetch at a time from MySQL
@@ -58,6 +58,13 @@ pub fn parse_mysql_version(version: &str) -> mysql_async::Result<u32> {
5858
Ok(major * 10000 + minor * 100 + patch)
5959
}
6060

61+
/// Get MySQL Server Version
62+
pub async fn get_mysql_version(conn: &mut mysql_async::Conn) -> mysql::Result<u32> {
63+
let version: mysql::Row = conn.query_first("SELECT VERSION()").await?.unwrap();
64+
let version: String = version.get(0).expect("MySQL version");
65+
parse_mysql_version(&version)
66+
}
67+
6168
#[cfg(test)]
6269
mod tests {
6370
use super::*;

0 commit comments

Comments
 (0)