Skip to content

Commit 3ed1810

Browse files
committed
core/blockchain, core/state: implement new trie prefetcher
1 parent 3f4bc34 commit 3ed1810

File tree

6 files changed

+247
-24
lines changed

6 files changed

+247
-24
lines changed

common/types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,3 +368,11 @@ func (ma *MixedcaseAddress) ValidChecksum() bool {
368368
func (ma *MixedcaseAddress) Original() string {
369369
return ma.original
370370
}
371+
372+
type TriePrefetcher interface {
373+
Pause()
374+
Reset(number uint64, root Hash)
375+
PrefetchAddress(addr Address)
376+
PrefetchStorage(root Hash, slots []Hash)
377+
Close()
378+
}

core/blockchain.go

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,9 @@ type BlockChain struct {
178178
wg sync.WaitGroup // chain processing wait group for shutting down
179179

180180
engine consensus.Engine
181-
validator Validator // Block and state validator interface
182-
prefetcher Prefetcher // Block state prefetcher interface
183-
processor Processor // Block transaction processor interface
181+
validator Validator // Block and state validator interface
182+
prefetcher common.TriePrefetcher // Trie prefetcher interface
183+
processor Processor // Block transaction processor interface
184184
vmConfig vm.Config
185185

186186
badBlocks *lru.Cache // Bad block cache
@@ -228,7 +228,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
228228
badBlocks: badBlocks,
229229
}
230230
bc.validator = NewBlockValidator(chainConfig, bc, engine)
231-
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
231+
//bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
232+
tp := newTriePrefetcher(bc.stateCache)
233+
go tp.loop()
234+
bc.prefetcher = tp
235+
232236
bc.processor = NewStateProcessor(chainConfig, bc, engine)
233237

234238
var err error
@@ -866,6 +870,9 @@ func (bc *BlockChain) Stop() {
866870
atomic.StoreInt32(&bc.procInterrupt, 1)
867871

868872
bc.wg.Wait()
873+
if bc.prefetcher != nil {
874+
bc.prefetcher.Close()
875+
}
869876

870877
// Ensure that the entirety of the state snapshot is journalled to disk.
871878
var snapBase common.Hash
@@ -1690,31 +1697,33 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
16901697
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
16911698
}
16921699
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
1700+
bc.prefetcher.Reset(block.NumberU64(), parent.Root)
1701+
statedb.UsePrefetcher(bc.prefetcher)
16931702
if err != nil {
16941703
return it.index, err
16951704
}
16961705
// If we have a followup block, run that against the current state to pre-cache
16971706
// transactions and probabilistically some of the account/storage trie nodes.
1698-
var followupInterrupt uint32
1699-
if !bc.cacheConfig.TrieCleanNoPrefetch {
1700-
if followup, err := it.peek(); followup != nil && err == nil {
1701-
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
1702-
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
1703-
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
1704-
1705-
blockPrefetchExecuteTimer.Update(time.Since(start))
1706-
if atomic.LoadUint32(interrupt) == 1 {
1707-
blockPrefetchInterruptMeter.Mark(1)
1708-
}
1709-
}(time.Now(), followup, throwaway, &followupInterrupt)
1710-
}
1711-
}
1707+
//var followupInterrupt uint32
1708+
//if !bc.cacheConfig.TrieCleanNoPrefetch {
1709+
// if followup, err := it.peek(); followup != nil && err == nil {
1710+
// throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
1711+
// go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
1712+
// bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
1713+
//
1714+
// blockPrefetchExecuteTimer.Update(time.Since(start))
1715+
// if atomic.LoadUint32(interrupt) == 1 {
1716+
// blockPrefetchInterruptMeter.Mark(1)
1717+
// }
1718+
// }(time.Now(), followup, throwaway, &followupInterrupt)
1719+
// }
1720+
//}
17121721
// Process block using the parent state as reference point
17131722
substart := time.Now()
17141723
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
17151724
if err != nil {
17161725
bc.reportBlock(block, receipts, err)
1717-
atomic.StoreUint32(&followupInterrupt, 1)
1726+
//atomic.StoreUint32(&followupInterrupt, 1)
17181727
return it.index, err
17191728
}
17201729
// Update the metrics touched during block processing
@@ -1724,7 +1733,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
17241733
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
17251734
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
17261735
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them
1727-
1736+
bc.prefetcher.Pause()
17281737
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
17291738
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
17301739
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
@@ -1735,7 +1744,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
17351744
substart = time.Now()
17361745
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
17371746
bc.reportBlock(block, receipts, err)
1738-
atomic.StoreUint32(&followupInterrupt, 1)
1747+
//atomic.StoreUint32(&followupInterrupt, 1)
17391748
return it.index, err
17401749
}
17411750
proctime := time.Since(start)
@@ -1749,7 +1758,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
17491758
// Write the block to the chain and get the status.
17501759
substart = time.Now()
17511760
status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
1752-
atomic.StoreUint32(&followupInterrupt, 1)
1761+
//atomic.StoreUint32(&followupInterrupt, 1)
17531762
if err != nil {
17541763
return it.index, err
17551764
}

core/state/database.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,20 @@ type cachingDB struct {
121121

122122
// OpenTrie opens the main account trie at a specific root hash.
123123
func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {
124-
return trie.NewSecure(root, db.db)
124+
tr, err := trie.NewSecure(root, db.db)
125+
if err != nil {
126+
return nil, err
127+
}
128+
return tr, nil
125129
}
126130

127131
// OpenStorageTrie opens the storage trie of an account.
128132
func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) {
129-
return trie.NewSecure(root, db.db)
133+
tr, err := trie.NewSecure(root, db.db)
134+
if err != nil {
135+
return nil, err
136+
}
137+
return tr, nil
130138
}
131139

132140
// CopyTrie returns an independent copy of the given trie.

core/state/state_object.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,13 @@ func (s *stateObject) setState(key, value common.Hash) {
301301
// finalise moves all dirty storage slots into the pending area to be hashed or
302302
// committed later. It is invoked at the end of every transaction.
303303
func (s *stateObject) finalise() {
304+
trieChanges := make([]common.Hash, 0, len(s.dirtyStorage))
304305
for key, value := range s.dirtyStorage {
305306
s.pendingStorage[key] = value
307+
trieChanges = append(trieChanges, key)
308+
}
309+
if len(trieChanges) > 0 && s.db.prefetcher != nil {
310+
s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges)
306311
}
307312
if len(s.dirtyStorage) > 0 {
308313
s.dirtyStorage = make(Storage)

core/state/statedb.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func (n *proofList) Delete(key []byte) error {
6565
// * Accounts
6666
type StateDB struct {
6767
db Database
68+
prefetcher common.TriePrefetcher
6869
trie Trie
6970
hasher crypto.KeccakHasher
7071

@@ -144,6 +145,10 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
144145
return sdb, nil
145146
}
146147

148+
func (s *StateDB) UsePrefetcher(prefetcher common.TriePrefetcher){
149+
s.prefetcher = prefetcher
150+
}
151+
147152
// setError remembers the first non-nil error it is called with.
148153
func (s *StateDB) setError(err error) {
149154
if s.dbErr == nil {
@@ -758,6 +763,12 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
758763
}
759764
s.stateObjectsPending[addr] = struct{}{}
760765
s.stateObjectsDirty[addr] = struct{}{}
766+
// At this point, also ship the address off to the precacher. The precacher
767+
// will start loading tries, and when the change is eventually committed,
768+
// the commit-phase will be a lot faster
769+
if s.prefetcher != nil{
770+
s.prefetcher.PrefetchAddress(addr)
771+
}
761772
}
762773
// Invalidate journal because reverting across transactions is not allowed.
763774
s.clearJournalAndRefund()

core/trie_prefetcher.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright 2020 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package core
18+
19+
import (
20+
"sync/atomic"
21+
22+
"github.com/ethereum/go-ethereum/common"
23+
"github.com/ethereum/go-ethereum/core/state"
24+
"github.com/ethereum/go-ethereum/log"
25+
"github.com/ethereum/go-ethereum/metrics"
26+
)
27+
28+
var (
29+
triePrefetchFetchMeter = metrics.NewRegisteredMeter("trie/prefetch/fetch", nil)
30+
triePrefetchSkipMeter = metrics.NewRegisteredMeter("trie/prefetch/skip", nil)
31+
triePrefetchDropMeter = metrics.NewRegisteredMeter("trie/prefetch/drop", nil)
32+
)
33+
34+
// triePrefetcher is an active prefetcher, which receives accounts or storage
35+
// items on two channels, and does trie-loading of the items.
36+
// The goal is to get as much useful content into the caches as possible
37+
type triePrefetcher struct {
38+
cmdCh chan (command)
39+
abortCh chan (struct{})
40+
db state.Database
41+
stale uint64
42+
}
43+
44+
func newTriePrefetcher(db state.Database) *triePrefetcher {
45+
return &triePrefetcher{
46+
cmdCh: make(chan command, 200),
47+
abortCh: make(chan struct{}),
48+
db: db,
49+
}
50+
}
51+
52+
type command struct {
53+
root *common.Hash
54+
address *common.Address
55+
slots []common.Hash
56+
}
57+
58+
func (p *triePrefetcher) loop() {
59+
var (
60+
tr state.Trie
61+
err error
62+
currentRoot common.Hash
63+
// Some tracking of performance
64+
skipped int64
65+
fetched int64
66+
)
67+
for {
68+
select {
69+
case cmd := <-p.cmdCh:
70+
// New roots are sent synchoronously
71+
if cmd.root != nil && cmd.slots == nil {
72+
// Update metrics at new block events
73+
triePrefetchFetchMeter.Mark(fetched)
74+
fetched = 0
75+
triePrefetchSkipMeter.Mark(skipped)
76+
skipped = 0
77+
// New root and number
78+
currentRoot = *cmd.root
79+
tr, err = p.db.OpenTrie(currentRoot)
80+
if err != nil {
81+
log.Warn("trie prefetcher failed opening trie", "root", currentRoot, "err", err)
82+
}
83+
// Open for business again
84+
atomic.StoreUint64(&p.stale, 0)
85+
continue
86+
}
87+
// Don't get stuck precaching on old blocks
88+
if atomic.LoadUint64(&p.stale) == 1 {
89+
if nSlots := len(cmd.slots); nSlots > 0 {
90+
skipped += int64(nSlots)
91+
} else {
92+
skipped++
93+
}
94+
// Keep reading until we're in step with the chain
95+
continue
96+
}
97+
// It's either storage slots or an account
98+
if cmd.slots != nil {
99+
storageTrie, err := p.db.OpenTrie(*cmd.root)
100+
if err != nil {
101+
log.Warn("trie prefetcher failed opening storage trie", "root", *cmd.root, "err", err)
102+
skipped += int64(len(cmd.slots))
103+
continue
104+
}
105+
for i, key := range cmd.slots {
106+
storageTrie.TryGet(key[:])
107+
fetched++
108+
// Abort if we fall behind
109+
if atomic.LoadUint64(&p.stale) == 1 {
110+
skipped += int64(len(cmd.slots[i:]))
111+
break
112+
}
113+
}
114+
} else { // an account
115+
if tr == nil {
116+
skipped++
117+
continue
118+
}
119+
// We're in sync with the chain, do preloading
120+
if cmd.address != nil {
121+
fetched++
122+
addr := *cmd.address
123+
tr.TryGet(addr[:])
124+
}
125+
}
126+
case <-p.abortCh:
127+
return
128+
}
129+
}
130+
}
131+
132+
// Close stops the prefetcher
133+
func (p *triePrefetcher) Close() {
134+
p.abortCh <- struct{}{}
135+
}
136+
137+
// Reset prevent the prefetcher from entering a state where it is
138+
// behind the actual block processing.
139+
// It causes any existing (stale) work to be ignored, and the prefetcher will skip ahead
140+
// to current tasks
141+
func (p *triePrefetcher) Reset(number uint64, root common.Hash) {
142+
// Set staleness
143+
atomic.StoreUint64(&p.stale, 1)
144+
// Do a synced send, so we're sure it punches through any old (now stale) commands
145+
cmd := command{
146+
root: &root,
147+
}
148+
p.cmdCh <- cmd
149+
}
150+
151+
func (p *triePrefetcher) Pause() {
152+
// Set staleness
153+
atomic.StoreUint64(&p.stale, 1)
154+
}
155+
156+
// PrefetchAddress adds an address for prefetching
157+
func (p *triePrefetcher) PrefetchAddress(addr common.Address) {
158+
cmd := command{
159+
address: &addr,
160+
}
161+
// We do an async send here, to not cause the caller to block
162+
select {
163+
case p.cmdCh <- cmd:
164+
default:
165+
triePrefetchDropMeter.Mark(1)
166+
}
167+
}
168+
169+
// PrefetchStorage adds a storage root and a set of keys for prefetching
170+
func (p *triePrefetcher) PrefetchStorage(root common.Hash, slots []common.Hash) {
171+
cmd := command{
172+
root: &root,
173+
slots: slots,
174+
}
175+
// We do an async send here, to not cause the caller to block
176+
select {
177+
case p.cmdCh <- cmd:
178+
default:
179+
triePrefetchDropMeter.Mark(int64(len(slots)))
180+
}
181+
182+
}

0 commit comments

Comments
 (0)