Skip to content

Commit 42f9f1f

Browse files
committed
core/state: convert prefetcher to concurrent per-trie loader
1 parent 1e1865b commit 42f9f1f

File tree

9 files changed

+384
-281
lines changed

9 files changed

+384
-281
lines changed

accounts/abi/bind/backends/simulated.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,9 @@ func (b *SimulatedBackend) Rollback() {
125125

126126
func (b *SimulatedBackend) rollback() {
127127
blocks, _ := core.GenerateChain(b.config, b.blockchain.CurrentBlock(), ethash.NewFaker(), b.database, 1, func(int, *core.BlockGen) {})
128-
stateDB, _ := b.blockchain.State()
129128

130129
b.pendingBlock = blocks[0]
131-
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
130+
b.pendingState, _ = state.New(b.pendingBlock.Root(), b.blockchain.StateCache(), nil)
132131
}
133132

134133
// stateByBlockNumber retrieves a state by a given blocknumber.

core/blockchain.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,11 @@ type BlockChain struct {
201201
running int32 // 0 if chain is running, 1 when stopped
202202
procInterrupt int32 // interrupt signaler for block processing
203203

204-
engine consensus.Engine
205-
validator Validator // Block and state validator interface
206-
triePrefetcher *state.TriePrefetcher // Trie prefetcher interface
207-
prefetcher Prefetcher
208-
processor Processor // Block transaction processor interface
209-
vmConfig vm.Config
204+
engine consensus.Engine
205+
validator Validator // Block and state validator interface
206+
prefetcher Prefetcher
207+
processor Processor // Block transaction processor interface
208+
vmConfig vm.Config
210209

211210
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
212211
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
@@ -250,15 +249,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
250249
}
251250
bc.validator = NewBlockValidator(chainConfig, bc, engine)
252251
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
253-
tp := state.NewTriePrefetcher(bc.stateCache)
254-
255-
bc.wg.Add(1)
256-
go func() {
257-
tp.Loop()
258-
bc.wg.Done()
259-
}()
260-
bc.triePrefetcher = tp
261-
262252
bc.processor = NewStateProcessor(chainConfig, bc, engine)
263253

264254
var err error
@@ -1001,9 +991,6 @@ func (bc *BlockChain) Stop() {
1001991
bc.scope.Close()
1002992
close(bc.quit)
1003993
bc.StopInsert()
1004-
if bc.triePrefetcher != nil {
1005-
bc.triePrefetcher.Close()
1006-
}
1007994
bc.wg.Wait()
1008995

1009996
// Ensure that the entirety of the state snapshot is journalled to disk.
@@ -1870,16 +1857,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
18701857
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
18711858
}
18721859
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
1873-
statedb.UsePrefetcher(bc.triePrefetcher)
18741860
if err != nil {
18751861
return it.index, err
18761862
}
1863+
// Enable prefetching to pull in trie node paths while processing transactions
1864+
statedb.StartPrefetcher("chain")
1865+
defer statedb.StopPrefetcher() // stopped on write anyway, defer meant to catch early error returns
1866+
18771867
// If we have a followup block, run that against the current state to pre-cache
18781868
// transactions and probabilistically some of the account/storage trie nodes.
18791869
var followupInterrupt uint32
18801870
if !bc.cacheConfig.TrieCleanNoPrefetch {
18811871
if followup, err := it.peek(); followup != nil && err == nil {
18821872
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
1873+
18831874
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
18841875
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
18851876

@@ -1933,7 +1924,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
19331924
if err != nil {
19341925
return it.index, err
19351926
}
1936-
19371927
// Update the metrics touched during block commit
19381928
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
19391929
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them

core/state/state_object.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (s *stateObject) getTrie(db Database) Trie {
162162
if s.data.Root != emptyRoot && s.db.prefetcher != nil {
163163
// When the miner is creating the pending state, there is no
164164
// prefetcher
165-
s.trie = s.db.prefetcher.GetTrie(s.data.Root)
165+
s.trie = s.db.prefetcher.trie(s.data.Root)
166166
}
167167
if s.trie == nil {
168168
var err error
@@ -309,14 +309,16 @@ func (s *stateObject) setState(key, value common.Hash) {
309309

310310
// finalise moves all dirty storage slots into the pending area to be hashed or
311311
// committed later. It is invoked at the end of every transaction.
312-
func (s *stateObject) finalise() {
313-
trieChanges := make([]common.Hash, 0, len(s.dirtyStorage))
312+
func (s *stateObject) finalise(prefetch bool) {
313+
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
314314
for key, value := range s.dirtyStorage {
315315
s.pendingStorage[key] = value
316-
trieChanges = append(trieChanges, key)
316+
if value != s.originStorage[key] {
317+
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
318+
}
317319
}
318-
if len(trieChanges) > 0 && s.db.prefetcher != nil && s.data.Root != emptyRoot {
319-
s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges)
320+
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
321+
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch)
320322
}
321323
if len(s.dirtyStorage) > 0 {
322324
s.dirtyStorage = make(Storage)
@@ -327,7 +329,7 @@ func (s *stateObject) finalise() {
327329
// It will return nil if the trie has not been loaded and no changes have been made
328330
func (s *stateObject) updateTrie(db Database) Trie {
329331
// Make sure all dirty slots are finalized into the pending storage area
330-
s.finalise()
332+
s.finalise(false) // Don't prefetch any more, pull directly if need be
331333
if len(s.pendingStorage) == 0 {
332334
return s.trie
333335
}
@@ -340,6 +342,8 @@ func (s *stateObject) updateTrie(db Database) Trie {
340342
// Insert all the pending updates into the trie
341343
tr := s.getTrie(db)
342344
hasher := s.db.hasher
345+
346+
usedStorage := make([][]byte, 0, len(s.pendingStorage))
343347
for key, value := range s.pendingStorage {
344348
// Skip noop changes, persist actual changes
345349
if value == s.originStorage[key] {
@@ -366,6 +370,10 @@ func (s *stateObject) updateTrie(db Database) Trie {
366370
}
367371
storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00
368372
}
373+
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
374+
}
375+
if s.db.prefetcher != nil {
376+
s.db.prefetcher.used(s.data.Root, usedStorage)
369377
}
370378
if len(s.pendingStorage) > 0 {
371379
s.pendingStorage = make(Storage)

core/state/state_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func TestSnapshot2(t *testing.T) {
170170
state.setStateObject(so0)
171171

172172
root, _ := state.Commit(false)
173-
state.Reset(root)
173+
state, _ = New(root, state.db, state.snaps)
174174

175175
// and one with deleted == true
176176
so1 := state.getStateObject(stateobjaddr1)

core/state/statedb.go

Lines changed: 68 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (n *proofList) Delete(key []byte) error {
6363
// * Accounts
6464
type StateDB struct {
6565
db Database
66-
prefetcher *TriePrefetcher
66+
prefetcher *triePrefetcher
6767
originalRoot common.Hash // The pre-state root, before any changes were made
6868
trie Trie
6969
hasher crypto.KeccakState
@@ -149,10 +149,25 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
149149
return sdb, nil
150150
}
151151

152-
func (s *StateDB) UsePrefetcher(prefetcher *TriePrefetcher) {
153-
if prefetcher != nil {
154-
s.prefetcher = prefetcher
155-
s.prefetcher.Resume(s.originalRoot)
152+
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
153+
// state trie concurrently while the state is mutated so that when we reach the
154+
// commit phase, most of the needed data is already hot.
155+
func (s *StateDB) StartPrefetcher(namespace string) {
156+
if s.prefetcher != nil {
157+
s.prefetcher.close()
158+
s.prefetcher = nil
159+
}
160+
if s.snap != nil {
161+
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
162+
}
163+
}
164+
165+
// StopPrefetcher terminates a running prefetcher and reports any leftover stats
166+
// from the gathered metrics.
167+
func (s *StateDB) StopPrefetcher() {
168+
if s.prefetcher != nil {
169+
s.prefetcher.close()
170+
s.prefetcher = nil
156171
}
157172
}
158173

@@ -167,37 +182,6 @@ func (s *StateDB) Error() error {
167182
return s.dbErr
168183
}
169184

170-
// Reset clears out all ephemeral state objects from the state db, but keeps
171-
// the underlying state trie to avoid reloading data for the next operations.
172-
func (s *StateDB) Reset(root common.Hash) error {
173-
tr, err := s.db.OpenTrie(root)
174-
if err != nil {
175-
return err
176-
}
177-
s.trie = tr
178-
s.stateObjects = make(map[common.Address]*stateObject)
179-
s.stateObjectsPending = make(map[common.Address]struct{})
180-
s.stateObjectsDirty = make(map[common.Address]struct{})
181-
s.thash = common.Hash{}
182-
s.bhash = common.Hash{}
183-
s.txIndex = 0
184-
s.logs = make(map[common.Hash][]*types.Log)
185-
s.logSize = 0
186-
s.preimages = make(map[common.Hash][]byte)
187-
s.clearJournalAndRefund()
188-
189-
if s.snaps != nil {
190-
s.snapAccounts, s.snapDestructs, s.snapStorage = nil, nil, nil
191-
if s.snap = s.snaps.Snapshot(root); s.snap != nil {
192-
s.snapDestructs = make(map[common.Hash]struct{})
193-
s.snapAccounts = make(map[common.Hash][]byte)
194-
s.snapStorage = make(map[common.Hash]map[common.Hash][]byte)
195-
}
196-
}
197-
s.accessList = newAccessList()
198-
return nil
199-
}
200-
201185
func (s *StateDB) AddLog(log *types.Log) {
202186
s.journal.append(addLogChange{txhash: s.thash})
203187

@@ -737,6 +721,13 @@ func (s *StateDB) Copy() *StateDB {
737721
// However, it doesn't cost us much to copy an empty list, so we do it anyway
738722
// to not blow up if we ever decide copy it in the middle of a transaction
739723
state.accessList = s.accessList.Copy()
724+
725+
// If there's a prefetcher running, make an inactive copy of it that can
726+
// only access data but does not actively preload (since the user will not
727+
// know that they need to explicitly terminate an active copy).
728+
if s.prefetcher != nil {
729+
state.prefetcher = s.prefetcher.copy()
730+
}
740731
return state
741732
}
742733

@@ -773,7 +764,7 @@ func (s *StateDB) GetRefund() uint64 {
773764
// the journal as well as the refunds. Finalise, however, will not push any updates
774765
// into the tries just yet. Only IntermediateRoot or Commit will do that.
775766
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
776-
var addressesToPrefetch []common.Address
767+
addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties))
777768
for addr := range s.journal.dirties {
778769
obj, exist := s.stateObjects[addr]
779770
if !exist {
@@ -798,21 +789,19 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
798789
delete(s.snapStorage, obj.addrHash) // Clear out any previously updated storage data (may be recreated via a ressurrect)
799790
}
800791
} else {
801-
obj.finalise()
792+
obj.finalise(true) // Prefetch slots in the background
802793
}
803794
s.stateObjectsPending[addr] = struct{}{}
804795
s.stateObjectsDirty[addr] = struct{}{}
796+
805797
// At this point, also ship the address off to the precacher. The precacher
806798
// will start loading tries, and when the change is eventually committed,
807799
// the commit-phase will be a lot faster
808-
if s.prefetcher != nil {
809-
addressesToPrefetch = append(addressesToPrefetch, addr)
810-
}
800+
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
811801
}
812-
if s.prefetcher != nil {
813-
s.prefetcher.PrefetchAddresses(addressesToPrefetch)
802+
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
803+
s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch)
814804
}
815-
816805
// Invalidate journal because reverting across transactions is not allowed.
817806
s.clearJournalAndRefund()
818807
}
@@ -824,29 +813,49 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
824813
// Finalise all the dirty storage states and write them into the tries
825814
s.Finalise(deleteEmptyObjects)
826815

827-
// Now we're about to start to write changes to the trie. The trie is so
828-
// far _untouched_. We can check with the prefetcher, if it can give us
829-
// a trie which has the same root, but also has some content loaded into it.
830-
// If so, use that one instead.
816+
// If there was a trie prefetcher operating, it gets aborted and irrevocably
817+
// modified after we start retrieving tries. Remove it from the statedb after
818+
// this round of use.
819+
//
820+
// This is weird pre-byzantium since the first tx runs with a prefetcher and
821+
// the remainder without, but pre-byzantium even the initial prefetcher is
822+
// useless, so no sleep lost.
823+
prefetcher := s.prefetcher
831824
if s.prefetcher != nil {
832-
s.prefetcher.Pause()
833-
// We only want to do this _once_, if someone calls IntermediateRoot again,
834-
// we shouldn't fetch the trie again
835-
if s.originalRoot != (common.Hash{}) {
836-
if trie := s.prefetcher.GetTrie(s.originalRoot); trie != nil {
837-
s.trie = trie
838-
}
839-
s.originalRoot = common.Hash{}
825+
defer func() {
826+
s.prefetcher.close()
827+
s.prefetcher = nil
828+
}()
829+
}
830+
// Although naively it makes sense to retrieve the account trie and then do
831+
// the contract storage and account updates sequentially, that short circuits
832+
// the account prefetcher. Instead, let's process all the storage updates
833+
// first, giving the account prefeches just a few more milliseconds of time
834+
// to pull useful data from disk.
835+
for addr := range s.stateObjectsPending {
836+
if obj := s.stateObjects[addr]; !obj.deleted {
837+
obj.updateRoot(s.db)
838+
}
839+
}
840+
// Now we're about to start to write changes to the trie. The trie is so far
841+
// _untouched_. We can check with the prefetcher, if it can give us a trie
842+
// which has the same root, but also has some content loaded into it.
843+
if prefetcher != nil {
844+
if trie := prefetcher.trie(s.originalRoot); trie != nil {
845+
s.trie = trie
840846
}
841847
}
848+
usedAddrs := make([][]byte, 0, len(s.stateObjectsPending))
842849
for addr := range s.stateObjectsPending {
843-
obj := s.stateObjects[addr]
844-
if obj.deleted {
850+
if obj := s.stateObjects[addr]; obj.deleted {
845851
s.deleteStateObject(obj)
846852
} else {
847-
obj.updateRoot(s.db)
848853
s.updateStateObject(obj)
849854
}
855+
usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure
856+
}
857+
if prefetcher != nil {
858+
prefetcher.used(s.originalRoot, usedAddrs)
850859
}
851860
if len(s.stateObjectsPending) > 0 {
852861
s.stateObjectsPending = make(map[common.Address]struct{})

core/state/statedb_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ func TestTouchDelete(t *testing.T) {
474474
s := newStateTest()
475475
s.state.GetOrNewStateObject(common.Address{})
476476
root, _ := s.state.Commit(false)
477-
s.state.Reset(root)
477+
s.state, _ = New(root, s.state.db, s.state.snaps)
478478

479479
snapshot := s.state.Snapshot()
480480
s.state.AddBalance(common.Address{}, new(big.Int))
@@ -676,7 +676,7 @@ func TestDeleteCreateRevert(t *testing.T) {
676676
state.SetBalance(addr, big.NewInt(1))
677677

678678
root, _ := state.Commit(false)
679-
state.Reset(root)
679+
state, _ = New(root, state.db, state.snaps)
680680

681681
// Simulate self-destructing in one transaction, then create-reverting in another
682682
state.Suicide(addr)
@@ -688,7 +688,7 @@ func TestDeleteCreateRevert(t *testing.T) {
688688

689689
// Commit the entire state and make sure we don't crash and have the correct state
690690
root, _ = state.Commit(true)
691-
state.Reset(root)
691+
state, _ = New(root, state.db, state.snaps)
692692

693693
if state.getStateObject(addr) != nil {
694694
t.Fatalf("self-destructed contract came alive")

0 commit comments

Comments
 (0)