diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 2ac4283397..615df5a350 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -152,7 +152,7 @@ impl Default for Channel { } } -pub type MockEvent = Event; +pub type MockEvent = Event; pub type MockData = Vec; type MessageForUser = (NetworkData, DataCommand<::PeerId>); @@ -183,7 +183,7 @@ impl MockIO { pub struct MockEventStream(mpsc::UnboundedReceiver); #[async_trait] -impl EventStream for MockEventStream { +impl EventStream for MockEventStream { async fn next_event(&mut self) -> Option { self.0.next().await } diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 5862a55341..2b30ef09e5 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -89,26 +89,26 @@ pub trait NetworkSender: Send + Sync + 'static { } #[derive(Clone)] -pub enum Event { +pub enum Event { Connected(M), - Disconnected(M::PeerId), - StreamOpened(M::PeerId, Protocol), - StreamClosed(M::PeerId, Protocol), + Disconnected(P), + StreamOpened(P, Protocol), + StreamClosed(P, Protocol), Messages(Vec<(Protocol, Bytes)>), } #[async_trait] -pub trait EventStream { - async fn next_event(&mut self) -> Option>; +pub trait EventStream { + async fn next_event(&mut self) -> Option>; } /// Abstraction over a network. pub trait Network: Clone + Send + Sync + 'static { type SenderError: std::error::Error; type NetworkSender: NetworkSender; - type PeerId: PeerId; - type Multiaddress: Multiaddress; - type EventStream: EventStream; + type PeerId: Clone + Debug + Eq + Hash + Send; + type Multiaddress: Debug + Eq + Hash; + type EventStream: EventStream; /// Returns a stream of events representing what happens on the network. fn event_stream(&self) -> Self::EventStream; diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index 6028945cbd..142e4cfb4c 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -187,7 +187,7 @@ impl< fn handle_network_event( &mut self, - event: Event, + event: Event, ) -> Result<(), mpsc::TrySendError>> { use Event::*; match event { diff --git a/finality-aleph/src/substrate_network.rs b/finality-aleph/src/substrate_network.rs index 9638520b29..e4a14d9818 100644 --- a/finality-aleph/src/substrate_network.rs +++ b/finality-aleph/src/substrate_network.rs @@ -1,13 +1,12 @@ use std::{borrow::Cow, collections::HashSet, fmt, iter, pin::Pin, sync::Arc}; use async_trait::async_trait; -use codec::{Decode, Encode}; use futures::stream::{Stream, StreamExt}; use log::error; use sc_consensus::JustificationSyncLink; use sc_network::{ multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, ExHashT, Multiaddr, - NetworkService, NetworkStateInfo, NetworkSyncForkRequest, PeerId as SubstratePeerId, + NetworkService, NetworkSyncForkRequest, PeerId, }; use sc_network_common::service::{ NetworkEventStream as _, NetworkNotification, NetworkPeers, NotificationSender, @@ -16,10 +15,7 @@ use sp_api::NumberFor; use sp_consensus::SyncOracle; use sp_runtime::traits::Block; -use crate::network::{ - Event, EventStream, Multiaddress as MultiaddressT, Network, NetworkIdentity, NetworkSender, - PeerId as PeerIdT, Protocol, RequestBlocks, -}; +use crate::network::{Event, EventStream, Network, NetworkSender, Protocol, RequestBlocks}; impl RequestBlocks for Arc> { fn request_justification(&self, hash: &B::Hash, number: NumberFor) { @@ -44,146 +40,6 @@ impl RequestBlocks for Arc> { } } -#[derive(PartialEq, Eq, Copy, Clone, Debug, Hash)] -pub struct PeerId(SubstratePeerId); - -impl From for SubstratePeerId { - fn from(wrapper: PeerId) -> Self { - wrapper.0 - } -} - -impl From for PeerId { - fn from(id: SubstratePeerId) -> Self { - PeerId(id) - } -} - -impl Encode for PeerId { - fn using_encoded R>(&self, f: F) -> R { - self.0.to_bytes().using_encoded(f) - } -} - -impl Decode for PeerId { - fn decode(value: &mut I) -> Result { - let bytes = Vec::::decode(value)?; - SubstratePeerId::from_bytes(&bytes) - .map_err(|_| "PeerId not encoded with to_bytes".into()) - .map(|pid| pid.into()) - } -} - -impl fmt::Display for PeerId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl PeerIdT for PeerId {} - -fn peer_id(protocol: &MultiaddressProtocol<'_>) -> Option { - match protocol { - MultiaddressProtocol::P2p(hashed_peer_id) => { - SubstratePeerId::from_multihash(*hashed_peer_id) - .ok() - .map(PeerId) - } - _ => None, - } -} - -/// A wrapper for the Substrate multiaddress to allow encoding & decoding. -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct Multiaddress(Multiaddr); - -impl From for Multiaddress { - fn from(addr: Multiaddr) -> Self { - Multiaddress(addr) - } -} - -impl From for Multiaddr { - fn from(addr: Multiaddress) -> Self { - addr.0 - } -} - -impl Encode for Multiaddress { - fn using_encoded R>(&self, f: F) -> R { - self.0.to_vec().using_encoded(f) - } -} - -impl Decode for Multiaddress { - fn decode(value: &mut I) -> Result { - let bytes = Vec::::decode(value)?; - Multiaddr::try_from(bytes) - .map_err(|_| "Multiaddr not encoded as bytes".into()) - .map(|multiaddr| multiaddr.into()) - } -} - -enum CommonPeerId { - Unknown, - Unique(PeerId), - NotUnique, -} - -impl From for Option { - fn from(cpi: CommonPeerId) -> Self { - use CommonPeerId::*; - match cpi { - Unique(peer_id) => Some(peer_id), - Unknown | NotUnique => None, - } - } -} - -impl CommonPeerId { - fn aggregate(self, peer_id: PeerId) -> Self { - use CommonPeerId::*; - match self { - Unknown => Unique(peer_id), - Unique(current_peer_id) => match peer_id == current_peer_id { - true => Unique(current_peer_id), - false => NotUnique, - }, - NotUnique => NotUnique, - } - } -} - -impl MultiaddressT for Multiaddress { - type PeerId = PeerId; - - fn get_peer_id(&self) -> Option { - self.0 - .iter() - .fold( - CommonPeerId::Unknown, - |common_peer_id, protocol| match peer_id(&protocol) { - Some(peer_id) => common_peer_id.aggregate(peer_id), - None => common_peer_id, - }, - ) - .into() - } - - fn add_matching_peer_id(mut self, peer_id: Self::PeerId) -> Option { - match self.get_peer_id() { - Some(peer) => match peer == peer_id { - true => Some(self), - false => None, - }, - None => { - self.0.push(MultiaddressProtocol::P2p(peer_id.0.into())); - Some(self) - } - } - } -} - /// Name of the network protocol used by Aleph Zero. This is how messages /// are subscribed to ensure that we are gossiping and communicating with our /// own network. @@ -267,28 +123,28 @@ impl NetworkSender for SubstrateNetworkSender { type NetworkEventStream = Pin + Send>>; #[async_trait] -impl EventStream for NetworkEventStream { - async fn next_event(&mut self) -> Option> { +impl EventStream for NetworkEventStream { + async fn next_event(&mut self) -> Option> { use Event::*; use SubstrateEvent::*; loop { match self.next().await { Some(event) => match event { SyncConnected { remote } => { - return Some(Connected(Multiaddress( + return Some(Connected( iter::once(MultiaddressProtocol::P2p(remote.into())).collect(), - ))) + )) } - SyncDisconnected { remote } => return Some(Disconnected(remote.into())), + SyncDisconnected { remote } => return Some(Disconnected(remote)), NotificationStreamOpened { remote, protocol, .. } => match to_protocol(protocol.as_ref()) { - Ok(protocol) => return Some(StreamOpened(remote.into(), protocol)), + Ok(protocol) => return Some(StreamOpened(remote, protocol)), Err(_) => continue, }, NotificationStreamClosed { remote, protocol } => { match to_protocol(protocol.as_ref()) { - Ok(protocol) => return Some(StreamClosed(remote.into(), protocol)), + Ok(protocol) => return Some(StreamClosed(remote, protocol)), Err(_) => continue, } } @@ -319,7 +175,7 @@ impl Network for Arc> { type SenderError = SenderError; type NetworkSender = SubstrateNetworkSender; type PeerId = PeerId; - type Multiaddress = Multiaddress; + type Multiaddress = Multiaddr; type EventStream = NetworkEventStream; fn event_stream(&self) -> Self::EventStream { @@ -335,118 +191,22 @@ impl Network for Arc> { // Currently method `notification_sender` does not distinguish whether we are not connected to the peer // or there is no such protocol so we need to have this worthless `SenderError::CannotCreateSender` error here notification_sender: self - .notification_sender(peer_id.into(), protocol_name(&protocol)) + .notification_sender(peer_id, protocol_name(&protocol)) .map_err(|_| SenderError::CannotCreateSender(peer_id, protocol))?, peer_id, }) } fn add_reserved(&self, addresses: HashSet, protocol: Protocol) { - if let Err(e) = self.add_peers_to_reserved_set( - protocol_name(&protocol), - addresses - .into_iter() - .map(|address| address.into()) - .collect(), - ) { + if let Err(e) = self + .add_peers_to_reserved_set(protocol_name(&protocol), addresses.into_iter().collect()) + { error!(target: "aleph-network", "add_reserved failed: {}", e); } } fn remove_reserved(&self, peers: HashSet, protocol: Protocol) { - let addresses = peers.into_iter().map(|peer_id| peer_id.0).collect(); + let addresses = peers.into_iter().collect(); self.remove_peers_from_reserved_set(protocol_name(&protocol), addresses); } } - -impl NetworkIdentity for Arc> { - type PeerId = PeerId; - type Multiaddress = Multiaddress; - - fn identity(&self) -> (Vec, Self::PeerId) { - ( - self.external_addresses() - .into_iter() - .map(|address| address.into()) - .collect(), - self.local_peer_id().into(), - ) - } -} - -#[cfg(test)] -mod tests { - use codec::{Decode, Encode}; - - use super::Multiaddress; - use crate::network::Multiaddress as _; - - fn address(text: &str) -> Multiaddress { - Multiaddress(text.parse().unwrap()) - } - - #[test] - fn non_p2p_addresses_are_not_p2p() { - assert!(address("/dns4/example.com/udt/sctp/5678") - .get_peer_id() - .is_none()); - } - - #[test] - fn p2p_addresses_are_p2p() { - assert!(address( - "/dns4/example.com/tcp/30333/p2p/12D3KooWRkGLz4YbVmrsWK75VjFTs8NvaBu42xhAmQaP4KeJpw1L" - ) - .get_peer_id() - .is_some()); - } - - #[test] - fn non_p2p_address_matches_peer_id() { - let address = address( - "/dns4/example.com/tcp/30333/p2p/12D3KooWRkGLz4YbVmrsWK75VjFTs8NvaBu42xhAmQaP4KeJpw1L", - ); - let peer_id = address.get_peer_id().unwrap(); - let mut peerless_address = address.clone().0; - peerless_address.pop(); - let peerless_address = Multiaddress(peerless_address); - assert!(peerless_address.get_peer_id().is_none()); - assert_eq!( - peerless_address.add_matching_peer_id(peer_id), - Some(address), - ); - } - - #[test] - fn p2p_address_matches_own_peer_id() { - let address = address( - "/dns4/example.com/tcp/30333/p2p/12D3KooWRkGLz4YbVmrsWK75VjFTs8NvaBu42xhAmQaP4KeJpw1L", - ); - let peer_id = address.get_peer_id().unwrap(); - let expected_address = address.clone(); - assert_eq!( - address.add_matching_peer_id(peer_id), - Some(expected_address), - ); - } - - #[test] - fn p2p_address_does_not_match_other_peer_id() { - let nonmatching_address = address( - "/dns4/example.com/tcp/30333/p2p/12D3KooWRkGLz4YbVmrsWK75VjFTs8NvaBu42xhAmQaP4KeJpw1L", - ); - let peer_id = address("/dns4/peer.example.com/tcp/30333/p2p/12D3KooWFVXnvJdPuGnGYMPn5qLQAQYwmRBgo6SmEQsKZSrDoo2k").get_peer_id().unwrap(); - assert!(nonmatching_address.add_matching_peer_id(peer_id).is_none()); - } - - #[test] - fn multiaddr_encode_decode() { - let multiaddr: Multiaddress = address( - "/dns4/example.com/tcp/30333/p2p/12D3KooWRkGLz4YbVmrsWK75VjFTs8NvaBu42xhAmQaP4KeJpw1L", - ); - assert_eq!( - Multiaddress::decode(&mut &multiaddr.encode()[..]).unwrap(), - multiaddr, - ); - } -}