Skip to content

Commit e01096f

Browse files
authored
eth/handler, broadcast: optimize tx broadcast mechanism (#22176)
This PR optimizes the broadcast loop. Instead of iterating twice through a given set of transactions to weed out which peers have and which do not have a tx, to send/announce transactions, we do it only once.
1 parent 1489c3f commit e01096f

File tree

2 files changed

+36
-32
lines changed

2 files changed

+36
-32
lines changed

eth/handler.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -456,44 +456,51 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
456456
}
457457
}
458458

459-
// BroadcastTransactions will propagate a batch of transactions to all peers which are not known to
459+
// BroadcastTransactions will propagate a batch of transactions
460+
// - To a square root of all peers
461+
// - And, separately, as announcements to all peers which are not known to
460462
// already have the given transaction.
461-
func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) {
463+
func (h *handler) BroadcastTransactions(txs types.Transactions) {
462464
var (
463-
txset = make(map[*ethPeer][]common.Hash)
464-
annos = make(map[*ethPeer][]common.Hash)
465+
annoCount int // Count of announcements made
466+
annoPeers int
467+
directCount int // Count of the txs sent directly to peers
468+
directPeers int // Count of the peers that were sent transactions directly
469+
470+
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
471+
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
472+
465473
)
466474
// Broadcast transactions to a batch of peers not knowing about it
467-
if propagate {
468-
for _, tx := range txs {
469-
peers := h.peers.peersWithoutTransaction(tx.Hash())
470-
471-
// Send the block to a subset of our peers
472-
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
473-
for _, peer := range transfer {
474-
txset[peer] = append(txset[peer], tx.Hash())
475-
}
476-
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(transfer))
477-
}
478-
for peer, hashes := range txset {
479-
peer.AsyncSendTransactions(hashes)
480-
}
481-
return
482-
}
483-
// Otherwise only broadcast the announcement to peers
484475
for _, tx := range txs {
485476
peers := h.peers.peersWithoutTransaction(tx.Hash())
486-
for _, peer := range peers {
477+
// Send the tx unconditionally to a subset of our peers
478+
numDirect := int(math.Sqrt(float64(len(peers))))
479+
for _, peer := range peers[:numDirect] {
480+
txset[peer] = append(txset[peer], tx.Hash())
481+
}
482+
// For the remaining peers, send announcement only
483+
for _, peer := range peers[numDirect:] {
487484
annos[peer] = append(annos[peer], tx.Hash())
488485
}
489486
}
487+
for peer, hashes := range txset {
488+
directPeers++
489+
directCount += len(hashes)
490+
peer.AsyncSendTransactions(hashes)
491+
}
490492
for peer, hashes := range annos {
493+
annoPeers++
494+
annoCount += len(hashes)
491495
if peer.Version() >= eth.ETH65 {
492496
peer.AsyncSendPooledTransactionHashes(hashes)
493497
} else {
494498
peer.AsyncSendTransactions(hashes)
495499
}
496500
}
501+
log.Debug("Transaction broadcast", "txs", len(txs),
502+
"announce packs", annoPeers, "announced hashes", annoCount,
503+
"tx packs", directPeers, "broadcast txs", directCount)
497504
}
498505

499506
// minedBroadcastLoop sends mined blocks to connected peers.
@@ -511,13 +518,10 @@ func (h *handler) minedBroadcastLoop() {
511518
// txBroadcastLoop announces new transactions to connected peers.
512519
func (h *handler) txBroadcastLoop() {
513520
defer h.wg.Done()
514-
515521
for {
516522
select {
517523
case event := <-h.txsCh:
518-
h.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
519-
h.BroadcastTransactions(event.Txs, false) // Only then announce to the rest
520-
524+
h.BroadcastTransactions(event.Txs)
521525
case <-h.txsSub.Err():
522526
return
523527
}

eth/protocols/eth/broadcast.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,18 @@ func (p *Peer) announceTransactions() {
142142
if done == nil && len(queue) > 0 {
143143
// Pile transaction hashes until we reach our allowed network limit
144144
var (
145-
hashes []common.Hash
145+
count int
146146
pending []common.Hash
147147
size common.StorageSize
148148
)
149-
for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
150-
if p.txpool.Get(queue[i]) != nil {
151-
pending = append(pending, queue[i])
149+
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
150+
if p.txpool.Get(queue[count]) != nil {
151+
pending = append(pending, queue[count])
152152
size += common.HashLength
153153
}
154-
hashes = append(hashes, queue[i])
155154
}
156-
queue = queue[:copy(queue, queue[len(hashes):])]
155+
// Shift and trim queue
156+
queue = queue[:copy(queue, queue[count:])]
157157

158158
// If there's anything available to transfer, fire up an async writer
159159
if len(pending) > 0 {

0 commit comments

Comments
 (0)