Skip to content

Commit 6ec384f

Browse files
authored
A0-1472 Validator Network log addresses (#701)
1 parent 33aca04 commit 6ec384f

File tree

7 files changed

+136
-48
lines changed

7 files changed

+136
-48
lines changed

finality-aleph/src/tcp_network.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,36 @@ use tokio::net::{
1010

1111
use crate::{
1212
network::{Multiaddress, NetworkIdentity, PeerId},
13-
validator_network::{Dialer, Listener, Splittable},
13+
validator_network::{ConnectionInfo, Dialer, Listener, Splittable},
1414
};
1515

16+
impl ConnectionInfo for TcpStream {
17+
fn peer_address_info(&self) -> String {
18+
match self.peer_addr() {
19+
Ok(addr) => addr.to_string(),
20+
Err(e) => format!("unknown address: {}", e),
21+
}
22+
}
23+
}
24+
25+
impl ConnectionInfo for OwnedWriteHalf {
26+
fn peer_address_info(&self) -> String {
27+
match self.peer_addr() {
28+
Ok(addr) => addr.to_string(),
29+
Err(e) => e.to_string(),
30+
}
31+
}
32+
}
33+
34+
impl ConnectionInfo for OwnedReadHalf {
35+
fn peer_address_info(&self) -> String {
36+
match self.peer_addr() {
37+
Ok(addr) => addr.to_string(),
38+
Err(e) => e.to_string(),
39+
}
40+
}
41+
}
42+
1643
impl Splittable for TcpStream {
1744
type Sender = OwnedWriteHalf;
1845
type Receiver = OwnedReadHalf;

finality-aleph/src/testing/mocks/validator_network.rs

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ use crate::{
2626
crypto::AuthorityPen,
2727
network::{mock::Channel, Data, Multiaddress, NetworkIdentity},
2828
validator_network::{
29-
mock::random_keys, Dialer as DialerT, Listener as ListenerT, Network, Service, Splittable,
29+
mock::random_keys, ConnectionInfo, Dialer as DialerT, Listener as ListenerT, Network,
30+
PeerAddressInfo, Service, Splittable,
3031
},
3132
};
3233

@@ -119,6 +120,7 @@ impl<D: Data> MockNetwork<D> {
119120
pub struct UnreliableDuplexStream {
120121
stream: DuplexStream,
121122
counter: Option<usize>,
123+
peer_address: Address,
122124
}
123125

124126
impl AsyncWrite for UnreliableDuplexStream {
@@ -160,33 +162,45 @@ impl AsyncRead for UnreliableDuplexStream {
160162
pub struct UnreliableSplittable {
161163
incoming_data: UnreliableDuplexStream,
162164
outgoing_data: UnreliableDuplexStream,
165+
peer_address: Address,
163166
}
164167

165168
impl UnreliableSplittable {
166169
/// Create a pair of mock splittables connected to each other.
167-
pub fn new(max_buf_size: usize, ends_after: Option<usize>) -> (Self, Self) {
168-
let (in_a, out_b) = duplex(max_buf_size);
169-
let (in_b, out_a) = duplex(max_buf_size);
170+
pub fn new(
171+
max_buf_size: usize,
172+
ends_after: Option<usize>,
173+
l_address: Address,
174+
r_address: Address,
175+
) -> (Self, Self) {
176+
let (l_in, r_out) = duplex(max_buf_size);
177+
let (r_in, l_out) = duplex(max_buf_size);
170178
(
171179
UnreliableSplittable {
172180
incoming_data: UnreliableDuplexStream {
173-
stream: in_a,
181+
stream: l_in,
174182
counter: ends_after,
183+
peer_address: r_address,
175184
},
176185
outgoing_data: UnreliableDuplexStream {
177-
stream: out_a,
186+
stream: l_out,
178187
counter: ends_after,
188+
peer_address: r_address,
179189
},
190+
peer_address: r_address,
180191
},
181192
UnreliableSplittable {
182193
incoming_data: UnreliableDuplexStream {
183-
stream: in_b,
194+
stream: r_in,
184195
counter: ends_after,
196+
peer_address: l_address,
185197
},
186198
outgoing_data: UnreliableDuplexStream {
187-
stream: out_b,
199+
stream: r_out,
188200
counter: ends_after,
201+
peer_address: l_address,
189202
},
203+
peer_address: l_address,
190204
},
191205
)
192206
}
@@ -216,6 +230,18 @@ impl AsyncWrite for UnreliableSplittable {
216230
}
217231
}
218232

233+
impl ConnectionInfo for UnreliableSplittable {
234+
fn peer_address_info(&self) -> PeerAddressInfo {
235+
self.peer_address.to_string()
236+
}
237+
}
238+
239+
impl ConnectionInfo for UnreliableDuplexStream {
240+
fn peer_address_info(&self) -> PeerAddressInfo {
241+
self.peer_address.to_string()
242+
}
243+
}
244+
219245
impl Splittable for UnreliableSplittable {
220246
type Sender = UnreliableDuplexStream;
221247
type Receiver = UnreliableDuplexStream;
@@ -234,7 +260,9 @@ const TWICE_MAX_DATA_SIZE: usize = 32 * 1024 * 1024;
234260

235261
#[derive(Clone)]
236262
pub struct MockDialer {
237-
channel_connect: mpsc::UnboundedSender<(Address, oneshot::Sender<Connection>)>,
263+
// used for logging
264+
own_address: Address,
265+
channel_connect: mpsc::UnboundedSender<(Address, Address, oneshot::Sender<Connection>)>,
238266
}
239267

240268
#[async_trait::async_trait]
@@ -245,7 +273,7 @@ impl DialerT<Address> for MockDialer {
245273
async fn connect(&mut self, addresses: Vec<Address>) -> Result<Self::Connection, Self::Error> {
246274
let (tx, rx) = oneshot::channel();
247275
self.channel_connect
248-
.unbounded_send((addresses[0], tx))
276+
.unbounded_send((self.own_address, addresses[0], tx))
249277
.expect("should send");
250278
Ok(rx.await.expect("should receive"))
251279
}
@@ -266,7 +294,7 @@ impl ListenerT for MockListener {
266294
}
267295

268296
pub struct UnreliableConnectionMaker {
269-
dialers: mpsc::UnboundedReceiver<(Address, oneshot::Sender<Connection>)>,
297+
dialers: mpsc::UnboundedReceiver<(Address, Address, oneshot::Sender<Connection>)>,
270298
listeners: Vec<mpsc::UnboundedSender<Connection>>,
271299
}
272300

@@ -288,6 +316,7 @@ impl UnreliableConnectionMaker {
288316
for id in ids.into_iter() {
289317
let (tx_listener, rx_listener) = mpsc::unbounded();
290318
let dialer = MockDialer {
319+
own_address: addr.get(&id).expect("should be there")[0],
291320
channel_connect: tx_dialer.clone(),
292321
};
293322
let listener = MockListener {
@@ -306,13 +335,19 @@ impl UnreliableConnectionMaker {
306335
pub async fn run(&mut self, connections_end_after: Option<usize>) {
307336
loop {
308337
info!(target: "validator-network", "UnreliableConnectionMaker: waiting for new request...");
309-
let (addr, c) = self.dialers.next().await.expect("should receive");
338+
let (dialer_address, listener_address, c) =
339+
self.dialers.next().await.expect("should receive");
310340
info!(target: "validator-network", "UnreliableConnectionMaker: received request");
311-
let (l_stream, r_stream) = Connection::new(4096, connections_end_after);
341+
let (dialer_stream, listener_stream) = Connection::new(
342+
4096,
343+
connections_end_after,
344+
dialer_address,
345+
listener_address,
346+
);
312347
info!(target: "validator-network", "UnreliableConnectionMaker: sending stream");
313-
c.send(l_stream).expect("should send");
314-
self.listeners[addr as usize]
315-
.unbounded_send(r_stream)
348+
c.send(dialer_stream).expect("should send");
349+
self.listeners[listener_address as usize]
350+
.unbounded_send(listener_stream)
316351
.expect("should send");
317352
}
318353
}

finality-aleph/src/validator_network/incoming.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ pub async fn incoming<D: Data, S: Splittable>(
6565
result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>,
6666
data_for_user: mpsc::UnboundedSender<D>,
6767
) {
68+
let addr = stream.peer_address_info();
6869
if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await {
69-
info!(target: "validator-network", "Incoming connection failed: {}", e);
70+
info!(target: "validator-network", "Incoming connection from {} failed: {}.", addr, e);
7071
}
7172
}

finality-aleph/src/validator_network/mock.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use aleph_primitives::{AuthorityId, KEY_TYPE};
1111
use sp_keystore::{testing::KeyStore, CryptoStore};
1212
use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf};
1313

14-
use crate::{crypto::AuthorityPen, validator_network::Splittable};
14+
use crate::{
15+
crypto::AuthorityPen,
16+
validator_network::{ConnectionInfo, PeerAddressInfo, Splittable},
17+
};
1518

1619
/// Create a random authority id and pen pair.
1720
pub async fn key() -> (AuthorityId, AuthorityPen) {
@@ -86,6 +89,18 @@ impl AsyncWrite for MockSplittable {
8689
}
8790
}
8891

92+
impl ConnectionInfo for MockSplittable {
93+
fn peer_address_info(&self) -> PeerAddressInfo {
94+
String::from("MOCK_ADDRESS")
95+
}
96+
}
97+
98+
impl ConnectionInfo for DuplexStream {
99+
fn peer_address_info(&self) -> PeerAddressInfo {
100+
String::from("MOCK_ADDRESS")
101+
}
102+
}
103+
89104
impl Splittable for MockSplittable {
90105
type Sender = DuplexStream;
91106
type Receiver = DuplexStream;

finality-aleph/src/validator_network/mod.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,18 @@ pub trait Network<A: Data, D: Data>: Send + 'static {
4949
async fn next(&mut self) -> Option<D>;
5050
}
5151

52+
pub type PeerAddressInfo = String;
53+
54+
/// Reports address of the peer that we are connected to.
55+
pub trait ConnectionInfo {
56+
/// Return the address of the peer that we are connected to.
57+
fn peer_address_info(&self) -> PeerAddressInfo;
58+
}
59+
5260
/// A stream that can be split into a sending and receiving part.
53-
pub trait Splittable: AsyncWrite + AsyncRead + Unpin + Send {
54-
type Sender: AsyncWrite + Unpin + Send;
55-
type Receiver: AsyncRead + Unpin + Send;
61+
pub trait Splittable: AsyncWrite + AsyncRead + ConnectionInfo + Unpin + Send {
62+
type Sender: AsyncWrite + ConnectionInfo + Unpin + Send;
63+
type Receiver: AsyncRead + ConnectionInfo + Unpin + Send;
5664

5765
/// Split into the sending and receiving part.
5866
fn split(self) -> (Self::Sender, Self::Receiver);

finality-aleph/src/validator_network/outgoing.rs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::fmt::{Display, Error as FmtError, Formatter};
1+
use std::fmt::{Debug, Display, Error as FmtError, Formatter};
22

33
use aleph_primitives::AuthorityId;
44
use futures::channel::mpsc;
@@ -10,39 +10,35 @@ use crate::{
1010
validator_network::{
1111
protocol_negotiation::{protocol, ProtocolNegotiationError},
1212
protocols::ProtocolError,
13-
Data, Dialer,
13+
ConnectionInfo, Data, Dialer, PeerAddressInfo,
1414
},
1515
};
1616

1717
enum OutgoingError<A: Data, ND: Dialer<A>> {
1818
Dial(ND::Error),
19-
ProtocolNegotiation(ProtocolNegotiationError),
20-
Protocol(ProtocolError),
19+
ProtocolNegotiation(PeerAddressInfo, ProtocolNegotiationError),
20+
Protocol(PeerAddressInfo, ProtocolError),
2121
}
2222

2323
impl<A: Data, ND: Dialer<A>> Display for OutgoingError<A, ND> {
2424
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
2525
use OutgoingError::*;
2626
match self {
2727
Dial(e) => write!(f, "dial error: {}", e),
28-
ProtocolNegotiation(e) => write!(f, "protocol negotiation error: {}", e),
29-
Protocol(e) => write!(f, "protocol error: {}", e),
28+
ProtocolNegotiation(addr, e) => write!(
29+
f,
30+
"communication with {} failed, protocol negotiation error: {}",
31+
addr, e
32+
),
33+
Protocol(addr, e) => write!(
34+
f,
35+
"communication with {} failed, protocol error: {}",
36+
addr, e
37+
),
3038
}
3139
}
3240
}
3341

34-
impl<A: Data, ND: Dialer<A>> From<ProtocolNegotiationError> for OutgoingError<A, ND> {
35-
fn from(e: ProtocolNegotiationError) -> Self {
36-
OutgoingError::ProtocolNegotiation(e)
37-
}
38-
}
39-
40-
impl<A: Data, ND: Dialer<A>> From<ProtocolError> for OutgoingError<A, ND> {
41-
fn from(e: ProtocolError) -> Self {
42-
OutgoingError::Protocol(e)
43-
}
44-
}
45-
4642
async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
4743
authority_pen: AuthorityPen,
4844
peer_id: AuthorityId,
@@ -55,20 +51,24 @@ async fn manage_outgoing<D: Data, A: Data, ND: Dialer<A>>(
5551
.connect(addresses)
5652
.await
5753
.map_err(OutgoingError::Dial)?;
54+
let peer_address_info = stream.peer_address_info();
5855
debug!(target: "validator-network", "Performing outgoing protocol negotiation.");
59-
let (stream, protocol) = protocol(stream).await?;
56+
let (stream, protocol) = protocol(stream)
57+
.await
58+
.map_err(|e| OutgoingError::ProtocolNegotiation(peer_address_info.clone(), e))?;
6059
debug!(target: "validator-network", "Negotiated protocol, running.");
61-
Ok(protocol
60+
protocol
6261
.manage_outgoing(stream, authority_pen, peer_id, result_for_parent)
63-
.await?)
62+
.await
63+
.map_err(|e| OutgoingError::Protocol(peer_address_info.clone(), e))
6464
}
6565

6666
const RETRY_DELAY: Duration = Duration::from_secs(10);
6767

6868
/// Establish an outgoing connection to the provided peer using the dialer and then manage it.
6969
/// While this works it will send any data from the user to the peer. Any failures will be reported
7070
/// to the parent, so that connections can be reestablished if necessary.
71-
pub async fn outgoing<D: Data, A: Data, ND: Dialer<A>>(
71+
pub async fn outgoing<D: Data, A: Data + Debug, ND: Dialer<A>>(
7272
authority_pen: AuthorityPen,
7373
peer_id: AuthorityId,
7474
dialer: ND,
@@ -79,12 +79,12 @@ pub async fn outgoing<D: Data, A: Data, ND: Dialer<A>>(
7979
authority_pen,
8080
peer_id.clone(),
8181
dialer,
82-
addresses,
82+
addresses.clone(),
8383
result_for_parent.clone(),
8484
)
8585
.await
8686
{
87-
info!(target: "validator-network", "Outgoing connection to {} failed: {}, will retry after {}s.", peer_id, e, RETRY_DELAY.as_secs());
87+
info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", peer_id, addresses, e, RETRY_DELAY.as_secs());
8888
sleep(RETRY_DELAY).await;
8989
if result_for_parent.unbounded_send((peer_id, None)).is_err() {
9090
debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service.");

finality-aleph/src/validator_network/service.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::fmt::Debug;
2+
13
use aleph_primitives::AuthorityId;
24
use futures::{
35
channel::{mpsc, oneshot},
@@ -71,7 +73,7 @@ impl<D: Data, A: Data> Network<A, D> for ServiceInterface<D, A> {
7173
}
7274

7375
/// A service that has to be run for the validator network to work.
74-
pub struct Service<D: Data, A: Data, ND: Dialer<A>, NL: Listener> {
76+
pub struct Service<D: Data, A: Data + Debug, ND: Dialer<A>, NL: Listener> {
7577
commands_from_interface: mpsc::UnboundedReceiver<ServiceCommand<D, A>>,
7678
next_to_interface: mpsc::UnboundedSender<D>,
7779
manager: Manager<A, D>,
@@ -81,7 +83,7 @@ pub struct Service<D: Data, A: Data, ND: Dialer<A>, NL: Listener> {
8183
authority_pen: AuthorityPen,
8284
}
8385

84-
impl<D: Data, A: Data, ND: Dialer<A>, NL: Listener> Service<D, A, ND, NL> {
86+
impl<D: Data, A: Data + Debug, ND: Dialer<A>, NL: Listener> Service<D, A, ND, NL> {
8587
/// Create a new validator network service plus an interface for interacting with it.
8688
pub fn new(
8789
dialer: ND,

0 commit comments

Comments
 (0)