Skip to content

core: reduce load on txindexer from API #31752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 5, 2025
54 changes: 24 additions & 30 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,42 +270,20 @@ func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, max
// GetTransactionLookup retrieves the lookup along with the transaction
// itself associate with the given transaction hash.
//
// An error will be returned if the transaction is not found, and background
// indexing for transactions is still in progress. The transaction might be
// reachable shortly once it's indexed.
//
// A null will be returned in the transaction is not found and background
// transaction indexing is already finished. The transaction is not existent
// from the node's perspective.
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction, error) {
// A null will be returned if the transaction is not found. This can be due to
// the transaction indexer not being finished. The caller must explicitly check
// the indexer progress.
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction) {
bc.txLookupLock.RLock()
defer bc.txLookupLock.RUnlock()

// Short circuit if the txlookup already in the cache, retrieve otherwise
if item, exist := bc.txLookupCache.Get(hash); exist {
return item.lookup, item.transaction, nil
return item.lookup, item.transaction
}
tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash)
if tx == nil {
progress, err := bc.TxIndexProgress()
if err != nil {
// No error is returned if the transaction indexing progress is unreachable
// due to unexpected internal errors. In such cases, it is impossible to
// determine whether the transaction does not exist or has simply not been
// indexed yet without a progress marker.
//
// In such scenarios, the transaction is treated as unreachable, though
// this is clearly an unintended and unexpected situation.
return nil, nil, nil
}
// The transaction indexing is not finished yet, returning an
// error to explicitly indicate it.
if !progress.Done() {
return nil, nil, errors.New("transaction indexing still in progress")
}
// The transaction is already indexed, the transaction is either
// not existent or not in the range of index, returning null.
return nil, nil, nil
return nil, nil
}
lookup := &rawdb.LegacyTxLookupEntry{
BlockHash: blockHash,
Expand All @@ -316,7 +294,23 @@ func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLoo
lookup: lookup,
transaction: tx,
})
return lookup, tx, nil
return lookup, tx
}

// TxIndexDone returns true if the transaction indexer has finished indexing.
func (bc *BlockChain) TxIndexDone() bool {
progress, err := bc.TxIndexProgress()
if err != nil {
// No error is returned if the transaction indexing progress is unreachable
// due to unexpected internal errors. In such cases, it is impossible to
// determine whether the transaction does not exist or has simply not been
// indexed yet without a progress marker.
//
// In such scenarios, the transaction is treated as unreachable, though
// this is clearly an unintended and unexpected situation.
return true
}
return progress.Done()
}

// HasState checks if state trie is fully present in the database or not.
Expand Down Expand Up @@ -412,7 +406,7 @@ func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if bc.txIndexer == nil {
return TxIndexProgress{}, errors.New("tx indexer is not enabled")
}
return bc.txIndexer.txIndexProgress()
return bc.txIndexer.txIndexProgress(), nil
}

// HistoryPruningCutoff returns the configured history pruning point.
Expand Down
59 changes: 32 additions & 27 deletions core/txindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package core

import (
"errors"
"fmt"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -47,26 +47,35 @@ type txIndexer struct {
// and all others shouldn't.
limit uint64

// The current tail of the indexed transactions, null indicates
// that no transactions have been indexed yet.
//
// This field is accessed by both the indexer and the indexing
// progress queries.
tail atomic.Pointer[uint64]

// cutoff denotes the block number before which the chain segment should
// be pruned and not available locally.
cutoff uint64
db ethdb.Database
progress chan chan TxIndexProgress
term chan chan struct{}
closed chan struct{}
cutoff uint64
chain *BlockChain
db ethdb.Database
term chan chan struct{}
closed chan struct{}
}

// newTxIndexer initializes the transaction indexer.
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
cutoff, _ := chain.HistoryPruningCutoff()
indexer := &txIndexer{
limit: limit,
cutoff: cutoff,
db: chain.db,
progress: make(chan chan TxIndexProgress),
term: make(chan chan struct{}),
closed: make(chan struct{}),
limit: limit,
cutoff: cutoff,
chain: chain,
db: chain.db,
term: make(chan chan struct{}),
closed: make(chan struct{}),
}
indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db))

go indexer.loop(chain)

var msg string
Expand Down Expand Up @@ -154,6 +163,7 @@ func (indexer *txIndexer) repair(head uint64) {
// A crash may occur between the two delete operations,
// potentially leaving dangling indexes in the database.
// However, this is considered acceptable.
indexer.tail.Store(nil)
rawdb.DeleteTxIndexTail(indexer.db)
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
log.Warn("Purge transaction indexes", "head", head, "tail", *tail)
Expand All @@ -174,6 +184,7 @@ func (indexer *txIndexer) repair(head uint64) {
// Traversing the database directly within the transaction
// index namespace might be slow and expensive, but we
// have no choice.
indexer.tail.Store(nil)
rawdb.DeleteTxIndexTail(indexer.db)
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
log.Warn("Purge transaction indexes", "head", head, "cutoff", indexer.cutoff)
Expand All @@ -187,6 +198,7 @@ func (indexer *txIndexer) repair(head uint64) {
// A crash may occur between the two delete operations,
// potentially leaving dangling indexes in the database.
// However, this is considered acceptable.
indexer.tail.Store(&indexer.cutoff)
rawdb.WriteTxIndexTail(indexer.db, indexer.cutoff)
rawdb.DeleteAllTxLookupEntries(indexer.db, func(txhash common.Hash, blob []byte) bool {
n := rawdb.DecodeTxLookupEntry(blob, indexer.db)
Expand Down Expand Up @@ -243,12 +255,10 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
done = make(chan struct{})
go indexer.run(h.Header.Number.Uint64(), stop, done)
}
head = h.Header.Number.Uint64()
case <-done:
stop = nil
done = nil
case ch := <-indexer.progress:
ch <- indexer.report(head)
indexer.tail.Store(rawdb.ReadTxIndexTail(indexer.db))
case ch := <-indexer.term:
if stop != nil {
close(stop)
Expand All @@ -264,7 +274,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
}

// report returns the tx indexing progress.
func (indexer *txIndexer) report(head uint64) TxIndexProgress {
func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
// Special case if the head is even below the cutoff,
// nothing to index.
if head < indexer.cutoff {
Expand All @@ -284,7 +294,6 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
}
// Compute how many blocks have been indexed
var indexed uint64
tail := rawdb.ReadTxIndexTail(indexer.db)
if tail != nil {
indexed = head - *tail + 1
}
Expand All @@ -300,16 +309,12 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
}
}

// txIndexProgress retrieves the tx indexing progress, or an error if the
// background tx indexer is already stopped.
func (indexer *txIndexer) txIndexProgress() (TxIndexProgress, error) {
ch := make(chan TxIndexProgress, 1)
select {
case indexer.progress <- ch:
return <-ch, nil
case <-indexer.closed:
return TxIndexProgress{}, errors.New("indexer is closed")
}
// txIndexProgress retrieves the transaction indexing progress. The reported
// progress may slightly lag behind the actual indexing state, as the tail is
// only updated at the end of each indexing operation. However, this delay is
// considered acceptable.
func (indexer *txIndexer) txIndexProgress() TxIndexProgress {
return indexer.report(indexer.resolveHead(), indexer.tail.Load())
}

// close shutdown the indexer. Safe to be called for multiple times.
Expand Down
22 changes: 8 additions & 14 deletions core/txindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ func TestTxIndexer(t *testing.T) {

// Index the initial blocks from ancient store
indexer := &txIndexer{
limit: 0,
db: db,
progress: make(chan chan TxIndexProgress),
limit: 0,
db: db,
}
for i, limit := range c.limits {
indexer.limit = limit
Expand Down Expand Up @@ -241,9 +240,8 @@ func TestTxIndexerRepair(t *testing.T) {

// Index the initial blocks from ancient store
indexer := &txIndexer{
limit: c.limit,
db: db,
progress: make(chan chan TxIndexProgress),
limit: c.limit,
db: db,
}
indexer.run(chainHead, make(chan struct{}), make(chan struct{}))

Expand Down Expand Up @@ -432,15 +430,11 @@ func TestTxIndexerReport(t *testing.T) {

// Index the initial blocks from ancient store
indexer := &txIndexer{
limit: c.limit,
cutoff: c.cutoff,
db: db,
progress: make(chan chan TxIndexProgress),
limit: c.limit,
cutoff: c.cutoff,
db: db,
}
if c.tail != nil {
rawdb.WriteTxIndexTail(db, *c.tail)
}
p := indexer.report(c.head)
p := indexer.report(c.head, c.tail)
if p.Indexed != c.expIndexed {
t.Fatalf("Unexpected indexed: %d, expected: %d", p.Indexed, c.expIndexed)
}
Expand Down
28 changes: 13 additions & 15 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,20 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction
// GetTransaction retrieves the lookup along with the transaction itself associate
// with the given transaction hash.
//
// An error will be returned if the transaction is not found, and background
// indexing for transactions is still in progress. The error is used to indicate the
// scenario explicitly that the transaction might be reachable shortly.
//
// A null will be returned in the transaction is not found and background transaction
// indexing is already finished. The transaction is not existent from the perspective
// of node.
func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
lookup, tx, err := b.eth.blockchain.GetTransactionLookup(txHash)
if err != nil {
return false, nil, common.Hash{}, 0, 0, err
}
// A null will be returned if the transaction is not found. The transaction is not
// existent from the node's perspective. This can be due to the transaction indexer
// not being finished. The caller must explicitly check the indexer progress.
func (b *EthAPIBackend) GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
lookup, tx := b.eth.blockchain.GetTransactionLookup(txHash)
if lookup == nil || tx == nil {
return false, nil, common.Hash{}, 0, 0, nil
return false, nil, common.Hash{}, 0, 0
}
return true, tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index, nil
return true, tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index
}

// TxIndexDone returns true if the transaction indexer has finished indexing.
func (b *EthAPIBackend) TxIndexDone() bool {
return b.eth.blockchain.TxIndexDone()
}

func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
Expand All @@ -391,7 +389,7 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S
return b.eth.txPool.SubscribeTransactions(ch, true)
}

func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {
func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress {
prog := b.eth.Downloader().Progress()
if txProg, err := b.eth.blockchain.TxIndexProgress(); err == nil {
prog.TxIndexFinishedBlocks = txProg.Indexed
Expand Down
14 changes: 8 additions & 6 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ type Backend interface {
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error)
GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64)
TxIndexDone() bool
RPCGasCap() uint64
ChainConfig() *params.ChainConfig
Engine() consensus.Engine
Expand Down Expand Up @@ -858,12 +859,13 @@ func containsTx(block *types.Block, hash common.Hash) bool {
// TraceTransaction returns the structured logs created during the execution of EVM
// and returns them as a JSON object.
func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) {
found, _, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash)
if err != nil {
return nil, ethapi.NewTxIndexingError()
}
// Only mined txes are supported
found, _, blockHash, blockNumber, index := api.backend.GetTransaction(hash)
if !found {
// Warn in case tx indexer is not done.
if !api.backend.TxIndexDone() {
return nil, ethapi.NewTxIndexingError()
}
// Only mined txes are supported
return nil, errTxNotFound
}
// It shouldn't happen in practice.
Expand Down
8 changes: 6 additions & 2 deletions eth/tracers/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ func (b *testBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber)
return b.chain.GetBlockByNumber(uint64(number)), nil
}

func (b *testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
func (b *testBackend) GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
tx, hash, blockNumber, index := rawdb.ReadTransaction(b.chaindb, txHash)
return tx != nil, tx, hash, blockNumber, index, nil
return tx != nil, tx, hash, blockNumber, index
}

func (b *testBackend) TxIndexDone() bool {
return true
}

func (b *testBackend) RPCGasCap() uint64 {
Expand Down
6 changes: 3 additions & 3 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type backend interface {
CurrentHeader() *types.Header
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
Stats() (pending int, queued int)
SyncProgress() ethereum.SyncProgress
SyncProgress(ctx context.Context) ethereum.SyncProgress
}

// fullNodeBackend encompasses the functionality necessary for a full node
Expand Down Expand Up @@ -766,7 +766,7 @@ func (s *Service) reportStats(conn *connWrapper) error {
)
// check if backend is a full node
if fullBackend, ok := s.backend.(fullNodeBackend); ok {
sync := fullBackend.SyncProgress()
sync := fullBackend.SyncProgress(context.Background())
syncing = !sync.Done()

price, _ := fullBackend.SuggestGasTipCap(context.Background())
Expand All @@ -775,7 +775,7 @@ func (s *Service) reportStats(conn *connWrapper) error {
gasprice += int(basefee.Uint64())
}
} else {
sync := s.backend.SyncProgress()
sync := s.backend.SyncProgress(context.Background())
syncing = !sync.Done()
}
// Assemble the node stats and send it to the server
Expand Down
6 changes: 3 additions & 3 deletions graphql/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (t *Transaction) resolve(ctx context.Context) (*types.Transaction, *Block)
return t.tx, t.block
}
// Try to return an already finalized transaction
found, tx, blockHash, _, index, _ := t.r.backend.GetTransaction(ctx, t.hash)
found, tx, blockHash, _, index := t.r.backend.GetTransaction(t.hash)
if found {
t.tx = tx
blockNrOrHash := rpc.BlockNumberOrHashWithHash(blockHash, false)
Expand Down Expand Up @@ -1530,8 +1530,8 @@ func (s *SyncState) TxIndexRemainingBlocks() hexutil.Uint64 {
// - healingBytecode: number of bytecodes pending
// - txIndexFinishedBlocks: number of blocks whose transactions are indexed
// - txIndexRemainingBlocks: number of blocks whose transactions are not indexed yet
func (r *Resolver) Syncing() (*SyncState, error) {
progress := r.backend.SyncProgress()
func (r *Resolver) Syncing(ctx context.Context) (*SyncState, error) {
progress := r.backend.SyncProgress(ctx)

// Return not syncing if the synchronisation already completed
if progress.Done() {
Expand Down
Loading