From 8b1664dac816e13b5b40c5de253c6850663beb8d Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 18 Dec 2023 15:39:52 +0800 Subject: [PATCH 01/11] core, core/rawdb, eth/sync: no tx indexing during snap sync --- core/blockchain.go | 117 ++++++++++-------------------- core/rawdb/accessors_chain.go | 17 ----- core/rawdb/chain_iterator.go | 52 +++++++------ core/rawdb/chain_iterator_test.go | 10 +-- core/rawdb/database.go | 1 - core/rawdb/schema.go | 2 + eth/sync.go | 18 ----- 7 files changed, 74 insertions(+), 143 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index f458da82573e..ea11ba1e6011 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1155,14 +1155,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Ensure genesis is in ancients. if first.NumberU64() == 1 { if frozen, _ := bc.db.Ancients(); frozen == 0 { - b := bc.genesisBlock td := bc.genesisBlock.Difficulty() - writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td) - size += writeSize + writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td) if err != nil { log.Error("Error writing genesis to ancients", "err", err) return 0, err } + size += writeSize log.Info("Wrote genesis to ancients") } } @@ -1176,44 +1175,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Write all chain data to ancients. td := bc.GetTd(first.Hash(), first.NumberU64()) writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td) - size += writeSize if err != nil { log.Error("Error importing chain data to ancients", "err", err) return 0, err } - - // Write tx indices if any condition is satisfied: - // * If user requires to reserve all tx indices(txlookuplimit=0) - // * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) - // * If block number is large enough to be regarded as a recent block - // It means blocks below the ancientLimit-txlookupLimit won't be indexed. - // - // But if the `TxIndexTail` is not nil, e.g. Geth is initialized with - // an external ancient database, during the setup, blockchain will start - // a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) - // range. In this case, all tx indices of newly imported blocks should be - // generated. - batch := bc.db.NewBatch() - for i, block := range blockChain { - if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { - rawdb.WriteTxLookupEntriesByBlock(batch, block) - } else if rawdb.ReadTxIndexTail(bc.db) != nil { - rawdb.WriteTxLookupEntriesByBlock(batch, block) - } - stats.processed++ - - if batch.ValueSize() > ethdb.IdealBatchSize || i == len(blockChain)-1 { - size += int64(batch.ValueSize()) - if err = batch.Write(); err != nil { - snapBlock := bc.CurrentSnapBlock().Number.Uint64() - if _, err := bc.db.TruncateHead(snapBlock + 1); err != nil { - log.Error("Can't truncate ancient store after failed insert", "err", err) - } - return 0, err - } - batch.Reset() - } - } + size += writeSize // Sync the ancient store explicitly to ensure all data has been flushed to disk. if err := bc.db.Sync(); err != nil { @@ -1231,8 +1197,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Delete block data from the main database. - batch.Reset() - canonHashes := make(map[common.Hash]struct{}) + var ( + batch = bc.db.NewBatch() + canonHashes = make(map[common.Hash]struct{}) + ) for _, block := range blockChain { canonHashes[block.Hash()] = struct{}{} if block.NumberU64() == 0 { @@ -1255,8 +1223,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // writeLive writes blockchain and corresponding receipt chain into active store. writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { - skipPresenceCheck := false - batch := bc.db.NewBatch() + var ( + skipPresenceCheck = false + batch = bc.db.NewBatch() + ) for i, block := range blockChain { // Short circuit insertion if shutting down or processing failed if bc.insertStopped() { @@ -1281,11 +1251,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // Write all the data out into the database rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) - rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed // Write everything belongs to the blocks into the database. So that - // we can ensure all components of body is completed(body, receipts, - // tx indexes) + // we can ensure all components of body is completed(body, receipts) + // except transaction indexes(will be created once sync is finished). if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { return 0, err @@ -1317,19 +1286,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return n, err } } - // Write the tx index tail (block number from where we index) before write any live blocks - if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 { - // The tx index tail can only be one of the following two options: - // * 0: all ancient blocks have been indexed - // * ancient-limit: the indices of blocks before ancient-limit are ignored - if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil { - if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit { - rawdb.WriteTxIndexTail(bc.db, 0) - } else { - rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit) - } - } - } if len(liveBlocks) > 0 { if n, err := writeLive(liveBlocks, liveReceipts); err != nil { if err == errInsertionInterrupted { @@ -1338,13 +1294,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return n, err } } - - head := blockChain[len(blockChain)-1] - context := []interface{}{ - "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), - "number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)), - "size", common.StorageSize(size), - } + var ( + head = blockChain[len(blockChain)-1] + context = []interface{}{ + "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), + "number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)), + "size", common.StorageSize(size), + } + ) if stats.ignored > 0 { context = append(context, []interface{}{"ignored", stats.ignored}...) } @@ -1360,7 +1317,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e if bc.insertStopped() { return errInsertionInterrupted } - batch := bc.db.NewBatch() rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) rawdb.WriteBlock(batch, block) @@ -2427,23 +2383,24 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool { func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) { defer func() { close(done) }() - // If head is 0, it means the chain is just initialized and no blocks are inserted, - // so don't need to indexing anything. + // If head is 0, it means the chain is just initialized and no blocks are + // inserted, so don't need to index anything. if head == 0 { return } - // The tail flag is not existent, it means the node is just initialized - // and all blocks(may from ancient store) are not indexed yet. + // and all blocks in the chain (part of them may from ancient store) are + // not indexed yet, index the chain according to the configuration then. if tail == nil { from := uint64(0) if bc.txLookupLimit != 0 && head >= bc.txLookupLimit { from = head - bc.txLookupLimit + 1 } - rawdb.IndexTransactions(bc.db, from, head+1, bc.quit) + rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true) return } - // The tail flag is existent, but the whole chain is required to be indexed. + // The tail flag is existent (which means indexes in [tail, head] should be + // present), while the whole chain are requested for indexing. if bc.txLookupLimit == 0 || head < bc.txLookupLimit { if *tail > 0 { // It can happen when chain is rewound to a historical point which @@ -2453,17 +2410,18 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) if end > head+1 { end = head + 1 } - rawdb.IndexTransactions(bc.db, 0, end, bc.quit) + rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true) } return } - // Update the transaction index to the new chain state + // The tail flag is existent, adjust the index range according to configuration + // and latest head. if head-bc.txLookupLimit+1 < *tail { // Reindex a part of missing indices and rewind index tail to HEAD-limit - rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit) + rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true) } else { // Unindex a part of stale indices and forward index tail to HEAD-limit - rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit) + rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false) } } @@ -2492,14 +2450,13 @@ func (bc *BlockChain) maintainTxIndex() { defer sub.Unsubscribe() log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit()) - // Launch the initial processing if chain is not empty. This step is - // useful in these scenarios that chain has no progress and indexer - // is never triggered. - if head := rawdb.ReadHeadBlock(bc.db); head != nil { + // Launch the initial processing if chain is not empty (head != genesis). + // This step is useful in these scenarios that chain has no progress and + // indexer is never triggered. + if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 { done = make(chan struct{}) go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done) } - for { select { case head := <-headCh: diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index d9a89fe90c99..964b3a311de5 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -278,23 +278,6 @@ func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) { } } -// ReadFastTxLookupLimit retrieves the tx lookup limit used in fast sync. -func ReadFastTxLookupLimit(db ethdb.KeyValueReader) *uint64 { - data, _ := db.Get(fastTxLookupLimitKey) - if len(data) != 8 { - return nil - } - number := binary.BigEndian.Uint64(data) - return &number -} - -// WriteFastTxLookupLimit stores the txlookup limit used in fast sync into database. -func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) { - if err := db.Put(fastTxLookupLimitKey, encodeBlockNumber(number)); err != nil { - log.Crit("Failed to store transaction lookup limit for fast sync", "err", err) - } -} - // ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going // backwards towards genesis. This method assumes that the caller already has // placed a cap on count, to prevent DoS issues. diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 56bb15b718ad..759e5913d13f 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -178,7 +178,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { +func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { // short circuit for invalid range if from >= to { return @@ -188,13 +188,13 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan batch = db.NewBatch() start = time.Now() logged = start.Add(-7 * time.Second) + // Since we iterate in reverse, we expect the first number to come // in to be [to-1]. Therefore, setting lastNum to means that the - // prqueue gap-evaluation will work correctly - lastNum = to - queue = prque.New[int64, *blockTxHashes](nil) - // for stats reporting - blocks, txs = 0, 0 + // queue gap-evaluation will work correctly + lastNum = to + queue = prque.New[int64, *blockTxHashes](nil) + blocks, txs = 0, 0 // for stats reporting ) for chanDelivery := range hashesCh { // Push the delivery into the queue and process contiguous ranges. @@ -240,11 +240,15 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan log.Crit("Failed writing batch to db", "error", err) return } + logger := log.Debug + if report { + logger = log.Info + } select { case <-interrupt: - log.Debug("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) + logger("Transaction indexing interrupted", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) default: - log.Debug("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) + logger("Indexed transactions", "blocks", blocks, "txs", txs, "tail", lastNum, "elapsed", common.PrettyDuration(time.Since(start))) } } @@ -257,20 +261,20 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { - indexTransactions(db, from, to, interrupt, nil) +func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) { + indexTransactions(db, from, to, interrupt, nil, report) } // indexTransactionsForTesting is the internal debug version with an additional hook. func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { - indexTransactions(db, from, to, interrupt, hook) + indexTransactions(db, from, to, interrupt, hook, false) } // unindexTransactions removes txlookup indices of the specified block range. // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { +func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { // short circuit for invalid range if from >= to { return @@ -280,12 +284,12 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch batch = db.NewBatch() start = time.Now() logged = start.Add(-7 * time.Second) + // we expect the first number to come in to be [from]. Therefore, setting - // nextNum to from means that the prqueue gap-evaluation will work correctly - nextNum = from - queue = prque.New[int64, *blockTxHashes](nil) - // for stats reporting - blocks, txs = 0, 0 + // nextNum to from means that the queue gap-evaluation will work correctly + nextNum = from + queue = prque.New[int64, *blockTxHashes](nil) + blocks, txs = 0, 0 // for stats reporting ) // Otherwise spin up the concurrent iterator and unindexer for delivery := range hashesCh { @@ -332,11 +336,15 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch log.Crit("Failed writing batch to db", "error", err) return } + logger := log.Debug + if report { + logger = log.Info + } select { case <-interrupt: - log.Debug("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) + logger("Transaction unindexing interrupted", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) default: - log.Debug("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) + logger("Unindexed transactions", "blocks", blocks, "txs", txs, "tail", to, "elapsed", common.PrettyDuration(time.Since(start))) } } @@ -345,11 +353,11 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}) { - unindexTransactions(db, from, to, interrupt, nil) +func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) { + unindexTransactions(db, from, to, interrupt, nil, report) } // unindexTransactionsForTesting is the internal debug version with an additional hook. func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { - unindexTransactions(db, from, to, interrupt, hook) + unindexTransactions(db, from, to, interrupt, hook, false) } diff --git a/core/rawdb/chain_iterator_test.go b/core/rawdb/chain_iterator_test.go index 9580cd92a873..78b0a82e10fe 100644 --- a/core/rawdb/chain_iterator_test.go +++ b/core/rawdb/chain_iterator_test.go @@ -162,18 +162,18 @@ func TestIndexTransactions(t *testing.T) { t.Fatalf("Transaction tail mismatch") } } - IndexTransactions(chainDb, 5, 11, nil) + IndexTransactions(chainDb, 5, 11, nil, false) verify(5, 11, true, 5) verify(0, 5, false, 5) - IndexTransactions(chainDb, 0, 5, nil) + IndexTransactions(chainDb, 0, 5, nil, false) verify(0, 11, true, 0) - UnindexTransactions(chainDb, 0, 5, nil) + UnindexTransactions(chainDb, 0, 5, nil, false) verify(5, 11, true, 5) verify(0, 5, false, 5) - UnindexTransactions(chainDb, 5, 11, nil) + UnindexTransactions(chainDb, 5, 11, nil, false) verify(0, 11, false, 11) // Testing corner cases @@ -190,7 +190,7 @@ func TestIndexTransactions(t *testing.T) { }) verify(9, 11, true, 9) verify(0, 9, false, 9) - IndexTransactions(chainDb, 0, 9, nil) + IndexTransactions(chainDb, 0, 9, nil, false) signal = make(chan struct{}) var once2 sync.Once diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 18b5bccb517c..27a9ec7412ca 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -657,7 +657,6 @@ func ReadChainMetadata(db ethdb.KeyValueStore) [][]string { {"snapshotRecoveryNumber", pp(ReadSnapshotRecoveryNumber(db))}, {"snapshotRoot", fmt.Sprintf("%v", ReadSnapshotRoot(db))}, {"txIndexTail", pp(ReadTxIndexTail(db))}, - {"fastTxLookupLimit", pp(ReadFastTxLookupLimit(db))}, } if b := ReadSkeletonSyncStatus(db); b != nil { data = append(data, []string{"SkeletonSyncStatus", string(b)}) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index be037235533a..11cf5b40fef6 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -80,6 +80,8 @@ var ( txIndexTailKey = []byte("TransactionIndexTail") // fastTxLookupLimitKey tracks the transaction lookup limit during fast sync. + // This flag is deprecated, it's kept to avoid reporting errors when inspect + // database. fastTxLookupLimitKey = []byte("FastTransactionLookupLimit") // badBlockKey tracks the list of bad blocks seen by local diff --git a/eth/sync.go b/eth/sync.go index c7ba7c93d6e9..c2a0f453bf78 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -228,24 +228,6 @@ func (cs *chainSyncer) startSync(op *chainSyncOp) { // doSync synchronizes the local blockchain with a remote peer. func (h *handler) doSync(op *chainSyncOp) error { - if op.mode == downloader.SnapSync { - // Before launch the snap sync, we have to ensure user uses the same - // txlookup limit. - // The main concern here is: during the snap sync Geth won't index the - // block(generate tx indices) before the HEAD-limit. But if user changes - // the limit in the next snap sync(e.g. user kill Geth manually and - // restart) then it will be hard for Geth to figure out the oldest block - // has been indexed. So here for the user-experience wise, it's non-optimal - // that user can't change limit during the snap sync. If changed, Geth - // will just blindly use the original one. - limit := h.chain.TxLookupLimit() - if stored := rawdb.ReadFastTxLookupLimit(h.database); stored == nil { - rawdb.WriteFastTxLookupLimit(h.database, limit) - } else if *stored != limit { - h.chain.SetTxLookupLimit(*stored) - log.Warn("Update txLookup limit", "provided", limit, "updated", *stored) - } - } // Run the sync cycle, and disable snap sync if we're past the pivot block err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, h.chain.Config().TerminalTotalDifficulty, op.mode) if err != nil { From 076ba509be195662c8fe84216d697eb5325fa2c0 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 19 Dec 2023 15:18:06 +0800 Subject: [PATCH 02/11] core, eth, internal: return tx indexing progress --- core/blockchain.go | 73 ++++++++++++++++++++++++++++++++++----- core/blockchain_reader.go | 32 ++++++++++++----- eth/api_backend.go | 11 ++++-- internal/ethapi/api.go | 35 +++++++------------ 4 files changed, 110 insertions(+), 41 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ea11ba1e6011..8a3620b2afee 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -185,6 +185,29 @@ func DefaultCacheConfigWithScheme(scheme string) *CacheConfig { return &config } +// txLookup is wrapper over transaction lookup along with the corresponding +// transaction itself. +type txLookup struct { + lookup *rawdb.LegacyTxLookupEntry + transaction *types.Transaction +} + +// txIndexProgress is the struct describing the progress for transaction indexing. +type txIndexProgress struct { + tail uint64 // the oldest block indexed for transactions + head uint64 // the latest block indexed for transactions + limit uint64 // the number of blocks required for transaction indexing(0 means the whole chain) +} + +// Error implements Error returning the progress in string format. +func (prog txIndexProgress) Error() string { + limit := "entire chain" + if prog.limit != 0 { + limit = fmt.Sprintf("last %d blocks", prog.limit) + } + return fmt.Sprintf("index-tail: %d, index-head: %d, limit: %s", prog.tail, prog.head, limit) +} + // BlockChain represents the canonical chain given a database with a genesis // block. The Blockchain manages chain imports, reverts, chain reorganisations. // @@ -242,15 +265,16 @@ type BlockChain struct { bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] receiptsCache *lru.Cache[common.Hash, []*types.Receipt] blockCache *lru.Cache[common.Hash, *types.Block] - txLookupCache *lru.Cache[common.Hash, *rawdb.LegacyTxLookupEntry] + txLookupCache *lru.Cache[common.Hash, txLookup] // future blocks are blocks added for later processing futureBlocks *lru.Cache[common.Hash, *types.Block] - wg sync.WaitGroup // - quit chan struct{} // shutdown signal, closed in Stop. - stopping atomic.Bool // false if chain is running, true when stopped - procInterrupt atomic.Bool // interrupt signaler for block processing + wg sync.WaitGroup + quit chan struct{} // shutdown signal, closed in Stop. + stopping atomic.Bool // false if chain is running, true when stopped + procInterrupt atomic.Bool // interrupt signaler for block processing + txIndexProgCh chan chan txIndexProgress // chan for querying the progress of transaction indexing engine consensus.Engine validator Validator // Block and state validator interface @@ -297,8 +321,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), - txLookupCache: lru.NewCache[common.Hash, *rawdb.LegacyTxLookupEntry](txLookupCacheLimit), + txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), + txIndexProgCh: make(chan chan txIndexProgress), engine: engine, vmConfig: vmConfig, } @@ -2425,6 +2450,34 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) } } +// reportTxIndexProgress returns the tx indexing progress. +func (bc *BlockChain) reportTxIndexProgress(head uint64) txIndexProgress { + tail := rawdb.ReadTxIndexTail(bc.db) + if tail == nil { + return txIndexProgress{ + tail: 0, // not indexed yet + head: 0, // not indexed yet + limit: bc.txLookupLimit, + } + } + return txIndexProgress{ + tail: *tail, + head: head, + limit: bc.txLookupLimit, + } +} + +// askTxIndexProgress retrieves the tx indexing progress. +func (bc *BlockChain) askTxIndexProgress() (txIndexProgress, error) { + ch := make(chan txIndexProgress, 1) + select { + case bc.txIndexProgCh <- ch: + return <-ch, nil + case <-bc.quit: + return txIndexProgress{}, errors.New("blockchain is closed") + } +} + // maintainTxIndex is responsible for the construction and deletion of the // transaction index. // @@ -2440,8 +2493,9 @@ func (bc *BlockChain) maintainTxIndex() { // Listening to chain events and manipulate the transaction indexes. var ( - done chan struct{} // Non-nil if background unindexing or reindexing routine is active. - headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed + done chan struct{} // Non-nil if background unindexing or reindexing routine is active. + lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created) + headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed ) sub := bc.SubscribeChainHeadEvent(headCh) if sub == nil { @@ -2464,8 +2518,11 @@ func (bc *BlockChain) maintainTxIndex() { done = make(chan struct{}) go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done) } + lastHead = head.Block.NumberU64() case <-done: done = nil + case ch := <-bc.txIndexProgCh: + ch <- bc.reportTxIndexProgress(lastHead) case <-bc.quit: if done != nil { log.Info("Waiting background transaction indexer to exit") diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 466a86c14415..9457d1e67234 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -254,20 +254,34 @@ func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, max return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical) } -// GetTransactionLookup retrieves the lookup associate with the given transaction -// hash from the cache or database. -func (bc *BlockChain) GetTransactionLookup(hash common.Hash) *rawdb.LegacyTxLookupEntry { +// GetTransactionLookup retrieves the lookup along with the transaction itself +// associate with the given transaction hash. A non-nil error will be returned +// if the transaction is not found. +func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction, error) { // Short circuit if the txlookup already in the cache, retrieve otherwise - if lookup, exist := bc.txLookupCache.Get(hash); exist { - return lookup + if item, exist := bc.txLookupCache.Get(hash); exist { + return item.lookup, item.transaction, nil } tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash) if tx == nil { - return nil + // The transaction can either be non-existent, or just not indexed + // yet. Return the tx indexing progress as well for better UX. + progress, err := bc.askTxIndexProgress() + if err != nil { + return nil, nil, err + } + return nil, nil, progress + } + lookup := &rawdb.LegacyTxLookupEntry{ + BlockHash: blockHash, + BlockIndex: blockNumber, + Index: txIndex, } - lookup := &rawdb.LegacyTxLookupEntry{BlockHash: blockHash, BlockIndex: blockNumber, Index: txIndex} - bc.txLookupCache.Add(hash, lookup) - return lookup + bc.txLookupCache.Add(hash, txLookup{ + lookup: lookup, + transaction: tx, + }) + return lookup, tx, nil } // GetTd retrieves a block's total difficulty in the canonical chain from the diff --git a/eth/api_backend.go b/eth/api_backend.go index bc8398d217a1..3cc22dcfe0b4 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -19,6 +19,7 @@ package eth import ( "context" "errors" + "fmt" "math/big" "time" @@ -308,9 +309,15 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction return b.eth.txPool.Get(hash) } +// GetTransaction retrieves the lookup along with the transaction itself associate +// with the given transaction hash. A non-nil error will be returned if the +// transaction is not found. func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { - tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.eth.ChainDb(), txHash) - return tx, blockHash, blockNumber, index, nil + lookup, tx, err := b.eth.blockchain.GetTransactionLookup(txHash) + if err != nil { + return nil, common.Hash{}, 0, 0, fmt.Errorf("tx is not existent or not indexed, %w", err) + } + return tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index, nil } func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index ee479d7139ab..bef323c09012 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1654,22 +1654,17 @@ func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.H // Try to return an already finalized transaction tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) if err != nil { - return nil, err - } - if tx != nil { - header, err := s.b.HeaderByHash(ctx, blockHash) - if err != nil { - return nil, err + // No finalized transaction, try to retrieve it from the pool + if tx := s.b.GetPoolTransaction(hash); tx != nil { + return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil } - return newRPCTransaction(tx, blockHash, blockNumber, header.Time, index, header.BaseFee, s.b.ChainConfig()), nil + return nil, err } - // No finalized transaction, try to retrieve it from the pool - if tx := s.b.GetPoolTransaction(hash); tx != nil { - return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil + header, err := s.b.HeaderByHash(ctx, blockHash) + if err != nil { + return nil, err } - - // Transaction unknown, return as such - return nil, nil + return newRPCTransaction(tx, blockHash, blockNumber, header.Time, index, header.BaseFee, s.b.ChainConfig()), nil } // GetRawTransactionByHash returns the bytes of the transaction for the given hash. @@ -1677,12 +1672,8 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo // Retrieve a finalized transaction, or a pooled otherwise tx, _, _, _, err := s.b.GetTransaction(ctx, hash) if err != nil { - return nil, err - } - if tx == nil { if tx = s.b.GetPoolTransaction(hash); tx == nil { - // Transaction not found anywhere, abort - return nil, nil + return nil, err } } // Serialize to RLP and return @@ -1692,10 +1683,10 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo // GetTransactionReceipt returns the transaction receipt for the given transaction hash. func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) - if tx == nil || err != nil { - // When the transaction doesn't exist, the RPC method should return JSON null - // as per specification. - return nil, nil + if err != nil { + // When the transaction doesn't exist or is not indexed yet, + // the RPC method should return JSON null as per specification. + return nil, err } header, err := s.b.HeaderByHash(ctx, blockHash) if err != nil { From 93d702975691e388b889bc8672bf724b6e6b724a Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 20 Dec 2023 20:56:33 +0800 Subject: [PATCH 03/11] core: remove useless test --- core/blockchain_test.go | 85 ----------------------------------------- 1 file changed, 85 deletions(-) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index bc6f8112f015..c1286705b62f 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -2822,91 +2822,6 @@ func TestTransactionIndices(t *testing.T) { } } -func TestSkipStaleTxIndicesInSnapSync(t *testing.T) { - testSkipStaleTxIndicesInSnapSync(t, rawdb.HashScheme) - testSkipStaleTxIndicesInSnapSync(t, rawdb.PathScheme) -} - -func testSkipStaleTxIndicesInSnapSync(t *testing.T, scheme string) { - // Configure and generate a sample block chain - var ( - key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - address = crypto.PubkeyToAddress(key.PublicKey) - funds = big.NewInt(100000000000000000) - gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}} - signer = types.LatestSigner(gspec.Config) - ) - _, blocks, receipts := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 128, func(i int, block *BlockGen) { - tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key) - if err != nil { - panic(err) - } - block.AddTx(tx) - }) - - check := func(tail *uint64, chain *BlockChain) { - stored := rawdb.ReadTxIndexTail(chain.db) - if tail == nil && stored != nil { - t.Fatalf("Oldest indexded block mismatch, want nil, have %d", *stored) - } - if tail != nil && *stored != *tail { - t.Fatalf("Oldest indexded block mismatch, want %d, have %d", *tail, *stored) - } - if tail != nil { - for i := *tail; i <= chain.CurrentBlock().Number.Uint64(); i++ { - block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) - if block.Transactions().Len() == 0 { - continue - } - for _, tx := range block.Transactions() { - if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index == nil { - t.Fatalf("Miss transaction indice, number %d hash %s", i, tx.Hash().Hex()) - } - } - } - for i := uint64(0); i < *tail; i++ { - block := rawdb.ReadBlock(chain.db, rawdb.ReadCanonicalHash(chain.db, i), i) - if block.Transactions().Len() == 0 { - continue - } - for _, tx := range block.Transactions() { - if index := rawdb.ReadTxLookupEntry(chain.db, tx.Hash()); index != nil { - t.Fatalf("Transaction indice should be deleted, number %d hash %s", i, tx.Hash().Hex()) - } - } - } - } - } - - ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) - if err != nil { - t.Fatalf("failed to create temp freezer db: %v", err) - } - defer ancientDb.Close() - - // Import all blocks into ancient db, only HEAD-32 indices are kept. - l := uint64(32) - chain, err := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil, &l) - if err != nil { - t.Fatalf("failed to create tester chain: %v", err) - } - defer chain.Stop() - - headers := make([]*types.Header, len(blocks)) - for i, block := range blocks { - headers[i] = block.Header() - } - if n, err := chain.InsertHeaderChain(headers); err != nil { - t.Fatalf("failed to insert header %d: %v", n, err) - } - // The indices before ancient-N(32) should be ignored. After that all blocks should be indexed. - if n, err := chain.InsertReceiptChain(blocks, receipts, 64); err != nil { - t.Fatalf("block %d: failed to insert into chain: %v", n, err) - } - tail := uint64(32) - check(&tail, chain) -} - // Benchmarks large blocks with value transfers to non-existing accounts func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) { var ( From 0f3c1d0d48922e2430042670133e765ae65c7c77 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 21 Dec 2023 11:00:27 +0800 Subject: [PATCH 04/11] core: track the processed ancient blocks --- core/blockchain.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/blockchain.go b/core/blockchain.go index 8a3620b2afee..4195bcc95f0a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1243,6 +1243,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if err := batch.Write(); err != nil { return 0, err } + stats.processed += int32(len(blockChain)) return 0, nil } From 64196f9657926b752b99fed768e68211a93dfb8e Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 18 Jan 2024 16:04:28 +0800 Subject: [PATCH 05/11] eth, core, internal, graphql: return error only if indexing is not finished --- core/blockchain.go | 39 +++++++++++++++++++------------------- core/blockchain_reader.go | 27 +++++++++++++++++++------- eth/api_backend.go | 21 ++++++++++++++------ eth/tracers/api.go | 6 +++--- graphql/graphql.go | 4 ++-- internal/ethapi/api.go | 26 ++++++++++++------------- internal/ethapi/backend.go | 2 +- 7 files changed, 72 insertions(+), 53 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 4195bcc95f0a..fb7adf244435 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -186,7 +186,7 @@ func DefaultCacheConfigWithScheme(scheme string) *CacheConfig { } // txLookup is wrapper over transaction lookup along with the corresponding -// transaction itself. +// transaction object. type txLookup struct { lookup *rawdb.LegacyTxLookupEntry transaction *types.Transaction @@ -194,18 +194,17 @@ type txLookup struct { // txIndexProgress is the struct describing the progress for transaction indexing. type txIndexProgress struct { - tail uint64 // the oldest block indexed for transactions - head uint64 // the latest block indexed for transactions - limit uint64 // the number of blocks required for transaction indexing(0 means the whole chain) + head uint64 // the current chain head + indexed uint64 // the number of blocks have been indexed + limit uint64 // the number of blocks required for transaction indexing(0 means the whole chain) } -// Error implements Error returning the progress in string format. -func (prog txIndexProgress) Error() string { - limit := "entire chain" - if prog.limit != 0 { - limit = fmt.Sprintf("last %d blocks", prog.limit) +// done returns an indicator if the transaction indexing is finished. +func (prog txIndexProgress) done() bool { + if prog.limit == 0 { + return prog.indexed == (prog.head + 1) // genesis included } - return fmt.Sprintf("index-tail: %d, index-head: %d, limit: %s", prog.tail, prog.head, limit) + return prog.indexed >= prog.limit } // BlockChain represents the canonical chain given a database with a genesis @@ -2453,18 +2452,17 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) // reportTxIndexProgress returns the tx indexing progress. func (bc *BlockChain) reportTxIndexProgress(head uint64) txIndexProgress { - tail := rawdb.ReadTxIndexTail(bc.db) - if tail == nil { - return txIndexProgress{ - tail: 0, // not indexed yet - head: 0, // not indexed yet - limit: bc.txLookupLimit, - } + var ( + indexed uint64 + tail = rawdb.ReadTxIndexTail(bc.db) + ) + if tail != nil { + indexed = head - *tail + 1 } return txIndexProgress{ - tail: *tail, - head: head, - limit: bc.txLookupLimit, + head: head, + indexed: indexed, + limit: bc.txLookupLimit, } } @@ -2510,6 +2508,7 @@ func (bc *BlockChain) maintainTxIndex() { // indexer is never triggered. if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 { done = make(chan struct{}) + lastHead = head.Number().Uint64() go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done) } for { diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 9457d1e67234..b761876b3b10 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -17,6 +17,7 @@ package core import ( + "errors" "math/big" "github.com/ethereum/go-ethereum/common" @@ -254,9 +255,16 @@ func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, max return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical) } -// GetTransactionLookup retrieves the lookup along with the transaction itself -// associate with the given transaction hash. A non-nil error will be returned -// if the transaction is not found. +// GetTransactionLookup retrieves the lookup along with the transaction +// itself associate with the given transaction hash. +// +// An error will be returned if the transaction is not found, and background +// indexing for transactions is still in progress. The transaction might be +// reachable shortly once it's indexed. +// +// A null will be returned in the transaction is not found and background +// transaction indexing is already finished. The transaction is not existent +// from the node's perspective. func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction, error) { // Short circuit if the txlookup already in the cache, retrieve otherwise if item, exist := bc.txLookupCache.Get(hash); exist { @@ -264,13 +272,18 @@ func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLoo } tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash) if tx == nil { - // The transaction can either be non-existent, or just not indexed - // yet. Return the tx indexing progress as well for better UX. progress, err := bc.askTxIndexProgress() if err != nil { - return nil, nil, err + return nil, nil, nil + } + // The transaction indexing is not finished yet, returning an + // error to explicitly indicate it. + if !progress.done() { + return nil, nil, errors.New("transaction is still indexing") } - return nil, nil, progress + // The transaction is already indexed, the transaction is either + // not existent or not in the range of index, returning null. + return nil, nil, nil } lookup := &rawdb.LegacyTxLookupEntry{ BlockHash: blockHash, diff --git a/eth/api_backend.go b/eth/api_backend.go index 3cc22dcfe0b4..c2019c3dd394 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -19,7 +19,6 @@ package eth import ( "context" "errors" - "fmt" "math/big" "time" @@ -310,14 +309,24 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction } // GetTransaction retrieves the lookup along with the transaction itself associate -// with the given transaction hash. A non-nil error will be returned if the -// transaction is not found. -func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { +// with the given transaction hash. +// +// An error will be returned if the transaction is not found, and background +// indexing for transactions is still in progress. The error is used to indicate the +// scenario explicitly that the transaction might be reachable shortly. +// +// A null will be returned in the transaction is not found and background transaction +// indexing is already finished. The transaction is not existent from the perspective +// of node. +func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) { lookup, tx, err := b.eth.blockchain.GetTransactionLookup(txHash) if err != nil { - return nil, common.Hash{}, 0, 0, fmt.Errorf("tx is not existent or not indexed, %w", err) + return false, nil, common.Hash{}, 0, 0, err + } + if lookup == nil || tx == nil { + return false, nil, common.Hash{}, 0, 0, nil } - return tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index, nil + return true, tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index, nil } func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 7c0028601dfb..67b7d21a40ff 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -80,7 +80,7 @@ type Backend interface { HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) - GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) + GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) RPCGasCap() uint64 ChainConfig() *params.ChainConfig Engine() consensus.Engine @@ -826,12 +826,12 @@ func containsTx(block *types.Block, hash common.Hash) bool { // TraceTransaction returns the structured logs created during the execution of EVM // and returns them as a JSON object. func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) { - tx, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash) + found, _, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash) if err != nil { return nil, err } // Only mined txes are supported - if tx == nil { + if !found { return nil, errTxNotFound } // It shouldn't happen in practice. diff --git a/graphql/graphql.go b/graphql/graphql.go index 49be23af69dd..e76ddac6bc88 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -230,8 +230,8 @@ func (t *Transaction) resolve(ctx context.Context) (*types.Transaction, *Block) return t.tx, t.block } // Try to return an already finalized transaction - tx, blockHash, _, index, err := t.r.backend.GetTransaction(ctx, t.hash) - if err == nil && tx != nil { + found, tx, blockHash, _, index, _ := t.r.backend.GetTransaction(ctx, t.hash) + if found { t.tx = tx blockNrOrHash := rpc.BlockNumberOrHashWithHash(blockHash, false) t.block = &Block{ diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index bef323c09012..6d33f2053d9e 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1652,8 +1652,8 @@ func (s *TransactionAPI) GetTransactionCount(ctx context.Context, address common // GetTransactionByHash returns the transaction for the given hash func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { // Try to return an already finalized transaction - tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) - if err != nil { + found, tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) + if !found { // No finalized transaction, try to retrieve it from the pool if tx := s.b.GetPoolTransaction(hash); tx != nil { return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil @@ -1670,8 +1670,8 @@ func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.H // GetRawTransactionByHash returns the bytes of the transaction for the given hash. func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { // Retrieve a finalized transaction, or a pooled otherwise - tx, _, _, _, err := s.b.GetTransaction(ctx, hash) - if err != nil { + found, tx, _, _, _, err := s.b.GetTransaction(ctx, hash) + if !found { if tx = s.b.GetPoolTransaction(hash); tx == nil { return nil, err } @@ -1682,11 +1682,12 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo // GetTransactionReceipt returns the transaction receipt for the given transaction hash. func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { - tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) + found, tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) if err != nil { - // When the transaction doesn't exist or is not indexed yet, - // the RPC method should return JSON null as per specification. - return nil, err + return nil, err // transaction is not fully indexed + } + if !found { + return nil, nil // transaction is not existent or reachable } header, err := s.b.HeaderByHash(ctx, blockHash) if err != nil { @@ -2076,14 +2077,11 @@ func (api *DebugAPI) GetRawReceipts(ctx context.Context, blockNrOrHash rpc.Block // GetRawTransaction returns the bytes of the transaction for the given hash. func (s *DebugAPI) GetRawTransaction(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { // Retrieve a finalized transaction, or a pooled otherwise - tx, _, _, _, err := s.b.GetTransaction(ctx, hash) - if err != nil { - return nil, err - } - if tx == nil { + found, tx, _, _, _, err := s.b.GetTransaction(ctx, hash) + if !found { if tx = s.b.GetPoolTransaction(hash); tx == nil { // Transaction not found anywhere, abort - return nil, nil + return nil, err } } return tx.MarshalBinary() diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 50f338f5cab3..5f408ba20ba5 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -75,7 +75,7 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error - GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) + GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) From 4e0eb8b24d31b83f6b6b0f260bbdb5ca830dadb3 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 19 Jan 2024 14:06:18 +0800 Subject: [PATCH 06/11] internal, eth, core: return RPC error --- core/blockchain_reader.go | 2 +- eth/tracers/api.go | 2 +- eth/tracers/api_test.go | 4 +- internal/ethapi/api.go | 57 ++++++----------- internal/ethapi/api_test.go | 4 +- internal/ethapi/errors.go | 78 ++++++++++++++++++++++++ internal/ethapi/transaction_args_test.go | 4 +- 7 files changed, 103 insertions(+), 48 deletions(-) create mode 100644 internal/ethapi/errors.go diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index b761876b3b10..7e4eb8daa020 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -279,7 +279,7 @@ func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLoo // The transaction indexing is not finished yet, returning an // error to explicitly indicate it. if !progress.done() { - return nil, nil, errors.New("transaction is still indexing") + return nil, nil, errors.New("transaction indexing still in progress") } // The transaction is already indexed, the transaction is either // not existent or not in the range of index, returning null. diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 67b7d21a40ff..4d4428f6c63b 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -828,7 +828,7 @@ func containsTx(block *types.Block, hash common.Hash) bool { func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) { found, _, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash) if err != nil { - return nil, err + return nil, ethapi.NewTxIndexingError() } // Only mined txes are supported if !found { diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index 49c3ebb67d7f..c72e5eaaac31 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -113,9 +113,9 @@ func (b *testBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) return b.chain.GetBlockByNumber(uint64(number)), nil } -func (b *testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { +func (b *testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) { tx, hash, blockNumber, index := rawdb.ReadTransaction(b.chaindb, txHash) - return tx, hash, blockNumber, index, nil + return true, tx, hash, blockNumber, index, nil } func (b *testBackend) RPCGasCap() uint64 { diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 6d33f2053d9e..f2ba2b4aac04 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -27,7 +27,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/ethereum/go-ethereum/accounts" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/accounts/scwallet" "github.com/ethereum/go-ethereum/common" @@ -1133,37 +1132,6 @@ func DoCall(ctx context.Context, b Backend, args TransactionArgs, blockNrOrHash return doCall(ctx, b, args, state, header, overrides, blockOverrides, timeout, globalGasCap) } -func newRevertError(revert []byte) *revertError { - err := vm.ErrExecutionReverted - - reason, errUnpack := abi.UnpackRevert(revert) - if errUnpack == nil { - err = fmt.Errorf("%w: %v", vm.ErrExecutionReverted, reason) - } - return &revertError{ - error: err, - reason: hexutil.Encode(revert), - } -} - -// revertError is an API error that encompasses an EVM revertal with JSON error -// code and a binary data blob. -type revertError struct { - error - reason string // revert reason hex encoded -} - -// ErrorCode returns the JSON error code for a revertal. -// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal -func (e *revertError) ErrorCode() int { - return 3 -} - -// ErrorData returns the hex encoded revert reason. -func (e *revertError) ErrorData() interface{} { - return e.reason -} - // Call executes the given transaction on the state for the given block number. // // Additionally, the caller can specify a batch of contract for fields overriding. @@ -1658,7 +1626,10 @@ func (s *TransactionAPI) GetTransactionByHash(ctx context.Context, hash common.H if tx := s.b.GetPoolTransaction(hash); tx != nil { return NewRPCPendingTransaction(tx, s.b.CurrentHeader(), s.b.ChainConfig()), nil } - return nil, err + if err == nil { + return nil, nil + } + return nil, NewTxIndexingError() } header, err := s.b.HeaderByHash(ctx, blockHash) if err != nil { @@ -1672,11 +1643,14 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo // Retrieve a finalized transaction, or a pooled otherwise found, tx, _, _, _, err := s.b.GetTransaction(ctx, hash) if !found { - if tx = s.b.GetPoolTransaction(hash); tx == nil { - return nil, err + if tx = s.b.GetPoolTransaction(hash); tx != nil { + return tx.MarshalBinary() } + if err == nil { + return nil, nil + } + return nil, NewTxIndexingError() } - // Serialize to RLP and return return tx.MarshalBinary() } @@ -1684,7 +1658,7 @@ func (s *TransactionAPI) GetRawTransactionByHash(ctx context.Context, hash commo func (s *TransactionAPI) GetTransactionReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { found, tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash) if err != nil { - return nil, err // transaction is not fully indexed + return nil, NewTxIndexingError() // transaction is not fully indexed } if !found { return nil, nil // transaction is not existent or reachable @@ -2079,10 +2053,13 @@ func (s *DebugAPI) GetRawTransaction(ctx context.Context, hash common.Hash) (hex // Retrieve a finalized transaction, or a pooled otherwise found, tx, _, _, _, err := s.b.GetTransaction(ctx, hash) if !found { - if tx = s.b.GetPoolTransaction(hash); tx == nil { - // Transaction not found anywhere, abort - return nil, err + if tx = s.b.GetPoolTransaction(hash); tx != nil { + return tx.MarshalBinary() + } + if err == nil { + return nil, nil } + return nil, NewTxIndexingError() } return tx.MarshalBinary() } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index fd6865019365..623aa1fe42a7 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -583,9 +583,9 @@ func (b testBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) even func (b testBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { panic("implement me") } -func (b testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { +func (b testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) { tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.db, txHash) - return tx, blockHash, blockNumber, index, nil + return true, tx, blockHash, blockNumber, index, nil } func (b testBackend) GetPoolTransactions() (types.Transactions, error) { panic("implement me") } func (b testBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction { panic("implement me") } diff --git a/internal/ethapi/errors.go b/internal/ethapi/errors.go new file mode 100644 index 000000000000..6171cc4d6b91 --- /dev/null +++ b/internal/ethapi/errors.go @@ -0,0 +1,78 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package ethapi + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/vm" +) + +// revertError is an API error that encompasses an EVM revert with JSON error +// code and a binary data blob. +type revertError struct { + error + reason string // revert reason hex encoded +} + +// ErrorCode returns the JSON error code for a revert. +// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal +func (e *revertError) ErrorCode() int { + return 3 +} + +// ErrorData returns the hex encoded revert reason. +func (e *revertError) ErrorData() interface{} { + return e.reason +} + +// newRevertError creates a revertError instance with the provided revert data. +func newRevertError(revert []byte) *revertError { + err := vm.ErrExecutionReverted + + reason, errUnpack := abi.UnpackRevert(revert) + if errUnpack == nil { + err = fmt.Errorf("%w: %v", vm.ErrExecutionReverted, reason) + } + return &revertError{ + error: err, + reason: hexutil.Encode(revert), + } +} + +// TxIndexingError is an API error that indicates the transaction indexing is not +// fully finished yet with JSON error code and a binary data blob. +type TxIndexingError struct{} + +// NewTxIndexingError creates a TxIndexingError instance. +func NewTxIndexingError() *TxIndexingError { return &TxIndexingError{} } + +// Error implement error interface, returning the error message. +func (e *TxIndexingError) Error() string { + return "transaction indexing is in progress" +} + +// ErrorCode returns the JSON error code for a revert. +// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal +func (e *TxIndexingError) ErrorCode() int { + return 3 // TODO tbd +} + +// ErrorData returns the hex encoded revert reason. +func (e *TxIndexingError) ErrorData() interface{} { return "transaction indexing is in progress" } diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 8651da402040..f0fdb6d8ee2d 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -379,8 +379,8 @@ func (b *backendMock) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) eve return nil } func (b *backendMock) SendTx(ctx context.Context, signedTx *types.Transaction) error { return nil } -func (b *backendMock) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { - return nil, [32]byte{}, 0, 0, nil +func (b *backendMock) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) { + return false, nil, [32]byte{}, 0, 0, nil } func (b *backendMock) GetPoolTransactions() (types.Transactions, error) { return nil, nil } func (b *backendMock) GetPoolTransaction(txHash common.Hash) *types.Transaction { return nil } From a4bb72ec3e818f31e4bf5eda9c5d538c2008267d Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 19 Jan 2024 15:10:01 +0800 Subject: [PATCH 07/11] core, eth, graphql, interfaces: improve eth.syncing --- core/blockchain.go | 58 +++++++++++++++++++++++---------------- core/blockchain_reader.go | 10 +++++-- core/blockchain_test.go | 10 +++++++ eth/api_backend.go | 7 ++++- eth/backend.go | 2 +- eth/downloader/api.go | 45 ++++++++++++++++++++++++++++-- eth/tracers/api_test.go | 2 +- ethstats/ethstats.go | 4 +-- graphql/graphql.go | 10 ++++++- interfaces.go | 12 ++++++++ internal/ethapi/api.go | 34 ++++++++++++----------- 11 files changed, 143 insertions(+), 51 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index fb7adf244435..e4136249320b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -192,19 +192,15 @@ type txLookup struct { transaction *types.Transaction } -// txIndexProgress is the struct describing the progress for transaction indexing. -type txIndexProgress struct { - head uint64 // the current chain head - indexed uint64 // the number of blocks have been indexed - limit uint64 // the number of blocks required for transaction indexing(0 means the whole chain) +// TxIndexProgress is the struct describing the progress for transaction indexing. +type TxIndexProgress struct { + Indexed uint64 // number of blocks whose transactions are indexed + Remaining uint64 // number of blocks whose transactions are not indexed yet } -// done returns an indicator if the transaction indexing is finished. -func (prog txIndexProgress) done() bool { - if prog.limit == 0 { - return prog.indexed == (prog.head + 1) // genesis included - } - return prog.indexed >= prog.limit +// Done returns an indicator if the transaction indexing is finished. +func (prog TxIndexProgress) Done() bool { + return prog.Remaining == 0 } // BlockChain represents the canonical chain given a database with a genesis @@ -248,6 +244,7 @@ type BlockChain struct { chainHeadFeed event.Feed logsFeed event.Feed blockProcFeed event.Feed + txIndexFeed event.Feed scope event.SubscriptionScope genesisBlock *types.Block @@ -273,7 +270,7 @@ type BlockChain struct { quit chan struct{} // shutdown signal, closed in Stop. stopping atomic.Bool // false if chain is running, true when stopped procInterrupt atomic.Bool // interrupt signaler for block processing - txIndexProgCh chan chan txIndexProgress // chan for querying the progress of transaction indexing + txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing engine consensus.Engine validator Validator // Block and state validator interface @@ -322,7 +319,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), - txIndexProgCh: make(chan chan txIndexProgress), + txIndexProgCh: make(chan chan TxIndexProgress), engine: engine, vmConfig: vmConfig, } @@ -2451,29 +2448,38 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) } // reportTxIndexProgress returns the tx indexing progress. -func (bc *BlockChain) reportTxIndexProgress(head uint64) txIndexProgress { +func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress { var ( - indexed uint64 - tail = rawdb.ReadTxIndexTail(bc.db) + remaining uint64 + tail = rawdb.ReadTxIndexTail(bc.db) ) + total := bc.txLookupLimit + if bc.txLookupLimit == 0 { + total = head + 1 // genesis included + } + var indexed uint64 if tail != nil { indexed = head - *tail + 1 } - return txIndexProgress{ - head: head, - indexed: indexed, - limit: bc.txLookupLimit, + // The value of indexed might be larger than total if some blocks need + // to unindexed, avoiding a negative remaining. + if indexed < total { + remaining = total - indexed + } + return TxIndexProgress{ + Indexed: indexed, + Remaining: remaining, } } -// askTxIndexProgress retrieves the tx indexing progress. -func (bc *BlockChain) askTxIndexProgress() (txIndexProgress, error) { - ch := make(chan txIndexProgress, 1) +// TxIndexProgress retrieves the tx indexing progress. +func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) { + ch := make(chan TxIndexProgress, 1) select { case bc.txIndexProgCh <- ch: return <-ch, nil case <-bc.quit: - return txIndexProgress{}, errors.New("blockchain is closed") + return TxIndexProgress{}, errors.New("blockchain is closed") } } @@ -2521,6 +2527,10 @@ func (bc *BlockChain) maintainTxIndex() { lastHead = head.Block.NumberU64() case <-done: done = nil + + if bc.reportTxIndexProgress(lastHead).Done() { + bc.txIndexFeed.Send(true) + } case ch := <-bc.txIndexProgCh: ch <- bc.reportTxIndexProgress(lastHead) case <-bc.quit: diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 7e4eb8daa020..fbec721e49bc 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -272,13 +272,13 @@ func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLoo } tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash) if tx == nil { - progress, err := bc.askTxIndexProgress() + progress, err := bc.TxIndexProgress() if err != nil { return nil, nil, nil } // The transaction indexing is not finished yet, returning an // error to explicitly indicate it. - if !progress.done() { + if !progress.Done() { return nil, nil, errors.New("transaction indexing still in progress") } // The transaction is already indexed, the transaction is either @@ -444,3 +444,9 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +// SubscribeTxIndexEvent registers a subscription of bool where true means +// transaction indexing has finished. +func (bc *BlockChain) SubscribeTxIndexEvent(ch chan<- bool) event.Subscription { + return bc.scope.Track(bc.txIndexFeed.Subscribe(ch)) +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index c1286705b62f..71260e44a096 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -4075,6 +4075,12 @@ func TestTxIndexer(t *testing.T) { } verifyRange(db, *tail, 128, true) } + verifyProgress := func(chain *BlockChain) { + prog := chain.reportTxIndexProgress(128) + if !prog.Done() { + t.Fatalf("Expect fully indexed") + } + } var cases = []struct { limitA uint64 @@ -4204,19 +4210,23 @@ func TestTxIndexer(t *testing.T) { chain, _ := NewBlockChain(db, nil, gspec, nil, engine, vm.Config{}, nil, &c.limitA) chain.indexBlocks(nil, 128, make(chan struct{})) verify(db, c.tailA) + verifyProgress(chain) chain.SetTxLookupLimit(c.limitB) chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) verify(db, c.tailB) + verifyProgress(chain) chain.SetTxLookupLimit(c.limitC) chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) verify(db, c.tailC) + verifyProgress(chain) // Recover all indexes chain.SetTxLookupLimit(0) chain.indexBlocks(rawdb.ReadTxIndexTail(db), 128, make(chan struct{})) verify(db, 0) + verifyProgress(chain) chain.Stop() db.Close() diff --git a/eth/api_backend.go b/eth/api_backend.go index c2019c3dd394..0edcce5c8789 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -354,7 +354,12 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S } func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress { - return b.eth.Downloader().Progress() + prog := b.eth.Downloader().Progress() + if txProg, err := b.eth.blockchain.TxIndexProgress(); err == nil { + prog.TxIndexFinishedBlocks = txProg.Indexed + prog.TxIndexRemainingBlocks = txProg.Remaining + } + return prog } func (b *EthAPIBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { diff --git a/eth/backend.go b/eth/backend.go index 774ffaf24877..aff23a910bcb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -322,7 +322,7 @@ func (s *Ethereum) APIs() []rpc.API { Service: NewMinerAPI(s), }, { Namespace: "eth", - Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux), + Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux), }, { Namespace: "admin", Service: NewAdminAPI(s), diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 606c6d4e7ec1..6d6a3f943a90 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -21,6 +21,7 @@ import ( "sync" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) @@ -29,6 +30,7 @@ import ( // It offers only methods that operates on data that can be available to anyone without security risks. type DownloaderAPI struct { d *Downloader + chain *core.BlockChain mux *event.TypeMux installSyncSubscription chan chan interface{} uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest @@ -38,9 +40,10 @@ type DownloaderAPI struct { // listens for events from the downloader through the global event mux. In case it receives one of // these events it broadcasts it to all syncing subscriptions that are installed through the // installSyncSubscription channel. -func NewDownloaderAPI(d *Downloader, m *event.TypeMux) *DownloaderAPI { +func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *DownloaderAPI { api := &DownloaderAPI{ d: d, + chain: chain, mux: m, installSyncSubscription: make(chan chan interface{}), uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), @@ -57,7 +60,10 @@ func (api *DownloaderAPI) eventLoop() { var ( sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) syncSubscriptions = make(map[chan interface{}]struct{}) + txIndexCh = make(chan bool, 1) ) + txIndexSub := api.chain.SubscribeTxIndexEvent(txIndexCh) + defer txIndexSub.Unsubscribe() for { select { @@ -70,21 +76,54 @@ func (api *DownloaderAPI) eventLoop() { if event == nil { return } - var notification interface{} + switch event.Data.(type) { case StartEvent: + prog := api.d.Progress() + txProg, err := api.chain.TxIndexProgress() + if err == nil { + prog.TxIndexFinishedBlocks = txProg.Indexed + prog.TxIndexRemainingBlocks = txProg.Remaining + } notification = &SyncingResult{ Syncing: true, - Status: api.d.Progress(), + Status: prog, } case DoneEvent, FailedEvent: notification = false + + txProg, err := api.chain.TxIndexProgress() + if err == nil && !txProg.Done() { + prog := api.d.Progress() + prog.TxIndexFinishedBlocks = txProg.Indexed + prog.TxIndexRemainingBlocks = txProg.Remaining + notification = &SyncingResult{ + Syncing: true, + Status: prog, + } + } } // broadcast for c := range syncSubscriptions { c <- notification } + case status := <-txIndexCh: + if !status { + continue + } + prog := api.d.Progress() + txProg, err := api.chain.TxIndexProgress() + if err == nil && !txProg.Done() { + prog.TxIndexFinishedBlocks = txProg.Indexed + prog.TxIndexRemainingBlocks = txProg.Remaining + } + if !prog.Done() { + continue + } + for c := range syncSubscriptions { + c <- true + } } } } diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index c72e5eaaac31..8aaa20fce536 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -115,7 +115,7 @@ func (b *testBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) func (b *testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) { tx, hash, blockNumber, index := rawdb.ReadTransaction(b.chaindb, txHash) - return true, tx, hash, blockNumber, index, nil + return tx != nil, tx, hash, blockNumber, index, nil } func (b *testBackend) RPCGasCap() uint64 { diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index 75d0faac54e9..29559991be3f 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -792,7 +792,7 @@ func (s *Service) reportStats(conn *connWrapper) error { } sync := fullBackend.SyncProgress() - syncing = fullBackend.CurrentHeader().Number.Uint64() >= sync.HighestBlock + syncing = !sync.Done() price, _ := fullBackend.SuggestGasTipCap(context.Background()) gasprice = int(price.Uint64()) @@ -801,7 +801,7 @@ func (s *Service) reportStats(conn *connWrapper) error { } } else { sync := s.backend.SyncProgress() - syncing = s.backend.CurrentHeader().Number.Uint64() >= sync.HighestBlock + syncing = !sync.Done() } // Assemble the node stats and send it to the server log.Trace("Sending node details to ethstats") diff --git a/graphql/graphql.go b/graphql/graphql.go index e76ddac6bc88..bf65b6544cc5 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -1509,6 +1509,12 @@ func (s *SyncState) HealingTrienodes() hexutil.Uint64 { func (s *SyncState) HealingBytecode() hexutil.Uint64 { return hexutil.Uint64(s.progress.HealingBytecode) } +func (s *SyncState) TxIndexFinishedBlocks() hexutil.Uint64 { + return hexutil.Uint64(s.progress.TxIndexFinishedBlocks) +} +func (s *SyncState) TxIndexRemainingBlocks() hexutil.Uint64 { + return hexutil.Uint64(s.progress.TxIndexRemainingBlocks) +} // Syncing returns false in case the node is currently not syncing with the network. It can be up-to-date or has not // yet received the latest block headers from its pears. In case it is synchronizing: @@ -1527,11 +1533,13 @@ func (s *SyncState) HealingBytecode() hexutil.Uint64 { // - healedBytecodeBytes: number of bytecodes persisted to disk // - healingTrienodes: number of state trie nodes pending // - healingBytecode: number of bytecodes pending +// - txIndexFinishedBlocks: number of blocks whose transactions are indexed +// - txIndexRemainingBlocks: number of blocks whose transactions are not indexed yet func (r *Resolver) Syncing() (*SyncState, error) { progress := r.backend.SyncProgress() // Return not syncing if the synchronisation already completed - if progress.CurrentBlock >= progress.HighestBlock { + if progress.Done() { return nil, nil } // Otherwise gather the block sync stats diff --git a/interfaces.go b/interfaces.go index 1892309ed316..c6aee295ee56 100644 --- a/interfaces.go +++ b/interfaces.go @@ -120,6 +120,18 @@ type SyncProgress struct { HealingTrienodes uint64 // Number of state trie nodes pending HealingBytecode uint64 // Number of bytecodes pending + + // "transaction indexing" fields + TxIndexFinishedBlocks uint64 // Number of blocks whose transactions are already indexed + TxIndexRemainingBlocks uint64 // Number of blocks whose transactions are not indexed yet +} + +// Done returns the indicator if the initial sync is finished or not. +func (prog SyncProgress) Done() bool { + if prog.CurrentBlock < prog.HighestBlock { + return false + } + return prog.TxIndexRemainingBlocks == 0 } // ChainSyncReader wraps access to the node's current sync status. If there's no diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index f2ba2b4aac04..78522c4f73a0 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -133,26 +133,28 @@ func (s *EthereumAPI) Syncing() (interface{}, error) { progress := s.b.SyncProgress() // Return not syncing if the synchronisation already completed - if progress.CurrentBlock >= progress.HighestBlock { + if progress.Done() { return false, nil } // Otherwise gather the block sync stats return map[string]interface{}{ - "startingBlock": hexutil.Uint64(progress.StartingBlock), - "currentBlock": hexutil.Uint64(progress.CurrentBlock), - "highestBlock": hexutil.Uint64(progress.HighestBlock), - "syncedAccounts": hexutil.Uint64(progress.SyncedAccounts), - "syncedAccountBytes": hexutil.Uint64(progress.SyncedAccountBytes), - "syncedBytecodes": hexutil.Uint64(progress.SyncedBytecodes), - "syncedBytecodeBytes": hexutil.Uint64(progress.SyncedBytecodeBytes), - "syncedStorage": hexutil.Uint64(progress.SyncedStorage), - "syncedStorageBytes": hexutil.Uint64(progress.SyncedStorageBytes), - "healedTrienodes": hexutil.Uint64(progress.HealedTrienodes), - "healedTrienodeBytes": hexutil.Uint64(progress.HealedTrienodeBytes), - "healedBytecodes": hexutil.Uint64(progress.HealedBytecodes), - "healedBytecodeBytes": hexutil.Uint64(progress.HealedBytecodeBytes), - "healingTrienodes": hexutil.Uint64(progress.HealingTrienodes), - "healingBytecode": hexutil.Uint64(progress.HealingBytecode), + "startingBlock": hexutil.Uint64(progress.StartingBlock), + "currentBlock": hexutil.Uint64(progress.CurrentBlock), + "highestBlock": hexutil.Uint64(progress.HighestBlock), + "syncedAccounts": hexutil.Uint64(progress.SyncedAccounts), + "syncedAccountBytes": hexutil.Uint64(progress.SyncedAccountBytes), + "syncedBytecodes": hexutil.Uint64(progress.SyncedBytecodes), + "syncedBytecodeBytes": hexutil.Uint64(progress.SyncedBytecodeBytes), + "syncedStorage": hexutil.Uint64(progress.SyncedStorage), + "syncedStorageBytes": hexutil.Uint64(progress.SyncedStorageBytes), + "healedTrienodes": hexutil.Uint64(progress.HealedTrienodes), + "healedTrienodeBytes": hexutil.Uint64(progress.HealedTrienodeBytes), + "healedBytecodes": hexutil.Uint64(progress.HealedBytecodes), + "healedBytecodeBytes": hexutil.Uint64(progress.HealedBytecodeBytes), + "healingTrienodes": hexutil.Uint64(progress.HealingTrienodes), + "healingBytecode": hexutil.Uint64(progress.HealingBytecode), + "txIndexFinishedBlocks": hexutil.Uint64(progress.TxIndexFinishedBlocks), + "txIndexRemainingBlocks": hexutil.Uint64(progress.TxIndexRemainingBlocks), }, nil } From 811ed9207eddfd38fbba1ef148aa358135999353 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 19 Jan 2024 16:30:14 +0800 Subject: [PATCH 08/11] core, eth: more fixes --- core/blockchain.go | 4 +++ eth/downloader/api.go | 80 +++++++++++++++++++++++-------------------- 2 files changed, 47 insertions(+), 37 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index e4136249320b..764189dddc69 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2528,6 +2528,10 @@ func (bc *BlockChain) maintainTxIndex() { case <-done: done = nil + // WARNING, the event will be fired for each signal once the + // transaction indexing is finished. Subscribers need to manage + // the event stream by themselves. It's recommended to unsubscribe + // once the event is received. if bc.reportTxIndexProgress(lastHead).Done() { bc.txIndexFeed.Send(true) } diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 6d6a3f943a90..ef61a9b9f29c 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -26,8 +26,9 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -// DownloaderAPI provides an API which gives information about the current synchronisation status. -// It offers only methods that operates on data that can be available to anyone without security risks. +// DownloaderAPI provides an API which gives information about the current +// synchronisation status. It offers only methods that operates on data that +// can be available to anyone without security risks. type DownloaderAPI struct { d *Downloader chain *core.BlockChain @@ -48,27 +49,41 @@ func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) * installSyncSubscription: make(chan chan interface{}), uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), } - go api.eventLoop() - return api } -// eventLoop runs a loop until the event mux closes. It will install and uninstall new -// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions. +// eventLoop runs a loop until the event mux closes. It will install and uninstall +// new sync subscriptions and broadcasts sync status updates to the installed sync +// subscriptions. +// +// The sync status pushed to subscriptions can a stream like: +// >>> {Syncing: true, Progress: {...}} +// >>> {false} +// +// If the node is already synced up, then only a single event will be pushed {false}. func (api *DownloaderAPI) eventLoop() { var ( - sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) + sub = api.mux.Subscribe(StartEvent{}) syncSubscriptions = make(map[chan interface{}]struct{}) txIndexCh = make(chan bool, 1) + done bool ) txIndexSub := api.chain.SubscribeTxIndexEvent(txIndexCh) - defer txIndexSub.Unsubscribe() + defer func() { + if txIndexSub == nil { + return + } + txIndexSub.Unsubscribe() + }() for { select { case i := <-api.installSyncSubscription: syncSubscriptions[i] = struct{}{} + if done { + i <- false + } case u := <-api.uninstallSyncSubscription: delete(syncSubscriptions, u.c) close(u.uninstalled) @@ -76,53 +91,44 @@ func (api *DownloaderAPI) eventLoop() { if event == nil { return } - var notification interface{} - switch event.Data.(type) { case StartEvent: prog := api.d.Progress() - txProg, err := api.chain.TxIndexProgress() - if err == nil { + if txProg, err := api.chain.TxIndexProgress(); err == nil { prog.TxIndexFinishedBlocks = txProg.Indexed prog.TxIndexRemainingBlocks = txProg.Remaining } - notification = &SyncingResult{ + notification := &SyncingResult{ Syncing: true, Status: prog, } - case DoneEvent, FailedEvent: - notification = false - - txProg, err := api.chain.TxIndexProgress() - if err == nil && !txProg.Done() { - prog := api.d.Progress() - prog.TxIndexFinishedBlocks = txProg.Indexed - prog.TxIndexRemainingBlocks = txProg.Remaining - notification = &SyncingResult{ - Syncing: true, - Status: prog, - } + // broadcast + for c := range syncSubscriptions { + c <- notification } } - // broadcast - for c := range syncSubscriptions { - c <- notification - } - case status := <-txIndexCh: - if !status { + case synced := <-txIndexCh: + if !synced { continue } prog := api.d.Progress() - txProg, err := api.chain.TxIndexProgress() - if err == nil && !txProg.Done() { - prog.TxIndexFinishedBlocks = txProg.Indexed - prog.TxIndexRemainingBlocks = txProg.Remaining - } if !prog.Done() { continue } + txProg, err := api.chain.TxIndexProgress() + if err != nil || !txProg.Done() { + continue + } for c := range syncSubscriptions { - c <- true + c <- false + } + done = true + + // Unsubscribe the tx indexing events as the whole + // state sync is already finished. + if txIndexSub != nil { + txIndexSub.Unsubscribe() + txIndexSub = nil } } } From aa21c1f2f46b86290d75f3916938bef5fa8b31c9 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 22 Jan 2024 12:08:54 +0800 Subject: [PATCH 09/11] core, eth: minor fixes --- core/blockchain.go | 28 +++++++++---------- core/blockchain_reader.go | 6 ----- eth/downloader/api.go | 57 +++++++++++++++++++-------------------- 3 files changed, 40 insertions(+), 51 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 764189dddc69..f67f071e3688 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -244,7 +244,6 @@ type BlockChain struct { chainHeadFeed event.Feed logsFeed event.Feed blockProcFeed event.Feed - txIndexFeed event.Feed scope event.SubscriptionScope genesisBlock *types.Block @@ -267,10 +266,12 @@ type BlockChain struct { futureBlocks *lru.Cache[common.Hash, *types.Block] wg sync.WaitGroup - quit chan struct{} // shutdown signal, closed in Stop. - stopping atomic.Bool // false if chain is running, true when stopped - procInterrupt atomic.Bool // interrupt signaler for block processing - txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing + quit chan struct{} // shutdown signal, closed in Stop. + stopping atomic.Bool // false if chain is running, true when stopped + procInterrupt atomic.Bool // interrupt signaler for block processing + + txIndexRunning bool // flag if the background tx indexer is activated + txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing engine consensus.Engine validator Validator // Block and state validator interface @@ -487,6 +488,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis // Start tx indexer/unindexer if required. if txLookupLimit != nil { bc.txLookupLimit = *txLookupLimit + bc.txIndexRunning = true bc.wg.Add(1) go bc.maintainTxIndex() @@ -2462,7 +2464,7 @@ func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress { indexed = head - *tail + 1 } // The value of indexed might be larger than total if some blocks need - // to unindexed, avoiding a negative remaining. + // to be unindexed, avoiding a negative remaining. if indexed < total { remaining = total - indexed } @@ -2472,8 +2474,12 @@ func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress { } } -// TxIndexProgress retrieves the tx indexing progress. +// TxIndexProgress retrieves the tx indexing progress, or an error if the +// background tx indexer is not activated or already stopped. func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) { + if !bc.txIndexRunning { + return TxIndexProgress{}, errors.New("tx indexer is not activated") + } ch := make(chan TxIndexProgress, 1) select { case bc.txIndexProgCh <- ch: @@ -2527,14 +2533,6 @@ func (bc *BlockChain) maintainTxIndex() { lastHead = head.Block.NumberU64() case <-done: done = nil - - // WARNING, the event will be fired for each signal once the - // transaction indexing is finished. Subscribers need to manage - // the event stream by themselves. It's recommended to unsubscribe - // once the event is received. - if bc.reportTxIndexProgress(lastHead).Done() { - bc.txIndexFeed.Send(true) - } case ch := <-bc.txIndexProgCh: ch <- bc.reportTxIndexProgress(lastHead) case <-bc.quit: diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index fbec721e49bc..059232946086 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -444,9 +444,3 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } - -// SubscribeTxIndexEvent registers a subscription of bool where true means -// transaction indexing has finished. -func (bc *BlockChain) SubscribeTxIndexEvent(ch chan<- bool) event.Subscription { - return bc.scope.Track(bc.txIndexFeed.Subscribe(ch)) -} diff --git a/eth/downloader/api.go b/eth/downloader/api.go index ef61a9b9f29c..1e06aa9e1deb 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -19,6 +19,7 @@ package downloader import ( "context" "sync" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core" @@ -57,25 +58,33 @@ func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) * // new sync subscriptions and broadcasts sync status updates to the installed sync // subscriptions. // -// The sync status pushed to subscriptions can a stream like: +// The sync status pushed to subscriptions can be a stream like: // >>> {Syncing: true, Progress: {...}} // >>> {false} // -// If the node is already synced up, then only a single event will be pushed {false}. +// If the node is already synced up, then only a single event subscribers will +// receive is {false}. func (api *DownloaderAPI) eventLoop() { var ( sub = api.mux.Subscribe(StartEvent{}) syncSubscriptions = make(map[chan interface{}]struct{}) - txIndexCh = make(chan bool, 1) - done bool - ) - txIndexSub := api.chain.SubscribeTxIndexEvent(txIndexCh) - defer func() { - if txIndexSub == nil { - return + checkInterval = time.Second * 30 + checkTimer = time.NewTimer(checkInterval) + + // status flags + started bool + done bool + + getProgress = func() ethereum.SyncProgress { + prog := api.d.Progress() + if txProg, err := api.chain.TxIndexProgress(); err == nil { + prog.TxIndexFinishedBlocks = txProg.Indexed + prog.TxIndexRemainingBlocks = txProg.Remaining + } + return prog } - txIndexSub.Unsubscribe() - }() + ) + defer checkTimer.Stop() for { select { @@ -93,11 +102,7 @@ func (api *DownloaderAPI) eventLoop() { } switch event.Data.(type) { case StartEvent: - prog := api.d.Progress() - if txProg, err := api.chain.TxIndexProgress(); err == nil { - prog.TxIndexFinishedBlocks = txProg.Indexed - prog.TxIndexRemainingBlocks = txProg.Remaining - } + prog := getProgress() notification := &SyncingResult{ Syncing: true, Status: prog, @@ -106,30 +111,22 @@ func (api *DownloaderAPI) eventLoop() { for c := range syncSubscriptions { c <- notification } + started = true } - case synced := <-txIndexCh: - if !synced { + case <-checkTimer.C: + if !started { + checkTimer.Reset(checkInterval) continue } - prog := api.d.Progress() + prog := getProgress() if !prog.Done() { - continue - } - txProg, err := api.chain.TxIndexProgress() - if err != nil || !txProg.Done() { + checkTimer.Reset(checkInterval) continue } for c := range syncSubscriptions { c <- false } done = true - - // Unsubscribe the tx indexing events as the whole - // state sync is already finished. - if txIndexSub != nil { - txIndexSub.Unsubscribe() - txIndexSub = nil - } } } } From 9f13d29432044ed6116cd896d6d4509cfbf05446 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 22 Jan 2024 12:17:30 +0800 Subject: [PATCH 10/11] eth/downloader: better ux --- eth/downloader/api.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 1e06aa9e1deb..f09122904c4c 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -68,7 +68,7 @@ func (api *DownloaderAPI) eventLoop() { var ( sub = api.mux.Subscribe(StartEvent{}) syncSubscriptions = make(map[chan interface{}]struct{}) - checkInterval = time.Second * 30 + checkInterval = time.Second * 60 checkTimer = time.NewTimer(checkInterval) // status flags @@ -102,15 +102,6 @@ func (api *DownloaderAPI) eventLoop() { } switch event.Data.(type) { case StartEvent: - prog := getProgress() - notification := &SyncingResult{ - Syncing: true, - Status: prog, - } - // broadcast - for c := range syncSubscriptions { - c <- notification - } started = true } case <-checkTimer.C: @@ -120,6 +111,13 @@ func (api *DownloaderAPI) eventLoop() { } prog := getProgress() if !prog.Done() { + notification := &SyncingResult{ + Syncing: true, + Status: prog, + } + for c := range syncSubscriptions { + c <- notification + } checkTimer.Reset(checkInterval) continue } From 8f2509305a8f98f77ace145d78523f69e18317aa Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 22 Jan 2024 14:02:06 +0800 Subject: [PATCH 11/11] internal/jsre/deps: fix web3 resolver --- internal/jsre/deps/web3.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/jsre/deps/web3.js b/internal/jsre/deps/web3.js index f23c65584c32..6ccf09b1cc3a 100644 --- a/internal/jsre/deps/web3.js +++ b/internal/jsre/deps/web3.js @@ -3961,6 +3961,8 @@ var outputSyncingFormatter = function(result) { result.healedBytecodeBytes = utils.toDecimal(result.healedBytecodeBytes); result.healingTrienodes = utils.toDecimal(result.healingTrienodes); result.healingBytecode = utils.toDecimal(result.healingBytecode); + result.txIndexFinishedBlocks = utils.toDecimal(result.txIndexFinishedBlocks); + result.txIndexRemainingBlocks = utils.toDecimal(result.txIndexRemainingBlocks); return result; };