Skip to content

A0-1576: Move data network into its own module #809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions finality-aleph/src/abft/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
},
crypto::Signature,
data_io::{AlephData, OrderedDataInterpreter},
network::DataNetwork,
network::data::Network,
oneshot,
party::{
backup::ABFTBackup,
Expand All @@ -27,7 +27,7 @@ pub const VERSION: u32 = 1;
pub fn run_member<
B: Block,
C: HeaderBackend<B> + Send + 'static,
ADN: DataNetwork<CurrentNetworkData<B>> + 'static,
ADN: Network<CurrentNetworkData<B>> + 'static,
>(
subtask_common: SubtaskCommon,
multikeychain: Keychain,
Expand Down
4 changes: 2 additions & 2 deletions finality-aleph/src/abft/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
NetworkWrapper, SpawnHandleT,
},
data_io::{AlephData, OrderedDataInterpreter},
network::DataNetwork,
network::data::Network,
oneshot,
party::{
backup::ABFTBackup,
Expand All @@ -26,7 +26,7 @@ pub const VERSION: u32 = 0;
pub fn run_member<
B: Block,
C: HeaderBackend<B> + Send + 'static,
ADN: DataNetwork<LegacyNetworkData<B>> + 'static,
ADN: Network<LegacyNetworkData<B>> + 'static,
>(
subtask_common: SubtaskCommon,
multikeychain: Keychain,
Expand Down
12 changes: 6 additions & 6 deletions finality-aleph/src/abft/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
abft::SignatureSet,
crypto::Signature,
data_io::{AlephData, AlephNetworkMessage},
network::{Data, DataNetwork},
network::{data::Network, Data},
Hasher, Recipient,
};

Expand All @@ -34,12 +34,12 @@ impl<B: Block> AlephNetworkMessage<B>
}

/// A wrapper needed only because of type system theoretical constraints. Sadness.
pub struct NetworkWrapper<D: Data, DN: DataNetwork<D>> {
pub struct NetworkWrapper<D: Data, DN: Network<D>> {
inner: DN,
_phantom: PhantomData<D>,
}

impl<D: Data, DN: DataNetwork<D>> From<DN> for NetworkWrapper<D, DN> {
impl<D: Data, DN: Network<D>> From<DN> for NetworkWrapper<D, DN> {
fn from(inner: DN) -> Self {
NetworkWrapper {
inner,
Expand All @@ -48,7 +48,7 @@ impl<D: Data, DN: DataNetwork<D>> From<DN> for NetworkWrapper<D, DN> {
}
}

impl<D: Data, DN: DataNetwork<D>> NetworkWrapper<D, DN> {
impl<D: Data, DN: Network<D>> NetworkWrapper<D, DN> {
fn send<R>(&self, data: D, recipient: R)
where
R: Into<Recipient>,
Expand All @@ -64,7 +64,7 @@ impl<D: Data, DN: DataNetwork<D>> NetworkWrapper<D, DN> {
}

#[async_trait::async_trait]
impl<D: Data, DN: DataNetwork<D>> current_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
impl<D: Data, DN: Network<D>> current_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
fn send(&self, data: D, recipient: current_aleph_bft::Recipient) {
NetworkWrapper::send(self, data, recipient)
}
Expand All @@ -75,7 +75,7 @@ impl<D: Data, DN: DataNetwork<D>> current_aleph_bft::Network<D> for NetworkWrapp
}

#[async_trait::async_trait]
impl<D: Data, DN: DataNetwork<D>> legacy_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
impl<D: Data, DN: Network<D>> legacy_aleph_bft::Network<D> for NetworkWrapper<D, DN> {
fn send(&self, data: D, recipient: legacy_aleph_bft::Recipient) {
NetworkWrapper::send(self, data, recipient)
}
Expand Down
25 changes: 14 additions & 11 deletions finality-aleph/src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use crate::{
crypto::Signature,
metrics::Checkpoint,
mpsc,
network::{Data, DataNetwork, SendError},
network::{
data::{Network, SendError},
Data,
},
Keychain, Metrics,
};

Expand Down Expand Up @@ -50,8 +53,8 @@ pub type CurrentAggregator<'a, B, N> = current_aleph_aggregator::IO<
enum EitherAggregator<'a, B, CN, LN>
where
B: Block,
LN: DataNetwork<LegacyRmcNetworkData<B>>,
CN: DataNetwork<CurrentRmcNetworkData<B>>,
LN: Network<LegacyRmcNetworkData<B>>,
CN: Network<CurrentRmcNetworkData<B>>,
<B as Block>::Hash: AsRef<[u8]>,
{
Current(CurrentAggregator<'a, B, CN>),
Expand All @@ -63,8 +66,8 @@ where
pub struct Aggregator<'a, B, CN, LN>
where
B: Block,
LN: DataNetwork<LegacyRmcNetworkData<B>>,
CN: DataNetwork<CurrentRmcNetworkData<B>>,
LN: Network<LegacyRmcNetworkData<B>>,
CN: Network<CurrentRmcNetworkData<B>>,
<B as Block>::Hash: AsRef<[u8]>,
{
agg: EitherAggregator<'a, B, CN, LN>,
Expand All @@ -73,8 +76,8 @@ where
impl<'a, B, CN, LN> Aggregator<'a, B, CN, LN>
where
B: Block,
LN: DataNetwork<LegacyRmcNetworkData<B>>,
CN: DataNetwork<CurrentRmcNetworkData<B>>,
LN: Network<LegacyRmcNetworkData<B>>,
CN: Network<CurrentRmcNetworkData<B>>,
<B as Block>::Hash: AsRef<[u8]>,
{
pub fn new_legacy(
Expand Down Expand Up @@ -163,9 +166,9 @@ where
}
}

pub struct NetworkWrapper<D: Data, N: DataNetwork<D>>(N, PhantomData<D>);
pub struct NetworkWrapper<D: Data, N: Network<D>>(N, PhantomData<D>);

impl<D: Data, N: DataNetwork<D>> NetworkWrapper<D, N> {
impl<D: Data, N: Network<D>> NetworkWrapper<D, N> {
pub fn new(network: N) -> Self {
Self(network, PhantomData)
}
Expand All @@ -186,7 +189,7 @@ impl<H: Debug + Hash + Eq + Debug + Copy> current_aleph_aggregator::Metrics<H> f
#[async_trait::async_trait]
impl<T, D> legacy_aleph_aggregator::ProtocolSink<D> for NetworkWrapper<D, T>
where
T: DataNetwork<D>,
T: Network<D>,
D: Data,
{
async fn next(&mut self) -> Option<D> {
Expand All @@ -207,7 +210,7 @@ where
#[async_trait::async_trait]
impl<T, D> current_aleph_aggregator::ProtocolSink<D> for NetworkWrapper<D, T>
where
T: DataNetwork<D>,
T: Network<D>,
D: Data,
{
async fn next(&mut self) -> Option<D> {
Expand Down
12 changes: 9 additions & 3 deletions finality-aleph/src/data_io/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ use crate::{
status_provider::get_proposal_status,
AlephNetworkMessage,
},
network::{ComponentNetwork, DataNetwork, ReceiverComponent, RequestBlocks, SimpleNetwork},
network::{
data::{
component::{Network as ComponentNetwork, Receiver, SimpleNetwork},
Network as DataNetwork,
},
RequestBlocks,
},
BlockHashNum, SessionBoundaries,
};

Expand Down Expand Up @@ -174,7 +180,7 @@ where
RB: RequestBlocks<B> + 'static,
Message:
AlephNetworkMessage<B> + std::fmt::Debug + Send + Sync + Clone + codec::Codec + 'static,
R: ReceiverComponent<Message>,
R: Receiver<Message>,
{
next_free_id: MessageId,
pending_proposals: HashMap<AlephProposal<B>, PendingProposalInfo<B>>,
Expand All @@ -201,7 +207,7 @@ where
RB: RequestBlocks<B> + 'static,
Message:
AlephNetworkMessage<B> + std::fmt::Debug + Send + Sync + Clone + codec::Codec + 'static,
R: ReceiverComponent<Message>,
R: Receiver<Message>,
{
/// Returns a struct to be run and a network that outputs messages filtered as appropriate
pub fn new<N: ComponentNetwork<Message, R = R>>(
Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time::Duration;
use crate::{
abft::{CurrentNetworkData, LegacyNetworkData},
aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData},
network::{protocol_name, Split},
network::{data::split::Split, protocol_name},
session::{
first_block_of_session, last_block_of_session, session_id_from_block_num,
SessionBoundaries, SessionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use futures::{channel::mpsc, StreamExt};
use log::warn;

use crate::{
network::{Data, DataNetwork, SendError},
network::{
data::{Network as DataNetwork, SendError},
Data,
},
Recipient,
};

Expand Down Expand Up @@ -185,8 +188,11 @@ mod tests {
use super::{DataNetwork, NetworkMap, Receiver, Sender};
use crate::{
network::{
component::{Network, ReceiverMap, SenderMap},
Data, SendError,
data::{
component::{Network, ReceiverMap, SenderMap},
SendError,
},
Data,
},
Recipient,
};
Expand Down
18 changes: 18 additions & 0 deletions finality-aleph/src/network/data/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//! Abstraction over an abstract network sending data to a set of nodes.
use crate::{abft::Recipient, network::Data};

pub mod component;
pub mod split;

/// Returned when something went wrong when sending data using a Network.
#[derive(Debug)]
pub enum SendError {
SendFailed,
}

/// A generic interface for sending and receiving data.
#[async_trait::async_trait]
pub trait Network<D: Data>: Send + Sync {
fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError>;
async fn next(&mut self) -> Option<D>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ use tokio::sync::Mutex;

use crate::{
network::{
ComponentNetwork, ComponentNetworkExt, Data, ReceiverComponent, SendError, SenderComponent,
SimpleNetwork,
data::{
component::{Network, NetworkExt, Receiver, Sender, SimpleNetwork},
SendError,
},
Data,
},
Recipient, Version, Versioned,
};
Expand Down Expand Up @@ -65,7 +68,7 @@ impl<A: Data, B: Data> Convert for ToRightSplitConvert<A, B> {
struct SplitSender<
LeftData: Data,
RightData: Data,
S: SenderComponent<Split<LeftData, RightData>>,
S: Sender<Split<LeftData, RightData>>,
Conv: Convert,
> {
sender: S,
Expand All @@ -75,9 +78,9 @@ struct SplitSender<
impl<
LeftData: Data,
RightData: Data,
S: SenderComponent<Split<LeftData, RightData>>,
S: Sender<Split<LeftData, RightData>>,
Conv: Convert<To = Split<LeftData, RightData>> + Clone + Send + Sync,
> SenderComponent<Conv::From> for SplitSender<LeftData, RightData, S, Conv>
> Sender<Conv::From> for SplitSender<LeftData, RightData, S, Conv>
where
<Conv as Convert>::From: Data,
<Conv as Convert>::To: Data,
Expand All @@ -96,7 +99,7 @@ type RightSender<LeftData, RightData, S> =
struct SplitReceiver<
LeftData: Data,
RightData: Data,
R: ReceiverComponent<Split<LeftData, RightData>>,
R: Receiver<Split<LeftData, RightData>>,
TranslatedData: Data,
> {
receiver: Arc<Mutex<R>>,
Expand All @@ -110,9 +113,9 @@ struct SplitReceiver<
impl<
LeftData: Data,
RightData: Data,
R: ReceiverComponent<Split<LeftData, RightData>>,
R: Receiver<Split<LeftData, RightData>>,
TranslatedData: Data,
> ReceiverComponent<TranslatedData> for SplitReceiver<LeftData, RightData, R, TranslatedData>
> Receiver<TranslatedData> for SplitReceiver<LeftData, RightData, R, TranslatedData>
{
async fn next(&mut self) -> Option<TranslatedData> {
loop {
Expand All @@ -137,7 +140,7 @@ type RightReceiver<LeftData, RightData, R> = SplitReceiver<LeftData, RightData,
async fn forward_or_wait<
LeftData: Data,
RightData: Data,
R: ReceiverComponent<Split<LeftData, RightData>>,
R: Receiver<Split<LeftData, RightData>>,
>(
receiver: &Arc<Mutex<R>>,
left_sender: &mpsc::UnboundedSender<LeftData>,
Expand Down Expand Up @@ -169,7 +172,7 @@ async fn forward_or_wait<
}
}

fn split_sender<LeftData: Data, RightData: Data, S: SenderComponent<Split<LeftData, RightData>>>(
fn split_sender<LeftData: Data, RightData: Data, S: Sender<Split<LeftData, RightData>>>(
sender: S,
) -> (
LeftSender<LeftData, RightData, S>,
Expand All @@ -187,11 +190,7 @@ fn split_sender<LeftData: Data, RightData: Data, S: SenderComponent<Split<LeftDa
)
}

fn split_receiver<
LeftData: Data,
RightData: Data,
R: ReceiverComponent<Split<LeftData, RightData>>,
>(
fn split_receiver<LeftData: Data, RightData: Data, R: Receiver<Split<LeftData, RightData>>>(
receiver: R,
left_name: &'static str,
right_name: &'static str,
Expand Down Expand Up @@ -229,14 +228,11 @@ fn split_receiver<
///
/// The main example for now is creating an `aleph_bft::Network` and a separate one for accumulating
/// signatures for justifications.
pub fn split<LeftData: Data, RightData: Data, CN: ComponentNetwork<Split<LeftData, RightData>>>(
pub fn split<LeftData: Data, RightData: Data, CN: Network<Split<LeftData, RightData>>>(
network: CN,
left_name: &'static str,
right_name: &'static str,
) -> (
impl ComponentNetworkExt<LeftData>,
impl ComponentNetworkExt<RightData>,
) {
) -> (impl NetworkExt<LeftData>, impl NetworkExt<RightData>) {
let (sender, receiver) = network.into();
let (left_sender, right_sender) = split_sender(sender);
let (left_receiver, right_receiver) = split_receiver(receiver, left_name, right_name);
Expand Down
Loading