Skip to content

Commit c559aac

Browse files
ryanschneiderfirmianavan
authored andcommitted
core: use a wrapped map to remove contention in TxPool.Get. (ethereum#16670)
* core: use a wrapped `map` and `sync.RWMutex` for `TxPool.all` to remove contention in `TxPool.Get`. * core: Remove redundant `txLookup.Find` and improve comments on txLookup methods.
1 parent a322f3f commit c559aac

File tree

3 files changed

+132
-69
lines changed

3 files changed

+132
-69
lines changed

core/tx_list.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -397,13 +397,13 @@ func (h *priceHeap) Pop() interface{} {
397397
// txPricedList is a price-sorted heap to allow operating on transactions pool
398398
// contents in a price-incrementing way.
399399
type txPricedList struct {
400-
all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
401-
items *priceHeap // Heap of prices of all the stored transactions
402-
stales int // Number of stale price points to (re-heap trigger)
400+
all *txLookup // Pointer to the map of all transactions
401+
items *priceHeap // Heap of prices of all the stored transactions
402+
stales int // Number of stale price points to (re-heap trigger)
403403
}
404404

405405
// newTxPricedList creates a new price-sorted transaction heap.
406-
func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
406+
func newTxPricedList(all *txLookup) *txPricedList {
407407
return &txPricedList{
408408
all: all,
409409
items: new(priceHeap),
@@ -425,12 +425,13 @@ func (l *txPricedList) Removed() {
425425
return
426426
}
427427
// Seems we've reached a critical number of stale transactions, reheap
428-
reheap := make(priceHeap, 0, len(*l.all))
428+
reheap := make(priceHeap, 0, l.all.Count())
429429

430430
l.stales, l.items = 0, &reheap
431-
for _, tx := range *l.all {
431+
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
432432
*l.items = append(*l.items, tx)
433-
}
433+
return true
434+
})
434435
heap.Init(l.items)
435436
}
436437

@@ -443,7 +444,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
443444
for len(*l.items) > 0 {
444445
// Discard stale transactions if found during cleanup
445446
tx := heap.Pop(l.items).(*types.Transaction)
446-
if _, ok := (*l.all)[tx.Hash()]; !ok {
447+
if l.all.Get(tx.Hash()) == nil {
447448
l.stales--
448449
continue
449450
}
@@ -475,7 +476,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
475476
// Discard stale price points if found at the heap start
476477
for len(*l.items) > 0 {
477478
head := []*types.Transaction(*l.items)[0]
478-
if _, ok := (*l.all)[head.Hash()]; !ok {
479+
if l.all.Get(head.Hash()) == nil {
479480
l.stales--
480481
heap.Pop(l.items)
481482
continue
@@ -500,7 +501,7 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
500501
for len(*l.items) > 0 && count > 0 {
501502
// Discard stale transactions if found during cleanup
502503
tx := heap.Pop(l.items).(*types.Transaction)
503-
if _, ok := (*l.all)[tx.Hash()]; !ok {
504+
if l.all.Get(tx.Hash()) == nil {
504505
l.stales--
505506
continue
506507
}

core/tx_pool.go

Lines changed: 96 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,11 @@ type TxPool struct {
200200
locals *accountSet // Set of local transaction to exempt from eviction rules
201201
journal *txJournal // Journal of local transaction to back up to disk
202202

203-
pending map[common.Address]*txList // All currently processable transactions
204-
queue map[common.Address]*txList // Queued but non-processable transactions
205-
beats map[common.Address]time.Time // Last heartbeat from each known account
206-
all map[common.Hash]*types.Transaction // All transactions to allow lookups
207-
priced *txPricedList // All transactions sorted by price
203+
pending map[common.Address]*txList // All currently processable transactions
204+
queue map[common.Address]*txList // Queued but non-processable transactions
205+
beats map[common.Address]time.Time // Last heartbeat from each known account
206+
all *txLookup // All transactions to allow lookups
207+
priced *txPricedList // All transactions sorted by price
208208

209209
wg sync.WaitGroup // for shutdown sync
210210

@@ -226,12 +226,12 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
226226
pending: make(map[common.Address]*txList),
227227
queue: make(map[common.Address]*txList),
228228
beats: make(map[common.Address]time.Time),
229-
all: make(map[common.Hash]*types.Transaction),
229+
all: newTxLookup(),
230230
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
231231
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
232232
}
233233
pool.locals = newAccountSet(pool.signer)
234-
pool.priced = newTxPricedList(&pool.all)
234+
pool.priced = newTxPricedList(pool.all)
235235
pool.reset(nil, chain.CurrentBlock().Header())
236236

237237
// If local transactions and journaling is enabled, load from disk
@@ -605,7 +605,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
605605
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
606606
// If the transaction is already known, discard it
607607
hash := tx.Hash()
608-
if pool.all[hash] != nil {
608+
if pool.all.Get(hash) != nil {
609609
log.Trace("Discarding already known transaction", "hash", hash)
610610
return false, fmt.Errorf("known transaction: %x", hash)
611611
}
@@ -616,15 +616,15 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
616616
return false, err
617617
}
618618
// If the transaction pool is full, discard underpriced transactions
619-
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
619+
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
620620
// If the new transaction is underpriced, don't accept it
621621
if !local && pool.priced.Underpriced(tx, pool.locals) {
622622
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
623623
underpricedTxCounter.Inc(1)
624624
return false, ErrUnderpriced
625625
}
626626
// New transaction is better than our worse ones, make room for it
627-
drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
627+
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
628628
for _, tx := range drop {
629629
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
630630
underpricedTxCounter.Inc(1)
@@ -642,11 +642,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
642642
}
643643
// New transaction is better, replace old one
644644
if old != nil {
645-
delete(pool.all, old.Hash())
645+
pool.all.Remove(old.Hash())
646646
pool.priced.Removed()
647647
pendingReplaceCounter.Inc(1)
648648
}
649-
pool.all[tx.Hash()] = tx
649+
pool.all.Add(tx)
650650
pool.priced.Put(tx)
651651
pool.journalTx(from, tx)
652652

@@ -689,12 +689,12 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
689689
}
690690
// Discard any previous transaction and mark this
691691
if old != nil {
692-
delete(pool.all, old.Hash())
692+
pool.all.Remove(old.Hash())
693693
pool.priced.Removed()
694694
queuedReplaceCounter.Inc(1)
695695
}
696-
if pool.all[hash] == nil {
697-
pool.all[hash] = tx
696+
if pool.all.Get(hash) == nil {
697+
pool.all.Add(tx)
698698
pool.priced.Put(tx)
699699
}
700700
return old != nil, nil
@@ -726,22 +726,22 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
726726
inserted, old := list.Add(tx, pool.config.PriceBump)
727727
if !inserted {
728728
// An older transaction was better, discard this
729-
delete(pool.all, hash)
729+
pool.all.Remove(hash)
730730
pool.priced.Removed()
731731

732732
pendingDiscardCounter.Inc(1)
733733
return false
734734
}
735735
// Otherwise discard any previous transaction and mark this
736736
if old != nil {
737-
delete(pool.all, old.Hash())
737+
pool.all.Remove(old.Hash())
738738
pool.priced.Removed()
739739

740740
pendingReplaceCounter.Inc(1)
741741
}
742742
// Failsafe to work around direct pending inserts (tests)
743-
if pool.all[hash] == nil {
744-
pool.all[hash] = tx
743+
if pool.all.Get(hash) == nil {
744+
pool.all.Add(tx)
745745
pool.priced.Put(tx)
746746
}
747747
// Set the potentially new pending nonce and notify any subsystems of the new tx
@@ -840,7 +840,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
840840

841841
status := make([]TxStatus, len(hashes))
842842
for i, hash := range hashes {
843-
if tx := pool.all[hash]; tx != nil {
843+
if tx := pool.all.Get(hash); tx != nil {
844844
from, _ := types.Sender(pool.signer, tx) // already validated
845845
if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
846846
status[i] = TxStatusPending
@@ -855,24 +855,21 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
855855
// Get returns a transaction if it is contained in the pool
856856
// and nil otherwise.
857857
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
858-
pool.mu.RLock()
859-
defer pool.mu.RUnlock()
860-
861-
return pool.all[hash]
858+
return pool.all.Get(hash)
862859
}
863860

864861
// removeTx removes a single transaction from the queue, moving all subsequent
865862
// transactions back to the future queue.
866863
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
867864
// Fetch the transaction we wish to delete
868-
tx, ok := pool.all[hash]
869-
if !ok {
865+
tx := pool.all.Get(hash)
866+
if tx == nil {
870867
return
871868
}
872869
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
873870

874871
// Remove it from the list of known transactions
875-
delete(pool.all, hash)
872+
pool.all.Remove(hash)
876873
if outofbound {
877874
pool.priced.Removed()
878875
}
@@ -928,15 +925,15 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
928925
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
929926
hash := tx.Hash()
930927
log.Trace("Removed old queued transaction", "hash", hash)
931-
delete(pool.all, hash)
928+
pool.all.Remove(hash)
932929
pool.priced.Removed()
933930
}
934931
// Drop all transactions that are too costly (low balance or out of gas)
935932
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
936933
for _, tx := range drops {
937934
hash := tx.Hash()
938935
log.Trace("Removed unpayable queued transaction", "hash", hash)
939-
delete(pool.all, hash)
936+
pool.all.Remove(hash)
940937
pool.priced.Removed()
941938
queuedNofundsCounter.Inc(1)
942939
}
@@ -952,7 +949,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
952949
if !pool.locals.contains(addr) {
953950
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
954951
hash := tx.Hash()
955-
delete(pool.all, hash)
952+
pool.all.Remove(hash)
956953
pool.priced.Removed()
957954
queuedRateLimitCounter.Inc(1)
958955
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
@@ -1001,7 +998,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
1001998
for _, tx := range list.Cap(list.Len() - 1) {
1002999
// Drop the transaction from the global pools too
10031000
hash := tx.Hash()
1004-
delete(pool.all, hash)
1001+
pool.all.Remove(hash)
10051002
pool.priced.Removed()
10061003

10071004
// Update the account nonce to the dropped transaction
@@ -1023,7 +1020,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
10231020
for _, tx := range list.Cap(list.Len() - 1) {
10241021
// Drop the transaction from the global pools too
10251022
hash := tx.Hash()
1026-
delete(pool.all, hash)
1023+
pool.all.Remove(hash)
10271024
pool.priced.Removed()
10281025

10291026
// Update the account nonce to the dropped transaction
@@ -1092,15 +1089,15 @@ func (pool *TxPool) demoteUnexecutables() {
10921089
for _, tx := range list.Forward(nonce) {
10931090
hash := tx.Hash()
10941091
log.Trace("Removed old pending transaction", "hash", hash)
1095-
delete(pool.all, hash)
1092+
pool.all.Remove(hash)
10961093
pool.priced.Removed()
10971094
}
10981095
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
10991096
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
11001097
for _, tx := range drops {
11011098
hash := tx.Hash()
11021099
log.Trace("Removed unpayable pending transaction", "hash", hash)
1103-
delete(pool.all, hash)
1100+
pool.all.Remove(hash)
11041101
pool.priced.Removed()
11051102
pendingNofundsCounter.Inc(1)
11061103
}
@@ -1172,3 +1169,68 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool {
11721169
func (as *accountSet) add(addr common.Address) {
11731170
as.accounts[addr] = struct{}{}
11741171
}
1172+
1173+
// txLookup is used internally by TxPool to track transactions while allowing lookup without
1174+
// mutex contention.
1175+
//
1176+
// Note, although this type is properly protected against concurrent access, it
1177+
// is **not** a type that should ever be mutated or even exposed outside of the
1178+
// transaction pool, since its internal state is tightly coupled with the pools
1179+
// internal mechanisms. The sole purpose of the type is to permit out-of-bound
1180+
// peeking into the pool in TxPool.Get without having to acquire the widely scoped
1181+
// TxPool.mu mutex.
1182+
type txLookup struct {
1183+
all map[common.Hash]*types.Transaction
1184+
lock sync.RWMutex
1185+
}
1186+
1187+
// newTxLookup returns a new txLookup structure.
1188+
func newTxLookup() *txLookup {
1189+
return &txLookup{
1190+
all: make(map[common.Hash]*types.Transaction),
1191+
}
1192+
}
1193+
1194+
// Range calls f on each key and value present in the map.
1195+
func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
1196+
t.lock.RLock()
1197+
defer t.lock.RUnlock()
1198+
1199+
for key, value := range t.all {
1200+
if !f(key, value) {
1201+
break
1202+
}
1203+
}
1204+
}
1205+
1206+
// Get returns a transaction if it exists in the lookup, or nil if not found.
1207+
func (t *txLookup) Get(hash common.Hash) *types.Transaction {
1208+
t.lock.RLock()
1209+
defer t.lock.RUnlock()
1210+
1211+
return t.all[hash]
1212+
}
1213+
1214+
// Count returns the current number of items in the lookup.
1215+
func (t *txLookup) Count() int {
1216+
t.lock.RLock()
1217+
defer t.lock.RUnlock()
1218+
1219+
return len(t.all)
1220+
}
1221+
1222+
// Add adds a transaction to the lookup.
1223+
func (t *txLookup) Add(tx *types.Transaction) {
1224+
t.lock.Lock()
1225+
defer t.lock.Unlock()
1226+
1227+
t.all[tx.Hash()] = tx
1228+
}
1229+
1230+
// Remove removes a transaction from the lookup.
1231+
func (t *txLookup) Remove(hash common.Hash) {
1232+
t.lock.Lock()
1233+
defer t.lock.Unlock()
1234+
1235+
delete(t.all, hash)
1236+
}

0 commit comments

Comments
 (0)