Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use std::error;
use std::fmt;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -175,6 +176,13 @@ where
cond: Condvar,
}

// Condvar isn't UnwindSafe, but our pool is. The only thing you could do
// with it which causes problems in the face of unwinding is check out a
// connection. Since we do not check a connection back into the pool if it
// is dropped during a panic, we can safely implement UnwindSafe.
impl<M: ManageConnection + UnwindSafe> UnwindSafe for SharedPool<M> {}
impl<M: ManageConnection + RefUnwindSafe> RefUnwindSafe for SharedPool<M> {}

fn drop_conns<M>(
shared: &Arc<SharedPool<M>>,
mut internals: MutexGuard<PoolInternals<M::Connection>>,
Expand Down Expand Up @@ -466,6 +474,11 @@ where
}
}

fn drop_conn(&self, conn: Conn<M::Connection>) {
let internals = self.0.internals.lock();
drop_conns(&self.0, internals, vec![conn.conn]);
}

/// Returns information about the current state of the pool.
pub fn state(&self) -> State {
let internals = self.0.internals.lock();
Expand Down Expand Up @@ -569,7 +582,13 @@ where
M: ManageConnection,
{
fn drop(&mut self) {
self.pool.put_back(self.conn.take().unwrap());
use std::thread::panicking;

if panicking() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer this be controlled by a configuration flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you feel about having this controlled by a cargo feature instead? The issue is that we need a M::Connection: UnwindSafe bound on the impls of UnwindSafe and RefUnwindSafe if we don't do this, so having it controlled by a runtime config flag means we either can't provide those impls, or providing them is potentially unsafe.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the problem with adding that extra bound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expect it to be true the majority of the time. It is not true for postgres::Connection or any of the connection types in Diesel.

We're exposing references to these types and storing them internally. If those types always get returned to the pool on drop, then we're only UnwindSafe if those types are. All these types use interior mutability, which is already opts them out of this trait.

So we can only be considered UnwindSafe if we are either not returning connections to the pool on panic, or the connections being returned are also UnwindSafe (and probably RefUnwindSafe on both impls). If this is configurable at runtime, we have to assume it's not safe to implement this. If we controlled it with a cargo feature instead, we could cfg out the impls.

self.pool.drop_conn(self.conn.take().unwrap());
} else {
self.pool.put_back(self.conn.take().unwrap());
}
}
}

Expand Down
49 changes: 49 additions & 0 deletions src/test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use antidote::Mutex;
use std::panic::catch_unwind;
use std::sync::atomic::{
AtomicBool, AtomicIsize, AtomicUsize, Ordering, ATOMIC_BOOL_INIT, ATOMIC_USIZE_INIT,
};
Expand Down Expand Up @@ -545,3 +546,51 @@ fn conns_drop_on_pool_drop() {
}
panic!("timed out waiting for connections to drop");
}

#[test]
fn connections_are_not_returned_to_pool_on_panic() {
static DROPPED: AtomicUsize = ATOMIC_USIZE_INIT;

struct Connection;

impl Drop for Connection {
fn drop(&mut self) {
DROPPED.fetch_add(1, Ordering::SeqCst);
}
}

struct Handler;

impl ManageConnection for Handler {
type Connection = Connection;
type Error = Error;

fn connect(&self) -> Result<Connection, Error> {
Ok(Connection)
}

fn is_valid(&self, _: &mut Connection) -> Result<(), Error> {
Ok(())
}

fn has_broken(&self, _: &mut Connection) -> bool {
false
}
}

let pool = Pool::builder()
.max_lifetime(Some(Duration::from_secs(1)))
.max_size(10)
.min_idle(Some(0))
.build(Handler)
.unwrap();

let _ = catch_unwind(|| {
let _conn = pool.get().unwrap();
panic!();
});

assert_eq!(DROPPED.load(Ordering::SeqCst), 1);
assert_eq!(pool.state().connections, 0);
assert_eq!(pool.state().idle_connections, 0);
}