Skip to content

core, eth, les, trie: remove the sync bloom, used by fast sync #24047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/state/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// NewStateSync create a new state trie download scheduler.
func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.SyncBloom, onLeaf func(paths [][]byte, leaf []byte) error) *trie.Sync {
func NewStateSync(root common.Hash, database ethdb.KeyValueReader, onLeaf func(paths [][]byte, leaf []byte) error) *trie.Sync {
// Register the storage slot callback if the external callback is specified.
var onSlot func(paths [][]byte, hexpath []byte, leaf []byte, parent common.Hash) error
if onLeaf != nil {
Expand All @@ -52,6 +52,6 @@ func NewStateSync(root common.Hash, database ethdb.KeyValueReader, bloom *trie.S
syncer.AddCodeEntry(common.BytesToHash(obj.CodeHash), hexpath, parent)
return nil
}
syncer = trie.NewSync(root, database, onAccount, bloom)
syncer = trie.NewSync(root, database, onAccount)
return syncer
}
13 changes: 6 additions & 7 deletions core/state/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
Expand Down Expand Up @@ -134,7 +133,7 @@ func checkStateConsistency(db ethdb.Database, root common.Hash) error {
// Tests that an empty state is not scheduled for syncing.
func TestEmptyStateSync(t *testing.T) {
empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
sync := NewStateSync(empty, rawdb.NewMemoryDatabase(), trie.NewSyncBloom(1, memorydb.New()), nil)
sync := NewStateSync(empty, rawdb.NewMemoryDatabase(), nil)
if nodes, paths, codes := sync.Missing(1); len(nodes) != 0 || len(paths) != 0 || len(codes) != 0 {
t.Errorf(" content requested for empty state: %v, %v, %v", nodes, paths, codes)
}
Expand Down Expand Up @@ -171,7 +170,7 @@ func testIterativeStateSync(t *testing.T, count int, commit bool, bypath bool) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil)
sched := NewStateSync(srcRoot, dstDb, nil)

nodes, paths, codes := sched.Missing(count)
var (
Expand Down Expand Up @@ -250,7 +249,7 @@ func TestIterativeDelayedStateSync(t *testing.T) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil)
sched := NewStateSync(srcRoot, dstDb, nil)

nodes, _, codes := sched.Missing(0)
queue := append(append([]common.Hash{}, nodes...), codes...)
Expand Down Expand Up @@ -298,7 +297,7 @@ func testIterativeRandomStateSync(t *testing.T, count int) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil)
sched := NewStateSync(srcRoot, dstDb, nil)

queue := make(map[common.Hash]struct{})
nodes, _, codes := sched.Missing(count)
Expand Down Expand Up @@ -348,7 +347,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil)
sched := NewStateSync(srcRoot, dstDb, nil)

queue := make(map[common.Hash]struct{})
nodes, _, codes := sched.Missing(0)
Expand Down Expand Up @@ -415,7 +414,7 @@ func TestIncompleteStateSync(t *testing.T) {

// Create a destination state and sync with the scheduler
dstDb := rawdb.NewMemoryDatabase()
sched := NewStateSync(srcRoot, dstDb, trie.NewSyncBloom(1, dstDb), nil)
sched := NewStateSync(srcRoot, dstDb, nil)

var added []common.Hash

Expand Down
25 changes: 2 additions & 23 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

var (
Expand Down Expand Up @@ -101,8 +100,7 @@ type Downloader struct {
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed

stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for snap trie node and contract code existence checks
stateDB ethdb.Database // Database to state sync into (and deduplicate via)

// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
Expand Down Expand Up @@ -203,13 +201,12 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
Expand Down Expand Up @@ -365,12 +362,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
// If we are already full syncing, but have a snap-sync bloom filter laying
// around, make sure it doesn't use memory any more. This is a special case
// when the user attempts to snap sync a new empty network.
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
// If snap sync was requested, create the snap scheduler and switch to snap
// sync mode. Long term we could drop snap sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
Expand Down Expand Up @@ -612,9 +603,6 @@ func (d *Downloader) Terminate() {
default:
close(d.quitCh)
}
if d.stateBloom != nil {
d.stateBloom.Close()
}
d.quitLock.Unlock()

// Cancel any pending download requests
Expand Down Expand Up @@ -1599,15 +1587,6 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
return err
}
atomic.StoreInt32(&d.committed, 1)

// If we had a bloom filter for the state sync, deallocate it now. Note, we only
// deallocate internally, but keep the empty wrapper. This ensures that if we do
// a rollback after committing the pivot and restarting snap sync, we don't end
// up using a nil bloom. Empty bloom is fine, it just returns that it does not
// have the info we need, so reach down to the database instead.
if d.stateBloom != nil {
d.stateBloom.Close()
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newTester() *downloadTester {
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
tester.downloader = New(0, db, trie.NewSyncBloom(1, db), new(event.TypeMux), tester.chain, nil, tester.dropPeer)
tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer)
return tester
}

Expand Down
11 changes: 1 addition & 10 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

const (
Expand Down Expand Up @@ -106,7 +105,6 @@ type handler struct {
maxPeers int

downloader *downloader.Downloader
stateBloom *trie.SyncBloom
blockFetcher *fetcher.BlockFetcher
txFetcher *fetcher.TxFetcher
peers *peerSet
Expand Down Expand Up @@ -176,14 +174,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
// Construct the downloader (long sync) and its backing state bloom if snap
// sync is requested. The downloader is responsible for deallocating the state
// bloom when it's done.
// Note: we don't enable it if snap-sync is performed, since it's very heavy
// and the heal-portion of the snap sync is much lighter than snap. What we particularly
// want to avoid, is a 90%-finished (but restarted) snap-sync to begin
// indexing the entire trie
if atomic.LoadUint32(&h.snapSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 {
h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database)
}
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer)
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer)

// Construct the fetcher (short sync)
validator := func(header *types.Header) error {
Expand Down
6 changes: 2 additions & 4 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/trie"
)

// ethHandler implements the eth.Backend interface to handle the various network
// packets that are sent as replies or broadcasts.
type ethHandler handler

func (h *ethHandler) Chain() *core.BlockChain { return h.chain }
func (h *ethHandler) StateBloom() *trie.SyncBloom { return h.stateBloom }
func (h *ethHandler) TxPool() eth.TxPool { return h.txpool }
func (h *ethHandler) Chain() *core.BlockChain { return h.chain }
func (h *ethHandler) TxPool() eth.TxPool { return h.txpool }

// RunPeer is invoked when a peer joins on the `eth` protocol.
func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {
Expand Down
2 changes: 0 additions & 2 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

// testEthHandler is a mock event handler to listen for inbound network requests
Expand All @@ -50,7 +49,6 @@ type testEthHandler struct {
}

func (h *testEthHandler) Chain() *core.BlockChain { panic("no backing chain") }
func (h *testEthHandler) StateBloom() *trie.SyncBloom { panic("no backing state bloom") }
func (h *testEthHandler) TxPool() eth.TxPool { panic("no backing tx pool") }
func (h *testEthHandler) AcceptTxs() bool { return true }
func (h *testEthHandler) RunPeer(*eth.Peer, eth.Handler) error { panic("not used in tests") }
Expand Down
4 changes: 0 additions & 4 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

const (
Expand Down Expand Up @@ -69,9 +68,6 @@ type Backend interface {
// Chain retrieves the blockchain object to serve data.
Chain() *core.BlockChain

// StateBloom retrieves the bloom filter - if any - for state trie nodes.
StateBloom() *trie.SyncBloom

// TxPool retrieves the transaction pool object to serve data.
TxPool() TxPool

Expand Down
6 changes: 2 additions & 4 deletions eth/protocols/eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

var (
Expand Down Expand Up @@ -91,9 +90,8 @@ func (b *testBackend) close() {
b.chain.Stop()
}

func (b *testBackend) Chain() *core.BlockChain { return b.chain }
func (b *testBackend) StateBloom() *trie.SyncBloom { return nil }
func (b *testBackend) TxPool() TxPool { return b.txpool }
func (b *testBackend) Chain() *core.BlockChain { return b.chain }
func (b *testBackend) TxPool() TxPool { return b.txpool }

func (b *testBackend) RunPeer(peer *Peer, handler Handler) error {
// Normally the backend would do peer mainentance and handshakes. All that
Expand Down
8 changes: 2 additions & 6 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ func handleGetNodeData66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := ServiceGetNodeDataQuery(backend.Chain(), backend.StateBloom(), query.GetNodeDataPacket)
response := ServiceGetNodeDataQuery(backend.Chain(), query.GetNodeDataPacket)
return peer.ReplyNodeData(query.RequestId, response)
}

// ServiceGetNodeDataQuery assembles the response to a node data query. It is
// exposed to allow external packages to test protocol behavior.
func ServiceGetNodeDataQuery(chain *core.BlockChain, bloom *trie.SyncBloom, query GetNodeDataPacket) [][]byte {
func ServiceGetNodeDataQuery(chain *core.BlockChain, query GetNodeDataPacket) [][]byte {
// Gather state data until the fetch or network limits is reached
var (
bytes int
Expand All @@ -182,10 +182,6 @@ func ServiceGetNodeDataQuery(chain *core.BlockChain, bloom *trie.SyncBloom, quer
break
}
// Retrieve the requested state entry
if bloom != nil && !bloom.Contains(hash[:]) {
// Only lookup the trie node if there's chance that we actually have it
continue
}
entry, err := chain.TrieNode(hash)
if len(entry) == 0 || err != nil {
// Read the contract code with prefix only to save unnecessary lookups.
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.lock.Lock()
s.root = root
s.healer = &healTask{
scheduler: state.NewStateSync(root, s.db, nil, s.onHealState),
scheduler: state.NewStateSync(root, s.db, s.onHealState),
trieTasks: make(map[common.Hash]trie.SyncPath),
codeTasks: make(map[common.Hash]struct{}),
}
Expand Down
2 changes: 1 addition & 1 deletion les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T
height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
}
handler.fetcher = newLightFetcher(backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise)
handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.downloader = downloader.New(height, backend.chainDb, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.backend.peers.subscribe((*downloaderPeerNotify)(handler))
return handler
}
Expand Down
25 changes: 2 additions & 23 deletions les/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)

var (
Expand Down Expand Up @@ -97,8 +96,7 @@ type Downloader struct {
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed

stateDB ethdb.Database // Database to state sync into (and deduplicate via)
stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
stateDB ethdb.Database // Database to state sync into (and deduplicate via)

// Statistics
syncStatsChainOrigin uint64 // Origin block number where syncing started at
Expand Down Expand Up @@ -207,13 +205,12 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
Expand Down Expand Up @@ -367,12 +364,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
log.Info("Block synchronisation started")
}
// If we are already full syncing, but have a fast-sync bloom filter laying
// around, make sure it doesn't use memory any more. This is a special case
// when the user attempts to fast sync a new empty network.
if mode == FullSync && d.stateBloom != nil {
d.stateBloom.Close()
}
// If snap sync was requested, create the snap scheduler and switch to fast
// sync mode. Long term we could drop fast sync or merge the two together,
// but until snap becomes prevalent, we should support both. TODO(karalabe).
Expand Down Expand Up @@ -628,9 +619,6 @@ func (d *Downloader) Terminate() {
default:
close(d.quitCh)
}
if d.stateBloom != nil {
d.stateBloom.Close()
}
d.quitLock.Unlock()

// Cancel any pending download requests
Expand Down Expand Up @@ -1930,15 +1918,6 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
return err
}
atomic.StoreInt32(&d.committed, 1)

// If we had a bloom filter for the state sync, deallocate it now. Note, we only
// deallocate internally, but keep the empty wrapper. This ensures that if we do
// a rollback after committing the pivot and restarting fast sync, we don't end
// up using a nil bloom. Empty bloom is fine, it just returns that it does not
// have the info we need, so reach down to the database instead.
if d.stateBloom != nil {
d.stateBloom.Close()
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion les/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newTester() *downloadTester {
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(testGenesis.Root().Bytes(), []byte{0x00})

tester.downloader = New(0, tester.stateDb, trie.NewSyncBloom(1, tester.stateDb), new(event.TypeMux), tester, nil, tester.dropPeer)
tester.downloader = New(0, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
return tester
}

Expand Down
2 changes: 1 addition & 1 deletion les/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
root: root,
sched: state.NewStateSync(root, d.stateDB, d.stateBloom, nil),
sched: state.NewStateSync(root, d.stateDB, nil),
keccak: sha3.NewLegacyKeccak256().(crypto.KeccakState),
trieTasks: make(map[common.Hash]*trieTask),
codeTasks: make(map[common.Hash]*codeTask),
Expand Down
Loading