Skip to content

Commit ec1616d

Browse files
committed
core, eth, internal: split txIndexProgress from the main loop
1 parent a92dadc commit ec1616d

File tree

12 files changed

+57
-73
lines changed

12 files changed

+57
-73
lines changed

core/blockchain_reader.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package core
1818

1919
import (
20-
"context"
2120
"errors"
2221

2322
"github.com/ethereum/go-ethereum/common"
@@ -299,8 +298,8 @@ func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLoo
299298
}
300299

301300
// TxIndexDone returns true if the transaction indexer has finished indexing.
302-
func (bc *BlockChain) TxIndexDone(ctx context.Context) bool {
303-
progress, err := bc.TxIndexProgress(ctx)
301+
func (bc *BlockChain) TxIndexDone() bool {
302+
progress, err := bc.TxIndexProgress()
304303
if err != nil {
305304
// No error is returned if the transaction indexing progress is unreachable
306305
// due to unexpected internal errors. In such cases, it is impossible to
@@ -403,11 +402,11 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
403402
}
404403

405404
// TxIndexProgress returns the transaction indexing progress.
406-
func (bc *BlockChain) TxIndexProgress(ctx context.Context) (TxIndexProgress, error) {
405+
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
407406
if bc.txIndexer == nil {
408407
return TxIndexProgress{}, errors.New("tx indexer is not enabled")
409408
}
410-
return bc.txIndexer.txIndexProgress(ctx)
409+
return bc.txIndexer.txIndexProgress(), nil
411410
}
412411

413412
// HistoryPruningCutoff returns the configured history pruning point.

core/txindexer.go

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
package core
1818

1919
import (
20-
"context"
21-
"errors"
2220
"fmt"
21+
"sync/atomic"
2322

2423
"github.com/ethereum/go-ethereum/common"
2524
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -48,26 +47,35 @@ type txIndexer struct {
4847
// and all others shouldn't.
4948
limit uint64
5049

50+
// The current tail of the indexed transactions, null indicates
51+
// that no transactions have been indexed yet.
52+
//
53+
// This field is accessed by both the indexer and the indexing
54+
// progress queries.
55+
tail atomic.Pointer[uint64]
56+
5157
// cutoff denotes the block number before which the chain segment should
5258
// be pruned and not available locally.
53-
cutoff uint64
54-
db ethdb.Database
55-
progress chan chan TxIndexProgress
56-
term chan chan struct{}
57-
closed chan struct{}
59+
cutoff uint64
60+
chain *BlockChain
61+
db ethdb.Database
62+
term chan chan struct{}
63+
closed chan struct{}
5864
}
5965

6066
// newTxIndexer initializes the transaction indexer.
6167
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
6268
cutoff, _ := chain.HistoryPruningCutoff()
6369
indexer := &txIndexer{
64-
limit: limit,
65-
cutoff: cutoff,
66-
db: chain.db,
67-
progress: make(chan chan TxIndexProgress),
68-
term: make(chan chan struct{}),
69-
closed: make(chan struct{}),
70+
limit: limit,
71+
cutoff: cutoff,
72+
chain: chain,
73+
db: chain.db,
74+
term: make(chan chan struct{}),
75+
closed: make(chan struct{}),
7076
}
77+
indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db))
78+
7179
go indexer.loop(chain)
7280

7381
var msg string
@@ -217,10 +225,9 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
217225

218226
// Listening to chain events and manipulate the transaction indexes.
219227
var (
220-
stop chan struct{} // Non-nil if background routine is active
221-
done chan struct{} // Non-nil if background routine is active
222-
head = indexer.resolveHead() // The latest announced chain head
223-
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
228+
stop chan struct{} // Non-nil if background routine is active
229+
done chan struct{} // Non-nil if background routine is active
230+
head = indexer.resolveHead() // The latest announced chain head
224231

225232
headCh = make(chan ChainHeadEvent)
226233
sub = chain.SubscribeChainHeadEvent(headCh)
@@ -245,13 +252,10 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
245252
done = make(chan struct{})
246253
go indexer.run(h.Header.Number.Uint64(), stop, done)
247254
}
248-
head = h.Header.Number.Uint64()
249255
case <-done:
250256
stop = nil
251257
done = nil
252-
lastTail = rawdb.ReadTxIndexTail(indexer.db)
253-
case ch := <-indexer.progress:
254-
ch <- indexer.report(head, lastTail)
258+
indexer.tail.Store(rawdb.ReadTxIndexTail(indexer.db))
255259
case ch := <-indexer.term:
256260
if stop != nil {
257261
close(stop)
@@ -302,25 +306,12 @@ func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
302306
}
303307
}
304308

305-
// txIndexProgress retrieves the tx indexing progress, or an error if the
306-
// background tx indexer is already stopped.
307-
func (indexer *txIndexer) txIndexProgress(ctx context.Context) (TxIndexProgress, error) {
308-
ch := make(chan TxIndexProgress, 1)
309-
select {
310-
case indexer.progress <- ch:
311-
select {
312-
case prog := <-ch:
313-
return prog, nil
314-
case <-ctx.Done():
315-
// Since the channel is buffered the loop will not block
316-
// eventually when it prepares the response.
317-
return TxIndexProgress{}, ctx.Err()
318-
}
319-
case <-indexer.closed:
320-
return TxIndexProgress{}, errors.New("indexer is closed")
321-
case <-ctx.Done():
322-
return TxIndexProgress{}, ctx.Err()
323-
}
309+
// txIndexProgress retrieves the transaction indexing progress. The reported
310+
// progress may slightly lag behind the actual indexing state, as the tail is
311+
// only updated at the end of each indexing operation. However, this delay is
312+
// considered acceptable.
313+
func (indexer *txIndexer) txIndexProgress() TxIndexProgress {
314+
return indexer.report(indexer.resolveHead(), indexer.tail.Load())
324315
}
325316

326317
// close shutdown the indexer. Safe to be called for multiple times.

core/txindexer_test.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,8 @@ func TestTxIndexer(t *testing.T) {
121121

122122
// Index the initial blocks from ancient store
123123
indexer := &txIndexer{
124-
limit: 0,
125-
db: db,
126-
progress: make(chan chan TxIndexProgress),
124+
limit: 0,
125+
db: db,
127126
}
128127
for i, limit := range c.limits {
129128
indexer.limit = limit
@@ -241,9 +240,8 @@ func TestTxIndexerRepair(t *testing.T) {
241240

242241
// Index the initial blocks from ancient store
243242
indexer := &txIndexer{
244-
limit: c.limit,
245-
db: db,
246-
progress: make(chan chan TxIndexProgress),
243+
limit: c.limit,
244+
db: db,
247245
}
248246
indexer.run(chainHead, make(chan struct{}), make(chan struct{}))
249247

@@ -432,13 +430,9 @@ func TestTxIndexerReport(t *testing.T) {
432430

433431
// Index the initial blocks from ancient store
434432
indexer := &txIndexer{
435-
limit: c.limit,
436-
cutoff: c.cutoff,
437-
db: db,
438-
progress: make(chan chan TxIndexProgress),
439-
}
440-
if c.tail != nil {
441-
rawdb.WriteTxIndexTail(db, *c.tail)
433+
limit: c.limit,
434+
cutoff: c.cutoff,
435+
db: db,
442436
}
443437
p := indexer.report(c.head, c.tail)
444438
if p.Indexed != c.expIndexed {

eth/api_backend.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,8 @@ func (b *EthAPIBackend) GetTransaction(txHash common.Hash) (bool, *types.Transac
361361
}
362362

363363
// TxIndexDone returns true if the transaction indexer has finished indexing.
364-
func (b *EthAPIBackend) TxIndexDone(ctx context.Context) bool {
365-
return b.eth.blockchain.TxIndexDone(ctx)
364+
func (b *EthAPIBackend) TxIndexDone() bool {
365+
return b.eth.blockchain.TxIndexDone()
366366
}
367367

368368
func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
@@ -391,7 +391,7 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S
391391

392392
func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress {
393393
prog := b.eth.Downloader().Progress()
394-
if txProg, err := b.eth.blockchain.TxIndexProgress(ctx); err == nil {
394+
if txProg, err := b.eth.blockchain.TxIndexProgress(); err == nil {
395395
prog.TxIndexFinishedBlocks = txProg.Indexed
396396
prog.TxIndexRemainingBlocks = txProg.Remaining
397397
}

eth/downloader/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (api *DownloaderAPI) eventLoop() {
7777

7878
getProgress = func() ethereum.SyncProgress {
7979
prog := api.d.Progress()
80-
if txProg, err := api.chain.TxIndexProgress(context.Background()); err == nil {
80+
if txProg, err := api.chain.TxIndexProgress(); err == nil {
8181
prog.TxIndexFinishedBlocks = txProg.Indexed
8282
prog.TxIndexRemainingBlocks = txProg.Remaining
8383
}

eth/tracers/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type Backend interface {
8383
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
8484
BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
8585
GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64)
86-
TxIndexDone(ctx context.Context) bool
86+
TxIndexDone() bool
8787
RPCGasCap() uint64
8888
ChainConfig() *params.ChainConfig
8989
Engine() consensus.Engine
@@ -862,7 +862,7 @@ func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *
862862
found, _, blockHash, blockNumber, index := api.backend.GetTransaction(hash)
863863
if !found {
864864
// Warn in case tx indexer is not done.
865-
if !api.backend.TxIndexDone(ctx) {
865+
if !api.backend.TxIndexDone() {
866866
return nil, ethapi.NewTxIndexingError()
867867
}
868868
// Only mined txes are supported

eth/tracers/api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (b *testBackend) GetTransaction(txHash common.Hash) (bool, *types.Transacti
121121
return tx != nil, tx, hash, blockNumber, index
122122
}
123123

124-
func (b *testBackend) TxIndexDone(ctx context.Context) bool {
124+
func (b *testBackend) TxIndexDone() bool {
125125
return true
126126
}
127127

ethclient/ethclient_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func newTestBackend(config *node.Config) (*node.Node, []*types.Block, error) {
119119
}
120120
// Ensure the tx indexing is fully generated
121121
for ; ; time.Sleep(time.Millisecond * 100) {
122-
progress, err := ethservice.BlockChain().TxIndexProgress(context.Background())
122+
progress, err := ethservice.BlockChain().TxIndexProgress()
123123
if err == nil && progress.Done() {
124124
break
125125
}

internal/ethapi/api.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,7 +1340,7 @@ func (api *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common
13401340
return NewRPCPendingTransaction(tx, api.b.CurrentHeader(), api.b.ChainConfig()), nil
13411341
}
13421342
// If also not in the pool there is a chance the tx indexer is still in progress.
1343-
if !api.b.TxIndexDone(ctx) {
1343+
if !api.b.TxIndexDone() {
13441344
return nil, NewTxIndexingError()
13451345
}
13461346
// If the transaction is not found in the pool and the indexer is done, return nil
@@ -1362,7 +1362,7 @@ func (api *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash com
13621362
return tx.MarshalBinary()
13631363
}
13641364
// If also not in the pool there is a chance the tx indexer is still in progress.
1365-
if !api.b.TxIndexDone(ctx) {
1365+
if !api.b.TxIndexDone() {
13661366
return nil, NewTxIndexingError()
13671367
}
13681368
// If the transaction is not found in the pool and the indexer is done, return nil
@@ -1376,7 +1376,7 @@ func (api *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash commo
13761376
found, tx, blockHash, blockNumber, index := api.b.GetTransaction(hash)
13771377
if !found {
13781378
// Make sure indexer is done.
1379-
if !api.b.TxIndexDone(ctx) {
1379+
if !api.b.TxIndexDone() {
13801380
return nil, NewTxIndexingError()
13811381
}
13821382
// No such tx.
@@ -1786,7 +1786,7 @@ func (api *DebugAPI) GetRawTransaction(ctx context.Context, hash common.Hash) (h
17861786
return tx.MarshalBinary()
17871787
}
17881788
// If also not in the pool there is a chance the tx indexer is still in progress.
1789-
if !api.b.TxIndexDone(ctx) {
1789+
if !api.b.TxIndexDone() {
17901790
return nil, NewTxIndexingError()
17911791
}
17921792
// Transaction is not found in the pool and the indexer is done.

internal/ethapi/api_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ func (b testBackend) GetTransaction(txHash common.Hash) (bool, *types.Transactio
595595
tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.db, txHash)
596596
return true, tx, blockHash, blockNumber, index
597597
}
598-
func (b testBackend) TxIndexDone(ctx context.Context) bool {
598+
func (b testBackend) TxIndexDone() bool {
599599
return true
600600
}
601601
func (b testBackend) GetPoolTransactions() (types.Transactions, error) { panic("implement me") }

internal/ethapi/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type Backend interface {
7575
// Transaction pool API
7676
SendTx(ctx context.Context, signedTx *types.Transaction) error
7777
GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64)
78-
TxIndexDone(ctx context.Context) bool
78+
TxIndexDone() bool
7979
GetPoolTransactions() (types.Transactions, error)
8080
GetPoolTransaction(txHash common.Hash) *types.Transaction
8181
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)

internal/ethapi/transaction_args_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func (b *backendMock) SendTx(ctx context.Context, signedTx *types.Transaction) e
383383
func (b *backendMock) GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
384384
return false, nil, [32]byte{}, 0, 0
385385
}
386-
func (b *backendMock) TxIndexDone(ctx context.Context) bool { return true }
386+
func (b *backendMock) TxIndexDone() bool { return true }
387387
func (b *backendMock) GetPoolTransactions() (types.Transactions, error) { return nil, nil }
388388
func (b *backendMock) GetPoolTransaction(txHash common.Hash) *types.Transaction { return nil }
389389
func (b *backendMock) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {

0 commit comments

Comments
 (0)