Skip to content

Commit 875fda7

Browse files
Allow multiple users to connect to Readyset
This commit allows multiple users to connect to Readyset by adding a new parameter --allowed-users containing a comma-separated list of username:password that will be used to authenticate connections to the Readyset server. The same user and password will be used for both the Readyset server and the upstream database. This commit also remove the obsolete --username and --password parameters. Closes: REA-4641 Closes: #1349 Release-Note-Core: It's now possible to connect to Readyset with multiple users by specifying a comma-separated list of username:password pairs in the --allowed-users parameter. Change-Id: Ie1d77daa6ae6430bce92e56ed2726b7016bbd0f9 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/8804 Tested-by: Buildkite CI Reviewed-by: Jason Brown <jason.b@readyset.io>
1 parent 99c04d7 commit 875fda7

File tree

21 files changed

+214
-39
lines changed

21 files changed

+214
-39
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/src/utils/backend.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl Backend {
3232

3333
match DatabaseURL::from_str(url)? {
3434
DatabaseURL::MySQL(_) => {
35-
let upstream = MySqlUpstream::connect(upstream_config.clone()).await?;
35+
let upstream = MySqlUpstream::connect(upstream_config.clone(), None, None).await?;
3636
let status_reporter = ReadySetStatusReporter::new(
3737
upstream_config.clone(),
3838
noria.handle(),
@@ -56,7 +56,8 @@ impl Backend {
5656
))
5757
}
5858
DatabaseURL::PostgreSQL(_) => {
59-
let upstream = PostgreSqlUpstream::connect(upstream_config.clone()).await?;
59+
let upstream =
60+
PostgreSqlUpstream::connect(upstream_config.clone(), None, None).await?;
6061
let status_reporter = ReadySetStatusReporter::new(
6162
upstream_config.clone(),
6263
noria.handle(),

mysql-srv/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ tracing = { workspace = true }
3131
database-utils = { path = "../database-utils" }
3232
readyset-adapter-types = { path = "../readyset-adapter-types" }
3333
readyset-data = { path = "../readyset-data" }
34+
readyset-util = { path = "../readyset-util" }
3435

3536
[dev-dependencies]
3637
tokio-postgres = { workspace = true }

mysql-srv/src/lib.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
//! use std::collections::HashMap;
2323
//! use std::iter;
2424
//!
25+
//! use readyset_util::redacted::RedactedString;
2526
//! use database_utils::TlsMode;
2627
//! use mysql::prelude::*;
2728
//! use mysql_srv::*;
@@ -64,6 +65,10 @@
6465
//! Ok(())
6566
//! }
6667
//!
68+
//! async fn set_auth_info(&mut self, _: &str, _: Option<RedactedString>) -> io::Result<()> {
69+
//! Ok(())
70+
//! }
71+
//!
6772
//! async fn on_query(
6873
//! &mut self,
6974
//! query: &str,
@@ -182,6 +187,7 @@ use error::{other_error, OtherErrorKind};
182187
use mysql_common::constants::CapabilityFlags;
183188
use readyset_adapter_types::{DeallocateId, ParsedCommand};
184189
use readyset_data::DfType;
190+
use readyset_util::redacted::RedactedString;
185191
use tokio::io::{AsyncRead, AsyncWrite};
186192
use tokio::net;
187193
use tokio_native_tls::TlsAcceptor;
@@ -312,6 +318,9 @@ pub trait MySqlShim<S: AsyncRead + AsyncWrite + Unpin + Send> {
312318
/// Called when client switches user.
313319
async fn on_change_user(&mut self, _: &str, _: &str, _: &str) -> io::Result<()>;
314320

321+
/// Called when client authenticates to inform which users we should use.
322+
async fn set_auth_info(&mut self, _: &str, _: Option<RedactedString>) -> io::Result<()>;
323+
315324
/// Retrieve the password for the user with the given username, if any.
316325
///
317326
/// If the user doesn't exist, return [`None`].
@@ -438,7 +447,8 @@ impl<B: MySqlShim<S> + Send, S: AsyncWrite + AsyncRead + Unpin + Send> MySqlInte
438447
tls_acceptor,
439448
tls_mode,
440449
};
441-
if let (true, database) = mi.init().await? {
450+
if let (true, username, password, database) = mi.init().await? {
451+
mi.shim.set_auth_info(&username, password).await?;
442452
if let Some(database) = database {
443453
mi.shim.on_init(&database, None).await?;
444454
}
@@ -455,9 +465,11 @@ impl<B: MySqlShim<S> + Send, S: AsyncWrite + AsyncRead + Unpin + Send> MySqlInte
455465
/// sent and received as needed to complete authentication.
456466
///
457467
/// If no errors are encountered, the return value contains a tuple of a boolean to indicate
458-
/// whether authentication was successful, and a database name if one was specified by the
459-
/// client in the handshake response.
460-
async fn init(&mut self) -> Result<(bool, Option<String>), io::Error> {
468+
/// whether authentication was successful, the username, the plaintext password if one was
469+
/// provided, and a database name if one was specified by the client in the handshake response.
470+
async fn init(
471+
&mut self,
472+
) -> Result<(bool, String, Option<RedactedString>, Option<String>), io::Error> {
461473
let auth_data =
462474
generate_auth_data().map_err(|_| other_error(OtherErrorKind::AuthDataErr))?;
463475
self.auth_data = auth_data;
@@ -546,7 +558,7 @@ impl<B: MySqlShim<S> + Send, S: AsyncWrite + AsyncRead + Unpin + Send> MySqlInte
546558
)
547559
.await?;
548560
self.conn.flush().await?;
549-
return Ok((false, None));
561+
return Ok((false, "".to_string(), None, None));
550562
}
551563
}
552564

@@ -601,7 +613,7 @@ impl<B: MySqlShim<S> + Send, S: AsyncWrite + AsyncRead + Unpin + Send> MySqlInte
601613
&mut self.conn,
602614
)
603615
.await?;
604-
return Ok((false, database));
616+
return Ok((false, "".to_string(), None, database));
605617
}
606618

607619
debug!(
@@ -632,17 +644,24 @@ impl<B: MySqlShim<S> + Send, S: AsyncWrite + AsyncRead + Unpin + Send> MySqlInte
632644
password
633645
};
634646

635-
let auth_success = !self.shim.require_authentication()
636-
|| self
637-
.shim
638-
.password_for_username(&username)
639-
.is_some_and(|password| {
640-
let expected = hash_password(&password, &auth_data);
641-
let actual = handshake_password.as_slice();
642-
trace!(?expected, ?actual);
643-
expected == actual
644-
});
645-
647+
let plain_password = self.shim.password_for_username(&username);
648+
let require_auth = self.shim.require_authentication();
649+
let auth_success = !require_auth
650+
|| plain_password.as_ref().is_some_and(|password| {
651+
let expected = hash_password(password, &auth_data);
652+
let actual = handshake_password.as_slice();
653+
trace!(?expected, ?actual);
654+
expected == actual
655+
});
656+
let plain_password = if require_auth {
657+
Some(RedactedString::from(
658+
plain_password
659+
.map(|p| String::from_utf8_lossy(&p).into_owned())
660+
.unwrap_or_default(),
661+
))
662+
} else {
663+
None
664+
};
646665
if auth_success {
647666
debug!(%username, "Successfully authenticated client");
648667
writers::write_ok_packet(&mut self.conn, 0, 0, StatusFlags::empty()).await?;
@@ -656,8 +675,7 @@ impl<B: MySqlShim<S> + Send, S: AsyncWrite + AsyncRead + Unpin + Send> MySqlInte
656675
.await?;
657676
}
658677
self.conn.flush().await?;
659-
660-
Ok((auth_success, database))
678+
Ok((auth_success, username, plain_password, database))
661679
}
662680

663681
async fn run(mut self) -> Result<(), io::Error> {

mysql-srv/tests/main.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use mysql_srv::{
2020
QueryResultWriter, QueryResultsResponse, StatementMetaWriter,
2121
};
2222
use readyset_adapter_types::DeallocateId;
23+
use readyset_util::redacted::RedactedString;
2324
use tokio::io::{AsyncRead, AsyncWrite};
2425
use tokio::net::TcpStream;
2526

@@ -73,6 +74,14 @@ where
7374
info.reply(id, &self.params, &self.columns).await
7475
}
7576

77+
async fn set_auth_info(
78+
&mut self,
79+
_user: &str,
80+
_password: Option<RedactedString>,
81+
) -> io::Result<()> {
82+
Ok(())
83+
}
84+
7685
async fn on_execute(
7786
&mut self,
7887
id: u32,

psql-srv/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use postgres_types::Type;
3636
use protocol::Protocol;
3737
use readyset_adapter_types::{DeallocateId, PreparedStatementType};
3838
use readyset_sql::ast::SqlIdentifier;
39+
use readyset_util::redacted::RedactedString;
3940
use tokio::io::{AsyncRead, AsyncWrite};
4041
use tokio_native_tls::TlsAcceptor;
4142
use tokio_postgres::OwnedField;
@@ -77,6 +78,9 @@ pub trait PsqlBackend {
7778
/// [libpq-server-version]: https://github.com/postgres/postgres/blob/d22646922d66012705e0e2948cfb5b4a07092a29/src/interfaces/libpq/fe-exec.c#L1146-L1178
7879
fn version(&self) -> String;
7980

81+
/// Called when client authenticates to inform which users we should use.
82+
async fn set_auth_info(&mut self, user: &str, password: Option<RedactedString>);
83+
8084
/// Initializes the backend.
8185
///
8286
/// * `database` - The name of the database that will be used for queries to this `Backend`

psql-srv/src/protocol.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use postgres::SimpleQueryMessage;
88
use postgres_protocol::Oid;
99
use postgres_types::{Kind, Type};
1010
use readyset_adapter_types::{DeallocateId, PreparedStatementType};
11+
use readyset_util::redacted::RedactedString;
1112
use smallvec::smallvec;
1213
use tokio::io::{AsyncRead, AsyncWrite};
1314
use tokio_postgres::CommandCompleteContents;
@@ -144,6 +145,9 @@ pub struct Protocol {
144145

145146
/// Whether the client is connected using TLS
146147
is_tls: bool,
148+
149+
/// The backend password for the current user
150+
backend_password: Option<RedactedString>,
147151
}
148152

149153
/// A prepared statement allows a frontend to specify the general form of a SQL statement while
@@ -179,6 +183,7 @@ impl Protocol {
179183
tls_mode,
180184
tls_server_end_point: None,
181185
is_tls: false,
186+
backend_password: None,
182187
}
183188
}
184189

@@ -428,6 +433,11 @@ impl Protocol {
428433

429434
self.state = State::Ready;
430435

436+
// Set it here in case the backend gets closed and reconnect
437+
self.backend_password = Some(RedactedString::from(password.to_string()));
438+
let _ = backend
439+
.set_auth_info(&user, Some(RedactedString::from(password.to_string())))
440+
.await;
431441
Ok(Response::Messages(Self::get_ready_message(
432442
backend.version(),
433443
)))
@@ -448,11 +458,15 @@ impl Protocol {
448458
}
449459
Some(Credentials::Any) => {
450460
self.state = State::Ready;
461+
self.backend_password = None;
451462
return Ok(Response::Messages(Self::get_ready_message(
452463
backend.version(),
453464
)));
454465
}
455-
Some(Credentials::CleartextPassword(pw)) => pw,
466+
Some(Credentials::CleartextPassword(pw)) => {
467+
self.backend_password = Some(RedactedString::from(pw.to_string()));
468+
pw
469+
}
456470
};
457471

458472
let SaslInitialResponse {
@@ -547,6 +561,14 @@ impl Protocol {
547561
sasl_data: server_final_message.to_string().into(),
548562
}];
549563
messages.extend(Self::get_ready_message(backend.version()));
564+
let _ = backend
565+
.set_auth_info(
566+
&user,
567+
self.backend_password
568+
.as_ref()
569+
.map(|password| RedactedString::from(password.to_string())),
570+
)
571+
.await;
550572
Ok(Response::Messages(messages.into()))
551573
} else {
552574
Err(Error::AuthenticationFailure {
@@ -1261,6 +1283,8 @@ mod tests {
12611283
"14.5 ReadySet".to_string()
12621284
}
12631285

1286+
async fn set_auth_info(&mut self, _user: &str, _password: Option<RedactedString>) {}
1287+
12641288
async fn on_init(&mut self, database: &str) -> Result<CredentialsNeeded, Error> {
12651289
self.database = Some(database.to_string());
12661290
match &self.needed_credentials {

psql-srv/tests/authentication.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use psql_srv::{
1414
PsqlValue, QueryResponse, TransferFormat,
1515
};
1616
use readyset_adapter_types::{DeallocateId, PreparedStatementType};
17+
use readyset_util::redacted::RedactedString;
1718
use tokio::net::TcpListener;
1819
use tokio::sync::oneshot;
1920
use tokio_native_tls::{native_tls, TlsAcceptor};
@@ -41,6 +42,7 @@ impl PsqlBackend for ScramSha256Backend {
4142
"13.4 ReadySet".to_string()
4243
}
4344

45+
async fn set_auth_info(&mut self, _user: &str, _password: Option<RedactedString>) {}
4446
fn credentials_for_user(&self, user: &str) -> Option<Credentials> {
4547
if self.username == user {
4648
Some(Credentials::CleartextPassword(self.password))

psql-srv/tests/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use psql_srv::{
1111
PsqlSrvRow, PsqlValue, QueryResponse, TransferFormat,
1212
};
1313
use readyset_adapter_types::{DeallocateId, PreparedStatementType};
14+
use readyset_util::redacted::RedactedString;
1415
use tokio::join;
1516
use tokio::net::TcpListener;
1617
use tokio::sync::oneshot;
@@ -34,6 +35,7 @@ impl PsqlBackend for ErrorBackend {
3435
Some(Credentials::Any)
3536
}
3637

38+
async fn set_auth_info(&mut self, _user: &str, _password: Option<RedactedString>) {}
3739
async fn on_init(&mut self, _database: &str) -> Result<CredentialsNeeded, Error> {
3840
Ok(CredentialsNeeded::None)
3941
}

psql-srv/tests/tls.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use psql_srv::{
1010
run_backend, Credentials, CredentialsNeeded, Error, PsqlBackend, PsqlSrvRow, TransferFormat,
1111
};
1212
use readyset_adapter_types::{DeallocateId, PreparedStatementType};
13+
use readyset_util::redacted::RedactedString;
1314
use tokio::net::TcpListener;
1415
use tokio::sync::oneshot;
1516
use tokio_native_tls::{native_tls, TlsAcceptor};
@@ -34,6 +35,8 @@ impl PsqlBackend for TestBackend {
3435
Some(Credentials::Any)
3536
}
3637

38+
async fn set_auth_info(&mut self, _user: &str, _password: Option<RedactedString>) {}
39+
3740
async fn on_init(
3841
&mut self,
3942
_database: &str,

0 commit comments

Comments
 (0)