Skip to content

A0-1585: Replace AuthorityId with parameter #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions finality-aleph/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
iter,
};

use aleph_primitives::AuthorityId;
use futures::{channel::mpsc, StreamExt};
use log::{debug, error, info, trace, warn};
use sc_service::SpawnTaskHandle;
Expand All @@ -16,7 +15,7 @@ use crate::{
AddressedData, ConnectionCommand, Data, Event, EventStream, Multiaddress, Network,
NetworkSender, Protocol,
},
validator_network::Network as ValidatorNetwork,
validator_network::{Network as ValidatorNetwork, PublicKey},
STATUS_REPORT_INTERVAL,
};

Expand All @@ -33,9 +32,11 @@ pub struct Service<
N: Network,
D: Data,
VD: Data,
A: Data + Multiaddress<PeerId = AuthorityId>,
VN: ValidatorNetwork<A, VD>,
> {
A: Data + Multiaddress,
VN: ValidatorNetwork<A::PeerId, A, VD>,
> where
A::PeerId: PublicKey,
{
network: N,
validator_network: VN,
data_from_user: mpsc::UnboundedReceiver<AddressedData<VD, A::PeerId>>,
Expand Down Expand Up @@ -85,9 +86,11 @@ impl<
N: Network,
D: Data,
VD: Data,
A: Data + Multiaddress<PeerId = AuthorityId>,
VN: ValidatorNetwork<A, VD>,
A: Data + Multiaddress,
VN: ValidatorNetwork<A::PeerId, A, VD>,
> Service<N, D, VD, A, VN>
where
A::PeerId: PublicKey,
{
pub fn new(
network: N,
Expand Down
4 changes: 2 additions & 2 deletions finality-aleph/src/nodes/validator_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::{
ConsensusParty, ConsensusPartyParams,
},
session_map::{AuthorityProviderImpl, FinalityNotificatorImpl, SessionMapUpdater},
tcp_network::new_tcp_network,
validator_network::{Service, KEY_TYPE},
tcp_network::{new_tcp_network, KEY_TYPE},
validator_network::Service,
AlephConfig,
};

Expand Down
28 changes: 27 additions & 1 deletion finality-aleph/src/tcp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ use std::{io::Result as IoResult, net::ToSocketAddrs as _};
use aleph_primitives::AuthorityId;
use codec::{Decode, Encode};
use log::info;
use sp_core::crypto::KeyTypeId;
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream, ToSocketAddrs,
};

use crate::{
crypto::{verify, AuthorityPen, Signature},
network::{Multiaddress, NetworkIdentity, PeerId},
validator_network::{ConnectionInfo, Dialer, Listener, Splittable},
validator_network::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable},
};

pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn");

impl ConnectionInfo for TcpStream {
fn peer_address_info(&self) -> String {
match self.peer_addr() {
Expand Down Expand Up @@ -66,6 +70,28 @@ impl Listener for TcpListener {

impl PeerId for AuthorityId {}

impl PublicKey for AuthorityId {
type Signature = Signature;

fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool {
verify(self, message, signature)
}
}

#[async_trait::async_trait]
impl SecretKey for AuthorityPen {
type Signature = Signature;
type PublicKey = AuthorityId;

async fn sign(&self, message: &[u8]) -> Self::Signature {
AuthorityPen::sign(self, message).await
}

fn public_key(&self) -> Self::PublicKey {
self.authority_id()
}
}

/// A representation of a single TCP address with an associated peer ID.
#[derive(Debug, Hash, Encode, Decode, Clone, PartialEq, Eq)]
pub struct TcpMultiaddress {
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/testing/mocks/validator_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct MockNetwork<D: Data> {
}

#[async_trait::async_trait]
impl<D: Data> Network<MockMultiaddress, D> for MockNetwork<D> {
impl<D: Data> Network<AuthorityId, MockMultiaddress, D> for MockNetwork<D> {
fn add_connection(&mut self, peer: AuthorityId, addresses: Vec<MockMultiaddress>) {
self.add_connection.send((peer, addresses));
}
Expand Down
26 changes: 26 additions & 0 deletions finality-aleph/src/validator_network/crypto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::{fmt::Display, hash::Hash};

use codec::Codec;

/// A public key for signature verification.
pub trait PublicKey:
Send + Sync + Eq + Clone + AsRef<[u8]> + Display + Hash + Codec + 'static
{
type Signature: Send + Sync + Clone + Codec;

/// Verify whether the message has been signed with the associated private key.
fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool;
}

/// Secret key for signing messages, with an associated public key.
#[async_trait::async_trait]
pub trait SecretKey: Clone + Send + Sync + 'static {
type Signature: Send + Sync + Clone + Codec;
type PublicKey: PublicKey<Signature = Self::Signature>;

/// Produce a signature for the provided message.
async fn sign(&self, message: &[u8]) -> Self::Signature;

/// Return the associated public key.
fn public_key(&self) -> Self::PublicKey;
}
41 changes: 19 additions & 22 deletions finality-aleph/src/validator_network/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@ use std::fmt::{Display, Error as FmtError, Formatter};
use futures::channel::mpsc;
use log::{debug, info};

use crate::{
crypto::AuthorityPen,
validator_network::{
protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService},
Data, Splittable,
},
use crate::validator_network::{
protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService},
Data, PublicKey, SecretKey, Splittable,
};

enum IncomingError {
enum IncomingError<PK: PublicKey> {
ProtocolNegotiationError(ProtocolNegotiationError),
ProtocolError(ProtocolError),
ProtocolError(ProtocolError<PK>),
}

impl Display for IncomingError {
impl<PK: PublicKey> Display for IncomingError<PK> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use IncomingError::*;
match self {
Expand All @@ -26,45 +23,45 @@ impl Display for IncomingError {
}
}

impl From<ProtocolNegotiationError> for IncomingError {
impl<PK: PublicKey> From<ProtocolNegotiationError> for IncomingError<PK> {
fn from(e: ProtocolNegotiationError) -> Self {
IncomingError::ProtocolNegotiationError(e)
}
}

impl From<ProtocolError> for IncomingError {
fn from(e: ProtocolError) -> Self {
impl<PK: PublicKey> From<ProtocolError<PK>> for IncomingError<PK> {
fn from(e: ProtocolError<PK>) -> Self {
IncomingError::ProtocolError(e)
}
}

async fn manage_incoming<D: Data, S: Splittable>(
authority_pen: AuthorityPen,
async fn manage_incoming<SK: SecretKey, D: Data, S: Splittable>(
secret_key: SK,
stream: S,
result_for_parent: mpsc::UnboundedSender<ResultForService<D>>,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), IncomingError> {
) -> Result<(), IncomingError<SK::PublicKey>> {
debug!(target: "validator-network", "Performing incoming protocol negotiation.");
let (stream, protocol) = protocol(stream).await?;
debug!(target: "validator-network", "Negotiated protocol, running.");
Ok(protocol
.manage_incoming(stream, authority_pen, result_for_parent, data_for_user)
.manage_incoming(stream, secret_key, result_for_parent, data_for_user)
.await?)
}

/// Manage an incoming connection. After the handshake it will send the recognized AuthorityId to
/// Manage an incoming connection. After the handshake it will send the recognized PublicKey to
/// the parent, together with an exit channel for this process. When this channel is dropped the
/// process ends. Whenever data arrives on this connection it will be passed to the user. Any
/// failures in receiving data result in the process stopping, we assume the other side will
/// reestablish it if necessary.
pub async fn incoming<D: Data, S: Splittable>(
authority_pen: AuthorityPen,
pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
secret_key: SK,
stream: S,
result_for_parent: mpsc::UnboundedSender<ResultForService<D>>,
result_for_parent: mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
data_for_user: mpsc::UnboundedSender<D>,
) {
let addr = stream.peer_address_info();
if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await {
if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await {
info!(target: "validator-network", "Incoming connection from {} failed: {}.", addr, e);
}
}
36 changes: 18 additions & 18 deletions finality-aleph/src/validator_network/manager/direction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ use std::{
ops::BitXor,
};

use aleph_primitives::AuthorityId;

use crate::validator_network::Data;
use crate::validator_network::{Data, PublicKey};

/// Data about peers we know and whether we should connect to them or they to us. For the former
/// case also keeps the peers' addresses.
pub struct DirectedPeers<A: Data> {
own_id: AuthorityId,
outgoing: HashMap<AuthorityId, Vec<A>>,
incoming: HashSet<AuthorityId>,
pub struct DirectedPeers<PK: PublicKey, A: Data> {
own_id: PK,
outgoing: HashMap<PK, Vec<A>>,
incoming: HashSet<PK>,
}

/// Whether we should call the remote or the other way around. We xor the peer ids and based on the
Expand All @@ -29,9 +27,9 @@ fn should_we_call(own_id: &[u8], remote_id: &[u8]) -> bool {
}
}

impl<A: Data> DirectedPeers<A> {
impl<PK: PublicKey, A: Data> DirectedPeers<PK, A> {
/// Create a new set of peers directed using our own peer id.
pub fn new(own_id: AuthorityId) -> Self {
pub fn new(own_id: PK) -> Self {
DirectedPeers {
own_id,
outgoing: HashMap::new(),
Expand All @@ -44,7 +42,7 @@ impl<A: Data> DirectedPeers<A> {
/// Returns whether we should start attempts at connecting with the peer, which is the case
/// exactly when the peer is one with which we should attempt connections AND it was added for
/// the first time.
pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec<A>) -> bool {
pub fn add_peer(&mut self, peer_id: PK, addresses: Vec<A>) -> bool {
match should_we_call(self.own_id.as_ref(), peer_id.as_ref()) {
true => self.outgoing.insert(peer_id, addresses).is_none(),
false => {
Expand All @@ -57,28 +55,28 @@ impl<A: Data> DirectedPeers<A> {
}

/// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer.
pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option<Vec<A>> {
pub fn peer_addresses(&self, peer_id: &PK) -> Option<Vec<A>> {
self.outgoing.get(peer_id).cloned()
}

/// Whether we should be maintaining a connection with this peer.
pub fn interested(&self, peer_id: &AuthorityId) -> bool {
pub fn interested(&self, peer_id: &PK) -> bool {
self.incoming.contains(peer_id) || self.outgoing.contains_key(peer_id)
}

/// Iterator over the peers we want connections from.
pub fn incoming_peers(&self) -> impl Iterator<Item = &AuthorityId> {
pub fn incoming_peers(&self) -> impl Iterator<Item = &PK> {
self.incoming.iter()
}

/// Iterator over the peers we want to connect to.
pub fn outgoing_peers(&self) -> impl Iterator<Item = &AuthorityId> {
pub fn outgoing_peers(&self) -> impl Iterator<Item = &PK> {
self.outgoing.keys()
}

/// Remove a peer from the list of peers that we want to stay connected with, whether the
/// connection was supposed to be incoming or outgoing.
pub fn remove_peer(&mut self, peer_id: &AuthorityId) {
pub fn remove_peer(&mut self, peer_id: &PK) {
self.incoming.remove(peer_id);
self.outgoing.remove(peer_id);
}
Expand All @@ -93,7 +91,7 @@ mod tests {

type Address = String;

async fn container_with_id() -> (DirectedPeers<Address>, AuthorityId) {
async fn container_with_id() -> (DirectedPeers<AuthorityId, Address>, AuthorityId) {
let (id, _) = key().await;
let container = DirectedPeers::new(id.clone());
(container, id)
Expand All @@ -118,7 +116,8 @@ mod tests {
);
}

async fn container_with_added_connecting_peer() -> (DirectedPeers<Address>, AuthorityId) {
async fn container_with_added_connecting_peer(
) -> (DirectedPeers<AuthorityId, Address>, AuthorityId) {
let (mut container0, id0) = container_with_id().await;
let (mut container1, id1) = container_with_id().await;
let addresses = some_addresses();
Expand All @@ -131,7 +130,8 @@ mod tests {
}
}

async fn container_with_added_nonconnecting_peer() -> (DirectedPeers<Address>, AuthorityId) {
async fn container_with_added_nonconnecting_peer(
) -> (DirectedPeers<AuthorityId, Address>, AuthorityId) {
let (mut container0, id0) = container_with_id().await;
let (mut container1, id1) = container_with_id().await;
let addresses = some_addresses();
Expand Down
Loading