Skip to content

Commit 92a2268

Browse files
moatraabonander
authored andcommitted
feat: Add set_connect_options method to Pool (launchbadge#2088)
* feat: Add set_connect_options method to Pool This allows external updates of the ConnectionOptions used when a new connection needs to be opened for the pool. The primary use case is to support dynamically updated (read: rotated) credentials used by systems like AWS RDS. * Use Arc wrapper for ConnectOptions to reduce lock contention * sqlite fix * Use direct assignment instead of mem::swap Co-authored-by: Austin Bonander <[email protected]> Co-authored-by: Austin Bonander <[email protected]>
1 parent ef17af3 commit 92a2268

File tree

4 files changed

+50
-10
lines changed

4 files changed

+50
-10
lines changed

sqlx-core/src/mysql/testing/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::fmt::Write;
2+
use std::ops::Deref;
23
use std::str::FromStr;
34
use std::sync::atomic::{AtomicBool, Ordering};
45
use std::time::Duration;
@@ -152,7 +153,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
152153
// Close connections ASAP if left in the idle queue.
153154
.idle_timeout(Some(Duration::from_secs(1)))
154155
.parent(master_pool.clone()),
155-
connect_opts: master_pool.connect_options().clone().database(&new_db_name),
156+
connect_opts: master_pool
157+
.connect_options()
158+
.deref()
159+
.clone()
160+
.database(&new_db_name),
156161
db_name: new_db_name,
157162
})
158163
}

sqlx-core/src/pool/inner.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use futures_intrusive::sync::{Semaphore, SemaphoreReleaser};
1111
use std::cmp;
1212
use std::future::Future;
1313
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
14-
use std::sync::Arc;
14+
use std::sync::{Arc, RwLock};
1515
use std::task::Poll;
1616

1717
use crate::pool::options::PoolConnectionMetadata;
@@ -20,7 +20,7 @@ use futures_util::FutureExt;
2020
use std::time::{Duration, Instant};
2121

2222
pub(crate) struct PoolInner<DB: Database> {
23-
pub(super) connect_options: <DB::Connection as Connection>::Options,
23+
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
2424
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
2525
pub(super) semaphore: Semaphore,
2626
pub(super) size: AtomicU32,
@@ -47,7 +47,7 @@ impl<DB: Database> PoolInner<DB> {
4747
};
4848

4949
let pool = Self {
50-
connect_options,
50+
connect_options: RwLock::new(Arc::new(connect_options)),
5151
idle_conns: ArrayQueue::new(capacity),
5252
semaphore: Semaphore::new(options.fair, semaphore_capacity),
5353
size: AtomicU32::new(0),
@@ -292,9 +292,17 @@ impl<DB: Database> PoolInner<DB> {
292292
loop {
293293
let timeout = deadline_as_timeout::<DB>(deadline)?;
294294

295+
// clone the connect options arc so it can be used without holding the RwLockReadGuard
296+
// across an async await point
297+
let connect_options = self
298+
.connect_options
299+
.read()
300+
.expect("write-lock holder panicked")
301+
.clone();
302+
295303
// result here is `Result<Result<C, Error>, TimeoutError>`
296304
// if this block does not return, sleep for the backoff timeout and try again
297-
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
305+
match sqlx_rt::timeout(timeout, connect_options.connect()).await {
298306
// successfully established connection
299307
Ok(Ok(mut raw)) => {
300308
// See comment on `PoolOptions::after_connect`

sqlx-core/src/pool/mod.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ use futures_core::FusedFuture;
7474
use futures_util::FutureExt;
7575
use std::fmt;
7676
use std::future::Future;
77+
use std::ops::DerefMut;
7778
use std::pin::Pin;
7879
use std::sync::Arc;
7980
use std::task::{Context, Poll};
@@ -489,9 +490,26 @@ impl<DB: Database> Pool<DB> {
489490
self.0.num_idle()
490491
}
491492

492-
/// Get the connection options for this pool
493-
pub fn connect_options(&self) -> &<DB::Connection as Connection>::Options {
494-
&self.0.connect_options
493+
/// Gets a clone of the connection options for this pool
494+
pub fn connect_options(&self) -> Arc<<DB::Connection as Connection>::Options> {
495+
self.0
496+
.connect_options
497+
.read()
498+
.expect("write-lock holder panicked")
499+
.clone()
500+
}
501+
502+
/// Updates the connection options this pool will use when opening any future connections. Any
503+
/// existing open connection in the pool will be left as-is.
504+
pub fn set_connect_options(&self, connect_options: <DB::Connection as Connection>::Options) {
505+
// technically write() could also panic if the current thread already holds the lock,
506+
// but because this method can't be re-entered by the same thread that shouldn't be a problem
507+
let mut guard = self
508+
.0
509+
.connect_options
510+
.write()
511+
.expect("write-lock holder panicked");
512+
*guard = Arc::new(connect_options);
495513
}
496514

497515
/// Get the options for this pool
@@ -514,7 +532,11 @@ impl Pool<Any> {
514532
///
515533
/// Determined by the connection URL.
516534
pub fn any_kind(&self) -> AnyKind {
517-
self.0.connect_options.kind()
535+
self.0
536+
.connect_options
537+
.read()
538+
.expect("write-lock holder panicked")
539+
.kind()
518540
}
519541
}
520542

sqlx-core/src/postgres/testing/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::fmt::Write;
2+
use std::ops::Deref;
23
use std::str::FromStr;
34
use std::sync::atomic::{AtomicBool, Ordering};
45
use std::time::Duration;
@@ -159,7 +160,11 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
159160
// Close connections ASAP if left in the idle queue.
160161
.idle_timeout(Some(Duration::from_secs(1)))
161162
.parent(master_pool.clone()),
162-
connect_opts: master_pool.connect_options().clone().database(&new_db_name),
163+
connect_opts: master_pool
164+
.connect_options()
165+
.deref()
166+
.clone()
167+
.database(&new_db_name),
163168
db_name: new_db_name,
164169
})
165170
}

0 commit comments

Comments
 (0)