Skip to content

Remove BannableAddress generic (p2p) #1117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::{
collections::BTreeMap,
net::{IpAddr, SocketAddr},
net::SocketAddr,
sync::{Arc, Mutex},
time::Duration,
};
Expand Down Expand Up @@ -117,7 +117,6 @@ pub struct MockSyncingEventReceiver {}
impl NetworkingService for MockNetworkingService {
type Transport = ();
type Address = SocketAddr;
type BannableAddress = IpAddr;
type ConnectivityHandle = MockConnectivityHandle;
type MessagingHandle = ();
type SyncingEventReceiver = MockSyncingEventReceiver;
Expand Down
16 changes: 4 additions & 12 deletions p2p/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
collections::BTreeSet,
net::{IpAddr, SocketAddr},
sync::Arc,
};
use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};

use common::chain::config::create_unit_test_config;
use criterion::{criterion_group, criterion_main, Criterion};
Expand All @@ -34,13 +30,9 @@ pub fn peer_db(c: &mut Criterion) {
let db_store = peerdb_inmemory_store();
let chain_config = create_unit_test_config();
let p2p_config = Arc::new(test_p2p_config());
let mut peerdb = PeerDb::<SocketAddr, IpAddr, _>::new(
&chain_config,
p2p_config,
Default::default(),
db_store,
)
.unwrap();
let mut peerdb =
PeerDb::<SocketAddr, _>::new(&chain_config, p2p_config, Default::default(), db_store)
.unwrap();

for _ in 0..100000 {
peerdb.peer_discovered(TestTcpAddressMaker::new());
Expand Down
10 changes: 6 additions & 4 deletions p2p/src/interface/p2p_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
use std::sync::Arc;

use common::chain::SignedTransaction;
use p2p_types::{ip_or_socket_address::IpOrSocketAddress, p2p_event::P2pEvent};
use p2p_types::{
bannable_address::BannableAddress, ip_or_socket_address::IpOrSocketAddress, p2p_event::P2pEvent,
};

use crate::{interface::types::ConnectedPeer, types::peer_id::PeerId};

Expand All @@ -25,9 +27,9 @@ pub trait P2pInterface: Send + Sync {
async fn connect(&mut self, addr: IpOrSocketAddress) -> crate::Result<()>;
async fn disconnect(&mut self, peer_id: PeerId) -> crate::Result<()>;

async fn list_banned(&mut self) -> crate::Result<Vec<String>>;
async fn ban(&mut self, addr: String) -> crate::Result<()>;
async fn unban(&mut self, addr: String) -> crate::Result<()>;
async fn list_banned(&mut self) -> crate::Result<Vec<BannableAddress>>;
async fn ban(&mut self, addr: BannableAddress) -> crate::Result<()>;
async fn unban(&mut self, addr: BannableAddress) -> crate::Result<()>;

async fn get_peer_count(&self) -> crate::Result<usize>;
async fn get_bind_addresses(&self) -> crate::Result<Vec<String>>;
Expand Down
20 changes: 7 additions & 13 deletions p2p/src/interface/p2p_interface_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use std::sync::Arc;

use common::chain::SignedTransaction;
use mempool::TxOrigin;
use p2p_types::ip_or_socket_address::IpOrSocketAddress;
use p2p_types::{bannable_address::BannableAddress, ip_or_socket_address::IpOrSocketAddress};

use crate::{
error::{ConversionError, P2pError},
error::P2pError,
interface::{p2p_interface::P2pInterface, types::ConnectedPeer},
net::NetworkingService,
types::peer_id::PeerId,
Expand All @@ -31,7 +31,7 @@ use crate::{
#[async_trait::async_trait]
impl<T> P2pInterface for P2p<T>
where
T: NetworkingService,
T: NetworkingService + Send + Sync,
T::MessagingHandle: MessagingService,
{
async fn connect(&mut self, addr: IpOrSocketAddress) -> crate::Result<()> {
Expand All @@ -50,29 +50,23 @@ where
rx.await?
}

async fn list_banned(&mut self) -> crate::Result<Vec<String>> {
async fn list_banned(&mut self) -> crate::Result<Vec<BannableAddress>> {
let (tx, rx) = oneshot_nofail::channel();
self.tx_peer_manager
.send(PeerManagerEvent::ListBanned(tx))
.map_err(|_| P2pError::ChannelClosed)?;
let list = rx.await?;
Ok(list.iter().map(ToString::to_string).collect())
Ok(list)
}
async fn ban(&mut self, addr: String) -> crate::Result<()> {
async fn ban(&mut self, addr: BannableAddress) -> crate::Result<()> {
let (tx, rx) = oneshot_nofail::channel();
let addr = addr
.parse::<T::BannableAddress>()
.map_err(|_| P2pError::ConversionError(ConversionError::InvalidAddress(addr)))?;
self.tx_peer_manager
.send(PeerManagerEvent::Ban(addr, tx))
.map_err(|_| P2pError::ChannelClosed)?;
rx.await?
}
async fn unban(&mut self, addr: String) -> crate::Result<()> {
async fn unban(&mut self, addr: BannableAddress) -> crate::Result<()> {
let (tx, rx) = oneshot_nofail::channel();
let addr = addr
.parse::<T::BannableAddress>()
.map_err(|_| P2pError::ConversionError(ConversionError::InvalidAddress(addr)))?;
self.tx_peer_manager
.send(PeerManagerEvent::Unban(addr, tx))
.map_err(|_| P2pError::ChannelClosed)?;
Expand Down
8 changes: 4 additions & 4 deletions p2p/src/interface/p2p_interface_impl_delegation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
};

use common::chain::SignedTransaction;
use p2p_types::ip_or_socket_address::IpOrSocketAddress;
use p2p_types::{bannable_address::BannableAddress, ip_or_socket_address::IpOrSocketAddress};

use crate::{types::peer_id::PeerId, P2pEvent};

Expand All @@ -37,13 +37,13 @@ impl<T: Deref<Target = dyn P2pInterface> + DerefMut<Target = dyn P2pInterface> +
self.deref_mut().disconnect(peer_id).await
}

async fn list_banned(&mut self) -> crate::Result<Vec<String>> {
async fn list_banned(&mut self) -> crate::Result<Vec<BannableAddress>> {
self.deref_mut().list_banned().await
}
async fn ban(&mut self, addr: String) -> crate::Result<()> {
async fn ban(&mut self, addr: BannableAddress) -> crate::Result<()> {
self.deref_mut().ban(addr).await
}
async fn unban(&mut self, addr: String) -> crate::Result<()> {
async fn unban(&mut self, addr: BannableAddress) -> crate::Result<()> {
self.deref_mut().unban(addr).await
}

Expand Down
8 changes: 6 additions & 2 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use crate::{
};

use std::{
marker::PhantomData,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
sync::Arc,
};
Expand Down Expand Up @@ -78,7 +79,7 @@ pub type Result<T> = core::result::Result<T, P2pError>;

struct P2p<T: NetworkingService> {
/// A sender for the peer manager events.
pub tx_peer_manager: mpsc::UnboundedSender<PeerManagerEvent<T>>,
pub tx_peer_manager: mpsc::UnboundedSender<PeerManagerEvent>,
mempool_handle: MempoolHandle,

backend_shutdown_sender: oneshot::Sender<()>,
Expand All @@ -92,11 +93,13 @@ struct P2p<T: NetworkingService> {
sync_manager_task: JoinHandle<()>,

subscribers_sender: mpsc::UnboundedSender<P2pEventHandler>,

_phantom: PhantomData<T>,
}

impl<T> P2p<T>
where
T: 'static + NetworkingService + Send,
T: 'static + NetworkingService + Send + Sync,
T::ConnectivityHandle: ConnectivityService<T>,
T::MessagingHandle: MessagingService,
T::SyncingEventReceiver: SyncingEventReceiver,
Expand Down Expand Up @@ -198,6 +201,7 @@ where
peer_manager_task,
sync_manager_task,
subscribers_sender,
_phantom: PhantomData,
})
}

Expand Down
1 change: 0 additions & 1 deletion p2p/src/net/default_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ pub struct SyncingReceiver {
impl<T: TransportSocket> NetworkingService for DefaultNetworkingService<T> {
type Transport = T;
type Address = T::Address;
type BannableAddress = T::BannableAddress;
type ConnectivityHandle = ConnectivityHandle<Self, T>;
type MessagingHandle = MessagingHandle<T>;
type SyncingEventReceiver = SyncingReceiver;
Expand Down
1 change: 0 additions & 1 deletion p2p/src/net/default_backend/transport/impls/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl MpscChannelTransport {
#[async_trait]
impl TransportSocket for MpscChannelTransport {
type Address = SocketAddr;
type BannableAddress = IpAddr;
type Listener = ChannelListener;
type Stream = ChannelStream;

Expand Down
6 changes: 1 addition & 5 deletions p2p/src/net/default_backend/transport/impls/socks5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
net::{IpAddr, SocketAddr},
sync::Arc,
};
use std::{net::SocketAddr, sync::Arc};

use async_trait::async_trait;
use futures::future::BoxFuture;
Expand Down Expand Up @@ -48,7 +45,6 @@ impl Socks5TransportSocket {
#[async_trait]
impl TransportSocket for Socks5TransportSocket {
type Address = SocketAddr;
type BannableAddress = IpAddr;
type Listener = Socks5TransportListener;
type Stream = Socks5TransportStream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ impl TestTransport {
#[async_trait]
impl TransportSocket for TestTransport {
type Address = <MpscChannelTransport as TransportSocket>::Address;
type BannableAddress = <MpscChannelTransport as TransportSocket>::BannableAddress;
type Listener = TestListener;
type Stream = <MpscChannelTransport as TransportSocket>::Stream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl<S: StreamAdapter<T::Stream>, T: TransportSocket> TransportSocket
for WrappedTransportSocket<S, T>
{
type Address = T::Address;
type BannableAddress = T::BannableAddress;
type Listener = AdaptedListener<S, T>;
type Stream = S::Stream;

Expand Down
10 changes: 4 additions & 6 deletions p2p/src/net/default_backend/transport/impls/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};

use async_trait::async_trait;
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use p2p_types::bannable_address::BannableAddress;
use tokio::net::{TcpListener, TcpStream};

use crate::{
Expand Down Expand Up @@ -66,7 +67,6 @@ impl TcpTransportSocket {
#[async_trait]
impl TransportSocket for TcpTransportSocket {
type Address = SocketAddr;
type BannableAddress = IpAddr;
type Listener = TcpTransportListener;
type Stream = TcpTransportStream;

Expand Down Expand Up @@ -156,10 +156,8 @@ impl TransportListener for TcpTransportListener {
}

impl AsBannableAddress for SocketAddr {
type BannableAddress = IpAddr;

fn as_bannable(&self) -> Self::BannableAddress {
self.ip()
fn as_bannable(&self) -> BannableAddress {
BannableAddress::new(self.ip())
}
}

Expand Down
5 changes: 1 addition & 4 deletions p2p/src/net/default_backend/transport/traits/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ pub trait TransportSocket: Send + Sync + 'static {
+ Sync
+ ToString
+ FromStr
+ AsBannableAddress<BannableAddress = Self::BannableAddress>;

/// A bannable address format.
type BannableAddress: Clone + Debug + Eq + Ord + Send + ToString + FromStr;
+ AsBannableAddress;

/// A listener type (or acceptor as per boost terminology).
type Listener: TransportListener<Stream = Self::Stream, Address = Self::Address>;
Expand Down
13 changes: 3 additions & 10 deletions p2p/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{fmt::Debug, hash::Hash, str::FromStr, sync::Arc};

use async_trait::async_trait;
use common::time_getter::TimeGetter;
use p2p_types::bannable_address::BannableAddress;
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
Expand Down Expand Up @@ -63,13 +64,7 @@ pub trait NetworkingService {
+ ToString
+ FromStr
+ TransportAddress
+ AsBannableAddress<BannableAddress = Self::BannableAddress>;

/// An address type that can be banned.
///
/// Usually it is part of the `NetworkingService::Address`. For example for a socket address
/// that consists of an IP address and a port we want to ban the IP address.
type BannableAddress: Clone + Debug + Eq + Ord + Send + ToString + FromStr;
+ AsBannableAddress;

/// Handle for sending/receiving connectivity-related events
type ConnectivityHandle: Send;
Expand Down Expand Up @@ -158,8 +153,6 @@ pub trait SyncingEventReceiver {
/// `SocketAddr` contains a port in addition to an IP address and we want to ban only the latter
/// one.
pub trait AsBannableAddress {
type BannableAddress;

/// Returns a bannable part of an address.
fn as_bannable(&self) -> Self::BannableAddress;
fn as_bannable(&self) -> BannableAddress;
}
8 changes: 4 additions & 4 deletions p2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
peer_connectivity_handle: T::ConnectivityHandle,

/// RX channel for receiving control events
rx_peer_manager: mpsc::UnboundedReceiver<PeerManagerEvent<T>>,
rx_peer_manager: mpsc::UnboundedReceiver<PeerManagerEvent>,

/// Hashmap of pending outbound connections
pending_outbound_connects:
Expand All @@ -138,7 +138,7 @@ where
peers: BTreeMap<PeerId, PeerContext<T::Address>>,

/// Peer database
peerdb: peerdb::PeerDb<T::Address, T::BannableAddress, S>,
peerdb: peerdb::PeerDb<T::Address, S>,

/// List of connected peers that subscribed to PeerAddresses topic
subscribed_to_peer_addresses: BTreeSet<PeerId>,
Expand Down Expand Up @@ -168,7 +168,7 @@ where
chain_config: Arc<ChainConfig>,
p2p_config: Arc<P2pConfig>,
handle: T::ConnectivityHandle,
rx_peer_manager: mpsc::UnboundedReceiver<PeerManagerEvent<T>>,
rx_peer_manager: mpsc::UnboundedReceiver<PeerManagerEvent>,
time_getter: TimeGetter,
peerdb_storage: S,
) -> crate::Result<Self> {
Expand Down Expand Up @@ -966,7 +966,7 @@ where
/// Handle control event.
///
/// Handle events from an outside controller (rpc, for example) that sets/gets values for PeerManager.
fn handle_control_event(&mut self, event: PeerManagerEvent<T>) {
fn handle_control_event(&mut self, event: PeerManagerEvent) {
match event {
PeerManagerEvent::Connect(address, response) => {
let address_res =
Expand Down
Loading