Skip to content

Commit ae0b6b2

Browse files
committed
address review
1 parent 0384495 commit ae0b6b2

File tree

7 files changed

+82
-26
lines changed

7 files changed

+82
-26
lines changed

Cargo.lock

Lines changed: 12 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ fnv = "1"
124124
fs2 = "0.4"
125125
futures = "0.3"
126126
hex = "0.4"
127+
hashlink = "0.9.0"
127128
hyper = "1"
128129
itertools = "0.10"
129130
lazy_static = "1"

beacon_node/lighthouse_network/gossipsub/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ tracing = "0.1.37"
3838
void = "1.0.2"
3939

4040
prometheus-client = "0.22.0"
41-
lru.workspace = true
41+
hashlink.workspace = true
4242

4343
[dev-dependencies]
4444
quickcheck = { workspace = true }

beacon_node/lighthouse_network/gossipsub/src/behaviour.rs

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,16 @@ use std::{
2525
collections::{BTreeSet, HashMap},
2626
fmt,
2727
net::IpAddr,
28-
num::NonZeroUsize,
2928
task::{Context, Poll},
30-
time::Duration,
29+
time::{Duration, Instant},
3130
};
3231

3332
use futures::StreamExt;
3433
use futures_ticker::Ticker;
35-
use lru::LruCache;
34+
use hashlink::LinkedHashMap;
3635
use prometheus_client::registry::Registry;
3736
use rand::{seq::SliceRandom, thread_rng};
3837

39-
use instant::Instant;
4038
use libp2p::core::{multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr};
4139
use libp2p::identity::Keypair;
4240
use libp2p::identity::PeerId;
@@ -78,8 +76,11 @@ use std::{cmp::Ordering::Equal, fmt::Debug};
7876
#[cfg(test)]
7977
mod tests;
8078

81-
/// IDONTWANT Cache capacity.
82-
const IDONTWANT_CAP: usize = 100;
79+
/// IDONTWANT cache capacity.
80+
const IDONTWANT_CAP: usize = 10_000;
81+
82+
/// IDONTWANT timeout before removal.
83+
const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
8384

8485
/// Determines if published messages should be signed or not.
8586
///
@@ -1377,6 +1378,11 @@ where
13771378
"IWANT: Peer has asked for message too many times; ignoring request"
13781379
);
13791380
} else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
1381+
if peer.dont_send.get(&id).is_some() {
1382+
tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message");
1383+
continue;
1384+
}
1385+
13801386
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
13811387
if peer
13821388
.sender
@@ -1970,7 +1976,10 @@ where
19701976
}
19711977
// if the mesh needs peers add the peer to the mesh
19721978
if !self.explicit_peers.contains(propagation_source)
1973-
&& matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub)
1979+
&& matches!(
1980+
peer.kind,
1981+
PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub | PeerKind::Gossipsubv1_2
1982+
)
19741983
&& !Self::score_below_threshold_from_scores(
19751984
&self.peer_score,
19761985
propagation_source,
@@ -2485,6 +2494,17 @@ where
24852494
}
24862495
self.failed_messages.shrink_to_fit();
24872496

2497+
// Clear stale IDONTWANTs.
2498+
for peer in self.connected_peers.values_mut() {
2499+
while let Some((_front, instant)) = peer.dont_send.front() {
2500+
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2501+
break;
2502+
} else {
2503+
peer.dont_send.pop_front();
2504+
}
2505+
}
2506+
}
2507+
24882508
tracing::debug!("Completed Heartbeat");
24892509
if let Some(metrics) = self.metrics.as_mut() {
24902510
let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
@@ -2677,9 +2697,18 @@ where
26772697
return;
26782698
};
26792699

2680-
let recipient_peers = mesh_peers.iter().filter(|peer_id| {
2681-
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
2682-
});
2700+
let iwant_peers = self
2701+
.peer_score
2702+
.as_ref()
2703+
.map(|(_peer_score, .., gossip_promises)| gossip_promises.peers_for_message(msg_id))
2704+
.unwrap_or(vec![]);
2705+
2706+
let recipient_peers = mesh_peers
2707+
.iter()
2708+
.chain(iwant_peers.iter())
2709+
.filter(|peer_id| {
2710+
*peer_id != propagation_source && Some(*peer_id) != message.source.as_ref()
2711+
});
26832712

26842713
for peer_id in recipient_peers {
26852714
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
@@ -2689,7 +2718,7 @@ where
26892718
};
26902719

26912720
// Only gossipsub 1.2 peers support IDONTWANT.
2692-
if peer.kind == PeerKind::Gossipsubv1_2 {
2721+
if peer.kind != PeerKind::Gossipsubv1_2 {
26932722
continue;
26942723
}
26952724

@@ -3121,7 +3150,7 @@ where
31213150
connections: vec![],
31223151
sender: RpcSender::new(self.config.connection_handler_queue_len()),
31233152
topics: Default::default(),
3124-
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
3153+
dont_send: LinkedHashMap::new(),
31253154
});
31263155
// Add the new connection
31273156
connected_peer.connections.push(connection_id);
@@ -3152,7 +3181,7 @@ where
31523181
connections: vec![],
31533182
sender: RpcSender::new(self.config.connection_handler_queue_len()),
31543183
topics: Default::default(),
3155-
dont_send: LruCache::new(NonZeroUsize::new(IDONTWANT_CAP).unwrap()),
3184+
dont_send: LinkedHashMap::new(),
31563185
});
31573186
// Add the new connection
31583187
connected_peer.connections.push(connection_id);
@@ -3319,7 +3348,11 @@ where
33193348
continue;
33203349
};
33213350
for message_id in message_ids {
3322-
peer.dont_send.push(message_id, ());
3351+
peer.dont_send.insert(message_id, Instant::now());
3352+
// Don't exceed capacity.
3353+
if peer.dont_send.len() > IDONTWANT_CAP {
3354+
peer.dont_send.pop_front();
3355+
}
33233356
}
33243357
}
33253358
}
@@ -3472,7 +3505,11 @@ fn get_random_peers_dynamic(
34723505
.iter()
34733506
.filter(|(_, p)| p.topics.contains(topic_hash))
34743507
.filter(|(peer_id, _)| f(peer_id))
3475-
.filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1)
3508+
.filter(|(_, p)| {
3509+
p.kind == PeerKind::Gossipsub
3510+
|| p.kind == PeerKind::Gossipsubv1_1
3511+
|| p.kind == PeerKind::Gossipsubv1_2
3512+
})
34763513
.map(|(peer_id, _)| *peer_id)
34773514
.collect::<Vec<PeerId>>();
34783515

beacon_node/lighthouse_network/gossipsub/src/gossip_promises.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ impl GossipPromises {
4141
self.promises.contains_key(message)
4242
}
4343

44+
///Get the peers we sent IWANT the input message id.
45+
pub(crate) fn peers_for_message(&self, message_id: &MessageId) -> Vec<PeerId> {
46+
self.promises
47+
.get(message_id)
48+
.map(|peers| peers.keys().copied().collect())
49+
.unwrap_or(vec![])
50+
}
51+
4452
/// Track a promise to deliver a message from a list of [`MessageId`]s we are requesting.
4553
pub(crate) fn add_promise(&mut self, peer: PeerId, messages: &[MessageId], expires: Instant) {
4654
for message_id in messages {

beacon_node/lighthouse_network/gossipsub/src/protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
4242

4343
pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId {
4444
protocol: StreamProtocol::new("/meshsub/1.2.0"),
45-
kind: PeerKind::Gossipsubv1_1,
45+
kind: PeerKind::Gossipsubv1_2,
4646
};
4747
pub(crate) const GOSSIPSUB_1_1_0_PROTOCOL: ProtocolId = ProtocolId {
4848
protocol: StreamProtocol::new("/meshsub/1.1.0"),

beacon_node/lighthouse_network/gossipsub/src/types.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,18 @@ use async_channel::{Receiver, Sender};
2525
use futures::stream::Peekable;
2626
use futures::{Future, Stream, StreamExt};
2727
use futures_timer::Delay;
28+
use hashlink::LinkedHashMap;
2829
use instant::Duration;
2930
use libp2p::identity::PeerId;
3031
use libp2p::swarm::ConnectionId;
31-
use lru::LruCache;
3232
use prometheus_client::encoding::EncodeLabelValue;
3333
use quick_protobuf::MessageWrite;
3434
use std::collections::BTreeSet;
3535
use std::fmt::Debug;
3636
use std::sync::atomic::{AtomicUsize, Ordering};
3737
use std::sync::Arc;
3838
use std::task::{Context, Poll};
39+
use std::time::Instant;
3940
use std::{fmt, pin::Pin};
4041

4142
use crate::rpc_proto::proto;
@@ -123,7 +124,7 @@ pub(crate) struct PeerConnections {
123124
/// Subscribed topics.
124125
pub(crate) topics: BTreeSet<TopicHash>,
125126
/// Don't send messages.
126-
pub(crate) dont_send: LruCache<MessageId, ()>,
127+
pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
127128
}
128129

129130
/// Describes the types of peers that can exist in the gossipsub context.
@@ -300,10 +301,10 @@ pub struct Prune {
300301
pub(crate) backoff: Option<u64>,
301302
}
302303

303-
/// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant control message.
304+
/// The node requests us to not forward message ids - IDontWant control message.
304305
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
305306
pub struct IDontWant {
306-
/// A list of known message ids (peer_id + sequence _number) as a string.
307+
/// A list of known message ids.
307308
pub(crate) message_ids: Vec<MessageId>,
308309
}
309310

@@ -568,10 +569,10 @@ impl From<Rpc> for proto::RPC {
568569
control.prune.push(rpc_prune);
569570
}
570571
ControlAction::IDontWant(IDontWant { message_ids }) => {
571-
let rpc_iwant = proto::ControlIDontWant {
572+
let rpc_idontwant = proto::ControlIDontWant {
572573
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
573574
};
574-
control.idontwant.push(rpc_iwant);
575+
control.idontwant.push(rpc_idontwant);
575576
}
576577
}
577578
}

0 commit comments

Comments
 (0)