Skip to content

Commit b5ceef6

Browse files
committed
replicators: Pass replica parameters to MySQL server
When Readyset is registering as a replica in the MySQL server, it provides an option to provide replica paramters such as hostname, port, username and password. These are shown when SHOW REPLICAS is run in the master database. This commit adds following arguments to Readyset: ``` --report-host --report-port --report-user --report-password ``` These are passed to the MySQL server with replica registration. The arguments are intentionally kept same as the MySQL arguments for replica registration. The variable names use the "replica" prefix to indicate that these are replica parameters. Fixes: #REA-4690 Closes: #1357 Release-Note-Core: Add `--report-host`, `--report-port`, `--report-user` and `--report-password` arguments to Readyset. These are used to show the replica parameters when SHOW REPLICAS is run in the master database. Change-Id: I7596fadb056fd39c2701f095cfed19a1a712efd4 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/10282 Reviewed-by: Marcelo Altmann <marcelo@readyset.io> Tested-by: Buildkite CI
1 parent 8d15521 commit b5ceef6

File tree

3 files changed

+80
-17
lines changed

3 files changed

+80
-17
lines changed

database-utils/src/lib.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,30 @@ pub struct UpstreamConfig {
9393
#[serde(default)]
9494
pub replication_server_id: Option<ReplicationServerId>,
9595

96+
/// Hostname to report when registering as a replica with the upstream database (MySQL only).
97+
/// If not set, no hostname will be reported.
98+
#[arg(long = "report-host", env = "REPORT_HOST")]
99+
#[serde(default)]
100+
pub replica_report_host: Option<String>,
101+
102+
/// Port to report when registering as a replica with the upstream database (MySQL only).
103+
/// If not set, no port will be reported.
104+
#[arg(long = "report-port", env = "REPORT_PORT")]
105+
#[serde(default)]
106+
pub replica_report_port: Option<u16>,
107+
108+
/// Username to report when registering as a replica with the upstream database (MySQL only).
109+
/// If not set, no username will be reported.
110+
#[arg(long = "report-user", env = "REPORT_USER")]
111+
#[serde(default)]
112+
pub replica_report_user: Option<String>,
113+
114+
/// Password to report when registering as a replica with the upstream database (MySQL only).
115+
/// If not set, no password will be reported.
116+
#[arg(long = "report-password", env = "REPORT_PASSWORD")]
117+
#[serde(default)]
118+
pub replica_report_password: Option<RedactedString>,
119+
96120
/// The time to wait before restarting the replicator in seconds.
97121
#[arg(long, hide = true, default_value = "1", value_parser = duration_from_seconds)]
98122
#[serde(default = "default_replicator_restart_timeout")]
@@ -320,6 +344,10 @@ impl Default for UpstreamConfig {
320344
disable_setup_ddl_replication: false,
321345
disable_create_publication: false,
322346
replication_server_id: Default::default(),
347+
replica_report_host: Default::default(),
348+
replica_report_port: Default::default(),
349+
replica_report_user: Default::default(),
350+
replica_report_password: Default::default(),
323351
replicator_restart_timeout: Duration::from_secs(1),
324352
replication_tables: Default::default(),
325353
replication_tables_ignore: Default::default(),

replicators/src/mysql_connector/connector.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use serde_json::Map;
2929
use tokio::time::timeout;
3030
use tracing::{error, info, warn};
3131

32+
use database_utils::UpstreamConfig;
3233
use readyset_client::metrics::recorded;
3334
use readyset_client::recipe::changelist::Change;
3435
use readyset_client::recipe::ChangeList;
@@ -174,7 +175,13 @@ impl MySqlBinlogConnector {
174175
/// In order to request a binlog, we must first register as a replica, and let the primary
175176
/// know what type of checksum we support (NONE and CRC32 are the options), NONE seems to work
176177
/// but others use CRC32 🤷‍♂️
177-
async fn register_as_replica(&mut self) -> mysql::Result<()> {
178+
async fn register_as_replica(
179+
&mut self,
180+
hostname: Option<String>,
181+
port: Option<u16>,
182+
username: Option<String>,
183+
password: Option<String>,
184+
) -> mysql::Result<()> {
178185
let query = match get_mysql_version(&mut self.connection).await {
179186
Ok(version) => {
180187
if version >= 80400 {
@@ -191,7 +198,24 @@ impl MySqlBinlogConnector {
191198
};
192199
self.connection.query_drop(query).await?;
193200

194-
let cmd = mysql_common::packets::ComRegisterSlave::new(self.server_id());
201+
let mut cmd = mysql_common::packets::ComRegisterSlave::new(self.server_id());
202+
203+
if let Some(ref host) = hostname {
204+
cmd = cmd.with_hostname(host.as_bytes());
205+
}
206+
207+
if let Some(port) = port {
208+
cmd = cmd.with_port(port);
209+
}
210+
211+
if let Some(ref user) = username {
212+
cmd = cmd.with_user(user.as_bytes());
213+
}
214+
215+
if let Some(ref pass) = password {
216+
cmd = cmd.with_password(pass.as_bytes());
217+
}
218+
195219
self.connection.write_command(&cmd).await?;
196220
// Server will respond with OK.
197221
self.connection.read_packet().await?;
@@ -243,11 +267,23 @@ impl MySqlBinlogConnector {
243267
noria: ReadySetHandle,
244268
mysql_opts: O,
245269
next_position: MySqlPosition,
246-
server_id: Option<u32>,
247270
enable_statement_logging: bool,
248271
table_filter: TableFilter,
249272
parsing_preset: ParsingPreset,
273+
config: &UpstreamConfig,
250274
) -> ReadySetResult<Self> {
275+
let server_id = config
276+
.replication_server_id
277+
.as_ref()
278+
.map(|id| id.0.parse::<u32>())
279+
.transpose()
280+
.map_err(|_| {
281+
ReadySetError::ReplicationFailed(format!(
282+
"{} is an invalid server id--it must be a valid u32.",
283+
config.replication_server_id.clone().unwrap()
284+
))
285+
})?;
286+
251287
let mut connector = MySqlBinlogConnector {
252288
noria,
253289
connection: mysql::Conn::new(mysql_opts).await?,
@@ -263,7 +299,18 @@ impl MySqlBinlogConnector {
263299
};
264300

265301
connector.set_parameters().await?;
266-
connector.register_as_replica().await?;
302+
303+
connector
304+
.register_as_replica(
305+
config.replica_report_host.clone(),
306+
config.replica_report_port,
307+
config.replica_report_user.clone(),
308+
config
309+
.replica_report_password
310+
.as_ref()
311+
.map(|p| p.to_string()),
312+
)
313+
.await?;
267314
let binlog_request = connector.request_binlog().await;
268315
match binlog_request {
269316
Ok(()) => (),

replicators/src/noria_adapter.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -428,27 +428,15 @@ impl<'a> NoriaAdapter<'a> {
428428
(Some(pos), _) => pos.clone().try_into()?,
429429
};
430430

431-
let server_id = config
432-
.replication_server_id
433-
.as_ref()
434-
.map(|id| id.0.parse::<u32>())
435-
.transpose()
436-
.map_err(|_| {
437-
ReadySetError::ReplicationFailed(format!(
438-
"{} is an invalid server id--it must be a valid u32.",
439-
config.replication_server_id.clone().unwrap()
440-
))
441-
})?;
442-
443431
let connector = Box::new(
444432
MySqlBinlogConnector::connect(
445433
noria.clone(),
446434
mysql_opts_builder,
447435
pos.clone(),
448-
server_id,
449436
enable_statement_logging,
450437
table_filter.clone(),
451438
parsing_preset,
439+
config,
452440
)
453441
.await?,
454442
);

0 commit comments

Comments
 (0)