Skip to content

Commit 8228b54

Browse files
authored
A0-1455: Add future protocol and decision process for connection direction (#716)
* Add future protocol and decision process for connection direction Not used for now, that'll be in the next PR. * Small review comments * Massively improve xor computation * Naming in tests
1 parent 28e3a04 commit 8228b54

File tree

4 files changed

+659
-0
lines changed

4 files changed

+659
-0
lines changed
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
use std::{
2+
collections::{HashMap, HashSet},
3+
ops::BitXor,
4+
};
5+
6+
use aleph_primitives::AuthorityId;
7+
8+
use crate::validator_network::Data;
9+
10+
/// Data about peers we know and whether we should connect to them or they to us. For the former
11+
/// case also keeps the peers' addresses.
12+
pub struct DirectedPeers<A: Data> {
13+
own_id: AuthorityId,
14+
outgoing: HashMap<AuthorityId, Vec<A>>,
15+
incoming: HashSet<AuthorityId>,
16+
}
17+
18+
/// Whether we should call the remote or the other way around. We xor the peer ids and based on the
19+
/// parity of the sum of bits of the result decide whether the caller should be the smaller or
20+
/// greated lexicographically. They are never equal, because cryptography.
21+
fn should_we_call(own_id: &[u8], remote_id: &[u8]) -> bool {
22+
let xor_sum_parity = (own_id.iter().fold(0u8, BitXor::bitxor)
23+
^ remote_id.iter().fold(0u8, BitXor::bitxor))
24+
.count_ones()
25+
% 2;
26+
match xor_sum_parity == 0 {
27+
true => own_id < remote_id,
28+
false => own_id > remote_id,
29+
}
30+
}
31+
32+
impl<A: Data> DirectedPeers<A> {
33+
/// Create a new set of peers directed using our own peer id.
34+
pub fn new(own_id: AuthorityId) -> Self {
35+
DirectedPeers {
36+
own_id,
37+
outgoing: HashMap::new(),
38+
incoming: HashSet::new(),
39+
}
40+
}
41+
42+
/// Add a peer to the list of peers we want to stay connected to, or
43+
/// update the list of addresses if the peer was already added.
44+
/// Returns whether we should start attempts at connecting with the peer, which is the case
45+
/// exactly when the peer is one with which we should attempt connections AND it was added for
46+
/// the first time.
47+
pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec<A>) -> bool {
48+
match should_we_call(self.own_id.as_ref(), peer_id.as_ref()) {
49+
true => self.outgoing.insert(peer_id, addresses).is_none(),
50+
false => {
51+
// We discard the addresses here, as we will never want to call this peer anyway,
52+
// so we don't need them.
53+
self.incoming.insert(peer_id);
54+
false
55+
}
56+
}
57+
}
58+
59+
/// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer.
60+
pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option<Vec<A>> {
61+
self.outgoing.get(peer_id).cloned()
62+
}
63+
64+
/// Whether we should be maintaining a connection with this peer.
65+
pub fn interested(&self, peer_id: &AuthorityId) -> bool {
66+
self.incoming.contains(peer_id) || self.outgoing.contains_key(peer_id)
67+
}
68+
69+
/// Iterator over the peers we want connections from.
70+
pub fn incoming_peers(&self) -> impl Iterator<Item = &AuthorityId> {
71+
self.incoming.iter()
72+
}
73+
74+
/// Iterator over the peers we want to connect to.
75+
pub fn outgoing_peers(&self) -> impl Iterator<Item = &AuthorityId> {
76+
self.outgoing.keys()
77+
}
78+
79+
/// Remove a peer from the list of peers that we want to stay connected with, whether the
80+
/// connection was supposed to be incoming or outgoing.
81+
pub fn remove_peer(&mut self, peer_id: &AuthorityId) {
82+
self.incoming.remove(peer_id);
83+
self.outgoing.remove(peer_id);
84+
}
85+
}
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use aleph_primitives::AuthorityId;
90+
91+
use super::DirectedPeers;
92+
use crate::validator_network::mock::key;
93+
94+
type Address = String;
95+
96+
async fn container_with_id() -> (DirectedPeers<Address>, AuthorityId) {
97+
let (id, _) = key().await;
98+
let container = DirectedPeers::new(id.clone());
99+
(container, id)
100+
}
101+
102+
fn some_addresses() -> Vec<Address> {
103+
vec![
104+
String::from(""),
105+
String::from("a/b/c"),
106+
String::from("43.43.43.43:43000"),
107+
]
108+
}
109+
110+
#[tokio::test]
111+
async fn exactly_one_direction_attempts_connections() {
112+
let (mut container0, id0) = container_with_id().await;
113+
let (mut container1, id1) = container_with_id().await;
114+
let addresses = some_addresses();
115+
assert!(
116+
container0.add_peer(id1, addresses.clone())
117+
!= container1.add_peer(id0, addresses.clone())
118+
);
119+
}
120+
121+
async fn container_with_added_connecting_peer() -> (DirectedPeers<Address>, AuthorityId) {
122+
let (mut container0, id0) = container_with_id().await;
123+
let (mut container1, id1) = container_with_id().await;
124+
let addresses = some_addresses();
125+
match container0.add_peer(id1.clone(), addresses.clone()) {
126+
true => (container0, id1),
127+
false => {
128+
container1.add_peer(id0.clone(), addresses);
129+
(container1, id0)
130+
}
131+
}
132+
}
133+
134+
async fn container_with_added_nonconnecting_peer() -> (DirectedPeers<Address>, AuthorityId) {
135+
let (mut container0, id0) = container_with_id().await;
136+
let (mut container1, id1) = container_with_id().await;
137+
let addresses = some_addresses();
138+
match container0.add_peer(id1.clone(), addresses.clone()) {
139+
false => (container0, id1),
140+
true => {
141+
container1.add_peer(id0.clone(), addresses);
142+
(container1, id0)
143+
}
144+
}
145+
}
146+
147+
#[tokio::test]
148+
async fn no_connecting_on_subsequent_add() {
149+
let (mut container0, id1) = container_with_added_connecting_peer().await;
150+
let addresses = some_addresses();
151+
assert!(!container0.add_peer(id1, addresses));
152+
}
153+
154+
#[tokio::test]
155+
async fn peer_addresses_when_connecting() {
156+
let (container0, id1) = container_with_added_connecting_peer().await;
157+
assert!(container0.peer_addresses(&id1).is_some());
158+
}
159+
160+
#[tokio::test]
161+
async fn no_peer_addresses_when_nonconnecting() {
162+
let (container0, id1) = container_with_added_nonconnecting_peer().await;
163+
assert!(container0.peer_addresses(&id1).is_none());
164+
}
165+
166+
#[tokio::test]
167+
async fn interested_in_connecting() {
168+
let (container0, id1) = container_with_added_connecting_peer().await;
169+
assert!(container0.interested(&id1));
170+
}
171+
172+
#[tokio::test]
173+
async fn interested_in_nonconnecting() {
174+
let (container0, id1) = container_with_added_nonconnecting_peer().await;
175+
assert!(container0.interested(&id1));
176+
}
177+
178+
#[tokio::test]
179+
async fn uninterested_in_unknown() {
180+
let (container0, _) = container_with_id().await;
181+
let (_, id1) = container_with_id().await;
182+
assert!(!container0.interested(&id1));
183+
}
184+
185+
#[tokio::test]
186+
async fn connecting_are_outgoing() {
187+
let (container0, id1) = container_with_added_connecting_peer().await;
188+
assert_eq!(container0.outgoing_peers().collect::<Vec<_>>(), vec![&id1]);
189+
assert_eq!(container0.incoming_peers().next(), None);
190+
}
191+
192+
#[tokio::test]
193+
async fn nonconnecting_are_incoming() {
194+
let (container0, id1) = container_with_added_nonconnecting_peer().await;
195+
assert_eq!(container0.incoming_peers().collect::<Vec<_>>(), vec![&id1]);
196+
assert_eq!(container0.outgoing_peers().next(), None);
197+
}
198+
199+
#[tokio::test]
200+
async fn uninterested_in_removed() {
201+
let (mut container0, id1) = container_with_added_connecting_peer().await;
202+
assert!(container0.interested(&id1));
203+
container0.remove_peer(&id1);
204+
assert!(!container0.interested(&id1));
205+
}
206+
}

finality-aleph/src/validator_network/manager/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ use futures::channel::mpsc;
88

99
use crate::{network::PeerId, validator_network::Data};
1010

11+
#[allow(dead_code)]
12+
mod direction;
13+
1114
/// Error during sending data through the Manager
1215
#[derive(Debug, PartialEq, Eq)]
1316
pub enum SendError {

finality-aleph/src/validator_network/protocols/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::{
1414
mod handshake;
1515
mod negotiation;
1616
mod v0;
17+
#[allow(dead_code)]
18+
mod v1;
1719

1820
use handshake::HandshakeError;
1921
pub use negotiation::{protocol, ProtocolNegotiationError};
@@ -24,6 +26,7 @@ pub type Version = u32;
2426
/// protocol. Remove after it's no longer needed.
2527
#[derive(PartialEq, Debug, Eq, Clone, Copy)]
2628
pub enum ConnectionType {
29+
New,
2730
LegacyIncoming,
2831
LegacyOutgoing,
2932
}

0 commit comments

Comments
 (0)