Skip to content

Snapshotter: Improve updates #20796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,11 @@ func (ma *MixedcaseAddress) ValidChecksum() bool {
func (ma *MixedcaseAddress) Original() string {
return ma.original
}

type TriePrefetcher interface {
Pause()
Reset(number uint64, root Hash)
PrefetchAddress(addr Address)
PrefetchStorage(root Hash, slots []Hash)
Close()
}
53 changes: 31 additions & 22 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ type BlockChain struct {
wg sync.WaitGroup // chain processing wait group for shutting down

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher // Block state prefetcher interface
processor Processor // Block transaction processor interface
validator Validator // Block and state validator interface
prefetcher common.TriePrefetcher // Trie prefetcher interface
processor Processor // Block transaction processor interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
Expand Down Expand Up @@ -228,7 +228,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
badBlocks: badBlocks,
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
//bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
tp := newTriePrefetcher(bc.stateCache)
go tp.loop()
bc.prefetcher = tp

bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
Expand Down Expand Up @@ -866,6 +870,9 @@ func (bc *BlockChain) Stop() {
atomic.StoreInt32(&bc.procInterrupt, 1)

bc.wg.Wait()
if bc.prefetcher != nil {
bc.prefetcher.Close()
}

// Ensure that the entirety of the state snapshot is journalled to disk.
var snapBase common.Hash
Expand Down Expand Up @@ -1690,31 +1697,33 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
bc.prefetcher.Reset(block.NumberU64(), parent.Root)
statedb.UsePrefetcher(bc.prefetcher)
if err != nil {
return it.index, err
}
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(interrupt) == 1 {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), followup, throwaway, &followupInterrupt)
}
}
//var followupInterrupt uint32
//if !bc.cacheConfig.TrieCleanNoPrefetch {
// if followup, err := it.peek(); followup != nil && err == nil {
// throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
// go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
// bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
//
// blockPrefetchExecuteTimer.Update(time.Since(start))
// if atomic.LoadUint32(interrupt) == 1 {
// blockPrefetchInterruptMeter.Mark(1)
// }
// }(time.Now(), followup, throwaway, &followupInterrupt)
// }
//}
// Process block using the parent state as reference point
substart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
//atomic.StoreUint32(&followupInterrupt, 1)
return it.index, err
}
// Update the metrics touched during block processing
Expand All @@ -1724,7 +1733,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them

bc.prefetcher.Pause()
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
Expand All @@ -1735,7 +1744,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
substart = time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
//atomic.StoreUint32(&followupInterrupt, 1)
return it.index, err
}
proctime := time.Since(start)
Expand All @@ -1749,7 +1758,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// Write the block to the chain and get the status.
substart = time.Now()
status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
atomic.StoreUint32(&followupInterrupt, 1)
//atomic.StoreUint32(&followupInterrupt, 1)
if err != nil {
return it.index, err
}
Expand Down
12 changes: 10 additions & 2 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,20 @@ type cachingDB struct {

// OpenTrie opens the main account trie at a specific root hash.
func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// OpenStorageTrie opens the storage trie of an account.
func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) {
return trie.NewSecure(root, db.db)
tr, err := trie.NewSecure(root, db.db)
if err != nil {
return nil, err
}
return tr, nil
}

// CopyTrie returns an independent copy of the given trie.
Expand Down
53 changes: 38 additions & 15 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,24 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If no live objects are available, attempt to use snapshots
var (
enc []byte
err error
enc []byte
err error
meter *time.Duration
)
readStart := time.Now()
if metrics.EnabledExpensive {
// If the snap is 'under construction', the first lookup may fail. If that
// happens, we don't want to double-count the time elapsed. Thus this
// dance with the metering.
defer func() {
if meter != nil {
*meter += time.Since(readStart)
}
}()
}
if s.db.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now())
meter = &s.db.SnapshotStorageReads
}
// If the object was destructed in *this* block (and potentially resurrected),
// the storage has been cleared out, and we should *not* consult the previous
Expand All @@ -217,8 +229,14 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
}
// If snapshot unavailable or reading from it failed, load from the database
if s.db.snap == nil || err != nil {
if meter != nil {
// If we already spent time checking the snapshot, account for it
// and reset the readStart
*meter += time.Since(readStart)
readStart = time.Now()
}
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now())
meter = &s.db.StorageReads
}
if enc, err = s.getTrie(db).TryGet(key[:]); err != nil {
s.setError(err)
Expand Down Expand Up @@ -283,8 +301,13 @@ func (s *stateObject) setState(key, value common.Hash) {
// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise() {
trieChanges := make([]common.Hash, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
s.pendingStorage[key] = value
trieChanges = append(trieChanges, key)
}
if len(trieChanges) > 0 && s.db.prefetcher != nil {
s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges)
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand All @@ -303,18 +326,11 @@ func (s *stateObject) updateTrie(db Database) Trie {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now())
}
// Retrieve the snapshot storage map for the object
// The snapshot storage map for the object
var storage map[common.Hash][]byte
if s.db.snap != nil {
// Retrieve the old storage map, if available, create a new one otherwise
storage = s.db.snapStorage[s.addrHash]
if storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
// Insert all the pending updates into the trie
tr := s.getTrie(db)
hasher := s.db.hasher
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
Expand All @@ -331,8 +347,15 @@ func (s *stateObject) updateTrie(db Database) Trie {
s.setError(tr.TryUpdate(key[:], v))
}
// If state snapshotting is active, cache the data til commit
if storage != nil {
storage[crypto.Keccak256Hash(key[:])] = v // v will be nil if value is 0x00
if s.db.snap != nil {
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = s.db.snapStorage[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.snapStorage[s.addrHash] = storage
}
}
storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00
}
}
if len(s.pendingStorage) > 0 {
Expand Down
20 changes: 17 additions & 3 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ func (n *proofList) Delete(key []byte) error {
// * Contracts
// * Accounts
type StateDB struct {
db Database
trie Trie
db Database
prefetcher common.TriePrefetcher
trie Trie
hasher crypto.KeccakHasher

snaps *snapshot.Tree
snap snapshot.Snapshot
Expand Down Expand Up @@ -131,6 +133,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
logs: make(map[common.Hash][]*types.Log),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
hasher: crypto.NewKeccakHasher(),
}
if sdb.snaps != nil {
if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil {
Expand All @@ -142,6 +145,10 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
return sdb, nil
}

func (s *StateDB) UsePrefetcher(prefetcher common.TriePrefetcher){
s.prefetcher = prefetcher
}

// setError remembers the first non-nil error it is called with.
func (s *StateDB) setError(err error) {
if s.dbErr == nil {
Expand Down Expand Up @@ -516,7 +523,7 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
defer func(start time.Time) { s.SnapshotAccountReads += time.Since(start) }(time.Now())
}
var acc *snapshot.Account
if acc, err = s.snap.Account(crypto.Keccak256Hash(addr[:])); err == nil {
if acc, err = s.snap.Account(crypto.HashData(s.hasher, addr[:])); err == nil {
if acc == nil {
return nil
}
Expand Down Expand Up @@ -648,6 +655,7 @@ func (s *StateDB) Copy() *StateDB {
logSize: s.logSize,
preimages: make(map[common.Hash][]byte, len(s.preimages)),
journal: newJournal(),
hasher: crypto.NewKeccakHasher(),
}
// Copy the dirty states, logs, and preimages
for addr := range s.journal.dirties {
Expand Down Expand Up @@ -755,6 +763,12 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
}
s.stateObjectsPending[addr] = struct{}{}
s.stateObjectsDirty[addr] = struct{}{}
// At this point, also ship the address off to the precacher. The precacher
// will start loading tries, and when the change is eventually committed,
// the commit-phase will be a lot faster
if s.prefetcher != nil{
s.prefetcher.PrefetchAddress(addr)
}
}
// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
Expand Down
Loading