Skip to content

Postgres async with tx dropper cleanup #1226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ impl TransactionalApiServerInMemoryStorage {
}

#[async_trait::async_trait]
impl<'t> Transactional<'t> for TransactionalApiServerInMemoryStorage {
type TransactionRo = ApiServerInMemoryStorageTransactionalRo<'t>;
impl<'tx> Transactional<'tx> for TransactionalApiServerInMemoryStorage {
type TransactionRo = ApiServerInMemoryStorageTransactionalRo<'tx>;

type TransactionRw = ApiServerInMemoryStorageTransactionalRw<'t>;
type TransactionRw = ApiServerInMemoryStorageTransactionalRw<'tx>;

async fn transaction_ro<'s: 't>(
&'s self,
async fn transaction_ro<'db: 'tx>(
&'db self,
) -> Result<Self::TransactionRo, ApiServerStorageError> {
Ok(ApiServerInMemoryStorageTransactionalRo::new(self).await)
}

async fn transaction_rw<'s: 't>(
&'s mut self,
async fn transaction_rw<'db: 'tx>(
&'db mut self,
) -> Result<Self::TransactionRw, ApiServerStorageError> {
Ok(ApiServerInMemoryStorageTransactionalRw::new(self).await)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,43 +26,38 @@ use super::ApiServerInMemoryStorageTransactionalRo;

#[async_trait::async_trait]
impl<'t> ApiServerStorageRead for ApiServerInMemoryStorageTransactionalRo<'t> {
async fn is_initialized(&mut self) -> Result<bool, ApiServerStorageError> {
async fn is_initialized(&self) -> Result<bool, ApiServerStorageError> {
self.transaction.is_initialized()
}

async fn get_block(
&mut self,
block_id: Id<Block>,
) -> Result<Option<Block>, ApiServerStorageError> {
async fn get_block(&self, block_id: Id<Block>) -> Result<Option<Block>, ApiServerStorageError> {
self.transaction.get_block(block_id)
}

async fn get_transaction(
&mut self,
&self,
transaction_id: Id<Transaction>,
) -> Result<Option<(Option<Id<Block>>, SignedTransaction)>, ApiServerStorageError> {
self.transaction.get_transaction(transaction_id)
}

async fn get_storage_version(&mut self) -> Result<Option<u32>, ApiServerStorageError> {
async fn get_storage_version(&self) -> Result<Option<u32>, ApiServerStorageError> {
Ok(Some(self.transaction.get_storage_version()?))
}

async fn get_best_block(
&mut self,
) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
async fn get_best_block(&self) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
self.transaction.get_best_block()
}

async fn get_block_aux_data(
&mut self,
&self,
block_id: Id<Block>,
) -> Result<Option<BlockAuxData>, ApiServerStorageError> {
self.transaction.get_block_aux_data(block_id)
}

async fn get_main_chain_block_id(
&mut self,
&self,
block_height: BlockHeight,
) -> Result<Option<Id<Block>>, ApiServerStorageError> {
self.transaction.get_main_chain_block_id(block_height)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,43 +85,38 @@ impl<'t> ApiServerStorageWrite for ApiServerInMemoryStorageTransactionalRw<'t> {

#[async_trait::async_trait]
impl<'t> ApiServerStorageRead for ApiServerInMemoryStorageTransactionalRw<'t> {
async fn is_initialized(&mut self) -> Result<bool, ApiServerStorageError> {
async fn is_initialized(&self) -> Result<bool, ApiServerStorageError> {
self.transaction.is_initialized()
}

async fn get_storage_version(&mut self) -> Result<Option<u32>, ApiServerStorageError> {
async fn get_storage_version(&self) -> Result<Option<u32>, ApiServerStorageError> {
Ok(Some(self.transaction.get_storage_version()?))
}

async fn get_best_block(
&mut self,
) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
async fn get_best_block(&self) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
self.transaction.get_best_block()
}

async fn get_block(
&mut self,
block_id: Id<Block>,
) -> Result<Option<Block>, ApiServerStorageError> {
async fn get_block(&self, block_id: Id<Block>) -> Result<Option<Block>, ApiServerStorageError> {
self.transaction.get_block(block_id)
}

async fn get_block_aux_data(
&mut self,
&self,
block_id: Id<Block>,
) -> Result<Option<BlockAuxData>, ApiServerStorageError> {
self.transaction.get_block_aux_data(block_id)
}

async fn get_main_chain_block_id(
&mut self,
&self,
block_height: BlockHeight,
) -> Result<Option<Id<Block>>, ApiServerStorageError> {
self.transaction.get_main_chain_block_id(block_height)
}

async fn get_transaction(
&mut self,
&self,
transaction_id: Id<Transaction>,
) -> Result<Option<(Option<Id<Block>>, SignedTransaction)>, ApiServerStorageError> {
self.transaction.get_transaction(transaction_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::{ApiServerPostgresTransactionalRo, CONN_ERR};
#[async_trait::async_trait]
impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
async fn is_initialized(
&mut self,
&self,
) -> Result<bool, crate::storage::storage_api::ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.is_initialized().await?;
Expand All @@ -31,7 +31,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
}

async fn get_storage_version(
&mut self,
&self,
) -> Result<Option<u32>, crate::storage::storage_api::ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_storage_version().await?;
Expand All @@ -40,7 +40,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
}

async fn get_best_block(
&mut self,
&self,
) -> Result<
(
common::primitives::BlockHeight,
Expand All @@ -55,7 +55,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
}

async fn get_block(
&mut self,
&self,
block_id: common::primitives::Id<common::chain::Block>,
) -> Result<Option<common::chain::Block>, crate::storage::storage_api::ApiServerStorageError>
{
Expand All @@ -66,7 +66,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
}

async fn get_block_aux_data(
&mut self,
&self,
block_id: common::primitives::Id<common::chain::Block>,
) -> Result<
Option<crate::storage::storage_api::block_aux_data::BlockAuxData>,
Expand All @@ -79,7 +79,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
}

async fn get_main_chain_block_id(
&mut self,
&self,
block_height: common::primitives::BlockHeight,
) -> Result<
Option<common::primitives::Id<common::chain::Block>>,
Expand All @@ -89,7 +89,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRo<'a> {
}

async fn get_transaction(
&mut self,
&self,
transaction_id: common::primitives::Id<common::chain::Transaction>,
) -> Result<
Option<(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,41 +109,36 @@ impl<'a> ApiServerStorageWrite for ApiServerPostgresTransactionalRw<'a> {

#[async_trait::async_trait]
impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> {
async fn is_initialized(&mut self) -> Result<bool, ApiServerStorageError> {
async fn is_initialized(&self) -> Result<bool, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.is_initialized().await?;

Ok(res)
}

async fn get_storage_version(&mut self) -> Result<Option<u32>, ApiServerStorageError> {
async fn get_storage_version(&self) -> Result<Option<u32>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_storage_version().await?;

Ok(res)
}

async fn get_best_block(
&mut self,
) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
async fn get_best_block(&self) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_best_block().await?;

Ok(res)
}

async fn get_block(
&mut self,
block_id: Id<Block>,
) -> Result<Option<Block>, ApiServerStorageError> {
async fn get_block(&self, block_id: Id<Block>) -> Result<Option<Block>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
let res = conn.get_block(block_id).await?;

Ok(res)
}

async fn get_block_aux_data(
&mut self,
&self,
block_id: Id<Block>,
) -> Result<Option<BlockAuxData>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
Expand All @@ -153,7 +148,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> {
}

async fn get_main_chain_block_id(
&mut self,
&self,
block_height: BlockHeight,
) -> Result<Option<Id<Block>>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
Expand All @@ -163,7 +158,7 @@ impl<'a> ApiServerStorageRead for ApiServerPostgresTransactionalRw<'a> {
}

async fn get_transaction(
&mut self,
&self,
transaction_id: Id<Transaction>,
) -> Result<Option<(Option<Id<Block>>, SignedTransaction)>, ApiServerStorageError> {
let mut conn = QueryFromConnection::new(self.connection.as_ref().expect(CONN_ERR));
Expand Down
36 changes: 16 additions & 20 deletions api-server/api-server-common/src/storage/storage_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,28 @@ pub enum ApiServerStorageError {
}

#[async_trait::async_trait]
pub trait ApiServerStorageRead {
async fn is_initialized(&mut self) -> Result<bool, ApiServerStorageError>;
pub trait ApiServerStorageRead: Sync {
async fn is_initialized(&self) -> Result<bool, ApiServerStorageError>;

async fn get_storage_version(&mut self) -> Result<Option<u32>, ApiServerStorageError>;
async fn get_storage_version(&self) -> Result<Option<u32>, ApiServerStorageError>;

async fn get_best_block(
&mut self,
) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError>;
async fn get_best_block(&self) -> Result<(BlockHeight, Id<GenBlock>), ApiServerStorageError>;

async fn get_block(
&mut self,
block_id: Id<Block>,
) -> Result<Option<Block>, ApiServerStorageError>;
async fn get_block(&self, block_id: Id<Block>) -> Result<Option<Block>, ApiServerStorageError>;

async fn get_block_aux_data(
&mut self,
&self,
block_id: Id<Block>,
) -> Result<Option<BlockAuxData>, ApiServerStorageError>;

async fn get_main_chain_block_id(
&mut self,
&self,
block_height: BlockHeight,
) -> Result<Option<Id<Block>>, ApiServerStorageError>;

#[allow(clippy::type_complexity)]
async fn get_transaction(
&mut self,
&self,
transaction_id: Id<Transaction>,
) -> Result<Option<(Option<Id<Block>>, SignedTransaction)>, ApiServerStorageError>;
}
Expand Down Expand Up @@ -132,20 +127,21 @@ pub trait ApiServerTransactionRo: ApiServerStorageRead {
}

#[async_trait::async_trait]
pub trait Transactional<'t> {
pub trait Transactional<'tx> {
/// Associated read-only transaction type.
type TransactionRo: ApiServerTransactionRo + Send + 't;
type TransactionRo: ApiServerTransactionRo + Send + 'tx;

/// Associated read-write transaction type.
type TransactionRw: ApiServerTransactionRw + Send + 't;
type TransactionRw: ApiServerTransactionRw + Send + 'tx;

/// Start a read-only transaction.
async fn transaction_ro<'s: 't>(&'s self)
-> Result<Self::TransactionRo, ApiServerStorageError>;
async fn transaction_ro<'db: 'tx>(
&'db self,
) -> Result<Self::TransactionRo, ApiServerStorageError>;

/// Start a read-write transaction.
async fn transaction_rw<'s: 't>(
&'s mut self,
async fn transaction_rw<'db: 'tx>(
&'db mut self,
) -> Result<Self::TransactionRw, ApiServerStorageError>;
}

Expand Down
4 changes: 2 additions & 2 deletions api-server/scanner-lib/src/blockchain_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ impl<S: ApiServerStorage> BlockchainState<S> {
}

#[async_trait::async_trait]
impl<S: ApiServerStorage + Sync> LocalBlockchainState for BlockchainState<S> {
impl<S: ApiServerStorage + Send + Sync> LocalBlockchainState for BlockchainState<S> {
type Error = BlockchainStateError;

async fn best_block(&self) -> Result<(BlockHeight, Id<GenBlock>), Self::Error> {
let mut db_tx = self.storage.transaction_ro().await?;
let db_tx = self.storage.transaction_ro().await?;
let best_block = db_tx.get_best_block().await?;
Ok(best_block)
}
Expand Down
8 changes: 4 additions & 4 deletions api-server/storage-test-suite/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn initialization<S: ApiServerStorage, F: Fn() -> S>(
_seed_maker: Box<dyn Fn() -> Seed + Send>,
) -> Result<(), Failed> {
let storage = storage_maker();
let mut tx = storage.transaction_ro().await.unwrap();
let tx = storage.transaction_ro().await.unwrap();
assert!(tx.is_initialized().await.unwrap());
Ok(())
}
Expand All @@ -55,7 +55,7 @@ pub async fn set_get<S: ApiServerStorage, F: Fn() -> S>(

let mut storage = storage_maker();

let mut db_tx = storage.transaction_ro().await.unwrap();
let db_tx = storage.transaction_ro().await.unwrap();

let is_initialized = db_tx.is_initialized().await.unwrap();
assert!(is_initialized);
Expand Down Expand Up @@ -100,7 +100,7 @@ pub async fn set_get<S: ApiServerStorage, F: Fn() -> S>(

db_tx.commit().await.unwrap();

let mut db_tx = storage.transaction_ro().await.unwrap();
let db_tx = storage.transaction_ro().await.unwrap();

{
let block_id = db_tx.get_main_chain_block_id(height).await.unwrap();
Expand Down Expand Up @@ -143,7 +143,7 @@ pub async fn set_get<S: ApiServerStorage, F: Fn() -> S>(

// Test setting/getting transactions
{
let mut db_tx = storage.transaction_ro().await.unwrap();
let db_tx = storage.transaction_ro().await.unwrap();

let random_tx_id: Id<Transaction> = Id::<Transaction>::new(H256::random_using(&mut rng));
let tx = db_tx.get_transaction(random_tx_id).await.unwrap();
Expand Down