Skip to content

Commit d73297f

Browse files
authored
feat(l1): add rpc endpoint admin_peers (#2732)
**Motivation** Support rpc endpoint `admin_peers` <!-- Why does this pull request exist? What are its goals? --> **Description** * Add rpc endpoint `admin_peers` * Track inbound connections * Store peer node version when starting a connection * Add `peer_handler: PeerHandler` field to `RpcContext` so we can access peers from the rpc * (Misc) `Syncer` & `SyncManager` now receive a `PeerHandler` upon creation instead of a `KademliaTable` * (Misc) Fix common typo across the project <!-- A clear and concise general description of the changes this PR introduces --> Data missing compared to geth implementation: * The local address of each connection * Whether a connection is trusted, static (we have no notion of this yet) <!-- Link to issues: Resolves #111, Resolves #222 --> Closes #2671
1 parent c716b18 commit d73297f

File tree

17 files changed

+214
-25
lines changed

17 files changed

+214
-25
lines changed

cmd/ef_tests/blockchain/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub struct BlobSchedule {
5353
}
5454

5555
impl TestUnit {
56-
/// Checks wether a test has a block where the inner_block is none.
56+
/// Checks whether a test has a block where the inner_block is none.
5757
/// These tests only check for failures in decoding invalid rlp and expect an exception.
5858
pub fn is_rlp_only_test(&self) -> bool {
5959
let mut is_rlp_only = false;

cmd/ethrex/initializers.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use ethrex_blockchain::Blockchain;
1010
use ethrex_p2p::{
1111
kademlia::KademliaTable,
1212
network::{public_key_from_signing_key, P2PContext},
13+
peer_handler::PeerHandler,
1314
sync_manager::SyncManager,
1415
types::{Node, NodeRecord},
1516
};
@@ -134,9 +135,11 @@ pub async fn init_rpc_api(
134135
tracker: TaskTracker,
135136
#[cfg(feature = "l2")] rollup_store: StoreRollup,
136137
) {
138+
let peer_handler = PeerHandler::new(peer_table);
139+
137140
// Create SyncManager
138141
let syncer = SyncManager::new(
139-
peer_table.clone(),
142+
peer_handler.clone(),
140143
opts.syncmode.clone(),
141144
cancel_token,
142145
blockchain.clone(),
@@ -153,6 +156,7 @@ pub async fn init_rpc_api(
153156
local_p2p_node,
154157
local_node_record,
155158
syncer,
159+
peer_handler,
156160
get_client_version(),
157161
#[cfg(feature = "based")]
158162
get_gateway_http_client(&l2_opts.based_opts),

crates/blockchain/payload.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ impl Blockchain {
401401
// TODO: maybe fetch hash too when filtering mempool so we don't have to compute it here (we can do this in the same refactor as adding timestamp)
402402
let tx_hash = head_tx.tx.compute_hash();
403403

404-
// Check wether the tx is replay-protected
404+
// Check whether the tx is replay-protected
405405
if head_tx.tx.protected() && !chain_config.is_eip155_activated(context.block_number()) {
406406
// Ignore replay protected tx & all txs from the sender
407407
// Pull transaction from the mempool

crates/common/trie/trie.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ mod test {
715715

716716
#[test]
717717
// The previous test needs to sort the input values in order to get rid of duplicate entries, leading to ordered insertions
718-
// This check has a fixed way of determining wether a value should be removed but doesn't require ordered insertions
718+
// This check has a fixed way of determining whether a value should be removed but doesn't require ordered insertions
719719
fn proptest_get_insert_with_removals_unsorted(data in btree_set(vec(any::<u8>(), 5..100), 1..100)) {
720720
let mut trie = Trie::new_temp();
721721
// Remove all values that have an odd first value
@@ -787,7 +787,7 @@ mod test {
787787

788788
#[test]
789789
// The previous test needs to sort the input values in order to get rid of duplicate entries, leading to ordered insertions
790-
// This check has a fixed way of determining wether a value should be removed but doesn't require ordered insertions
790+
// This check has a fixed way of determining whether a value should be removed but doesn't require ordered insertions
791791
fn proptest_compare_hash_with_removals_unsorted(data in btree_set(vec(any::<u8>(), 5..100), 1..100)) {
792792
let mut trie = Trie::new_temp();
793793
let mut cita_trie = cita_trie();
@@ -876,7 +876,7 @@ mod test {
876876

877877
#[test]
878878
// The previous test needs to sort the input values in order to get rid of duplicate entries, leading to ordered insertions
879-
// This check has a fixed way of determining wether a value should be removed but doesn't require ordered insertions
879+
// This check has a fixed way of determining whether a value should be removed but doesn't require ordered insertions
880880
fn proptest_compare_proof_with_removals_unsorted(data in btree_set(vec(any::<u8>(), 5..100), 1..100)) {
881881
let mut trie = Trie::new_temp();
882882
let mut cita_trie = cita_trie();

crates/l2/sequencer/block_producer/payload_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ pub async fn fill_transactions(
135135
// TODO: maybe fetch hash too when filtering mempool so we don't have to compute it here (we can do this in the same refactor as adding timestamp)
136136
let tx_hash = head_tx.tx.compute_hash();
137137

138-
// Check wether the tx is replay-protected
138+
// Check whether the tx is replay-protected
139139
if head_tx.tx.protected() && !chain_config.is_eip155_activated(context.block_number()) {
140140
// Ignore replay protected tx & all txs from the sender
141141
// Pull transaction from the mempool

crates/networking/p2p/kademlia.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ impl KademliaTable {
232232
}
233233

234234
/// Returns an iterator for all peers in the table that match the filter
235-
fn filter_peers<'a>(
235+
pub fn filter_peers<'a>(
236236
&'a self,
237237
filter: &'a dyn Fn(&'a PeerData) -> bool,
238238
) -> impl Iterator<Item = &'a PeerData> {
@@ -297,17 +297,20 @@ impl KademliaTable {
297297
/// Set the sender end of the channel between the kademlia table and the peer's active connection
298298
/// Set the peer's supported capabilities
299299
/// This function should be called each time a connection is established so the backend can send requests to the peers
300+
/// Receives a boolean indicating if the connection is inbound (aka if it was started by the peer and not by this node)
300301
pub(crate) fn init_backend_communication(
301302
&mut self,
302303
node_id: H256,
303304
channels: PeerChannels,
304305
capabilities: Vec<Capability>,
306+
inbound: bool,
305307
) {
306308
let peer = self.get_by_node_id_mut(node_id);
307309
if let Some(peer) = peer {
308310
peer.channels = Some(channels);
309311
peer.supported_capabilities = capabilities;
310312
peer.is_connected = true;
313+
peer.is_connection_inbound = inbound;
311314
} else {
312315
debug!(
313316
"[PEERS] Peer with node_id {:?} not found in the kademlia table when trying to init backend communication",
@@ -354,9 +357,12 @@ pub struct PeerData {
354357
pub revalidation: Option<bool>,
355358
/// communication channels between the peer data and its active connection
356359
pub channels: Option<PeerChannels>,
357-
/// Starts as false when a node is added. Set to true when a connection si active. When a
360+
/// Starts as false when a node is added. Set to true when a connection becomes active. When a
358361
/// connection fails, the peer record is removed, so no need to set it to false.
359362
pub is_connected: bool,
363+
/// Set to true if the connection is inbound (aka the connection was started by the peer and not by this node)
364+
/// It is only valid as long as is_connected is true
365+
pub is_connection_inbound: bool,
360366
}
361367

362368
impl PeerData {
@@ -375,6 +381,7 @@ impl PeerData {
375381
channels: None,
376382
supported_capabilities: vec![],
377383
is_connected: false,
384+
is_connection_inbound: false,
378385
}
379386
}
380387

crates/networking/p2p/network.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ fn listener(tcp_addr: SocketAddr) -> Result<TcpListener, io::Error> {
138138
async fn handle_peer_as_receiver(context: P2PContext, peer_addr: SocketAddr, stream: TcpStream) {
139139
let table = context.table.clone();
140140
match handshake::as_receiver(context, peer_addr, stream).await {
141-
Ok(mut conn) => conn.start(table).await,
141+
Ok(mut conn) => conn.start(table, true).await,
142142
Err(e) => {
143143
debug!("Error creating tcp connection with peer at {peer_addr}: {e}")
144144
}
@@ -157,7 +157,7 @@ pub async fn handle_peer_as_initiator(context: P2PContext, node: Node) {
157157
};
158158
let table = context.table.clone();
159159
match handshake::as_initiator(context, node.clone(), stream).await {
160-
Ok(mut conn) => conn.start(table).await,
160+
Ok(mut conn) => conn.start(table, false).await,
161161
Err(e) => {
162162
log_peer_error(&node, &format!("Error creating tcp connection {e}"));
163163
table.lock().await.replace_peer(node.node_id());

crates/networking/p2p/peer_handler.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use ethrex_trie::{verify_range, Node};
1111
use tokio::sync::Mutex;
1212

1313
use crate::{
14-
kademlia::{KademliaTable, PeerChannels},
14+
kademlia::{KademliaTable, PeerChannels, PeerData},
1515
rlpx::{
1616
eth::{
1717
blocks::{
@@ -58,6 +58,13 @@ impl PeerHandler {
5858
pub fn new(peer_table: Arc<Mutex<KademliaTable>>) -> PeerHandler {
5959
Self { peer_table }
6060
}
61+
62+
/// Creates a dummy PeerHandler for tests where interacting with peers is not needed
63+
/// This should only be used in tests as it won't be able to interact with the node's connected peers
64+
pub fn dummy() -> PeerHandler {
65+
let dummy_peer_table = Arc::new(Mutex::new(KademliaTable::new(Default::default())));
66+
PeerHandler::new(dummy_peer_table)
67+
}
6168
/// Returns the channel ends to an active peer connection that supports the given capability
6269
/// The peer is selected randomly, and doesn't guarantee that the selected peer is not currently busy
6370
/// If no peer is found, this method will try again after 10 seconds
@@ -637,4 +644,17 @@ impl PeerHandler {
637644
}
638645
None
639646
}
647+
648+
/// Returns the PeerData for each connected Peer
649+
/// Returns None if it fails to aquire the lock on the kademlia table
650+
pub fn read_connected_peers(&self) -> Option<Vec<PeerData>> {
651+
Some(
652+
self.peer_table
653+
.try_lock()
654+
.ok()?
655+
.filter_peers(&|peer| peer.is_connected)
656+
.cloned()
657+
.collect::<Vec<_>>(),
658+
)
659+
}
640660
}

crates/networking/p2p/rlpx/connection.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,11 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
143143

144144
/// Handshake already performed, now it starts a peer connection.
145145
/// It runs in it's own task and blocks until the connection is dropped
146-
pub async fn start(&mut self, table: Arc<Mutex<crate::kademlia::KademliaTable>>) {
146+
pub async fn start(
147+
&mut self,
148+
table: Arc<Mutex<crate::kademlia::KademliaTable>>,
149+
inbound: bool,
150+
) {
147151
log_peer_debug(&self.node, "Starting RLPx connection");
148152

149153
if let Err(reason) = self.post_handshake_checks(table.clone()).await {
@@ -174,6 +178,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
174178
self.node.node_id(),
175179
peer_channels,
176180
self.capabilities.clone(),
181+
inbound,
177182
);
178183
}
179184
if let Err(e) = self.connection_loop(sender, receiver).await {
@@ -295,6 +300,8 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
295300
self.negotiated_snap_version = negotiated_snap_cap.version;
296301
}
297302

303+
self.node.version = Some(hello_message.client_id);
304+
298305
Ok(())
299306
}
300307
Message::Disconnect(disconnect) => {

crates/networking/p2p/rlpx/p2p.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use ethrex_rlp::{
1212
error::{RLPDecodeError, RLPEncodeError},
1313
};
1414
use k256::PublicKey;
15+
use serde::Serialize;
1516

1617
pub const CAP_P2P_5: Capability = Capability::p2p(5);
1718
pub const CAP_ETH_68: Capability = Capability::eth(68);
@@ -69,6 +70,15 @@ impl RLPDecode for Capability {
6970
}
7071
}
7172

73+
impl Serialize for Capability {
74+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
75+
where
76+
S: serde::Serializer,
77+
{
78+
serializer.serialize_str(&format!("{}/{}", self.protocol, self.version))
79+
}
80+
}
81+
7282
#[derive(Debug)]
7383
pub(crate) struct HelloMessage {
7484
pub(crate) capabilities: Vec<Capability>,

crates/networking/p2p/sync.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,14 @@ use std::{
2626
};
2727
use storage_healing::storage_healer;
2828
use tokio::{
29-
sync::{mpsc::error::SendError, Mutex},
29+
sync::mpsc::error::SendError,
3030
time::{Duration, Instant},
3131
};
3232
use tokio_util::sync::CancellationToken;
3333
use tracing::{debug, error, info, warn};
3434
use trie_rebuild::TrieRebuilder;
3535

36-
use crate::{
37-
kademlia::KademliaTable,
38-
peer_handler::{BlockRequestOrder, PeerHandler, HASH_MAX, MAX_BLOCK_BODIES_TO_REQUEST},
39-
};
36+
use crate::peer_handler::{BlockRequestOrder, PeerHandler, HASH_MAX, MAX_BLOCK_BODIES_TO_REQUEST};
4037

4138
/// The minimum amount of blocks from the head that we want to full sync during a snap sync
4239
const MIN_FULL_BLOCKS: usize = 64;
@@ -95,14 +92,14 @@ pub struct Syncer {
9592

9693
impl Syncer {
9794
pub fn new(
98-
peer_table: Arc<Mutex<KademliaTable>>,
95+
peers: PeerHandler,
9996
snap_enabled: Arc<AtomicBool>,
10097
cancel_token: CancellationToken,
10198
blockchain: Arc<Blockchain>,
10299
) -> Self {
103100
Self {
104101
snap_enabled,
105-
peers: PeerHandler::new(peer_table),
102+
peers,
106103
last_snap_pivot: 0,
107104
trie_rebuilder: None,
108105
cancel_token,
@@ -113,10 +110,9 @@ impl Syncer {
113110
/// Creates a dummy Syncer for tests where syncing is not needed
114111
/// This should only be used in tests as it won't be able to connect to the p2p network
115112
pub fn dummy() -> Self {
116-
let dummy_peer_table = Arc::new(Mutex::new(KademliaTable::new(Default::default())));
117113
Self {
118114
snap_enabled: Arc::new(AtomicBool::new(false)),
119-
peers: PeerHandler::new(dummy_peer_table),
115+
peers: PeerHandler::dummy(),
120116
last_snap_pivot: 0,
121117
trie_rebuilder: None,
122118
// This won't be used

crates/networking/p2p/sync_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken;
1414
use tracing::{info, warn};
1515

1616
use crate::{
17-
kademlia::KademliaTable,
17+
peer_handler::PeerHandler,
1818
sync::{SyncMode, Syncer},
1919
};
2020

@@ -31,15 +31,15 @@ pub struct SyncManager {
3131

3232
impl SyncManager {
3333
pub async fn new(
34-
peer_table: Arc<Mutex<KademliaTable>>,
34+
peer_handler: PeerHandler,
3535
sync_mode: SyncMode,
3636
cancel_token: CancellationToken,
3737
blockchain: Arc<Blockchain>,
3838
store: Store,
3939
) -> Self {
4040
let snap_enabled = Arc::new(AtomicBool::new(matches!(sync_mode, SyncMode::Snap)));
4141
let syncer = Arc::new(Mutex::new(Syncer::new(
42-
peer_table,
42+
peer_handler,
4343
snap_enabled.clone(),
4444
cancel_token,
4545
blockchain,

crates/networking/p2p/types.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub struct Node {
6565
pub udp_port: u16,
6666
pub tcp_port: u16,
6767
pub public_key: H512,
68+
pub version: Option<String>,
6869
node_id: OnceLock<H256>,
6970
}
7071

@@ -119,6 +120,7 @@ impl Node {
119120
udp_port,
120121
tcp_port,
121122
public_key,
123+
version: None,
122124
node_id: OnceLock::new(),
123125
}
124126
}

crates/networking/rpc/admin/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use serde_json::Value;
55
use std::collections::HashMap;
66

77
use crate::{rpc::NodeData, utils::RpcErr};
8+
mod peers;
9+
pub use peers::peers;
810

911
#[derive(Serialize, Debug)]
1012
struct NodeInfo {

0 commit comments

Comments
 (0)