Skip to content

A0-1455: Enable two-way connections #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 318 additions & 0 deletions finality-aleph/src/validator_network/manager/legacy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
use std::{
collections::{HashMap, HashSet},
fmt::{Display, Error as FmtError, Formatter},
};

use aleph_primitives::AuthorityId;
use futures::channel::mpsc;

use crate::{
network::PeerId,
validator_network::{
manager::{AddResult, SendError},
Data,
},
};

/// Network component responsible for holding the list of peers that we
/// want to connect to, and managing the established connections.
pub struct Manager<A: Data, D: Data> {
addresses: HashMap<AuthorityId, Vec<A>>,
outgoing: HashMap<AuthorityId, mpsc::UnboundedSender<D>>,
incoming: HashMap<AuthorityId, mpsc::UnboundedSender<D>>,
}

struct ManagerStatus {
wanted_peers: usize,
both_ways_peers: HashSet<AuthorityId>,
outgoing_peers: HashSet<AuthorityId>,
incoming_peers: HashSet<AuthorityId>,
missing_peers: HashSet<AuthorityId>,
}

impl ManagerStatus {
fn new<A: Data, D: Data>(manager: &Manager<A, D>) -> Self {
let incoming: HashSet<_> = manager
.incoming
.iter()
.filter(|(_, exit)| !exit.is_closed())
.map(|(k, _)| k.clone())
.collect();
let outgoing: HashSet<_> = manager
.outgoing
.iter()
.filter(|(_, exit)| !exit.is_closed())
.map(|(k, _)| k.clone())
.collect();

let both_ways = incoming.intersection(&outgoing).cloned().collect();
let incoming: HashSet<_> = incoming.difference(&both_ways).cloned().collect();
let outgoing: HashSet<_> = outgoing.difference(&both_ways).cloned().collect();
let missing = manager
.addresses
.keys()
.filter(|a| !both_ways.contains(a) && !incoming.contains(a) && !outgoing.contains(a))
.cloned()
.collect();

ManagerStatus {
wanted_peers: manager.addresses.len(),
both_ways_peers: both_ways,
incoming_peers: incoming,
outgoing_peers: outgoing,
missing_peers: missing,
}
}
}

impl Display for ManagerStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
if self.wanted_peers == 0 {
return write!(f, "not maintaining any connections; ");
}

write!(f, "target - {:?} connections; ", self.wanted_peers)?;

if self.both_ways_peers.is_empty() && self.incoming_peers.is_empty() {
write!(f, "WARNING! No incoming peers even though we expected tham, maybe connecting to us is impossible; ")?;
}

if !self.both_ways_peers.is_empty() {
let peers = self
.both_ways_peers
.iter()
.map(|authority_id| authority_id.to_short_string())
.collect::<Vec<_>>()
.join(", ");
write!(
f,
"both ways - {:?} [{}]; ",
self.both_ways_peers.len(),
peers,
)?;
}

if !self.incoming_peers.is_empty() {
let peers = self
.incoming_peers
.iter()
.map(|authority_id| authority_id.to_short_string())
.collect::<Vec<_>>()
.join(", ");
write!(
f,
"incoming only - {:?} [{}]; ",
self.incoming_peers.len(),
peers
)?;
}

if !self.outgoing_peers.is_empty() {
let peers = self
.outgoing_peers
.iter()
.map(|authority_id| authority_id.to_short_string())
.collect::<Vec<_>>()
.join(", ");
write!(
f,
"outgoing only - {:?} [{}];",
self.outgoing_peers.len(),
peers
)?;
}

if !self.missing_peers.is_empty() {
let peers = self
.missing_peers
.iter()
.map(|authority_id| authority_id.to_short_string())
.collect::<Vec<_>>()
.join(", ");
write!(f, "missing - {:?} [{}];", self.missing_peers.len(), peers)?;
}

Ok(())
}
}

impl<A: Data, D: Data> Manager<A, D> {
/// Create a new Manager with empty list of peers.
pub fn new() -> Self {
Manager {
addresses: HashMap::new(),
outgoing: HashMap::new(),
incoming: HashMap::new(),
}
}

/// Add a peer to the list of peers we want to stay connected to, or
/// update the list of addresses if the peer was already added.
/// Returns whether this peer is a new peer.
pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec<A>) -> bool {
self.addresses.insert(peer_id, addresses).is_none()
}

/// Return Option containing addresses of the given peer, or None if
/// the peer is unknown.
pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option<Vec<A>> {
self.addresses.get(peer_id).cloned()
}

/// Add an established outgoing connection with a known peer,
/// but only if the peer is on the list of peers that we want to stay connected with.
pub fn add_outgoing(
&mut self,
peer_id: AuthorityId,
data_for_network: mpsc::UnboundedSender<D>,
) -> AddResult {
use AddResult::*;
if !self.addresses.contains_key(&peer_id) {
return Uninterested;
}
match self.outgoing.insert(peer_id, data_for_network) {
Some(_) => Replaced,
None => Added,
}
}

/// Add an established incoming connection with a known peer,
/// but only if the peer is on the list of peers that we want to stay connected with.
pub fn add_incoming(
&mut self,
peer_id: AuthorityId,
exit: mpsc::UnboundedSender<D>,
) -> AddResult {
use AddResult::*;
if !self.addresses.contains_key(&peer_id) {
return Uninterested;
};
match self.incoming.insert(peer_id, exit) {
Some(_) => Replaced,
None => Added,
}
}

/// Remove a peer from the list of peers that we want to stay connected with.
/// Close any incoming and outgoing connections that were established.
pub fn remove_peer(&mut self, peer_id: &AuthorityId) {
self.addresses.remove(peer_id);
self.incoming.remove(peer_id);
self.outgoing.remove(peer_id);
}

/// Send data to a peer.
/// Returns error if there is no outgoing connection to the peer,
/// or if the connection is dead.
pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> {
self.outgoing
.get(peer_id)
.ok_or(SendError::PeerNotFound)?
.unbounded_send(data)
.map_err(|_| SendError::ConnectionClosed)
}

/// A status of the manager, to be displayed somewhere.
pub fn status_report(&self) -> impl Display {
ManagerStatus::new(self)
}
}

#[cfg(test)]
mod tests {
use futures::{channel::mpsc, StreamExt};

use super::{AddResult::*, Manager, SendError};
use crate::validator_network::mock::key;

type Data = String;
type Address = String;

#[tokio::test]
async fn add_remove() {
let mut manager = Manager::<Address, Data>::new();
let (peer_id, _) = key().await;
let (peer_id_b, _) = key().await;
let addresses = vec![
String::from(""),
String::from("a/b/c"),
String::from("43.43.43.43:43000"),
];
// add new peer - returns true
assert!(manager.add_peer(peer_id.clone(), addresses.clone()));
// add known peer - returns false
assert!(!manager.add_peer(peer_id.clone(), addresses.clone()));
// get address
assert_eq!(manager.peer_addresses(&peer_id), Some(addresses));
// try to get address of an unknown peer
assert_eq!(manager.peer_addresses(&peer_id_b), None);
// remove peer
manager.remove_peer(&peer_id);
// try to get address of removed peer
assert_eq!(manager.peer_addresses(&peer_id), None);
}

#[tokio::test]
async fn outgoing() {
let mut manager = Manager::<Address, Data>::new();
let data = String::from("DATA");
let (peer_id, _) = key().await;
let addresses = vec![
String::from(""),
String::from("a/b/c"),
String::from("43.43.43.43:43000"),
];
let (tx, _rx) = mpsc::unbounded();
// try add unknown peer
manager.add_outgoing(peer_id.clone(), tx);
// sending should fail
assert_eq!(
manager.send_to(&peer_id, data.clone()),
Err(SendError::PeerNotFound)
);
// add peer, this time for real
assert!(manager.add_peer(peer_id.clone(), addresses.clone()));
let (tx, mut rx) = mpsc::unbounded();
assert_eq!(manager.add_outgoing(peer_id.clone(), tx), Added);
// send and receive
assert!(manager.send_to(&peer_id, data.clone()).is_ok());
assert_eq!(data, rx.next().await.expect("should receive"));
// remove peer
manager.remove_peer(&peer_id);
// receiving should fail
assert!(rx.next().await.is_none());
}

#[tokio::test]
async fn incoming() {
let mut manager = Manager::<Address, Data>::new();
let (peer_id, _) = key().await;
let addresses = vec![
String::from(""),
String::from("a/b/c"),
String::from("43.43.43.43:43000"),
];
let (tx, mut rx) = mpsc::unbounded();
// try add unknown peer
assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested);
// rx should fail
assert!(rx.try_next().expect("channel should be closed").is_none());
// add peer, this time for real
assert!(manager.add_peer(peer_id.clone(), addresses.clone()));
let (tx, mut rx) = mpsc::unbounded();
// should just add
assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added);
// the exit channel should be open
assert!(rx.try_next().is_err());
let (tx, mut rx2) = mpsc::unbounded();
// should replace now
assert_eq!(manager.add_incoming(peer_id.clone(), tx), Replaced);
// receiving should fail on old, but work on new channel
assert!(rx.try_next().expect("channel should be closed").is_none());
assert!(rx2.try_next().is_err());
// remove peer
manager.remove_peer(&peer_id);
// receiving should fail
assert!(rx2.try_next().expect("channel should be closed").is_none());
}
}
Loading