diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index b45bd7135a..a8ec06bad4 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -1,14 +1,12 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use log::{debug, info}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocol_negotiation::{protocol, ProtocolNegotiationError}, - protocols::ProtocolError, + protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, Data, Splittable, }, }; @@ -43,7 +41,7 @@ impl From for IncomingError { async fn manage_incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); @@ -62,7 +60,7 @@ async fn manage_incoming( pub async fn incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { let addr = stream.peer_address_info(); diff --git a/finality-aleph/src/validator_network/manager.rs b/finality-aleph/src/validator_network/manager/mod.rs similarity index 93% rename from finality-aleph/src/validator_network/manager.rs rename to finality-aleph/src/validator_network/manager/mod.rs index c765c33a78..d890cd16c8 100644 --- a/finality-aleph/src/validator_network/manager.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -4,18 +4,10 @@ use std::{ }; use aleph_primitives::AuthorityId; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use crate::{network::PeerId, validator_network::Data}; -/// Network component responsible for holding the list of peers that we -/// want to connect to, and managing the established connections. -pub struct Manager { - addresses: HashMap>, - outgoing: HashMap>, - incoming: HashMap>, -} - /// Error during sending data through the Manager #[derive(Debug, PartialEq, Eq)] pub enum SendError { @@ -35,6 +27,25 @@ impl Display for SendError { } } +/// Possible results of adding connections. +#[derive(Debug, PartialEq, Eq)] +pub enum AddResult { + /// We do not want to maintain a connection with this peer. + Uninterested, + /// Connection added. + Added, + /// Old connection replaced with new one. + Replaced, +} + +/// Network component responsible for holding the list of peers that we +/// want to connect to, and managing the established connections. +pub struct Manager { + addresses: HashMap>, + outgoing: HashMap>, + incoming: HashMap>, +} + struct ManagerStatus { wanted_peers: usize, both_ways_peers: HashSet, @@ -48,7 +59,7 @@ impl ManagerStatus { let incoming: HashSet<_> = manager .incoming .iter() - .filter(|(_, exit)| !exit.is_canceled()) + .filter(|(_, exit)| !exit.is_closed()) .map(|(k, _)| k.clone()) .collect(); let outgoing: HashSet<_> = manager @@ -149,17 +160,6 @@ impl Display for ManagerStatus { } } -/// Possible results of adding connections. -#[derive(Debug, PartialEq, Eq)] -pub enum AddResult { - /// We do not want to maintain a connection with this peer. - Uninterested, - /// Connection added. - Added, - /// Old connection replaced with new one. - Replaced, -} - impl Manager { /// Create a new Manager with empty list of peers. pub fn new() -> Self { @@ -202,7 +202,11 @@ impl Manager { /// Add an established incoming connection with a known peer, /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_incoming(&mut self, peer_id: AuthorityId, exit: oneshot::Sender<()>) -> AddResult { + pub fn add_incoming( + &mut self, + peer_id: AuthorityId, + exit: mpsc::UnboundedSender, + ) -> AddResult { use AddResult::*; if !self.addresses.contains_key(&peer_id) { return Uninterested; @@ -240,10 +244,7 @@ impl Manager { #[cfg(test)] mod tests { - use futures::{ - channel::{mpsc, oneshot}, - StreamExt, - }; + use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; use crate::validator_network::mock::key; @@ -319,27 +320,27 @@ mod tests { String::from("a/b/c"), String::from("43.43.43.43:43000"), ]; - let (tx, rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded(); // try add unknown peer assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested); // rx should fail - assert!(rx.await.is_err()); + assert!(rx.try_next().expect("channel should be closed").is_none()); // add peer, this time for real assert!(manager.add_peer(peer_id.clone(), addresses.clone())); - let (tx, mut rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded(); // should just add assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added); // the exit channel should be open - assert!(rx.try_recv().is_ok()); - let (tx, mut rx2) = oneshot::channel(); + assert!(rx.try_next().is_err()); + let (tx, mut rx2) = mpsc::unbounded(); // should replace now assert_eq!(manager.add_incoming(peer_id.clone(), tx), Replaced); // receiving should fail on old, but work on new channel - assert!(rx.try_recv().is_err()); - assert!(rx2.try_recv().is_ok()); + assert!(rx.try_next().expect("channel should be closed").is_none()); + assert!(rx2.try_next().is_err()); // remove peer manager.remove_peer(&peer_id); // receiving should fail - assert!(rx2.try_recv().is_err()); + assert!(rx2.try_next().expect("channel should be closed").is_none()); } } diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/validator_network/mod.rs index 1afa9a8d56..b9f3d2524b 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/validator_network/mod.rs @@ -5,15 +5,12 @@ use codec::Codec; use sp_core::crypto::KeyTypeId; use tokio::io::{AsyncRead, AsyncWrite}; -mod handshake; -mod heartbeat; mod incoming; mod io; mod manager; #[cfg(test)] pub mod mock; mod outgoing; -mod protocol_negotiation; mod protocols; mod service; diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index 344155b4b0..37ea94b668 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -8,8 +8,9 @@ use tokio::time::{sleep, Duration}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocol_negotiation::{protocol, ProtocolNegotiationError}, - protocols::ProtocolError, + protocols::{ + protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, + }, ConnectionInfo, Data, Dialer, PeerAddressInfo, }, }; @@ -44,7 +45,8 @@ async fn manage_outgoing>( peer_id: AuthorityId, mut dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", peer_id); let stream = dialer @@ -58,7 +60,13 @@ async fn manage_outgoing>( .map_err(|e| OutgoingError::ProtocolNegotiation(peer_address_info.clone(), e))?; debug!(target: "validator-network", "Negotiated protocol, running."); protocol - .manage_outgoing(stream, authority_pen, peer_id, result_for_parent) + .manage_outgoing( + stream, + authority_pen, + peer_id, + result_for_parent, + data_for_user, + ) .await .map_err(|e| OutgoingError::Protocol(peer_address_info.clone(), e)) } @@ -73,7 +81,8 @@ pub async fn outgoing>( peer_id: AuthorityId, dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( authority_pen, @@ -81,12 +90,16 @@ pub async fn outgoing>( dialer, addresses.clone(), result_for_parent.clone(), + data_for_user, ) .await { info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", peer_id, addresses, e, RETRY_DELAY.as_secs()); sleep(RETRY_DELAY).await; - if result_for_parent.unbounded_send((peer_id, None)).is_err() { + if result_for_parent + .unbounded_send((peer_id, None, ConnectionType::LegacyOutgoing)) + .is_err() + { debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); } } diff --git a/finality-aleph/src/validator_network/handshake.rs b/finality-aleph/src/validator_network/protocols/handshake.rs similarity index 100% rename from finality-aleph/src/validator_network/handshake.rs rename to finality-aleph/src/validator_network/protocols/handshake.rs diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs new file mode 100644 index 0000000000..97f2ad4ed3 --- /dev/null +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -0,0 +1,143 @@ +use std::fmt::{Display, Error as FmtError, Formatter}; + +use aleph_primitives::AuthorityId; +use futures::channel::mpsc; + +use crate::{ + crypto::AuthorityPen, + validator_network::{ + io::{ReceiveError, SendError}, + Data, Splittable, + }, +}; + +mod handshake; +mod negotiation; +mod v0; + +use handshake::HandshakeError; +pub use negotiation::{protocol, ProtocolNegotiationError}; + +pub type Version = u32; + +/// The types of connections needed for backwards compatibility with the legacy two connections +/// protocol. Remove after it's no longer needed. +#[derive(PartialEq, Debug, Eq, Clone, Copy)] +pub enum ConnectionType { + LegacyIncoming, + LegacyOutgoing, +} + +/// What connections send back to the service after they become established. Starts with a peer id +/// of the remote node, followed by a channel for sending data to that node, with None if the +/// connection was unsuccessful and should be reestablished. Finally a marker for legacy +/// compatibility. +pub type ResultForService = ( + AuthorityId, + Option>, + ConnectionType, +); + +/// Defines the protocol for communication. +#[derive(Debug, PartialEq, Eq)] +pub enum Protocol { + /// The first version of the protocol, with unidirectional connections. + V0, +} + +/// Protocol error. +#[derive(Debug)] +pub enum ProtocolError { + /// Error during performing a handshake. + HandshakeError(HandshakeError), + /// Sending failed. + SendError(SendError), + /// Receiving failed. + ReceiveError(ReceiveError), + /// Heartbeat stopped. + CardiacArrest, + /// Channel to the parent service closed. + NoParentConnection, + /// Data channel closed. + NoUserConnection, +} + +impl Display for ProtocolError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use ProtocolError::*; + match self { + HandshakeError(e) => write!(f, "handshake error: {}", e), + SendError(e) => write!(f, "send error: {}", e), + ReceiveError(e) => write!(f, "receive error: {}", e), + CardiacArrest => write!(f, "heartbeat stopped"), + NoParentConnection => write!(f, "cannot send result to service"), + NoUserConnection => write!(f, "cannot send data to user"), + } + } +} + +impl From for ProtocolError { + fn from(e: HandshakeError) -> Self { + ProtocolError::HandshakeError(e) + } +} + +impl From for ProtocolError { + fn from(e: SendError) -> Self { + ProtocolError::SendError(e) + } +} + +impl From for ProtocolError { + fn from(e: ReceiveError) -> Self { + ProtocolError::ReceiveError(e) + } +} + +impl Protocol { + /// Minimal supported protocol version. + const MIN_VERSION: Version = 0; + + /// Maximal supported protocol version. + const MAX_VERSION: Version = 0; + + /// Launches the proper variant of the protocol (receiver half). + pub async fn manage_incoming( + &self, + stream: S, + authority_pen: AuthorityPen, + result_for_service: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> { + use Protocol::*; + match self { + V0 => v0::incoming(stream, authority_pen, result_for_service, data_for_user).await, + } + } + + /// Launches the proper variant of the protocol (sender half). + pub async fn manage_outgoing( + &self, + stream: S, + authority_pen: AuthorityPen, + peer_id: AuthorityId, + result_for_service: mpsc::UnboundedSender>, + _data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> { + use Protocol::*; + match self { + V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await, + } + } +} + +impl TryFrom for Protocol { + type Error = Version; + + fn try_from(version: Version) -> Result { + match version { + 0 => Ok(Protocol::V0), + unknown_version => Err(unknown_version), + } + } +} diff --git a/finality-aleph/src/validator_network/protocol_negotiation.rs b/finality-aleph/src/validator_network/protocols/negotiation.rs similarity index 91% rename from finality-aleph/src/validator_network/protocol_negotiation.rs rename to finality-aleph/src/validator_network/protocols/negotiation.rs index 6413dfc63b..aa6ff97f89 100644 --- a/finality-aleph/src/validator_network/protocol_negotiation.rs +++ b/finality-aleph/src/validator_network/protocols/negotiation.rs @@ -8,17 +8,13 @@ use tokio::{ time::{timeout, Duration}, }; -use crate::validator_network::protocols::Protocol; +use crate::validator_network::protocols::{Protocol, Version}; -pub type ProtocolVersion = u32; - -const MIN_SUPPORTED_PROTOCOL: ProtocolVersion = 0; -const MAX_SUPPORTED_PROTOCOL: ProtocolVersion = 0; const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(5); /// A range of supported protocols, will fail to decode if the range is empty. #[derive(Clone, Debug, PartialEq, Eq)] -pub struct ProtocolsRange(ProtocolVersion, ProtocolVersion); +pub struct ProtocolsRange(Version, Version); impl Display for ProtocolsRange { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { @@ -27,7 +23,7 @@ impl Display for ProtocolsRange { } const fn supported_protocol_range() -> ProtocolsRange { - ProtocolsRange(MIN_SUPPORTED_PROTOCOL, MAX_SUPPORTED_PROTOCOL) + ProtocolsRange(Protocol::MIN_VERSION, Protocol::MAX_VERSION) } /// What went wrong when negotiating a protocol. @@ -36,7 +32,7 @@ pub enum ProtocolNegotiationError { ConnectionClosed, InvalidRange(ProtocolsRange), ProtocolMismatch(ProtocolsRange, ProtocolsRange), - BadChoice(ProtocolVersion), + BadChoice(Version), TimedOut, } @@ -74,12 +70,8 @@ impl ProtocolsRange { fn decode(encoded: &[u8; 8]) -> Result { let result = ProtocolsRange( - ProtocolVersion::from_le_bytes( - encoded[0..4].try_into().expect("this is literally 4 bytes"), - ), - ProtocolVersion::from_le_bytes( - encoded[4..8].try_into().expect("this is literally 4 bytes"), - ), + Version::from_le_bytes(encoded[0..4].try_into().expect("this is literally 4 bytes")), + Version::from_le_bytes(encoded[4..8].try_into().expect("this is literally 4 bytes")), ); match result.valid() { true => Ok(result), @@ -103,9 +95,11 @@ fn maximum_of_intersection( range1: ProtocolsRange, range2: ProtocolsRange, ) -> Result { - intersection(range1, range2).map(|intersection| match intersection.1 { - 0 => Ok(Protocol::V0), - unknown_version => Err(ProtocolNegotiationError::BadChoice(unknown_version)), + intersection(range1, range2).map(|intersection| { + intersection + .1 + .try_into() + .map_err(ProtocolNegotiationError::BadChoice) })? } diff --git a/finality-aleph/src/validator_network/heartbeat.rs b/finality-aleph/src/validator_network/protocols/v0/heartbeat.rs similarity index 100% rename from finality-aleph/src/validator_network/heartbeat.rs rename to finality-aleph/src/validator_network/protocols/v0/heartbeat.rs diff --git a/finality-aleph/src/validator_network/protocols.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs similarity index 77% rename from finality-aleph/src/validator_network/protocols.rs rename to finality-aleph/src/validator_network/protocols/v0/mod.rs index 54581c20bb..1f80de2baf 100644 --- a/finality-aleph/src/validator_network/protocols.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -1,78 +1,23 @@ -use std::fmt::{Display, Error as FmtError, Formatter}; - use aleph_primitives::AuthorityId; -use futures::{ - channel::{mpsc, oneshot}, - StreamExt, -}; +use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ crypto::AuthorityPen, validator_network::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing, HandshakeError}, - heartbeat::{heartbeat_receiver, heartbeat_sender}, - io::{receive_data, send_data, ReceiveError, SendError}, + io::{receive_data, send_data}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, + }, Data, Splittable, }, }; -/// Defines the protocol for communication. -#[derive(Debug, PartialEq, Eq)] -pub enum Protocol { - /// The current version of the protocol. - V0, -} +mod heartbeat; -/// Protocol error. -#[derive(Debug)] -pub enum ProtocolError { - /// Error during performing a handshake. - HandshakeError(HandshakeError), - /// Sending failed. - SendError(SendError), - /// Receiving failed. - ReceiveError(ReceiveError), - /// Heartbeat stopped. - CardiacArrest, - /// Channel to the parent service closed. - NoParentConnection, - /// Data channel closed. - NoUserConnection, -} - -impl Display for ProtocolError { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - use ProtocolError::*; - match self { - HandshakeError(e) => write!(f, "handshake error: {}", e), - SendError(e) => write!(f, "send error: {}", e), - ReceiveError(e) => write!(f, "receive error: {}", e), - CardiacArrest => write!(f, "heartbeat stopped"), - NoParentConnection => write!(f, "cannot send result to service"), - NoUserConnection => write!(f, "cannot send data to user"), - } - } -} - -impl From for ProtocolError { - fn from(e: HandshakeError) -> Self { - ProtocolError::HandshakeError(e) - } -} - -impl From for ProtocolError { - fn from(e: SendError) -> Self { - ProtocolError::SendError(e) - } -} - -impl From for ProtocolError { - fn from(e: ReceiveError) -> Self { - ProtocolError::ReceiveError(e) - } -} +use heartbeat::{heartbeat_receiver, heartbeat_sender}; /// Receives data from the parent service and sends it over the network. /// Exits when the parent channel is closed, or if the network connection is broken. @@ -91,18 +36,22 @@ async fn sending( /// Performs the handshake, and then keeps sending data received from the parent service. /// Exits on parent request, or in case of broken or dead network connection. -async fn v0_outgoing( +pub async fn outgoing( stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); - let (data_for_network, data_from_user) = mpsc::unbounded::(); + let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network))) + .unbounded_send(( + peer_id.clone(), + Some(data_for_network), + ConnectionType::LegacyOutgoing, + )) .map_err(|_| ProtocolError::NoParentConnection)?; let sending = sending(sender, data_from_user); @@ -134,19 +83,23 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -async fn v0_incoming( +pub async fn incoming( stream: S, authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); - let (tx_exit, exit) = oneshot::channel(); + let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), tx_exit)) + .unbounded_send(( + peer_id.clone(), + Some(tx_exit), + ConnectionType::LegacyIncoming, + )) .map_err(|_| ProtocolError::NoParentConnection)?; let receiving = receiving(receiver, data_for_user); @@ -157,37 +110,7 @@ async fn v0_incoming( tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), result = receiving => return result, - _ = exit => return Ok(()), - } - } -} - -impl Protocol { - /// Launches the proper variant of the protocol (receiver half). - pub async fn manage_incoming( - &self, - stream: S, - authority_pen: AuthorityPen, - result_for_service: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, - data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { - use Protocol::*; - match self { - V0 => v0_incoming(stream, authority_pen, result_for_service, data_for_user).await, - } - } - - /// Launches the proper variant of the protocol (sender half). - pub async fn manage_outgoing( - &self, - stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_service: mpsc::UnboundedSender<(AuthorityId, Option>)>, - ) -> Result<(), ProtocolError> { - use Protocol::*; - match self { - V0 => v0_outgoing(stream, authority_pen, peer_id, result_for_service).await, + _ = exit.next() => return Ok(()), } } } @@ -196,15 +119,16 @@ impl Protocol { mod tests { use aleph_primitives::AuthorityId; use futures::{ - channel::{mpsc, mpsc::UnboundedReceiver, oneshot}, + channel::{mpsc, mpsc::UnboundedReceiver}, pin_mut, FutureExt, StreamExt, }; - use super::{Protocol, ProtocolError}; + use super::{incoming, outgoing, ProtocolError}; use crate::{ crypto::AuthorityPen, validator_network::{ mock::{key, MockSplittable}, + protocols::{ConnectionType, ResultForService}, Data, }, }; @@ -217,24 +141,23 @@ mod tests { impl futures::Future>, impl futures::Future>, UnboundedReceiver, - UnboundedReceiver<(AuthorityId, oneshot::Sender<()>)>, - UnboundedReceiver<(AuthorityId, Option>)>, + UnboundedReceiver>, + UnboundedReceiver>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; let (id_outgoing, pen_outgoing) = key().await; assert_ne!(id_incoming, id_outgoing); - let (incoming_result_for_service, result_from_incoming) = - mpsc::unbounded::<(AuthorityId, oneshot::Sender<()>)>(); + let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (data_for_user, data_from_incoming) = mpsc::unbounded::(); - let incoming_handle = Protocol::V0.manage_incoming( + let incoming_handle = incoming( stream_incoming, pen_incoming.clone(), incoming_result_for_service, data_for_user, ); - let outgoing_handle = Protocol::V0.manage_outgoing( + let outgoing_handle = outgoing( stream_outgoing, pen_outgoing.clone(), id_incoming.clone(), @@ -274,7 +197,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyOutgoing); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![4, 3, 43]) @@ -323,7 +247,8 @@ mod tests { _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), received = result_from_incoming.next() => { // we drop the exit oneshot channel, thus finishing incoming_handle - let (received_id, _) = received.expect("should receive"); + let (received_id, _, connection_type) = received.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyIncoming); assert_eq!(received_id, id_outgoing); }, }; @@ -382,7 +307,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyOutgoing); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![2, 1, 3, 7]) @@ -436,11 +362,12 @@ mod tests { ) = prepare::>().await; let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); // outgoing_handle got consumed by tokio::select!, the sender is dead match incoming_handle.await { Err(ProtocolError::ReceiveError(_)) => (), @@ -485,11 +412,12 @@ mod tests { ) = prepare::>().await; let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); // incoming_handle got consumed by tokio::select!, the receiver is dead match outgoing_handle.await { // We never get the SendError variant here, because we did not send anything @@ -515,11 +443,12 @@ mod tests { ) = prepare::>().await; let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); match outgoing_handle.await { Err(ProtocolError::CardiacArrest) => (), Err(e) => panic!("unexpected error: {}", e), diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index 298d8d8158..75b9198e98 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -14,6 +14,7 @@ use crate::{ incoming::incoming, manager::{AddResult, Manager}, outgoing::outgoing, + protocols::ResultForService, Data, Dialer, Listener, Network, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, @@ -116,20 +117,30 @@ impl, NL: Listener> Service, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, ) { let authority_pen = self.authority_pen.clone(); let dialer = self.dialer.clone(); + // This isn't really currently used, but soon will be. + let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_outgoing", None, async move { - outgoing(authority_pen, peer_id, dialer, addresses, result_for_parent).await; + outgoing( + authority_pen, + peer_id, + dialer, + addresses, + result_for_parent, + next_to_interface, + ) + .await; }); } fn spawn_new_incoming( &self, stream: NL::Connection, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, ) { let authority_pen = self.authority_pen.clone(); let next_to_interface = self.next_to_interface.clone(); @@ -183,7 +194,7 @@ impl, NL: Listener> Service { + Some((peer_id, Some(exit), _)) = incoming_workers.next() => { use AddResult::*; match self.manager.add_incoming(peer_id.clone(), exit) { Uninterested => info!(target: "validator-network", "Peer {} connected to us despite out lack of interest.", peer_id), @@ -193,7 +204,7 @@ impl, NL: Listener> Service { + Some((peer_id, maybe_data_for_network, _)) = outgoing_workers.next() => { use AddResult::*; if let Some(addresses) = self.manager.peer_addresses(&peer_id) { match maybe_data_for_network {