Skip to content

Commit 6c9f040

Browse files
core: transaction pool optimizations (#21328)
* core: added local tx pool test case * core, crypto: various allocation savings regarding tx handling * core/txlist, txpool: save a reheap operation, avoid some bigint allocs Co-authored-by: Marius van der Wijden <[email protected]>
1 parent 5b081ab commit 6c9f040

File tree

3 files changed

+104
-28
lines changed

3 files changed

+104
-28
lines changed

core/tx_list.go

Lines changed: 85 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,30 @@ func (m *txSortedMap) Forward(threshold uint64) types.Transactions {
9999

100100
// Filter iterates over the list of transactions and removes all of them for which
101101
// the specified function evaluates to true.
102+
// Filter, as opposed to 'filter', re-initialises the heap after the operation is done.
103+
// If you want to do several consecutive filterings, it's therefore better to first
104+
// do a .filter(func1) followed by .Filter(func2) or reheap()
102105
func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
106+
removed := m.filter(filter)
107+
// If transactions were removed, the heap and cache are ruined
108+
if len(removed) > 0 {
109+
m.reheap()
110+
}
111+
return removed
112+
}
113+
114+
func (m *txSortedMap) reheap() {
115+
*m.index = make([]uint64, 0, len(m.items))
116+
for nonce := range m.items {
117+
*m.index = append(*m.index, nonce)
118+
}
119+
heap.Init(m.index)
120+
m.cache = nil
121+
}
122+
123+
// filter is identical to Filter, but **does not** regenerate the heap. This method
124+
// should only be used if followed immediately by a call to Filter or reheap()
125+
func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transactions {
103126
var removed types.Transactions
104127

105128
// Collect all the transactions to filter out
@@ -109,14 +132,7 @@ func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transac
109132
delete(m.items, nonce)
110133
}
111134
}
112-
// If transactions were removed, the heap and cache are ruined
113135
if len(removed) > 0 {
114-
*m.index = make([]uint64, 0, len(m.items))
115-
for nonce := range m.items {
116-
*m.index = append(*m.index, nonce)
117-
}
118-
heap.Init(m.index)
119-
120136
m.cache = nil
121137
}
122138
return removed
@@ -197,10 +213,7 @@ func (m *txSortedMap) Len() int {
197213
return len(m.items)
198214
}
199215

200-
// Flatten creates a nonce-sorted slice of transactions based on the loosely
201-
// sorted internal representation. The result of the sorting is cached in case
202-
// it's requested again before any modifications are made to the contents.
203-
func (m *txSortedMap) Flatten() types.Transactions {
216+
func (m *txSortedMap) flatten() types.Transactions {
204217
// If the sorting was not cached yet, create and cache it
205218
if m.cache == nil {
206219
m.cache = make(types.Transactions, 0, len(m.items))
@@ -209,12 +222,27 @@ func (m *txSortedMap) Flatten() types.Transactions {
209222
}
210223
sort.Sort(types.TxByNonce(m.cache))
211224
}
225+
return m.cache
226+
}
227+
228+
// Flatten creates a nonce-sorted slice of transactions based on the loosely
229+
// sorted internal representation. The result of the sorting is cached in case
230+
// it's requested again before any modifications are made to the contents.
231+
func (m *txSortedMap) Flatten() types.Transactions {
212232
// Copy the cache to prevent accidental modifications
213-
txs := make(types.Transactions, len(m.cache))
214-
copy(txs, m.cache)
233+
cache := m.flatten()
234+
txs := make(types.Transactions, len(cache))
235+
copy(txs, cache)
215236
return txs
216237
}
217238

239+
// LastElement returns the last element of a flattened list, thus, the
240+
// transaction with the highest nonce
241+
func (m *txSortedMap) LastElement() *types.Transaction {
242+
cache := m.flatten()
243+
return cache[len(cache)-1]
244+
}
245+
218246
// txList is a "list" of transactions belonging to an account, sorted by account
219247
// nonce. The same type can be used both for storing contiguous transactions for
220248
// the executable/pending queue; and for storing gapped transactions for the non-
@@ -252,7 +280,11 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran
252280
// If there's an older better transaction, abort
253281
old := l.txs.Get(tx.Nonce())
254282
if old != nil {
255-
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
283+
// threshold = oldGP * (100 + priceBump) / 100
284+
a := big.NewInt(100 + int64(priceBump))
285+
a = a.Mul(a, old.GasPrice())
286+
b := big.NewInt(100)
287+
threshold := a.Div(a, b)
256288
// Have to ensure that the new gas price is higher than the old gas
257289
// price as well as checking the percentage threshold to ensure that
258290
// this is accurate for low (Wei-level) gas price replacements
@@ -296,20 +328,25 @@ func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions
296328
l.gascap = gasLimit
297329

298330
// Filter out all the transactions above the account's funds
299-
removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })
331+
removed := l.txs.Filter(func(tx *types.Transaction) bool {
332+
return tx.Gas() > gasLimit || tx.Cost().Cmp(costLimit) > 0
333+
})
300334

301-
// If the list was strict, filter anything above the lowest nonce
335+
if len(removed) == 0 {
336+
return nil, nil
337+
}
302338
var invalids types.Transactions
303-
304-
if l.strict && len(removed) > 0 {
339+
// If the list was strict, filter anything above the lowest nonce
340+
if l.strict {
305341
lowest := uint64(math.MaxUint64)
306342
for _, tx := range removed {
307343
if nonce := tx.Nonce(); lowest > nonce {
308344
lowest = nonce
309345
}
310346
}
311-
invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
347+
invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
312348
}
349+
l.txs.reheap()
313350
return removed, invalids
314351
}
315352

@@ -363,6 +400,12 @@ func (l *txList) Flatten() types.Transactions {
363400
return l.txs.Flatten()
364401
}
365402

403+
// LastElement returns the last element of a flattened list, thus, the
404+
// transaction with the highest nonce
405+
func (l *txList) LastElement() *types.Transaction {
406+
return l.txs.LastElement()
407+
}
408+
366409
// priceHeap is a heap.Interface implementation over transactions for retrieving
367410
// price-sorted transactions to discard when the pool fills up.
368411
type priceHeap []*types.Transaction
@@ -495,8 +538,29 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
495538
// Discard finds a number of most underpriced transactions, removes them from the
496539
// priced list and returns them for further removal from the entire pool.
497540
func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions {
498-
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
499-
save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
541+
// If we have some local accountset, those will not be discarded
542+
if !local.empty() {
543+
// In case the list is filled to the brim with 'local' txs, we do this
544+
// little check to avoid unpacking / repacking the heap later on, which
545+
// is very expensive
546+
discardable := 0
547+
for _, tx := range *l.items {
548+
if !local.containsTx(tx) {
549+
discardable++
550+
}
551+
if discardable >= slots {
552+
break
553+
}
554+
}
555+
if slots > discardable {
556+
slots = discardable
557+
}
558+
}
559+
if slots == 0 {
560+
return nil
561+
}
562+
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
563+
save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep
500564

501565
for len(*l.items) > 0 && slots > 0 {
502566
// Discard stale transactions if found during cleanup

core/tx_pool.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,8 +1059,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
10591059

10601060
// Update all accounts to the latest known pending nonce
10611061
for addr, list := range pool.pending {
1062-
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
1063-
pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1)
1062+
highestPending := list.LastElement()
1063+
pool.pendingNonces.set(addr, highestPending.Nonce()+1)
10641064
}
10651065
pool.mu.Unlock()
10661066

@@ -1457,6 +1457,10 @@ func (as *accountSet) contains(addr common.Address) bool {
14571457
return exist
14581458
}
14591459

1460+
func (as *accountSet) empty() bool {
1461+
return len(as.accounts) == 0
1462+
}
1463+
14601464
// containsTx checks if the sender of a given tx is within the set. If the sender
14611465
// cannot be derived, this method returns false.
14621466
func (as *accountSet) containsTx(tx *types.Transaction) bool {

core/tx_pool_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,11 +1890,15 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
18901890
}
18911891

18921892
// Benchmarks the speed of batched transaction insertion.
1893-
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
1894-
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }
1895-
func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000) }
1893+
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, false) }
1894+
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, false) }
1895+
func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, false) }
18961896

1897-
func benchmarkPoolBatchInsert(b *testing.B, size int) {
1897+
func BenchmarkPoolBatchLocalInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, true) }
1898+
func BenchmarkPoolBatchLocalInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, true) }
1899+
func BenchmarkPoolBatchLocalInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, true) }
1900+
1901+
func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) {
18981902
// Generate a batch of transactions to enqueue into the pool
18991903
pool, key := setupTxPool()
19001904
defer pool.Stop()
@@ -1912,6 +1916,10 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) {
19121916
// Benchmark importing the transactions into the queue
19131917
b.ResetTimer()
19141918
for _, batch := range batches {
1915-
pool.AddRemotes(batch)
1919+
if local {
1920+
pool.AddLocals(batch)
1921+
} else {
1922+
pool.AddRemotes(batch)
1923+
}
19161924
}
19171925
}

0 commit comments

Comments
 (0)