diff --git a/common/types.go b/common/types.go index cdcc6c20ad53..86239eedd5e6 100644 --- a/common/types.go +++ b/common/types.go @@ -368,3 +368,11 @@ func (ma *MixedcaseAddress) ValidChecksum() bool { func (ma *MixedcaseAddress) Original() string { return ma.original } + +type TriePrefetcher interface { + Pause() + Reset(number uint64, root Hash) + PrefetchAddress(addr Address) + PrefetchStorage(root Hash, slots []Hash) + Close() +} diff --git a/core/blockchain.go b/core/blockchain.go index 0d1c27f95e64..3b9e2ed7764e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -178,9 +178,9 @@ type BlockChain struct { wg sync.WaitGroup // chain processing wait group for shutting down engine consensus.Engine - validator Validator // Block and state validator interface - prefetcher Prefetcher // Block state prefetcher interface - processor Processor // Block transaction processor interface + validator Validator // Block and state validator interface + prefetcher common.TriePrefetcher // Trie prefetcher interface + processor Processor // Block transaction processor interface vmConfig vm.Config badBlocks *lru.Cache // Bad block cache @@ -228,7 +228,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlocks: badBlocks, } bc.validator = NewBlockValidator(chainConfig, bc, engine) - bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) + //bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) + tp := newTriePrefetcher(bc.stateCache) + go tp.loop() + bc.prefetcher = tp + bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error @@ -866,6 +870,9 @@ func (bc *BlockChain) Stop() { atomic.StoreInt32(&bc.procInterrupt, 1) bc.wg.Wait() + if bc.prefetcher != nil { + bc.prefetcher.Close() + } // Ensure that the entirety of the state snapshot is journalled to disk. var snapBase common.Hash @@ -1690,31 +1697,33 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + bc.prefetcher.Reset(block.NumberU64(), parent.Root) + statedb.UsePrefetcher(bc.prefetcher) if err != nil { return it.index, err } // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. - var followupInterrupt uint32 - if !bc.cacheConfig.TrieCleanNoPrefetch { - if followup, err := it.peek(); followup != nil && err == nil { - throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) - go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { - bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) - - blockPrefetchExecuteTimer.Update(time.Since(start)) - if atomic.LoadUint32(interrupt) == 1 { - blockPrefetchInterruptMeter.Mark(1) - } - }(time.Now(), followup, throwaway, &followupInterrupt) - } - } + //var followupInterrupt uint32 + //if !bc.cacheConfig.TrieCleanNoPrefetch { + // if followup, err := it.peek(); followup != nil && err == nil { + // throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) + // go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { + // bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) + // + // blockPrefetchExecuteTimer.Update(time.Since(start)) + // if atomic.LoadUint32(interrupt) == 1 { + // blockPrefetchInterruptMeter.Mark(1) + // } + // }(time.Now(), followup, throwaway, &followupInterrupt) + // } + //} // Process block using the parent state as reference point substart := time.Now() receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) + //atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } // Update the metrics touched during block processing @@ -1724,7 +1733,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them - + bc.prefetcher.Pause() triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates @@ -1735,7 +1744,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er substart = time.Now() if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) + //atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } proctime := time.Since(start) @@ -1749,7 +1758,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Write the block to the chain and get the status. substart = time.Now() status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false) - atomic.StoreUint32(&followupInterrupt, 1) + //atomic.StoreUint32(&followupInterrupt, 1) if err != nil { return it.index, err } diff --git a/core/state/database.go b/core/state/database.go index ecc2c134da6c..3dd3bab84f08 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -121,12 +121,20 @@ type cachingDB struct { // OpenTrie opens the main account trie at a specific root hash. func (db *cachingDB) OpenTrie(root common.Hash) (Trie, error) { - return trie.NewSecure(root, db.db) + tr, err := trie.NewSecure(root, db.db) + if err != nil { + return nil, err + } + return tr, nil } // OpenStorageTrie opens the storage trie of an account. func (db *cachingDB) OpenStorageTrie(addrHash, root common.Hash) (Trie, error) { - return trie.NewSecure(root, db.db) + tr, err := trie.NewSecure(root, db.db) + if err != nil { + return nil, err + } + return tr, nil } // CopyTrie returns an independent copy of the given trie. diff --git a/core/state/state_object.go b/core/state/state_object.go index 0833f2b0a866..591159414914 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -197,12 +197,24 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has } // If no live objects are available, attempt to use snapshots var ( - enc []byte - err error + enc []byte + err error + meter *time.Duration ) + readStart := time.Now() + if metrics.EnabledExpensive { + // If the snap is 'under construction', the first lookup may fail. If that + // happens, we don't want to double-count the time elapsed. Thus this + // dance with the metering. + defer func() { + if meter != nil { + *meter += time.Since(readStart) + } + }() + } if s.db.snap != nil { if metrics.EnabledExpensive { - defer func(start time.Time) { s.db.SnapshotStorageReads += time.Since(start) }(time.Now()) + meter = &s.db.SnapshotStorageReads } // If the object was destructed in *this* block (and potentially resurrected), // the storage has been cleared out, and we should *not* consult the previous @@ -217,8 +229,14 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has } // If snapshot unavailable or reading from it failed, load from the database if s.db.snap == nil || err != nil { + if meter != nil { + // If we already spent time checking the snapshot, account for it + // and reset the readStart + *meter += time.Since(readStart) + readStart = time.Now() + } if metrics.EnabledExpensive { - defer func(start time.Time) { s.db.StorageReads += time.Since(start) }(time.Now()) + meter = &s.db.StorageReads } if enc, err = s.getTrie(db).TryGet(key[:]); err != nil { s.setError(err) @@ -283,8 +301,13 @@ func (s *stateObject) setState(key, value common.Hash) { // finalise moves all dirty storage slots into the pending area to be hashed or // committed later. It is invoked at the end of every transaction. func (s *stateObject) finalise() { + trieChanges := make([]common.Hash, 0, len(s.dirtyStorage)) for key, value := range s.dirtyStorage { s.pendingStorage[key] = value + trieChanges = append(trieChanges, key) + } + if len(trieChanges) > 0 && s.db.prefetcher != nil { + s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges) } if len(s.dirtyStorage) > 0 { s.dirtyStorage = make(Storage) @@ -303,18 +326,11 @@ func (s *stateObject) updateTrie(db Database) Trie { if metrics.EnabledExpensive { defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now()) } - // Retrieve the snapshot storage map for the object + // The snapshot storage map for the object var storage map[common.Hash][]byte - if s.db.snap != nil { - // Retrieve the old storage map, if available, create a new one otherwise - storage = s.db.snapStorage[s.addrHash] - if storage == nil { - storage = make(map[common.Hash][]byte) - s.db.snapStorage[s.addrHash] = storage - } - } // Insert all the pending updates into the trie tr := s.getTrie(db) + hasher := s.db.hasher for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { @@ -331,8 +347,15 @@ func (s *stateObject) updateTrie(db Database) Trie { s.setError(tr.TryUpdate(key[:], v)) } // If state snapshotting is active, cache the data til commit - if storage != nil { - storage[crypto.Keccak256Hash(key[:])] = v // v will be nil if value is 0x00 + if s.db.snap != nil { + if storage == nil { + // Retrieve the old storage map, if available, create a new one otherwise + if storage = s.db.snapStorage[s.addrHash]; storage == nil { + storage = make(map[common.Hash][]byte) + s.db.snapStorage[s.addrHash] = storage + } + } + storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00 } } if len(s.pendingStorage) > 0 { diff --git a/core/state/statedb.go b/core/state/statedb.go index 3f96e8707ed0..cca6abfb3f0b 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -64,8 +64,10 @@ func (n *proofList) Delete(key []byte) error { // * Contracts // * Accounts type StateDB struct { - db Database - trie Trie + db Database + prefetcher common.TriePrefetcher + trie Trie + hasher crypto.KeccakHasher snaps *snapshot.Tree snap snapshot.Snapshot @@ -131,6 +133,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) logs: make(map[common.Hash][]*types.Log), preimages: make(map[common.Hash][]byte), journal: newJournal(), + hasher: crypto.NewKeccakHasher(), } if sdb.snaps != nil { if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil { @@ -142,6 +145,10 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return sdb, nil } +func (s *StateDB) UsePrefetcher(prefetcher common.TriePrefetcher){ + s.prefetcher = prefetcher +} + // setError remembers the first non-nil error it is called with. func (s *StateDB) setError(err error) { if s.dbErr == nil { @@ -516,7 +523,7 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject { defer func(start time.Time) { s.SnapshotAccountReads += time.Since(start) }(time.Now()) } var acc *snapshot.Account - if acc, err = s.snap.Account(crypto.Keccak256Hash(addr[:])); err == nil { + if acc, err = s.snap.Account(crypto.HashData(s.hasher, addr[:])); err == nil { if acc == nil { return nil } @@ -648,6 +655,7 @@ func (s *StateDB) Copy() *StateDB { logSize: s.logSize, preimages: make(map[common.Hash][]byte, len(s.preimages)), journal: newJournal(), + hasher: crypto.NewKeccakHasher(), } // Copy the dirty states, logs, and preimages for addr := range s.journal.dirties { @@ -755,6 +763,12 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { } s.stateObjectsPending[addr] = struct{}{} s.stateObjectsDirty[addr] = struct{}{} + // At this point, also ship the address off to the precacher. The precacher + // will start loading tries, and when the change is eventually committed, + // the commit-phase will be a lot faster + if s.prefetcher != nil{ + s.prefetcher.PrefetchAddress(addr) + } } // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() diff --git a/core/trie_prefetcher.go b/core/trie_prefetcher.go new file mode 100644 index 000000000000..27cfa7fe76b6 --- /dev/null +++ b/core/trie_prefetcher.go @@ -0,0 +1,182 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + triePrefetchFetchMeter = metrics.NewRegisteredMeter("trie/prefetch/fetch", nil) + triePrefetchSkipMeter = metrics.NewRegisteredMeter("trie/prefetch/skip", nil) + triePrefetchDropMeter = metrics.NewRegisteredMeter("trie/prefetch/drop", nil) +) + +// triePrefetcher is an active prefetcher, which receives accounts or storage +// items on two channels, and does trie-loading of the items. +// The goal is to get as much useful content into the caches as possible +type triePrefetcher struct { + cmdCh chan (command) + abortCh chan (struct{}) + db state.Database + stale uint64 +} + +func newTriePrefetcher(db state.Database) *triePrefetcher { + return &triePrefetcher{ + cmdCh: make(chan command, 200), + abortCh: make(chan struct{}), + db: db, + } +} + +type command struct { + root *common.Hash + address *common.Address + slots []common.Hash +} + +func (p *triePrefetcher) loop() { + var ( + tr state.Trie + err error + currentRoot common.Hash + // Some tracking of performance + skipped int64 + fetched int64 + ) + for { + select { + case cmd := <-p.cmdCh: + // New roots are sent synchoronously + if cmd.root != nil && cmd.slots == nil { + // Update metrics at new block events + triePrefetchFetchMeter.Mark(fetched) + fetched = 0 + triePrefetchSkipMeter.Mark(skipped) + skipped = 0 + // New root and number + currentRoot = *cmd.root + tr, err = p.db.OpenTrie(currentRoot) + if err != nil { + log.Warn("trie prefetcher failed opening trie", "root", currentRoot, "err", err) + } + // Open for business again + atomic.StoreUint64(&p.stale, 0) + continue + } + // Don't get stuck precaching on old blocks + if atomic.LoadUint64(&p.stale) == 1 { + if nSlots := len(cmd.slots); nSlots > 0 { + skipped += int64(nSlots) + } else { + skipped++ + } + // Keep reading until we're in step with the chain + continue + } + // It's either storage slots or an account + if cmd.slots != nil { + storageTrie, err := p.db.OpenTrie(*cmd.root) + if err != nil { + log.Warn("trie prefetcher failed opening storage trie", "root", *cmd.root, "err", err) + skipped += int64(len(cmd.slots)) + continue + } + for i, key := range cmd.slots { + storageTrie.TryGet(key[:]) + fetched++ + // Abort if we fall behind + if atomic.LoadUint64(&p.stale) == 1 { + skipped += int64(len(cmd.slots[i:])) + break + } + } + } else { // an account + if tr == nil { + skipped++ + continue + } + // We're in sync with the chain, do preloading + if cmd.address != nil { + fetched++ + addr := *cmd.address + tr.TryGet(addr[:]) + } + } + case <-p.abortCh: + return + } + } +} + +// Close stops the prefetcher +func (p *triePrefetcher) Close() { + p.abortCh <- struct{}{} +} + +// Reset prevent the prefetcher from entering a state where it is +// behind the actual block processing. +// It causes any existing (stale) work to be ignored, and the prefetcher will skip ahead +// to current tasks +func (p *triePrefetcher) Reset(number uint64, root common.Hash) { + // Set staleness + atomic.StoreUint64(&p.stale, 1) + // Do a synced send, so we're sure it punches through any old (now stale) commands + cmd := command{ + root: &root, + } + p.cmdCh <- cmd +} + +func (p *triePrefetcher) Pause() { + // Set staleness + atomic.StoreUint64(&p.stale, 1) +} + +// PrefetchAddress adds an address for prefetching +func (p *triePrefetcher) PrefetchAddress(addr common.Address) { + cmd := command{ + address: &addr, + } + // We do an async send here, to not cause the caller to block + select { + case p.cmdCh <- cmd: + default: + triePrefetchDropMeter.Mark(1) + } +} + +// PrefetchStorage adds a storage root and a set of keys for prefetching +func (p *triePrefetcher) PrefetchStorage(root common.Hash, slots []common.Hash) { + cmd := command{ + root: &root, + slots: slots, + } + // We do an async send here, to not cause the caller to block + select { + case p.cmdCh <- cmd: + default: + triePrefetchDropMeter.Mark(int64(len(slots))) + } + +} diff --git a/crypto/crypto.go b/crypto/crypto.go index 1f43ad15e847..663331637caf 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -24,6 +24,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "io/ioutil" "math/big" @@ -51,6 +52,27 @@ var ( var errInvalidPubkey = errors.New("invalid secp256k1 public key") +// KeccakHasher wraps sha3.state. In addition to the usual hash methods, it also supports +// Read to get a variable amount of data from the hash state. Read is faster than Sum +// because it doesn't copy the internal state, but also modifies the internal state. +type KeccakHasher interface { + hash.Hash + Read([]byte) (int, error) +} + +func NewKeccakHasher() KeccakHasher { + return sha3.NewLegacyKeccak256().(KeccakHasher) +} + +// hashData hashes the provided data and returns a 32 byte hash +// This method is not threadsafe +func HashData(kh KeccakHasher, data []byte) (h common.Hash) { + kh.Reset() + kh.Write(data) + kh.Read(h[:]) + return h +} + // Keccak256 calculates and returns the Keccak256 hash of the input data. func Keccak256(data ...[]byte) []byte { d := sha3.NewLegacyKeccak256() diff --git a/crypto/crypto_test.go b/crypto/crypto_test.go index f71ae8232ad6..0a781a754ff7 100644 --- a/crypto/crypto_test.go +++ b/crypto/crypto_test.go @@ -42,6 +42,13 @@ func TestKeccak256Hash(t *testing.T) { checkhash(t, "Sha3-256-array", func(in []byte) []byte { h := Keccak256Hash(in); return h[:] }, msg, exp) } +func TestKeccak256Hasher(t *testing.T) { + msg := []byte("abc") + exp, _ := hex.DecodeString("4e03657aea45a94fc7d47ba826c8d667c0d1e6e33a64a036ec44f58fa12d6c45") + hasher := NewKeccakHasher() + checkhash(t, "Sha3-256-array", func(in []byte) []byte { h := HashData(hasher, in); return h[:] }, msg, exp) +} + func TestToECDSAErrors(t *testing.T) { if _, err := HexToECDSA("0000000000000000000000000000000000000000000000000000000000000000"); err == nil { t.Fatal("HexToECDSA should've returned error")