Skip to content

Commit dda4412

Browse files
committed
eth/handler, broadcast: optimize tx broadcast mechanism
1 parent 83e4c49 commit dda4412

File tree

2 files changed

+23
-28
lines changed

2 files changed

+23
-28
lines changed

eth/handler.go

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -456,36 +456,32 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
456456
}
457457
}
458458

459-
// BroadcastTransactions will propagate a batch of transactions to all peers which are not known to
459+
// BroadcastTransactions will propagate a batch of transactions
460+
// - To a square root of all peers
461+
// - And, separately, as announcements to all peers which are not known to
460462
// already have the given transaction.
461-
func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool) {
463+
func (h *handler) BroadcastTransactions(txs types.Transactions) {
462464
var (
463-
txset = make(map[*ethPeer][]common.Hash)
464-
annos = make(map[*ethPeer][]common.Hash)
465+
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
466+
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
465467
)
466468
// Broadcast transactions to a batch of peers not knowing about it
467-
if propagate {
468-
for _, tx := range txs {
469-
peers := h.peers.peersWithoutTransaction(tx.Hash())
470-
471-
// Send the block to a subset of our peers
472-
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
473-
for _, peer := range transfer {
474-
txset[peer] = append(txset[peer], tx.Hash())
475-
}
476-
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(transfer))
477-
}
478-
for peer, hashes := range txset {
479-
peer.AsyncSendTransactions(hashes)
480-
}
481-
return
482-
}
483-
// Otherwise only broadcast the announcement to peers
484469
for _, tx := range txs {
485470
peers := h.peers.peersWithoutTransaction(tx.Hash())
486-
for _, peer := range peers {
471+
// Send the tx unconditionally to a subset of our peers
472+
numDirect := int(math.Sqrt(float64(len(peers))))
473+
for _, peer := range peers[:numDirect] {
474+
txset[peer] = append(txset[peer], tx.Hash())
475+
}
476+
// For the remaining peers, send announcement only
477+
for _, peer := range peers[numDirect:] {
487478
annos[peer] = append(annos[peer], tx.Hash())
488479
}
480+
log.Trace("Broadcast transaction", "hash", tx.Hash(), "direct", numDirect,
481+
"announce", len(peers)-numDirect)
482+
}
483+
for peer, hashes := range txset {
484+
peer.AsyncSendTransactions(hashes)
489485
}
490486
for peer, hashes := range annos {
491487
if peer.Version() >= eth.ETH65 {
@@ -515,8 +511,7 @@ func (h *handler) txBroadcastLoop() {
515511
for {
516512
select {
517513
case event := <-h.txsCh:
518-
h.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
519-
h.BroadcastTransactions(event.Txs, false) // Only then announce to the rest
514+
h.BroadcastTransactions(event.Txs)
520515

521516
case <-h.txsSub.Err():
522517
return

eth/protocols/eth/broadcast.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,18 @@ func (p *Peer) announceTransactions() {
142142
if done == nil && len(queue) > 0 {
143143
// Pile transaction hashes until we reach our allowed network limit
144144
var (
145-
hashes []common.Hash
145+
i int
146146
pending []common.Hash
147147
size common.StorageSize
148148
)
149-
for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
149+
for i = 0; i < len(queue) && size < maxTxPacketSize; i++ {
150150
if p.txpool.Get(queue[i]) != nil {
151151
pending = append(pending, queue[i])
152152
size += common.HashLength
153153
}
154-
hashes = append(hashes, queue[i])
155154
}
156-
queue = queue[:copy(queue, queue[len(hashes):])]
155+
// Shift and trim queue
156+
queue = queue[:copy(queue, queue[i:])]
157157

158158
// If there's anything available to transfer, fire up an async writer
159159
if len(pending) > 0 {

0 commit comments

Comments
 (0)