diff --git a/dns_server/Cargo.toml b/dns_server/Cargo.toml index 3c82966e3e..f8f2603e2e 100644 --- a/dns_server/Cargo.toml +++ b/dns_server/Cargo.toml @@ -17,16 +17,15 @@ storage = { path = "../storage" } storage-lmdb = { path = "../storage/lmdb" } utils = { path = '../utils' } +async-trait.workspace = true +clap = { workspace = true, features = ["derive"] } +directories.workspace = true futures = { workspace = true } +parity-scale-codec.workspace = true +thiserror.workspace = true tokio = { workspace = true, default-features = false } - trust-dns-client.workspace = true trust-dns-server.workspace = true -thiserror.workspace = true -async-trait.workspace = true -parity-scale-codec.workspace = true -clap = { workspace = true, features = ["derive"] } -directories.workspace = true [dev-dependencies] p2p-test-utils = { path = "../p2p/p2p-test-utils" } diff --git a/dns_server/src/crawler_p2p/crawler_manager/mod.rs b/dns_server/src/crawler_p2p/crawler_manager/mod.rs index a9529453a4..d2cddee6bc 100644 --- a/dns_server/src/crawler_p2p/crawler_manager/mod.rs +++ b/dns_server/src/crawler_p2p/crawler_manager/mod.rs @@ -33,7 +33,10 @@ use p2p::{ types::{ConnectivityEvent, SyncingEvent}, ConnectivityService, NetworkingService, SyncingEventReceiver, }, - peer_manager::ip_or_socket_address_to_peer_address, + peer_manager::{ + ip_or_socket_address_to_peer_address, + peerdb_common::{storage::update_db, TransactionRo, TransactionRw}, + }, types::{ ip_or_socket_address::IpOrSocketAddress, peer_address::PeerAddress, peer_id::PeerId, socket_address::SocketAddress, IsGlobalIp, @@ -43,10 +46,7 @@ use tokio::sync::mpsc; use crate::{dns_server::DnsServerCommand, error::DnsServerError}; -use self::storage::{ - DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite, DnsServerTransactionRo, - DnsServerTransactionRw, -}; +use self::storage::{DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite}; use super::crawler::{Crawler, CrawlerCommand, CrawlerEvent}; @@ -299,11 +299,11 @@ where match (old_state.is_persistent(), new_state.is_persistent()) { (false, true) => { - storage::update_db(storage, |tx| tx.add_address(&address.to_string())) + update_db(storage, |tx| tx.add_address(&address.to_string())) .expect("update_db must succeed (add_address)"); } (true, false) => { - storage::update_db(storage, |tx| tx.del_address(&address.to_string())) + update_db(storage, |tx| tx.del_address(&address.to_string())) .expect("update_db must succeed (del_address)"); } _ => {} diff --git a/dns_server/src/crawler_p2p/crawler_manager/storage.rs b/dns_server/src/crawler_p2p/crawler_manager/storage.rs index 2e5105c057..84550ed6b1 100644 --- a/dns_server/src/crawler_p2p/crawler_manager/storage.rs +++ b/dns_server/src/crawler_p2p/crawler_manager/storage.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use p2p::peer_manager::peerdb_common::{TransactionRo, TransactionRw, Transactional}; + pub trait DnsServerStorageRead { fn get_version(&self) -> Result, storage::Error>; @@ -27,58 +29,27 @@ pub trait DnsServerStorageWrite { fn del_address(&mut self, address: &str) -> Result<(), storage::Error>; } -pub trait DnsServerTransactionRo: DnsServerStorageRead { - fn close(self); -} - -pub trait DnsServerTransactionRw: DnsServerStorageWrite { - fn abort(self); - - fn commit(self) -> Result<(), storage::Error>; -} - -/// Support for transactions over blockchain storage -pub trait DnsServerTransactional<'t> { - /// Associated read-only transaction type. - type TransactionRo: DnsServerTransactionRo + 't; +// Note: here we want to say something like: +// pub trait DnsServerStorage: for<'t> Transactional<'t> + Send +// where for<'t> >::TransactionRo: DnsServerStorageRead, +// for<'t> >::TransactionRw: DnsServerStorageWrite {} +// But currently Rust would require us to duplicate the "where" constrains in all places +// where DnsServerStorage is used, so we use this "Helper" approach instead. +pub trait DnsServerStorage: for<'t> DnsServerStorageHelper<'t> + Send {} - /// Associated read-write transaction type. - type TransactionRw: DnsServerTransactionRw + 't; - - /// Start a read-only transaction. - fn transaction_ro<'s: 't>(&'s self) -> Result; - - /// Start a read-write transaction. - fn transaction_rw<'s: 't>(&'s self) -> Result; +pub trait DnsServerStorageHelper<'t>: + Transactional<'t, TransactionRo = Self::TxRo, TransactionRw = Self::TxRw> +{ + type TxRo: TransactionRo + DnsServerStorageRead + 't; + type TxRw: TransactionRw + DnsServerStorageWrite + 't; } -pub trait DnsServerStorage: for<'tx> DnsServerTransactional<'tx> + Send {} - -const MAX_RECOVERABLE_ERROR_RETRY_COUNT: u32 = 3; - -/// Try update storage, gracefully handle recoverable errors -pub fn update_db(storage: &S, f: F) -> Result<(), storage::Error> +impl<'t, T> DnsServerStorageHelper<'t> for T where - S: DnsServerStorage, - F: Fn(&mut >::TransactionRw) -> Result<(), storage::Error>, + T: Transactional<'t>, + Self::TransactionRo: DnsServerStorageRead + 't, + Self::TransactionRw: DnsServerStorageWrite + 't, { - let mut recoverable_errors = 0; - loop { - let res = || -> Result<(), storage::Error> { - let mut tx = storage.transaction_rw()?; - f(&mut tx)?; - tx.commit() - }(); - - match res { - Ok(()) => return Ok(()), - Err(storage::Error::Recoverable(e)) => { - recoverable_errors += 1; - if recoverable_errors >= MAX_RECOVERABLE_ERROR_RETRY_COUNT { - return Err(storage::Error::Recoverable(e)); - } - } - Err(storage::Error::Fatal(e)) => return Err(storage::Error::Fatal(e)), - } - } + type TxRo = Self::TransactionRo; + type TxRw = Self::TransactionRw; } diff --git a/dns_server/src/crawler_p2p/crawler_manager/storage_impl.rs b/dns_server/src/crawler_p2p/crawler_manager/storage_impl.rs index 52608d5783..fc04736e44 100644 --- a/dns_server/src/crawler_p2p/crawler_manager/storage_impl.rs +++ b/dns_server/src/crawler_p2p/crawler_manager/storage_impl.rs @@ -13,10 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::storage::{ - DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite, DnsServerTransactionRo, - DnsServerTransactionRw, DnsServerTransactional, -}; +use super::storage::{DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite}; +use p2p::peer_manager::peerdb_common::storage_impl::{StorageImpl, StorageTxRo, StorageTxRw}; use serialization::{encoded::Encoded, DecodeAll, Encode}; type ValueId = u32; @@ -34,61 +32,30 @@ storage::decl_schema! { const VALUE_ID_VERSION: ValueId = 1; -pub struct DnsServerStoreTxRo<'st, B: storage::Backend>(storage::TransactionRo<'st, B, Schema>); +type DnsServerStoreTxRo<'st, B> = StorageTxRo<'st, B, Schema>; +type DnsServerStoreTxRw<'st, B> = StorageTxRw<'st, B, Schema>; -pub struct DnsServerStoreTxRw<'st, B: storage::Backend>(storage::TransactionRw<'st, B, Schema>); - -impl<'tx, B: storage::Backend + 'tx> DnsServerTransactional<'tx> for DnsServerStorageImpl { - type TransactionRo = DnsServerStoreTxRo<'tx, B>; - type TransactionRw = DnsServerStoreTxRw<'tx, B>; - - fn transaction_ro<'st: 'tx>(&'st self) -> Result { - self.0.transaction_ro().map(DnsServerStoreTxRo) - } - - fn transaction_rw<'st: 'tx>(&'st self) -> Result { - self.0.transaction_rw(None).map(DnsServerStoreTxRw) - } -} +pub type DnsServerStorageImpl = StorageImpl; impl DnsServerStorage for DnsServerStorageImpl {} -pub struct DnsServerStorageImpl(storage::Storage); - -impl DnsServerStorageImpl { - pub fn new(storage: B) -> Result { - let store = storage::Storage::<_, Schema>::new(storage)?; - Ok(Self(store)) - } -} - impl<'st, B: storage::Backend> DnsServerStorageWrite for DnsServerStoreTxRw<'st, B> { fn set_version(&mut self, version: u32) -> Result<(), storage::Error> { - self.0.get_mut::().put(VALUE_ID_VERSION, version.encode()) + self.storage().get_mut::().put(VALUE_ID_VERSION, version.encode()) } fn add_address(&mut self, address: &str) -> Result<(), storage::Error> { - self.0.get_mut::().put(address, ()) + self.storage().get_mut::().put(address, ()) } fn del_address(&mut self, address: &str) -> Result<(), storage::Error> { - self.0.get_mut::().del(address) - } -} - -impl<'st, B: storage::Backend> DnsServerTransactionRw for DnsServerStoreTxRw<'st, B> { - fn abort(self) { - self.0.abort() - } - - fn commit(self) -> Result<(), storage::Error> { - self.0.commit() + self.storage().get_mut::().del(address) } } impl<'st, B: storage::Backend> DnsServerStorageRead for DnsServerStoreTxRo<'st, B> { fn get_version(&self) -> Result, storage::Error> { - let map = self.0.get::(); + let map = self.storage().get::(); let vec_opt = map.get(VALUE_ID_VERSION)?.as_ref().map(Encoded::decode); Ok(vec_opt.map(|vec| { u32::decode_all(&mut vec.as_ref()).expect("db values to be encoded correctly") @@ -96,14 +63,8 @@ impl<'st, B: storage::Backend> DnsServerStorageRead for DnsServerStoreTxRo<'st, } fn get_addresses(&self) -> Result, storage::Error> { - let map = self.0.get::(); + let map = self.storage().get::(); let iter = map.prefix_iter_decoded(&())?.map(|(addr, ())| addr); Ok(iter.collect::>()) } } - -impl<'st, B: storage::Backend> DnsServerTransactionRo for DnsServerStoreTxRo<'st, B> { - fn close(self) { - self.0.close() - } -} diff --git a/dns_server/src/crawler_p2p/crawler_manager/tests/mod.rs b/dns_server/src/crawler_p2p/crawler_manager/tests/mod.rs index 6f3bcbc5fa..7548db4c8a 100644 --- a/dns_server/src/crawler_p2p/crawler_manager/tests/mod.rs +++ b/dns_server/src/crawler_p2p/crawler_manager/tests/mod.rs @@ -17,11 +17,11 @@ mod mock_manager; use std::time::Duration; -use p2p::types::socket_address::SocketAddress; +use p2p::{peer_manager::peerdb_common::Transactional, types::socket_address::SocketAddress}; use crate::{ crawler_p2p::crawler_manager::{ - storage::{DnsServerStorageRead, DnsServerTransactional}, + storage::DnsServerStorageRead, tests::mock_manager::{advance_time, test_crawler}, }, dns_server::DnsServerCommand, diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index dc95ec3cc8..9217f77719 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -18,6 +18,7 @@ pub mod address_groups; pub mod peer_context; pub mod peerdb; +pub mod peerdb_common; mod peers_eviction; use std::{ diff --git a/p2p/src/peer_manager/peerdb/mod.rs b/p2p/src/peer_manager/peerdb/mod.rs index c1d23e14dc..00455b73d5 100644 --- a/p2p/src/peer_manager/peerdb/mod.rs +++ b/p2p/src/peer_manager/peerdb/mod.rs @@ -49,7 +49,10 @@ use self::{ storage_load::LoadedStorage, }; -use super::{address_groups::AddressGroup, ip_or_socket_address_to_peer_address}; +use super::{ + address_groups::AddressGroup, ip_or_socket_address_to_peer_address, + peerdb_common::storage::update_db, +}; pub struct PeerDb { /// P2P configuration @@ -213,7 +216,7 @@ impl PeerDb { let banned = now <= *banned_till; if !banned { - storage::update_db(&self.storage, |tx| { + update_db(&self.storage, |tx| { tx.del_banned_address(&address.to_string()) }) .expect("removing banned address is expected to succeed"); @@ -284,13 +287,13 @@ impl PeerDb { match (is_persistent_old, is_persistent_new) { (false, true) => { - storage::update_db(&self.storage, |tx| { + update_db(&self.storage, |tx| { tx.add_known_address(&address.to_string()) }) .expect("adding address expected to succeed (peer_connected)"); } (true, false) => { - storage::update_db(&self.storage, |tx| { + update_db(&self.storage, |tx| { tx.del_known_address(&address.to_string()) }) .expect("adding address expected to succeed (peer_connected)"); @@ -326,7 +329,7 @@ impl PeerDb { pub fn ban(&mut self, address: BannableAddress) { let ban_till = self.time_getter.get_time() + *self.p2p_config.ban_duration; - storage::update_db(&self.storage, |tx| { + update_db(&self.storage, |tx| { tx.add_banned_address(&address.to_string(), ban_till) }) .expect("adding banned address is expected to succeed (ban_peer)"); @@ -335,7 +338,7 @@ impl PeerDb { } pub fn unban(&mut self, address: &BannableAddress) { - storage::update_db(&self.storage, |tx| { + update_db(&self.storage, |tx| { tx.del_banned_address(&address.to_string()) }) .expect("adding banned address is expected to succeed (ban_peer)"); @@ -351,7 +354,7 @@ impl PeerDb { if self.anchor_addresses == anchor_addresses { return; } - storage::update_db(&self.storage, |tx| { + update_db(&self.storage, |tx| { for address in self.anchor_addresses.difference(&anchor_addresses) { log::debug!("remove anchor peer {address}"); tx.del_anchor_address(&address.to_string())?; diff --git a/p2p/src/peer_manager/peerdb/storage.rs b/p2p/src/peer_manager/peerdb/storage.rs index 96c6961224..ff084d3d06 100644 --- a/p2p/src/peer_manager/peerdb/storage.rs +++ b/p2p/src/peer_manager/peerdb/storage.rs @@ -15,6 +15,8 @@ use std::time::Duration; +use crate::peer_manager::peerdb_common::{TransactionRo, TransactionRw, Transactional}; + pub trait PeerDbStorageRead { fn get_version(&self) -> Result, storage::Error>; @@ -45,58 +47,27 @@ pub trait PeerDbStorageWrite { fn del_anchor_address(&mut self, address: &str) -> Result<(), storage::Error>; } -pub trait PeerDbTransactionRo: PeerDbStorageRead { - fn close(self); -} - -pub trait PeerDbTransactionRw: PeerDbStorageWrite { - fn abort(self); - - fn commit(self) -> Result<(), storage::Error>; -} - -/// Support for transactions over blockchain storage -pub trait PeerDbTransactional<'t> { - /// Associated read-only transaction type. - type TransactionRo: PeerDbTransactionRo + 't; +// Note: here we want to say something like: +// pub trait PeerDbStorage: for<'t> Transactional<'t> + Send +// where for<'t> >::TransactionRo: PeerDbStorageRead, +// for<'t> >::TransactionRw: PeerDbStorageWrite {} +// But currently Rust would require us to duplicate the "where" constrains in all places +// where PeerDbStorage is used, so we use this "Helper" approach instead. +pub trait PeerDbStorage: for<'t> PeerDbStorageHelper<'t> + Send {} - /// Associated read-write transaction type. - type TransactionRw: PeerDbTransactionRw + 't; - - /// Start a read-only transaction. - fn transaction_ro<'s: 't>(&'s self) -> Result; - - /// Start a read-write transaction. - fn transaction_rw<'s: 't>(&'s self) -> Result; +pub trait PeerDbStorageHelper<'t>: + Transactional<'t, TransactionRo = Self::TxRo, TransactionRw = Self::TxRw> +{ + type TxRo: TransactionRo + PeerDbStorageRead + 't; + type TxRw: TransactionRw + PeerDbStorageWrite + 't; } -pub trait PeerDbStorage: for<'tx> PeerDbTransactional<'tx> + Send {} - -const MAX_RECOVERABLE_ERROR_RETRY_COUNT: u32 = 3; - -/// Try update storage, gracefully handle recoverable errors -pub fn update_db(storage: &S, f: F) -> Result<(), storage::Error> +impl<'t, T> PeerDbStorageHelper<'t> for T where - S: PeerDbStorage, - F: Fn(&mut >::TransactionRw) -> Result<(), storage::Error>, + T: Transactional<'t>, + Self::TransactionRo: PeerDbStorageRead + 't, + Self::TransactionRw: PeerDbStorageWrite + 't, { - let mut recoverable_errors = 0; - loop { - let res = || -> Result<(), storage::Error> { - let mut tx = storage.transaction_rw()?; - f(&mut tx)?; - tx.commit() - }(); - - match res { - Ok(()) => return Ok(()), - err @ Err(storage::Error::Recoverable(_)) => { - recoverable_errors += 1; - if recoverable_errors >= MAX_RECOVERABLE_ERROR_RETRY_COUNT { - return err; - } - } - err @ Err(storage::Error::Fatal(_)) => return err, - } - } + type TxRo = Self::TransactionRo; + type TxRw = Self::TransactionRw; } diff --git a/p2p/src/peer_manager/peerdb/storage_impl.rs b/p2p/src/peer_manager/peerdb/storage_impl.rs index 016e4fe479..6d7d6d67c7 100644 --- a/p2p/src/peer_manager/peerdb/storage_impl.rs +++ b/p2p/src/peer_manager/peerdb/storage_impl.rs @@ -15,10 +15,9 @@ use std::time::Duration; -use super::storage::{ - PeerDbStorage, PeerDbStorageRead, PeerDbStorageWrite, PeerDbTransactionRo, PeerDbTransactionRw, - PeerDbTransactional, -}; +use crate::peer_manager::peerdb_common::storage_impl::{StorageImpl, StorageTxRo, StorageTxRw}; + +use super::storage::{PeerDbStorage, PeerDbStorageRead, PeerDbStorageWrite}; use serialization::{encoded::Encoded, DecodeAll, Encode}; type ValueId = u32; @@ -42,45 +41,24 @@ storage::decl_schema! { const VALUE_ID_VERSION: ValueId = 1; -pub struct PeerDbStoreTxRo<'st, B: storage::Backend>(storage::TransactionRo<'st, B, Schema>); - -pub struct PeerDbStoreTxRw<'st, B: storage::Backend>(storage::TransactionRw<'st, B, Schema>); - -impl<'tx, B: storage::Backend + 'tx> PeerDbTransactional<'tx> for PeerDbStorageImpl { - type TransactionRo = PeerDbStoreTxRo<'tx, B>; - type TransactionRw = PeerDbStoreTxRw<'tx, B>; - - fn transaction_ro<'st: 'tx>(&'st self) -> Result { - self.0.transaction_ro().map(PeerDbStoreTxRo) - } +type PeerDbStoreTxRo<'st, B> = StorageTxRo<'st, B, Schema>; +type PeerDbStoreTxRw<'st, B> = StorageTxRw<'st, B, Schema>; - fn transaction_rw<'st: 'tx>(&'st self) -> Result { - self.0.transaction_rw(None).map(PeerDbStoreTxRw) - } -} +pub type PeerDbStorageImpl = StorageImpl; impl PeerDbStorage for PeerDbStorageImpl {} -pub struct PeerDbStorageImpl(storage::Storage); - -impl PeerDbStorageImpl { - pub fn new(storage: B) -> crate::Result { - let store = storage::Storage::<_, Schema>::new(storage)?; - Ok(Self(store)) - } -} - impl<'st, B: storage::Backend> PeerDbStorageWrite for PeerDbStoreTxRw<'st, B> { fn set_version(&mut self, version: u32) -> Result<(), storage::Error> { - self.0.get_mut::().put(VALUE_ID_VERSION, version.encode()) + self.storage().get_mut::().put(VALUE_ID_VERSION, version.encode()) } fn add_known_address(&mut self, address: &str) -> Result<(), storage::Error> { - self.0.get_mut::().put(address, ()) + self.storage().get_mut::().put(address, ()) } fn del_known_address(&mut self, address: &str) -> Result<(), storage::Error> { - self.0.get_mut::().del(address) + self.storage().get_mut::().del(address) } fn add_banned_address( @@ -88,35 +66,25 @@ impl<'st, B: storage::Backend> PeerDbStorageWrite for PeerDbStoreTxRw<'st, B> { address: &str, duration: Duration, ) -> Result<(), storage::Error> { - self.0.get_mut::().put(address, duration) + self.storage().get_mut::().put(address, duration) } fn del_banned_address(&mut self, address: &str) -> Result<(), storage::Error> { - self.0.get_mut::().del(address) + self.storage().get_mut::().del(address) } fn add_anchor_address(&mut self, address: &str) -> Result<(), storage::Error> { - self.0.get_mut::().put(address, ()) + self.storage().get_mut::().put(address, ()) } fn del_anchor_address(&mut self, address: &str) -> Result<(), storage::Error> { - self.0.get_mut::().del(address) - } -} - -impl<'st, B: storage::Backend> PeerDbTransactionRw for PeerDbStoreTxRw<'st, B> { - fn abort(self) { - self.0.abort() - } - - fn commit(self) -> Result<(), storage::Error> { - self.0.commit() + self.storage().get_mut::().del(address) } } impl<'st, B: storage::Backend> PeerDbStorageRead for PeerDbStoreTxRo<'st, B> { fn get_version(&self) -> Result, storage::Error> { - let map = self.0.get::(); + let map = self.storage().get::(); let vec_opt = map.get(VALUE_ID_VERSION)?.as_ref().map(Encoded::decode); Ok(vec_opt.map(|vec| { u32::decode_all(&mut vec.as_ref()).expect("db values to be encoded correctly") @@ -124,26 +92,20 @@ impl<'st, B: storage::Backend> PeerDbStorageRead for PeerDbStoreTxRo<'st, B> { } fn get_known_addresses(&self) -> Result, storage::Error> { - let map = self.0.get::(); + let map = self.storage().get::(); let iter = map.prefix_iter_decoded(&())?; Ok(iter.map(|(key, _value)| key).collect::>()) } fn get_banned_addresses(&self) -> Result, storage::Error> { - let map = self.0.get::(); + let map = self.storage().get::(); let iter = map.prefix_iter_decoded(&())?; Ok(iter.collect::>()) } fn get_anchor_addresses(&self) -> Result, storage::Error> { - let map = self.0.get::(); + let map = self.storage().get::(); let iter = map.prefix_iter_decoded(&())?; Ok(iter.map(|(key, _value)| key).collect::>()) } } - -impl<'st, B: storage::Backend> PeerDbTransactionRo for PeerDbStoreTxRo<'st, B> { - fn close(self) { - self.0.close() - } -} diff --git a/p2p/src/peer_manager/peerdb/storage_load.rs b/p2p/src/peer_manager/peerdb/storage_load.rs index af64b52904..0e5bab4a15 100644 --- a/p2p/src/peer_manager/peerdb/storage_load.rs +++ b/p2p/src/peer_manager/peerdb/storage_load.rs @@ -20,12 +20,13 @@ use std::{ use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress}; -use crate::error::P2pError; - -use super::storage::{ - PeerDbStorage, PeerDbStorageRead, PeerDbStorageWrite, PeerDbTransactionRo, PeerDbTransactionRw, +use crate::{ + error::P2pError, + peer_manager::peerdb_common::{TransactionRo, TransactionRw}, }; +use super::storage::{PeerDbStorage, PeerDbStorageRead, PeerDbStorageWrite}; + const STORAGE_VERSION: u32 = 1; pub struct LoadedStorage { diff --git a/p2p/src/peer_manager/peerdb/tests.rs b/p2p/src/peer_manager/peerdb/tests.rs index 9cd4212a90..ff1dbbadd7 100644 --- a/p2p/src/peer_manager/peerdb/tests.rs +++ b/p2p/src/peer_manager/peerdb/tests.rs @@ -23,7 +23,7 @@ use p2p_test_utils::P2pBasicTestTimeGetter; use crate::{ config::P2pConfig, error::{DialError, P2pError}, - peer_manager::peerdb::storage::{PeerDbStorageRead, PeerDbTransactional}, + peer_manager::{peerdb::storage::PeerDbStorageRead, peerdb_common::Transactional}, testing_utils::{peerdb_inmemory_store, test_p2p_config, TestAddressMaker}, }; diff --git a/p2p/src/peer_manager/peerdb_common/mod.rs b/p2p/src/peer_manager/peerdb_common/mod.rs new file mode 100644 index 0000000000..a52bf480be --- /dev/null +++ b/p2p/src/peer_manager/peerdb_common/mod.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod storage; +pub mod storage_impl; + +pub use storage::{TransactionRo, TransactionRw, Transactional}; diff --git a/p2p/src/peer_manager/peerdb_common/storage.rs b/p2p/src/peer_manager/peerdb_common/storage.rs new file mode 100644 index 0000000000..1d01b03aea --- /dev/null +++ b/p2p/src/peer_manager/peerdb_common/storage.rs @@ -0,0 +1,67 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub trait TransactionRo { + fn close(self); +} + +pub trait TransactionRw { + fn abort(self); + + fn commit(self) -> Result<(), storage::Error>; +} + +pub trait Transactional<'t> { + /// Associated read-only transaction type. + type TransactionRo: TransactionRo + 't; + + /// Associated read-write transaction type. + type TransactionRw: TransactionRw + 't; + + /// Start a read-only transaction. + fn transaction_ro<'s: 't>(&'s self) -> Result; + + /// Start a read-write transaction. + fn transaction_rw<'s: 't>(&'s self) -> Result; +} + +const MAX_RECOVERABLE_ERROR_RETRY_COUNT: u32 = 3; + +/// Try updating the storage, gracefully handle recoverable errors. +pub fn update_db<'t, S, F>(storage: &'t S, f: F) -> Result<(), storage::Error> +where + S: Transactional<'t> + Send, + F: Fn(&mut >::TransactionRw) -> Result<(), storage::Error>, +{ + let mut recoverable_errors = 0; + loop { + let res = || -> Result<(), storage::Error> { + let mut tx = storage.transaction_rw()?; + f(&mut tx)?; + tx.commit() + }(); + + match res { + Ok(()) => return Ok(()), + err @ Err(storage::Error::Recoverable(_)) => { + recoverable_errors += 1; + if recoverable_errors >= MAX_RECOVERABLE_ERROR_RETRY_COUNT { + return err; + } + } + err @ Err(storage::Error::Fatal(_)) => return err, + } + } +} diff --git a/p2p/src/peer_manager/peerdb_common/storage_impl.rs b/p2p/src/peer_manager/peerdb_common/storage_impl.rs new file mode 100644 index 0000000000..be6a025a2d --- /dev/null +++ b/p2p/src/peer_manager/peerdb_common/storage_impl.rs @@ -0,0 +1,72 @@ +// Copyright (c) 2021-2023 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use storage::schema::Schema; + +use super::{TransactionRo, TransactionRw, Transactional}; + +pub struct StorageImpl(storage::Storage); + +impl StorageImpl { + pub fn new(storage: B) -> crate::Result { + let store = storage::Storage::<_, Sch>::new(storage)?; + Ok(Self(store)) + } +} + +impl<'tx, B: storage::Backend + 'tx, Sch: Schema> Transactional<'tx> for StorageImpl { + type TransactionRo = StorageTxRo<'tx, B, Sch>; + type TransactionRw = StorageTxRw<'tx, B, Sch>; + + fn transaction_ro<'st: 'tx>(&'st self) -> Result { + self.0.transaction_ro().map(StorageTxRo) + } + + fn transaction_rw<'st: 'tx>(&'st self) -> Result { + self.0.transaction_rw(None).map(StorageTxRw) + } +} + +pub struct StorageTxRo<'st, B: storage::Backend, Sch: Schema>(storage::TransactionRo<'st, B, Sch>); + +impl<'st, B: storage::Backend, Sch: Schema> StorageTxRo<'st, B, Sch> { + pub fn storage(&self) -> &storage::TransactionRo<'st, B, Sch> { + &self.0 + } +} + +impl<'st, B: storage::Backend, Sch: Schema> TransactionRo for StorageTxRo<'st, B, Sch> { + fn close(self) { + self.0.close() + } +} + +pub struct StorageTxRw<'st, B: storage::Backend, Sch: Schema>(storage::TransactionRw<'st, B, Sch>); + +impl<'st, B: storage::Backend, Sch: Schema> StorageTxRw<'st, B, Sch> { + pub fn storage(&mut self) -> &mut storage::TransactionRw<'st, B, Sch> { + &mut self.0 + } +} + +impl<'st, B: storage::Backend, Sch: Schema> TransactionRw for StorageTxRw<'st, B, Sch> { + fn abort(self) { + self.0.abort() + } + + fn commit(self) -> Result<(), storage::Error> { + self.0.commit() + } +}