Skip to content

Commit c330f54

Browse files
Merge pull request #1231 from mintlayer/peer_db_code_dedup
Some code deduplication related to peer db storage
2 parents 38039e0 + 0458a5d commit c330f54

File tree

14 files changed

+256
-229
lines changed

14 files changed

+256
-229
lines changed

dns_server/Cargo.toml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@ storage = { path = "../storage" }
1717
storage-lmdb = { path = "../storage/lmdb" }
1818
utils = { path = '../utils' }
1919

20+
async-trait.workspace = true
21+
clap = { workspace = true, features = ["derive"] }
22+
directories.workspace = true
2023
futures = { workspace = true }
24+
parity-scale-codec.workspace = true
25+
thiserror.workspace = true
2126
tokio = { workspace = true, default-features = false }
22-
2327
trust-dns-client.workspace = true
2428
trust-dns-server.workspace = true
25-
thiserror.workspace = true
26-
async-trait.workspace = true
27-
parity-scale-codec.workspace = true
28-
clap = { workspace = true, features = ["derive"] }
29-
directories.workspace = true
3029

3130
[dev-dependencies]
3231
p2p-test-utils = { path = "../p2p/p2p-test-utils" }

dns_server/src/crawler_p2p/crawler_manager/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ use p2p::{
3333
types::{ConnectivityEvent, SyncingEvent},
3434
ConnectivityService, NetworkingService, SyncingEventReceiver,
3535
},
36-
peer_manager::ip_or_socket_address_to_peer_address,
36+
peer_manager::{
37+
ip_or_socket_address_to_peer_address,
38+
peerdb_common::{storage::update_db, TransactionRo, TransactionRw},
39+
},
3740
types::{
3841
ip_or_socket_address::IpOrSocketAddress, peer_address::PeerAddress, peer_id::PeerId,
3942
socket_address::SocketAddress, IsGlobalIp,
@@ -43,10 +46,7 @@ use tokio::sync::mpsc;
4346

4447
use crate::{dns_server::DnsServerCommand, error::DnsServerError};
4548

46-
use self::storage::{
47-
DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite, DnsServerTransactionRo,
48-
DnsServerTransactionRw,
49-
};
49+
use self::storage::{DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite};
5050

5151
use super::crawler::{Crawler, CrawlerCommand, CrawlerEvent};
5252

@@ -299,11 +299,11 @@ where
299299

300300
match (old_state.is_persistent(), new_state.is_persistent()) {
301301
(false, true) => {
302-
storage::update_db(storage, |tx| tx.add_address(&address.to_string()))
302+
update_db(storage, |tx| tx.add_address(&address.to_string()))
303303
.expect("update_db must succeed (add_address)");
304304
}
305305
(true, false) => {
306-
storage::update_db(storage, |tx| tx.del_address(&address.to_string()))
306+
update_db(storage, |tx| tx.del_address(&address.to_string()))
307307
.expect("update_db must succeed (del_address)");
308308
}
309309
_ => {}

dns_server/src/crawler_p2p/crawler_manager/storage.rs

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16+
use p2p::peer_manager::peerdb_common::{TransactionRo, TransactionRw, Transactional};
17+
1618
pub trait DnsServerStorageRead {
1719
fn get_version(&self) -> Result<Option<u32>, storage::Error>;
1820

@@ -27,58 +29,27 @@ pub trait DnsServerStorageWrite {
2729
fn del_address(&mut self, address: &str) -> Result<(), storage::Error>;
2830
}
2931

30-
pub trait DnsServerTransactionRo: DnsServerStorageRead {
31-
fn close(self);
32-
}
33-
34-
pub trait DnsServerTransactionRw: DnsServerStorageWrite {
35-
fn abort(self);
36-
37-
fn commit(self) -> Result<(), storage::Error>;
38-
}
39-
40-
/// Support for transactions over blockchain storage
41-
pub trait DnsServerTransactional<'t> {
42-
/// Associated read-only transaction type.
43-
type TransactionRo: DnsServerTransactionRo + 't;
32+
// Note: here we want to say something like:
33+
// pub trait DnsServerStorage: for<'t> Transactional<'t> + Send
34+
// where for<'t> <Self as Transactional<'t>>::TransactionRo: DnsServerStorageRead,
35+
// for<'t> <Self as Transactional<'t>>::TransactionRw: DnsServerStorageWrite {}
36+
// But currently Rust would require us to duplicate the "where" constrains in all places
37+
// where DnsServerStorage is used, so we use this "Helper" approach instead.
38+
pub trait DnsServerStorage: for<'t> DnsServerStorageHelper<'t> + Send {}
4439

45-
/// Associated read-write transaction type.
46-
type TransactionRw: DnsServerTransactionRw + 't;
47-
48-
/// Start a read-only transaction.
49-
fn transaction_ro<'s: 't>(&'s self) -> Result<Self::TransactionRo, storage::Error>;
50-
51-
/// Start a read-write transaction.
52-
fn transaction_rw<'s: 't>(&'s self) -> Result<Self::TransactionRw, storage::Error>;
40+
pub trait DnsServerStorageHelper<'t>:
41+
Transactional<'t, TransactionRo = Self::TxRo, TransactionRw = Self::TxRw>
42+
{
43+
type TxRo: TransactionRo + DnsServerStorageRead + 't;
44+
type TxRw: TransactionRw + DnsServerStorageWrite + 't;
5345
}
5446

55-
pub trait DnsServerStorage: for<'tx> DnsServerTransactional<'tx> + Send {}
56-
57-
const MAX_RECOVERABLE_ERROR_RETRY_COUNT: u32 = 3;
58-
59-
/// Try update storage, gracefully handle recoverable errors
60-
pub fn update_db<S, F>(storage: &S, f: F) -> Result<(), storage::Error>
47+
impl<'t, T> DnsServerStorageHelper<'t> for T
6148
where
62-
S: DnsServerStorage,
63-
F: Fn(&mut <S as DnsServerTransactional<'_>>::TransactionRw) -> Result<(), storage::Error>,
49+
T: Transactional<'t>,
50+
Self::TransactionRo: DnsServerStorageRead + 't,
51+
Self::TransactionRw: DnsServerStorageWrite + 't,
6452
{
65-
let mut recoverable_errors = 0;
66-
loop {
67-
let res = || -> Result<(), storage::Error> {
68-
let mut tx = storage.transaction_rw()?;
69-
f(&mut tx)?;
70-
tx.commit()
71-
}();
72-
73-
match res {
74-
Ok(()) => return Ok(()),
75-
Err(storage::Error::Recoverable(e)) => {
76-
recoverable_errors += 1;
77-
if recoverable_errors >= MAX_RECOVERABLE_ERROR_RETRY_COUNT {
78-
return Err(storage::Error::Recoverable(e));
79-
}
80-
}
81-
Err(storage::Error::Fatal(e)) => return Err(storage::Error::Fatal(e)),
82-
}
83-
}
53+
type TxRo = Self::TransactionRo;
54+
type TxRw = Self::TransactionRw;
8455
}

dns_server/src/crawler_p2p/crawler_manager/storage_impl.rs

Lines changed: 10 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use super::storage::{
17-
DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite, DnsServerTransactionRo,
18-
DnsServerTransactionRw, DnsServerTransactional,
19-
};
16+
use super::storage::{DnsServerStorage, DnsServerStorageRead, DnsServerStorageWrite};
17+
use p2p::peer_manager::peerdb_common::storage_impl::{StorageImpl, StorageTxRo, StorageTxRw};
2018
use serialization::{encoded::Encoded, DecodeAll, Encode};
2119

2220
type ValueId = u32;
@@ -34,76 +32,39 @@ storage::decl_schema! {
3432

3533
const VALUE_ID_VERSION: ValueId = 1;
3634

37-
pub struct DnsServerStoreTxRo<'st, B: storage::Backend>(storage::TransactionRo<'st, B, Schema>);
35+
type DnsServerStoreTxRo<'st, B> = StorageTxRo<'st, B, Schema>;
36+
type DnsServerStoreTxRw<'st, B> = StorageTxRw<'st, B, Schema>;
3837

39-
pub struct DnsServerStoreTxRw<'st, B: storage::Backend>(storage::TransactionRw<'st, B, Schema>);
40-
41-
impl<'tx, B: storage::Backend + 'tx> DnsServerTransactional<'tx> for DnsServerStorageImpl<B> {
42-
type TransactionRo = DnsServerStoreTxRo<'tx, B>;
43-
type TransactionRw = DnsServerStoreTxRw<'tx, B>;
44-
45-
fn transaction_ro<'st: 'tx>(&'st self) -> Result<Self::TransactionRo, storage::Error> {
46-
self.0.transaction_ro().map(DnsServerStoreTxRo)
47-
}
48-
49-
fn transaction_rw<'st: 'tx>(&'st self) -> Result<Self::TransactionRw, storage::Error> {
50-
self.0.transaction_rw(None).map(DnsServerStoreTxRw)
51-
}
52-
}
38+
pub type DnsServerStorageImpl<B> = StorageImpl<B, Schema>;
5339

5440
impl<B: storage::Backend + 'static> DnsServerStorage for DnsServerStorageImpl<B> {}
5541

56-
pub struct DnsServerStorageImpl<T: storage::Backend>(storage::Storage<T, Schema>);
57-
58-
impl<B: storage::Backend> DnsServerStorageImpl<B> {
59-
pub fn new(storage: B) -> Result<Self, storage::Error> {
60-
let store = storage::Storage::<_, Schema>::new(storage)?;
61-
Ok(Self(store))
62-
}
63-
}
64-
6542
impl<'st, B: storage::Backend> DnsServerStorageWrite for DnsServerStoreTxRw<'st, B> {
6643
fn set_version(&mut self, version: u32) -> Result<(), storage::Error> {
67-
self.0.get_mut::<DBValue, _>().put(VALUE_ID_VERSION, version.encode())
44+
self.storage().get_mut::<DBValue, _>().put(VALUE_ID_VERSION, version.encode())
6845
}
6946

7047
fn add_address(&mut self, address: &str) -> Result<(), storage::Error> {
71-
self.0.get_mut::<DBAddresses, _>().put(address, ())
48+
self.storage().get_mut::<DBAddresses, _>().put(address, ())
7249
}
7350

7451
fn del_address(&mut self, address: &str) -> Result<(), storage::Error> {
75-
self.0.get_mut::<DBAddresses, _>().del(address)
76-
}
77-
}
78-
79-
impl<'st, B: storage::Backend> DnsServerTransactionRw for DnsServerStoreTxRw<'st, B> {
80-
fn abort(self) {
81-
self.0.abort()
82-
}
83-
84-
fn commit(self) -> Result<(), storage::Error> {
85-
self.0.commit()
52+
self.storage().get_mut::<DBAddresses, _>().del(address)
8653
}
8754
}
8855

8956
impl<'st, B: storage::Backend> DnsServerStorageRead for DnsServerStoreTxRo<'st, B> {
9057
fn get_version(&self) -> Result<Option<u32>, storage::Error> {
91-
let map = self.0.get::<DBValue, _>();
58+
let map = self.storage().get::<DBValue, _>();
9259
let vec_opt = map.get(VALUE_ID_VERSION)?.as_ref().map(Encoded::decode);
9360
Ok(vec_opt.map(|vec| {
9461
u32::decode_all(&mut vec.as_ref()).expect("db values to be encoded correctly")
9562
}))
9663
}
9764

9865
fn get_addresses(&self) -> Result<Vec<String>, storage::Error> {
99-
let map = self.0.get::<DBAddresses, _>();
66+
let map = self.storage().get::<DBAddresses, _>();
10067
let iter = map.prefix_iter_decoded(&())?.map(|(addr, ())| addr);
10168
Ok(iter.collect::<Vec<_>>())
10269
}
10370
}
104-
105-
impl<'st, B: storage::Backend> DnsServerTransactionRo for DnsServerStoreTxRo<'st, B> {
106-
fn close(self) {
107-
self.0.close()
108-
}
109-
}

dns_server/src/crawler_p2p/crawler_manager/tests/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ mod mock_manager;
1717

1818
use std::time::Duration;
1919

20-
use p2p::types::socket_address::SocketAddress;
20+
use p2p::{peer_manager::peerdb_common::Transactional, types::socket_address::SocketAddress};
2121

2222
use crate::{
2323
crawler_p2p::crawler_manager::{
24-
storage::{DnsServerStorageRead, DnsServerTransactional},
24+
storage::DnsServerStorageRead,
2525
tests::mock_manager::{advance_time, test_crawler},
2626
},
2727
dns_server::DnsServerCommand,

p2p/src/peer_manager/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
pub mod address_groups;
1919
pub mod peer_context;
2020
pub mod peerdb;
21+
pub mod peerdb_common;
2122
mod peers_eviction;
2223

2324
use std::{

p2p/src/peer_manager/peerdb/mod.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ use self::{
4949
storage_load::LoadedStorage,
5050
};
5151

52-
use super::{address_groups::AddressGroup, ip_or_socket_address_to_peer_address};
52+
use super::{
53+
address_groups::AddressGroup, ip_or_socket_address_to_peer_address,
54+
peerdb_common::storage::update_db,
55+
};
5356

5457
pub struct PeerDb<S> {
5558
/// P2P configuration
@@ -213,7 +216,7 @@ impl<S: PeerDbStorage> PeerDb<S> {
213216
let banned = now <= *banned_till;
214217

215218
if !banned {
216-
storage::update_db(&self.storage, |tx| {
219+
update_db(&self.storage, |tx| {
217220
tx.del_banned_address(&address.to_string())
218221
})
219222
.expect("removing banned address is expected to succeed");
@@ -284,13 +287,13 @@ impl<S: PeerDbStorage> PeerDb<S> {
284287

285288
match (is_persistent_old, is_persistent_new) {
286289
(false, true) => {
287-
storage::update_db(&self.storage, |tx| {
290+
update_db(&self.storage, |tx| {
288291
tx.add_known_address(&address.to_string())
289292
})
290293
.expect("adding address expected to succeed (peer_connected)");
291294
}
292295
(true, false) => {
293-
storage::update_db(&self.storage, |tx| {
296+
update_db(&self.storage, |tx| {
294297
tx.del_known_address(&address.to_string())
295298
})
296299
.expect("adding address expected to succeed (peer_connected)");
@@ -326,7 +329,7 @@ impl<S: PeerDbStorage> PeerDb<S> {
326329
pub fn ban(&mut self, address: BannableAddress) {
327330
let ban_till = self.time_getter.get_time() + *self.p2p_config.ban_duration;
328331

329-
storage::update_db(&self.storage, |tx| {
332+
update_db(&self.storage, |tx| {
330333
tx.add_banned_address(&address.to_string(), ban_till)
331334
})
332335
.expect("adding banned address is expected to succeed (ban_peer)");
@@ -335,7 +338,7 @@ impl<S: PeerDbStorage> PeerDb<S> {
335338
}
336339

337340
pub fn unban(&mut self, address: &BannableAddress) {
338-
storage::update_db(&self.storage, |tx| {
341+
update_db(&self.storage, |tx| {
339342
tx.del_banned_address(&address.to_string())
340343
})
341344
.expect("adding banned address is expected to succeed (ban_peer)");
@@ -351,7 +354,7 @@ impl<S: PeerDbStorage> PeerDb<S> {
351354
if self.anchor_addresses == anchor_addresses {
352355
return;
353356
}
354-
storage::update_db(&self.storage, |tx| {
357+
update_db(&self.storage, |tx| {
355358
for address in self.anchor_addresses.difference(&anchor_addresses) {
356359
log::debug!("remove anchor peer {address}");
357360
tx.del_anchor_address(&address.to_string())?;

0 commit comments

Comments
 (0)