Skip to content

Commit 89b61f1

Browse files
Merge pull request #1223 from mintlayer/fix/postgres-async-with-tx-dropper
Ensure that transactions are wrapped asynchronously in the parent database object instead of blocking on transactions' drop function.
2 parents 8945f2c + a3c90df commit 89b61f1

File tree

4 files changed

+138
-63
lines changed

4 files changed

+138
-63
lines changed

api-server/api-server-common/src/storage/impls/postgres/mod.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod queries;
2020
use std::str::FromStr;
2121

2222
use bb8_postgres::bb8::Pool;
23+
use bb8_postgres::bb8::PooledConnection;
2324
use bb8_postgres::PostgresConnectionManager;
2425
use tokio_postgres::NoTls;
2526

@@ -32,6 +33,19 @@ use self::transactional::ApiServerPostgresTransactionalRw;
3233

3334
pub struct TransactionalApiServerPostgresStorage {
3435
pool: Pool<PostgresConnectionManager<NoTls>>,
36+
/// This task is responsible for rolling back failed RW/RO transactions, since closing connections are pooled
37+
tx_dropper_joiner: tokio::task::JoinHandle<()>,
38+
/// This channel is used to send transactions that are not manually rolled back to the tx_dropper task to roll them back
39+
db_tx_conn_sender: tokio::sync::mpsc::UnboundedSender<
40+
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
41+
>,
42+
}
43+
44+
impl Drop for TransactionalApiServerPostgresStorage {
45+
fn drop(&mut self) {
46+
// Since the whole connection pool will be destroyed, we can safely abort all connections
47+
self.tx_dropper_joiner.abort();
48+
}
3549
}
3650

3751
impl TransactionalApiServerPostgresStorage {
@@ -57,7 +71,26 @@ impl TransactionalApiServerPostgresStorage {
5771
))
5872
})?;
5973

60-
let result = Self { pool };
74+
let (conn_tx, conn_rx) = tokio::sync::mpsc::unbounded_channel::<
75+
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
76+
>();
77+
78+
let tx_dropper_joiner = tokio::task::spawn(async move {
79+
let mut conn_rx = conn_rx;
80+
while let Some(conn) = conn_rx.recv().await {
81+
conn.batch_execute("ROLLBACK").await.unwrap_or_else(|e| {
82+
logging::log::error!(
83+
"CRITICAL ERROR: failed to rollback failed postgres RW transaction: {e}"
84+
)
85+
});
86+
}
87+
});
88+
89+
let result = Self {
90+
pool,
91+
tx_dropper_joiner,
92+
db_tx_conn_sender: conn_tx,
93+
};
6194

6295
result.initialize_if_not(chain_config).await?;
6396

@@ -80,20 +113,22 @@ impl TransactionalApiServerPostgresStorage {
80113
) -> Result<ApiServerPostgresTransactionalRo, ApiServerStorageError> {
81114
let conn = self
82115
.pool
83-
.get()
116+
.get_owned()
84117
.await
85118
.map_err(|e| ApiServerStorageError::AcquiringConnectionFailed(e.to_string()))?;
86-
ApiServerPostgresTransactionalRo::from_connection(conn).await
119+
ApiServerPostgresTransactionalRo::from_connection(conn, self.db_tx_conn_sender.clone())
120+
.await
87121
}
88122

89123
pub async fn begin_rw_transaction(
90124
&self,
91125
) -> Result<ApiServerPostgresTransactionalRw, ApiServerStorageError> {
92126
let conn = self
93127
.pool
94-
.get()
128+
.get_owned()
95129
.await
96130
.map_err(|e| ApiServerStorageError::AcquiringConnectionFailed(e.to_string()))?;
97-
ApiServerPostgresTransactionalRw::from_connection(conn).await
131+
ApiServerPostgresTransactionalRw::from_connection(conn, self.db_tx_conn_sender.clone())
132+
.await
98133
}
99134
}

api-server/api-server-common/src/storage/impls/postgres/transactional/mod.rs

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,34 +30,53 @@ use crate::storage::storage_api::{
3030

3131
use super::{queries::QueryFromConnection, TransactionalApiServerPostgresStorage};
3232

33+
const CONN_ERR: &str = "CRITICAL ERROR: failed to get postgres tx connection. Invariant broken.";
34+
3335
pub struct ApiServerPostgresTransactionalRo<'a> {
34-
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
36+
// Note: This is an Option due to needing to pry the connection out of Self in Drop
37+
connection: Option<PooledConnection<'static, PostgresConnectionManager<NoTls>>>,
3538
finished: bool,
39+
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
40+
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
41+
>,
42+
// Note: This exists to enforce that a transaction never outlives the database object,
43+
// given that all connections have 'static lifetimes
44+
_marker: std::marker::PhantomData<&'a ()>,
3645
}
3746

3847
impl<'a> ApiServerPostgresTransactionalRo<'a> {
3948
pub(super) async fn from_connection(
40-
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
41-
) -> Result<ApiServerPostgresTransactionalRo, ApiServerStorageError> {
49+
connection: PooledConnection<'static, PostgresConnectionManager<NoTls>>,
50+
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
51+
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
52+
>,
53+
) -> Result<ApiServerPostgresTransactionalRo<'a>, ApiServerStorageError> {
4254
let tx = Self {
43-
connection,
55+
connection: Some(connection),
4456
finished: false,
57+
db_tx_sender,
58+
_marker: std::marker::PhantomData,
4559
};
46-
tx.connection.batch_execute("BEGIN READ ONLY").await.map_err(|e| {
47-
ApiServerStorageError::RoTxBeginFailed(format!("Transaction begin failed: {}", e))
48-
})?;
60+
tx.connection
61+
.as_ref()
62+
.expect(CONN_ERR)
63+
.batch_execute("BEGIN READ ONLY")
64+
.await
65+
.map_err(|e| {
66+
ApiServerStorageError::RoTxBeginFailed(format!("Transaction begin failed: {}", e))
67+
})?;
4968
Ok(tx)
5069
}
5170

5271
pub async fn is_initialized(&mut self) -> Result<bool, ApiServerStorageError> {
53-
let mut conn = QueryFromConnection::new(&self.connection);
72+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
5473
let res = conn.is_initialized().await?;
5574

5675
Ok(res)
5776
}
5877

5978
pub async fn get_storage_version(&mut self) -> Result<Option<u32>, ApiServerStorageError> {
60-
let mut conn = QueryFromConnection::new(&self.connection);
79+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
6180
let res = conn.get_storage_version().await?;
6281

6382
Ok(res)
@@ -66,7 +85,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
6685
pub async fn get_best_block(
6786
&mut self,
6887
) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
69-
let mut conn = QueryFromConnection::new(&self.connection);
88+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
7089
let res = conn.get_best_block().await?;
7190

7291
Ok(res)
@@ -76,7 +95,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
7695
&mut self,
7796
block_height: BlockHeight,
7897
) -> Result<Option<Id<Block>>, ApiServerStorageError> {
79-
let mut conn = QueryFromConnection::new(&self.connection);
98+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
8099
let res = conn.get_main_chain_block_id(block_height).await?;
81100

82101
Ok(res)
@@ -86,7 +105,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
86105
&mut self,
87106
block_id: Id<Block>,
88107
) -> Result<Option<Block>, ApiServerStorageError> {
89-
let mut conn = QueryFromConnection::new(&self.connection);
108+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
90109
let res = conn.get_block(block_id).await?;
91110

92111
Ok(res)
@@ -97,7 +116,7 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
97116
&mut self,
98117
transaction_id: Id<Transaction>,
99118
) -> Result<Option<(Option<Id<Block>>, SignedTransaction)>, ApiServerStorageError> {
100-
let mut conn = QueryFromConnection::new(&self.connection);
119+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
101120
let res = conn.get_transaction(transaction_id).await?;
102121

103122
Ok(res)
@@ -107,43 +126,60 @@ impl<'a> ApiServerPostgresTransactionalRo<'a> {
107126
&mut self,
108127
block_id: Id<Block>,
109128
) -> Result<Option<BlockAuxData>, ApiServerStorageError> {
110-
let mut conn = QueryFromConnection::new(&self.connection);
129+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
111130
let res = conn.get_block_aux_data(block_id).await?;
112131

113132
Ok(res)
114133
}
115134
}
116135

117136
pub struct ApiServerPostgresTransactionalRw<'a> {
118-
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
137+
// Note: This is an Option due to needing to pry the connection out of Self in Drop
138+
connection: Option<PooledConnection<'static, PostgresConnectionManager<NoTls>>>,
119139
finished: bool,
140+
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
141+
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
142+
>,
143+
// Note: This exists to enforce that a transaction never outlives the database object,
144+
// given that all connections have 'static lifetimes
145+
_marker: std::marker::PhantomData<&'a ()>,
120146
}
121147

122148
impl<'a> Drop for ApiServerPostgresTransactionalRw<'a> {
123149
fn drop(&mut self) {
124150
if !self.finished {
125-
futures::executor::block_on(self.connection.batch_execute("ROLLBACK")).unwrap_or_else(
126-
|e| {
151+
self.db_tx_sender
152+
.send(self.connection.take().expect(CONN_ERR))
153+
.unwrap_or_else(|e| {
127154
logging::log::error!(
128-
"CRITICAL ERROR: failed to rollback failed postgres RW transaction: {e}"
155+
"CRITICAL ERROR: failed to send postgres RW transaction connection for closure: {e}"
129156
)
130-
},
131-
);
157+
});
132158
}
133159
}
134160
}
135161

136162
impl<'a> ApiServerPostgresTransactionalRw<'a> {
137163
pub(super) async fn from_connection(
138-
connection: PooledConnection<'a, PostgresConnectionManager<NoTls>>,
164+
connection: PooledConnection<'static, PostgresConnectionManager<NoTls>>,
165+
db_tx_sender: tokio::sync::mpsc::UnboundedSender<
166+
PooledConnection<'static, PostgresConnectionManager<NoTls>>,
167+
>,
139168
) -> Result<ApiServerPostgresTransactionalRw<'a>, ApiServerStorageError> {
140169
let tx = Self {
141-
connection,
170+
connection: Some(connection),
142171
finished: false,
172+
db_tx_sender,
173+
_marker: std::marker::PhantomData,
143174
};
144-
tx.connection.batch_execute("BEGIN READ WRITE").await.map_err(|e| {
145-
ApiServerStorageError::RwTxBeginFailed(format!("Transaction begin failed: {}", e))
146-
})?;
175+
tx.connection
176+
.as_ref()
177+
.expect(CONN_ERR)
178+
.batch_execute("BEGIN READ WRITE")
179+
.await
180+
.map_err(|e| {
181+
ApiServerStorageError::RwTxBeginFailed(format!("Transaction begin failed: {}", e))
182+
})?;
147183
Ok(tx)
148184
}
149185
}
@@ -152,6 +188,8 @@ impl<'a> ApiServerPostgresTransactionalRw<'a> {
152188
impl<'a> ApiServerTransactionRw for ApiServerPostgresTransactionalRw<'a> {
153189
async fn commit(mut self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
154190
self.connection
191+
.as_ref()
192+
.expect(CONN_ERR)
155193
.batch_execute("COMMIT")
156194
.await
157195
.map_err(|e| ApiServerStorageError::TxCommitFailed(e.to_string()))?;
@@ -161,6 +199,8 @@ impl<'a> ApiServerTransactionRw for ApiServerPostgresTransactionalRw<'a> {
161199

162200
async fn rollback(mut self) -> Result<(), crate::storage::storage_api::ApiServerStorageError> {
163201
self.connection
202+
.as_ref()
203+
.expect(CONN_ERR)
164204
.batch_execute("ROLLBACK")
165205
.await
166206
.map_err(|e| ApiServerStorageError::TxCommitFailed(e.to_string()))?;
@@ -179,31 +219,31 @@ impl<'a> ApiServerTransactionRo for ApiServerPostgresTransactionalRo<'a> {
179219
impl<'a> Drop for ApiServerPostgresTransactionalRo<'a> {
180220
fn drop(&mut self) {
181221
if !self.finished {
182-
futures::executor::block_on(self.connection.batch_execute("ROLLBACK")).unwrap_or_else(
183-
|e| {
222+
self.db_tx_sender
223+
.send(self.connection.take().expect(CONN_ERR))
224+
.unwrap_or_else(|e| {
184225
logging::log::error!(
185-
"CRITICAL ERROR: failed to rollback failed postgres RO transaction: {e}"
226+
"CRITICAL ERROR: failed to send postgres RO transaction connection for closure: {e}"
186227
)
187-
},
188-
);
228+
});
189229
}
190230
}
191231
}
192232

193233
#[async_trait::async_trait]
194-
impl<'t> Transactional<'t> for TransactionalApiServerPostgresStorage {
195-
type TransactionRo = ApiServerPostgresTransactionalRo<'t>;
234+
impl<'tx> Transactional<'tx> for TransactionalApiServerPostgresStorage {
235+
type TransactionRo = ApiServerPostgresTransactionalRo<'tx>;
196236

197-
type TransactionRw = ApiServerPostgresTransactionalRw<'t>;
237+
type TransactionRw = ApiServerPostgresTransactionalRw<'tx>;
198238

199-
async fn transaction_ro<'s: 't>(
200-
&'s self,
239+
async fn transaction_ro<'db: 'tx>(
240+
&'db self,
201241
) -> Result<Self::TransactionRo, ApiServerStorageError> {
202242
self.begin_ro_transaction().await
203243
}
204244

205-
async fn transaction_rw<'s: 't>(
206-
&'s mut self,
245+
async fn transaction_rw<'db: 'tx>(
246+
&'db mut self,
207247
) -> Result<Self::TransactionRw, ApiServerStorageError> {
208248
self.begin_rw_transaction().await
209249
}

api-server/api-server-common/src/storage/impls/postgres/transactional/read.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ use crate::storage::{
1717
impls::postgres::queries::QueryFromConnection, storage_api::ApiServerStorageRead,
1818
};
1919

20-
use super::ApiServerPostgresTransactionalRo;
20+
use super::{ApiServerPostgresTransactionalRo, CONN_ERR};
2121

2222
#[async_trait::async_trait]
2323
impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
2424
async fn is_initialized(
2525
&mut self,
2626
) -> Result<bool, crate::storage::storage_api::ApiServerStorageError> {
27-
let mut conn = QueryFromConnection::new(&self.connection);
27+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
2828
let res = conn.is_initialized().await?;
2929

3030
Ok(res)
@@ -33,7 +33,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
3333
async fn get_storage_version(
3434
&mut self,
3535
) -> Result<Option<u32>, crate::storage::storage_api::ApiServerStorageError> {
36-
let mut conn = QueryFromConnection::new(&self.connection);
36+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
3737
let res = conn.get_storage_version().await?;
3838

3939
Ok(res)
@@ -48,7 +48,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
4848
),
4949
crate::storage::storage_api::ApiServerStorageError,
5050
> {
51-
let mut conn = QueryFromConnection::new(&self.connection);
51+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
5252
let res = conn.get_best_block().await?;
5353

5454
Ok(res)
@@ -59,7 +59,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
5959
block_id: common::primitives::Id<common::chain::Block>,
6060
) -> Result<Option<common::chain::Block>, crate::storage::storage_api::ApiServerStorageError>
6161
{
62-
let mut conn = QueryFromConnection::new(&self.connection);
62+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
6363
let res = conn.get_block(block_id).await?;
6464

6565
Ok(res)
@@ -72,7 +72,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
7272
Option<crate::storage::storage_api::block_aux_data::BlockAuxData>,
7373
crate::storage::storage_api::ApiServerStorageError,
7474
> {
75-
let mut conn = QueryFromConnection::new(&self.connection);
75+
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
7676
let res = conn.get_block_aux_data(block_id).await?;
7777

7878
Ok(res)

0 commit comments

Comments
 (0)