Skip to content

core/txpool, eth, miner: pre-filter dynamic fees during pending tx retrieval #29005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,10 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.Addres
// Close closes down the underlying persistent store.
func (p *BlobPool) Close() error {
var errs []error
if err := p.limbo.Close(); err != nil {
errs = append(errs, err)
if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
if err := p.limbo.Close(); err != nil {
errs = append(errs, err)
}
}
if err := p.store.Close(); err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -1441,7 +1443,10 @@ func (p *BlobPool) drop() {

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
// The latter will be pretty much moot, but we've kept it to have symmetric
Expand All @@ -1459,6 +1464,25 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
for addr, txs := range p.index {
var lazies []*txpool.LazyTransaction
for _, tx := range txs {
// If transaction filtering was requested, discard badly priced ones
if minTip != nil && baseFee != nil {
if tx.execFeeCap.Lt(baseFee) {
break // basefee too low, cannot be included, discard rest of txs from the account
}
tip := new(uint256.Int).Sub(tx.execFeeCap, baseFee)
if tip.Gt(tx.execTipCap) {
tip = tx.execTipCap
}
if tip.Lt(minTip) {
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
}
}
if blobFee != nil {
if tx.blobFeeCap.Lt(blobFee) {
break // blobfee too low, cannot be included, discard rest of txs from the account
}
}
// Transaction was accepted according to the filter, append to the pending list
lazies = append(lazies, &txpool.LazyTransaction{
Pool: p,
Hash: tx.hash,
Expand Down
26 changes: 18 additions & 8 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,24 +518,34 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
}

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
// account and sorted by nonce.
//
// The enforceTips parameter can be used to do an extra filtering on the pending
// transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment.
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock()
defer pool.mu.Unlock()

// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if minTip != nil {
minTipBig = minTip.ToBig()
}
if baseFee != nil {
baseFeeBig = baseFee.ToBig()
}

pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()

// If the miner requests tip enforcement, cap the lists now
if enforceTips && !pool.locals.contains(addr) {
if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(pool.gasTip.Load().ToBig(), pool.priced.urgent.baseFee) < 0 {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
break
}
Expand Down
6 changes: 5 additions & 1 deletion core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/holiman/uint256"
)

// LazyTransaction contains a small subset of the transaction properties that is
Expand Down Expand Up @@ -114,7 +115,10 @@ type SubPool interface {

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction

// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
Expand Down
8 changes: 6 additions & 2 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

// TxStatus is the current status of a transaction as seen by the pool.
Expand Down Expand Up @@ -353,10 +354,13 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(enforceTips) {
for addr, set := range subpool.Pending(minTip, baseFee, blobFee) {
txs[addr] = set
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false)
pending := b.eth.txPool.Pending(nil, nil, nil)
var txs types.Transactions
for _, batch := range pending {
for _, lazy := range batch {
Expand Down
4 changes: 2 additions & 2 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (c *SimulatedBeacon) Rollback() {

// Fork sets the head to the provided hash.
func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
if len(c.eth.TxPool().Pending(false)) != 0 {
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
return errors.New("pending block dirty")
}
parent := c.eth.BlockChain().GetBlockByHash(parentHash)
Expand All @@ -275,7 +275,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {

// AdjustTime creates a new block with an adjusted timestamp.
func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if len(c.eth.TxPool().Pending(false)) != 0 {
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
return errors.New("could not adjust time on non-empty block")
}
parent := c.eth.BlockChain().CurrentBlock()
Expand Down
3 changes: 2 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -73,7 +74,7 @@ type txPool interface {

// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction

// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
Expand Down
3 changes: 2 additions & 1 deletion eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)

var (
Expand Down Expand Up @@ -92,7 +93,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []erro
}

// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
p.lock.RLock()
defer p.lock.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) {
var hashes []common.Hash
for _, batch := range h.txpool.Pending(false) {
for _, batch := range h.txpool.Pending(nil, nil, nil) {
for _, tx := range batch {
hashes = append(hashes, tx.Hash)
}
Expand Down
20 changes: 15 additions & 5 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -999,7 +1000,20 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error {
pending := w.eth.TxPool().Pending(true)
w.mu.RLock()
tip := w.tip
w.mu.RUnlock()

// Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees
var baseFee *uint256.Int
if env.header.BaseFee != nil {
baseFee = uint256.MustFromBig(env.header.BaseFee)
}
var blobFee *uint256.Int
if env.header.ExcessBlobGas != nil {
blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
}
pending := w.eth.TxPool().Pending(uint256.MustFromBig(tip), baseFee, blobFee)

// Split the pending transactions into locals and remotes.
localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
Expand All @@ -1011,10 +1025,6 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
}

// Fill the block with all available pending transactions.
w.mu.RLock()
tip := w.tip
w.mu.RUnlock()

if len(localTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, new(big.Int)); err != nil {
Expand Down