Skip to content

[1.2.0] P2P: BP gossip connections #1616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <eosio/chain/controller.hpp>
#include <eosio/chain/producer_schedule.hpp>

#include <boost/unordered/unordered_flat_set.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/range/adaptor/transformed.hpp>

Expand All @@ -27,6 +28,8 @@ class bp_connection_manager {
static constexpr fc::microseconds my_bp_gossip_peer_expiration = fc::minutes(30); // resend my bp_peer info every 30 minutes
static constexpr fc::microseconds bp_gossip_peer_expiration_variance = bp_gossip_peer_expiration + fc::minutes(15);

using address_set_t = boost::unordered_flat_set<std::string>;

gossip_bp_index_t gossip_bps;

struct bp_gossip_endpoint_t {
Expand Down Expand Up @@ -471,8 +474,8 @@ class bp_connection_manager {
return false;
}

flat_set<std::string> find_gossip_bp_addresses(const name_set_t& accounts, const char* desc) const {
flat_set<std::string> addresses;
address_set_t find_gossip_bp_addresses(const name_set_t& accounts, const char* desc) const {
address_set_t addresses;
fc::lock_guard g(gossip_bps.mtx);
const auto& prod_idx = gossip_bps.index.get<by_producer>();
for (const auto& account : accounts) {
Expand All @@ -489,11 +492,22 @@ class bp_connection_manager {
return addresses;
}

address_set_t all_gossip_bp_addresses(const char* desc) const {
address_set_t addresses;
fc::lock_guard g(gossip_bps.mtx);
const auto& prod_idx = gossip_bps.index.get<by_producer>();
for (auto& i : prod_idx) {
fc_dlog(self()->get_logger(), "${d} gossip bp peer ${p}", ("d", desc)("p", i.server_endpoint()));
addresses.insert(i.server_endpoint());
}
return addresses;
}

// thread-safe
void connect_to_active_bp_peers() {
// do not hold mutexes when calling resolve_and_connect which acquires connections mutex since other threads
// can be holding connections mutex when trying to acquire these mutexes
flat_set<std::string> addresses;
address_set_t addresses;
{
fc::lock_guard gm(mtx);
active_bps = active_bp_accounts(active_schedule);
Expand Down Expand Up @@ -524,7 +538,7 @@ class bp_connection_manager {

// do not hold mutexes when calling resolve_and_connect which acquires connections mutex since other threads
// can be holding connections mutex when trying to acquire these mutexes
flat_set<std::string> addresses = find_gossip_bp_addresses(pending_connections, "connect");
address_set_t addresses = find_gossip_bp_addresses(pending_connections, "connect");
for (const auto& add : addresses) {
self()->connections.resolve_and_connect(add, self()->get_first_p2p_address());
}
Expand Down Expand Up @@ -573,9 +587,16 @@ class bp_connection_manager {
std::inserter(peers_to_drop, peers_to_drop.end()));
fc_dlog(self()->get_logger(), "peers to drop: ${p}", ("p", to_string(peers_to_drop)));

flat_set<std::string> addresses = find_gossip_bp_addresses(peers_to_drop, "disconnect");
// if we dropped out of active schedule then disconnect from all
bool disconnect_from_all = !config.my_bp_gossip_accounts.empty() &&
std::all_of(config.my_bp_gossip_accounts.begin(), config.my_bp_gossip_accounts.end(),
[&](const auto& e) { return peers_to_drop.contains(e.first); });

address_set_t addresses = disconnect_from_all
? all_gossip_bp_addresses("disconnect")
: find_gossip_bp_addresses(peers_to_drop, "disconnect");
for (const auto& add : addresses) {
self()->connections.disconnect(add);
self()->connections.disconnect_gossip_connection(add);
}

active_schedule_version = schedule.version;
Expand Down
14 changes: 14 additions & 0 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ namespace eosio {
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address);
string disconnect(const string& host);
void disconnect_gossip_connection(const string& host);
void close_all();

std::optional<connection_status> status(const string& host) const;
Expand Down Expand Up @@ -4832,6 +4833,19 @@ namespace eosio {
return true;
}

void connections_manager::disconnect_gossip_connection(const string& host) {
std::lock_guard g( connections_mtx );
// do not disconnect if a p2p-peer-address
if (supplied_peers.contains(host))
return;
auto& index = connections.get<by_host>();
if( auto i = index.find( host ); i != index.end() ) {
fc_ilog( logger, "disconnecting: ${cid}", ("cid", i->c->connection_id) );
i->c->close();
connections.erase(i);
}
}

// called by API
string connections_manager::disconnect( const string& host ) {
std::lock_guard g( connections_mtx );
Expand Down
7 changes: 4 additions & 3 deletions plugins/net_plugin/tests/auto_bp_peering_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct mock_connections_manager {
std::vector<std::shared_ptr<mock_connection>> connections;

std::function<void(std::string, std::string)> resolve_and_connect;
std::function<void(std::string)> disconnect;
std::function<void(std::string)> disconnect_gossip_connection;

uint32_t get_max_client_count() const { return max_client_count; }

Expand Down Expand Up @@ -186,6 +186,7 @@ BOOST_AUTO_TEST_CASE(test_on_pending_schedule) {
BOOST_TEST(plugin.pending_bps == producers_minus_prodkt);

// all connect to bp peers should be invoked
std::ranges::sort(connected_hosts);
BOOST_CHECK_EQUAL(connected_hosts, peer_addresses);

BOOST_CHECK_EQUAL(plugin.pending_schedule_version, 1u);
Expand All @@ -211,7 +212,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule1) {
plugin.connections.resolve_and_connect = [](std::string host, std::string p2p_address) {};

std::vector<std::string> disconnected_hosts;
plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };
plugin.connections.disconnect_gossip_connection = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };

// make sure nothing happens when it is not in_sync
plugin.lib_catchup = true;
Expand Down Expand Up @@ -242,7 +243,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule2) {
plugin.set_active_bps( { "proda"_n, "prodh"_n, "prodn"_n, "prodt"_n } );
plugin.connections.resolve_and_connect = [](std::string host, std::string p2p_address) {};
std::vector<std::string> disconnected_hosts;
plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };
plugin.connections.disconnect_gossip_connection = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };

// when pending and active schedules are changed simultaneously
plugin.lib_catchup = false;
Expand Down
7 changes: 7 additions & 0 deletions tests/TestHarness/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,13 @@ def getBlockProducer(self, timeout=None, waitForBlock=True, exitOnError=True, bl
return None
return NodeosQueries.getBlockAttribute(block, "producer", blockNum, exitOnError=exitOnError)

def getProducerSchedule(self):
scheduled_producers = []
schedule = self.processUrllibRequest("chain", "get_producer_schedule")
for prod in schedule["payload"]["active"]["producers"]:
scheduled_producers.append(prod["producer_name"])
return scheduled_producers

def getNextCleanProductionCycle(self, trans):
rounds=21*12*2 # max time to ensure that at least 2/3+1 of producers x blocks per producer x at least 2 times
if trans is not None:
Expand Down
167 changes: 116 additions & 51 deletions tests/auto_bp_gossip_peering_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import copy
import signal

from TestHarness import Cluster, TestHelper, Utils, WalletMgr, createAccountKeys
from TestHarness import Cluster, TestHelper, Utils, WalletMgr, CORE_SYMBOL, createAccountKeys

###############################################################
# auto_bp_gossip_peering_test
#
# This test sets up a cluster with 21 producers nodeos, each nodeos is configured with only one producer and only
# This test sets up a cluster with 21 producers nodeos, each nodeos is configured with only one producer and only
# connects to the bios node. Moreover, each producer nodeos is also configured with a p2p-bp-gossip-endpoint so that
# each one can automatically establish p2p connections to other bps. Test verifies connections are established when
# producer schedule is active.
#
# Test then changes the producer schedule and verifies that connections change appropriately.
# Also verifies manual connections are maintained and that non-producers disconnect from producers when they are no
# longer in the schedule.
#
###############################################################

Print = Utils.Print
Expand Down Expand Up @@ -64,6 +68,14 @@ def getHostName(nodeId):
accounts=createAccountKeys(21)
if accounts is None:
Utils.errorExit("FAILURE - create keys")
voteAccounts=createAccountKeys(5)
if voteAccounts is None:
Utils.errorExit("FAILURE - create keys")
voteAccounts[0].name="tester111111"
voteAccounts[1].name="tester222222"
voteAccounts[2].name="tester333333"
voteAccounts[3].name="tester444444"
voteAccounts[4].name="tester555555"

if walletMgr.launch() is False:
errorExit("Failed to stand up keosd.")
Expand Down Expand Up @@ -96,26 +108,45 @@ def getHostName(nodeId):
Print("Creating wallet \"%s\"" % (testWalletName))
walletAccounts=copy.deepcopy(cluster.defProducerAccounts)
testWallet = walletMgr.create(testWalletName, walletAccounts.values())
walletMgr.importKeys(voteAccounts, testWallet)
all_acc = accounts + list( cluster.defProducerAccounts.values() )
for account in all_acc:
Print("Importing keys for account %s into wallet %s." % (account.name, testWallet.name))
if not walletMgr.importKey(account, testWallet):
errorExit("Failed to import key for account %s" % (account.name))

for i in range(0, producerNodes):
node=cluster.getNode(i)
node.producers=Cluster.parseProducers(i)
for prod in node.producers:
trans=cluster.biosNode.regproducer(cluster.defProducerAccounts[prod], "http::/mysite.com", 0,
waitForTransBlock=False, silentErrors=False)
Print("Setup vote accounts so they can vote")
# create accounts via eosio as otherwise a bid is needed
for account in voteAccounts:
Print("Create new account %s via %s" % (account.name, cluster.eosioAccount.name))
trans=cluster.biosNode.createInitializeAccount(account, cluster.eosioAccount, stakedDeposit=0, waitForTransBlock=False, stakeNet=1000, stakeCPU=1000, buyRAM=1000, exitOnError=True)
cluster.biosNode.waitForTransactionInBlock(trans['transaction_id'])
transferAmount="100000000.0000 {0}".format(CORE_SYMBOL)
Print("Transfer funds %s from account %s to %s" % (transferAmount, cluster.eosioAccount.name, account.name))
trans=cluster.biosNode.transferFunds(cluster.eosioAccount, account, transferAmount, "test transfer", waitForTransBlock=False)
cluster.biosNode.waitForTransactionInBlock(trans['transaction_id'])
trans=cluster.biosNode.delegatebw(account, 20000000.0000, 20000000.0000, waitForTransBlock=False, exitOnError=False)

Print("regpeerkey for all the producers")
for nodeId in range(0, producerNodes):
producer_name = "defproducer" + chr(ord('a') + nodeId)
a = accounts[nodeId]
node = cluster.getNode(nodeId)

success, trans = cluster.biosNode.pushMessage('eosio', 'regpeerkey', f'{{"proposer_finalizer_name":"{producer_name}","key":"{a.activePublicKey}"}}', f'-p {producer_name}@active')
assert(success)

# wait for regpeerkey to be final
for nodeId in range(0, producerNodes):
Utils.Print("Wait for last regpeerkey to be final on ", nodeId)
cluster.getNode(nodeId).waitForTransFinalization(trans['transaction_id'])

# relaunch with p2p-bp-gossip-endpoint
Print("relaunch with p2p-bp-gossip-endpoint to enable BP gossip")
for nodeId in range(0, producerNodes):
Utils.Print(f"Relaunch node {nodeId} with p2p-bp-gossip-endpoint")
node = cluster.getNode(nodeId)
Expand All @@ -125,64 +156,98 @@ def getHostName(nodeId):
if not node.relaunch(chainArg=f" --enable-stale-production --p2p-bp-gossip-endpoint {producer_name},{server_address},127.0.0.1"):
errorExit(f"Failed to relaunch node {nodeId}")

# give time for messages to be gossiped around
Print("Wait for messages to be gossiped")
cluster.getNode(producerNodes-1).waitForHeadToAdvance(blocksToAdvance=60)
blockNum = cluster.getNode(0).getBlockNum()
for nodeId in range(0, producerNodes):
Utils.Print(f"Wait for block ${blockNum} on node ", nodeId)
cluster.getNode(nodeId).waitForBlock(blockNum)

# retrieve the producer stable producer schedule
scheduled_producers = []
schedule = cluster.nodes[0].processUrllibRequest("chain", "get_producer_schedule")
for prod in schedule["payload"]["active"]["producers"]:
scheduled_producers.append(prod["producer_name"])
scheduled_producers.sort()

connection_failure = False
for nodeId in range(0, producerNodes):
# retrieve the connections in each node and check if each connects to the other bps in the schedule
connections = cluster.nodes[nodeId].processUrllibRequest("net", "connections")
if Utils.Debug: Utils.Print(f"v1/net/connections: {connections}")
bp_peers = cluster.nodes[nodeId].processUrllibRequest("net", "bp_gossip_peers")
if Utils.Debug: Utils.Print(f"v1/net/bp_gossip_peers: {bp_peers}")
peers = []
for conn in connections["payload"]:
if conn["is_socket_open"] is False:
continue
peer_addr = conn["peer"]
if len(peer_addr) == 0:
if len(conn["last_handshake"]["p2p_address"]) == 0:
def verifyGossipConnections(scheduled_producers):
assert(len(scheduled_producers) > 0)
scheduled_producers.sort()
connection_failure = False
for nodeId in range(0, producerNodes):
name = "defproducer" + chr(ord('a') + nodeId)
if name not in scheduled_producers:
break
# retrieve the connections in each node and check if each connects to the other bps in the schedule
connections = cluster.nodes[nodeId].processUrllibRequest("net", "connections")
if Utils.Debug: Utils.Print(f"v1/net/connections: {connections}")
bp_peers = cluster.nodes[nodeId].processUrllibRequest("net", "bp_gossip_peers")
if Utils.Debug: Utils.Print(f"v1/net/bp_gossip_peers: {bp_peers}")
peers = []
for conn in connections["payload"]:
if conn["is_socket_open"] is False:
continue
peer_addr = conn["last_handshake"]["p2p_address"].split()[0]
if peer_names[peer_addr] != "bios" and peer_addr != getHostName(nodeId):
if conn["is_bp_peer"]:
peers.append(peer_names[peer_addr])

if not peers:
Utils.Print(f"ERROR: found no connected peers for node {nodeId}")
connection_failure = True
break
name = "defproducer" + chr(ord('a') + nodeId)
peers.append(name) # add ourselves so matches schedule_producers
peers = list(set(peers))
peers.sort()
if peers != scheduled_producers:
Utils.Print(f"ERROR: expect {name} has connections to {scheduled_producers}, got connections to {peers}")
connection_failure = True
break
num_peers_found = 0
for p in bp_peers["payload"]:
if p["producer_name"] not in peers:
Utils.Print(f"ERROR: expect bp peer {p} in peer list")
peer_addr = conn["peer"]
if len(peer_addr) == 0:
if len(conn["last_handshake"]["p2p_address"]) == 0:
continue
peer_addr = conn["last_handshake"]["p2p_address"].split()[0]
if peer_names[peer_addr] != "bios" and peer_addr != getHostName(nodeId):
if conn["is_bp_peer"]:
peers.append(peer_names[peer_addr])

if not peers:
Utils.Print(f"ERROR: found no connected peers for node {nodeId}")
connection_failure = True
break
else:
num_peers_found += 1
peers.append(name) # add ourselves so matches schedule_producers
peers = list(set(peers))
peers.sort()
if peers != scheduled_producers:
Utils.Print(f"ERROR: expect {name} has connections to {scheduled_producers}, got connections to {peers}")
connection_failure = True
break
num_peers_found = 0
for p in bp_peers["payload"]:
if p["producer_name"] not in peers:
Utils.Print(f"ERROR: expect bp peer {p} in peer list")
connection_failure = True
break
else:
num_peers_found += 1

assert(num_peers_found == len(peers))
return not connection_failure

Print("Verify gossip connections")
scheduled_producers = cluster.nodes[0].getProducerSchedule()
success = verifyGossipConnections(scheduled_producers)
assert(success)

Print("Manual connect node_03 defproducerd to node_04 defproducere")
cluster.nodes[3].processUrllibRequest("net", "connect", payload="localhost:9880", exitOnError=True)

Print("Set new producers b,h,m,r")
for account in voteAccounts:
trans=cluster.biosNode.vote(account, ["defproducerb", "defproducerh", "defproducerm", "defproducerr"], silentErrors=False, exitOnError=True)
cluster.biosNode.getNextCleanProductionCycle(trans)

Print("Verify new gossip connections")
scheduled_producers = cluster.nodes[0].getProducerSchedule()
Print(f"Scheduled producers: {scheduled_producers}")
assert(len(scheduled_producers) == 4)
assert("defproducerb" in scheduled_producers and "defproducerh" in scheduled_producers and "defproducerm" in scheduled_producers and "defproducerr" in scheduled_producers)
success = verifyGossipConnections(scheduled_producers)
assert(success)

Print("Verify manual connection still connected")
connections = cluster.nodes[3].processUrllibRequest("net", "connections")
if Utils.Debug: Utils.Print(f"v1/net/connections: {connections}")
found = []
for conn in connections["payload"]:
if conn["is_socket_open"] is False:
continue
peer_addr = conn["peer"]
found.append(peer_names[peer_addr])

assert(num_peers_found == len(peers))
Print(f"Found connections of Node_03: {found}")
assert(len(found) == 2)
assert("bios" in found and "defproducere" in found)

testSuccessful = not connection_failure
testSuccessful = success

finally:
TestHelper.shutdown(
Expand Down
Loading