From 3270aade1847f87bd4fdc0733770e9dbc45a8760 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 10 Jan 2021 18:22:08 +0100 Subject: [PATCH 1/7] eth: moved eth.Config to eth/protocols/eth --- cmd/faucet/faucet.go | 2 +- cmd/geth/config.go | 2 +- cmd/utils/flags.go | 5 +++-- cmd/utils/flags_legacy.go | 2 +- console/console_test.go | 7 ++++--- eth/backend.go | 8 ++++---- eth/{ => protocols/eth}/config.go | 0 eth/{ => protocols/eth}/gen_config.go | 0 ethclient/ethclient_test.go | 3 ++- graphql/graphql_test.go | 5 +++-- les/api_test.go | 5 +++-- les/client.go | 7 ++++--- les/commons.go | 2 +- les/costtracker.go | 2 +- les/server.go | 5 +++-- les/test_helper.go | 5 +++-- mobile/geth.go | 2 +- 17 files changed, 35 insertions(+), 27 deletions(-) rename eth/{ => protocols/eth}/config.go (100%) rename eth/{ => protocols/eth}/gen_config.go (100%) diff --git a/cmd/faucet/faucet.go b/cmd/faucet/faucet.go index b9c4e1819a06..157b08c96b3f 100644 --- a/cmd/faucet/faucet.go +++ b/cmd/faucet/faucet.go @@ -47,8 +47,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethstats" "github.com/ethereum/go-ethereum/les" diff --git a/cmd/geth/config.go b/cmd/geth/config.go index bf1fc55b172a..2c55040b6d74 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -27,7 +27,7 @@ import ( "gopkg.in/urfave/cli.v1" "github.com/ethereum/go-ethereum/cmd/utils" - "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 684e3428ba22..e34b08e6e6a8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -42,9 +42,10 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth" + ethereum "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethstats" "github.com/ethereum/go-ethereum/graphql" @@ -1722,7 +1723,7 @@ func RegisterEthService(stack *node.Node, cfg *eth.Config) ethapi.Backend { } return backend.ApiBackend } - backend, err := eth.New(stack, cfg) + backend, err := ethereum.New(stack, cfg) if err != nil { Fatalf("Failed to register the Ethereum service: %v", err) } diff --git a/cmd/utils/flags_legacy.go b/cmd/utils/flags_legacy.go index 1376d47c0554..9484d190cbed 100644 --- a/cmd/utils/flags_legacy.go +++ b/cmd/utils/flags_legacy.go @@ -20,7 +20,7 @@ import ( "fmt" "strings" - "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/node" "gopkg.in/urfave/cli.v1" ) diff --git a/console/console_test.go b/console/console_test.go index 68c03d108d00..224e18d992e0 100644 --- a/console/console_test.go +++ b/console/console_test.go @@ -30,7 +30,8 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/console/prompt" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/eth" + ethereum "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/internal/jsre" "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/node" @@ -77,7 +78,7 @@ func (p *hookedPrompter) SetWordCompleter(completer prompt.WordCompleter) {} type tester struct { workspace string stack *node.Node - ethereum *eth.Ethereum + ethereum *ethereum.Ethereum console *Console input *hookedPrompter output *bytes.Buffer @@ -109,7 +110,7 @@ func newTester(t *testing.T, confOverride func(*eth.Config)) *tester { if confOverride != nil { confOverride(ethConf) } - ethBackend, err := eth.New(stack, ethConf) + ethBackend, err := ethereum.New(stack, ethConf) if err != nil { t.Fatalf("failed to register Ethereum protocol: %v", err) } diff --git a/eth/backend.go b/eth/backend.go index c1732d3ceb6a..45a5028e0d0d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -57,7 +57,7 @@ import ( // Ethereum implements the Ethereum full node service. type Ethereum struct { - config *Config + config *eth.Config // Handlers txPool *core.TxPool @@ -93,7 +93,7 @@ type Ethereum struct { // New creates a new Ethereum object (including the // initialisation of the common Ethereum object) -func New(stack *node.Node, config *Config) (*Ethereum, error) { +func New(stack *node.Node, config *eth.Config) (*Ethereum, error) { // Ensure configuration values are compatible and sane if config.SyncMode == downloader.LightSync { return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum") @@ -102,8 +102,8 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 { - log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", DefaultConfig.Miner.GasPrice) - config.Miner.GasPrice = new(big.Int).Set(DefaultConfig.Miner.GasPrice) + log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", eth.DefaultConfig.Miner.GasPrice) + config.Miner.GasPrice = new(big.Int).Set(eth.DefaultConfig.Miner.GasPrice) } if config.NoPruning && config.TrieDirtyCache > 0 { if config.SnapshotCache > 0 { diff --git a/eth/config.go b/eth/protocols/eth/config.go similarity index 100% rename from eth/config.go rename to eth/protocols/eth/config.go diff --git a/eth/gen_config.go b/eth/protocols/eth/gen_config.go similarity index 100% rename from eth/gen_config.go rename to eth/protocols/eth/gen_config.go diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index d700022e8fad..c53d8644d34f 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" + ethp "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" @@ -195,7 +196,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { t.Fatalf("can't create new node: %v", err) } // Create Ethereum Service - config := ð.Config{Genesis: genesis} + config := ðp.Config{Genesis: genesis} config.Ethash.PowMode = ethash.ModeFake ethservice, err := eth.New(n, config) if err != nil { diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index fc62da1813a2..7d55774b2555 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -27,7 +27,8 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/eth" + ethereum "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" ) @@ -202,7 +203,7 @@ func createGQLService(t *testing.T, stack *node.Node, endpoint string) { TrieTimeout: 60 * time.Minute, SnapshotCache: 5, } - ethBackend, err := eth.New(stack, ethConf) + ethBackend, err := ethereum.New(stack, ethConf) if err != nil { t.Fatalf("could not create eth backend: %v", err) } diff --git a/les/api_test.go b/les/api_test.go index 2895264f67ed..9b530cc8e41f 100644 --- a/les/api_test.go +++ b/les/api_test.go @@ -31,8 +31,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus/ethash" - "github.com/ethereum/go-ethereum/eth" + ethereum "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -503,7 +504,7 @@ func newLesServerService(ctx *adapters.ServiceContext, stack *node.Node) (node.L config.SyncMode = downloader.FullSync config.LightServ = testServerCapacity config.LightPeers = testMaxClients - ethereum, err := eth.New(stack, &config) + ethereum, err := ethereum.New(stack, &config) if err != nil { return nil, err } diff --git a/les/client.go b/les/client.go index 47997a098b30..d3438bb16264 100644 --- a/les/client.go +++ b/les/client.go @@ -30,10 +30,11 @@ import ( "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth" + ethx "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" lpc "github.com/ethereum/go-ethereum/les/lespay/client" @@ -104,9 +105,9 @@ func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) { eventMux: stack.EventMux(), reqDist: newRequestDistributor(peers, &mclock.System{}), accountManager: stack.AccountManager(), - engine: eth.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb), + engine: ethx.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb), bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), + bloomIndexer: ethx.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)), p2pServer: stack.Server(), } diff --git a/les/commons.go b/les/commons.go index 003e196d2b82..0a4364b22ea3 100644 --- a/les/commons.go +++ b/les/commons.go @@ -25,7 +25,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/les/checkpointoracle" diff --git a/les/costtracker.go b/les/costtracker.go index 0558779bc5e6..e16a3885126c 100644 --- a/les/costtracker.go +++ b/les/costtracker.go @@ -24,7 +24,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/log" diff --git a/les/server.go b/les/server.go index cbedce136c35..cb99367c72c3 100644 --- a/les/server.go +++ b/les/server.go @@ -22,7 +22,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/eth" + ethx "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" lps "github.com/ethereum/go-ethereum/les/lespay/server" "github.com/ethereum/go-ethereum/light" @@ -75,7 +76,7 @@ type LesServer struct { p2pSrv *p2p.Server } -func NewLesServer(node *node.Node, e *eth.Ethereum, config *eth.Config) (*LesServer, error) { +func NewLesServer(node *node.Node, e *ethx.Ethereum, config *eth.Config) (*LesServer, error) { ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup) // Collect les protocol version information supported by local node. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions)) diff --git a/les/test_helper.go b/les/test_helper.go index d108a8dacefb..ea93574ae845 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -38,7 +38,8 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth" + ethx "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/les/checkpointoracle" @@ -163,7 +164,7 @@ func prepare(n int, backend *backends.SimulatedBackend) { func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.IndexerConfig, disablePruning bool) []*core.ChainIndexer { var indexers [3]*core.ChainIndexer indexers[0] = light.NewChtIndexer(db, odr, config.ChtSize, config.ChtConfirms, disablePruning) - indexers[1] = eth.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms) + indexers[1] = ethx.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms) indexers[2] = light.NewBloomTrieIndexer(db, odr, config.BloomSize, config.BloomTrieSize, disablePruning) // make bloomTrieIndexer as a child indexer of bloom indexer. indexers[1].AddChildIndexer(indexers[2]) diff --git a/mobile/geth.go b/mobile/geth.go index b561e33675df..d4c6153b9df2 100644 --- a/mobile/geth.go +++ b/mobile/geth.go @@ -25,8 +25,8 @@ import ( "path/filepath" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethstats" "github.com/ethereum/go-ethereum/internal/debug" From 26682ba41158ea5d9e55f0580e8e277e602858c8 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 10 Jan 2021 18:32:26 +0100 Subject: [PATCH 2/7] les: replaced direct *eth.Ethereum ref with interface --- les/server.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/les/server.go b/les/server.go index cb99367c72c3..038ead28d615 100644 --- a/les/server.go +++ b/les/server.go @@ -22,8 +22,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" - ethx "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/les/flowcontrol" lps "github.com/ethereum/go-ethereum/les/lespay/server" "github.com/ethereum/go-ethereum/light" @@ -76,7 +77,16 @@ type LesServer struct { p2pSrv *p2p.Server } -func NewLesServer(node *node.Node, e *ethx.Ethereum, config *eth.Config) (*LesServer, error) { +type ethBackend interface { + ArchiveMode() bool + BlockChain() *core.BlockChain + BloomIndexer() *core.ChainIndexer + ChainDb() ethdb.Database + Synced() bool + TxPool() *core.TxPool +} + +func NewLesServer(node *node.Node, e ethBackend, config *eth.Config) (*LesServer, error) { ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup) // Collect les protocol version information supported by local node. lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions)) From 8a0f254b1f80eae9bf9f7412a259f30b0a3b0b66 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 10 Jan 2021 18:42:55 +0100 Subject: [PATCH 3/7] eth, core: moved bloom indexer creation to core --- core/bloombits.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++ eth/backend.go | 2 +- eth/bloombits.go | 69 ---------------------------------- les/client.go | 2 +- les/test_helper.go | 3 +- 5 files changed, 95 insertions(+), 73 deletions(-) create mode 100644 core/bloombits.go diff --git a/core/bloombits.go b/core/bloombits.go new file mode 100644 index 000000000000..856746a1c088 --- /dev/null +++ b/core/bloombits.go @@ -0,0 +1,92 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" +) + +const ( + // bloomThrottling is the time to wait between processing two consecutive index + // sections. It's useful during chain upgrades to prevent disk overload. + bloomThrottling = 100 * time.Millisecond +) + +// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index +// for the Ethereum header bloom filters, permitting blazing fast filtering. +type BloomIndexer struct { + size uint64 // section size to generate bloombits for + db ethdb.Database // database instance to write index data and metadata into + gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index + section uint64 // Section is the section number being processed currently + head common.Hash // Head is the hash of the last header processed +} + +// NewBloomIndexer returns a chain indexer that generates bloom bits data for the +// canonical chain for fast logs filtering. +func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *ChainIndexer { + backend := &BloomIndexer{ + db: db, + size: size, + } + table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) + + return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") +} + +// Reset implements core.ChainIndexerBackend, starting a new bloombits index +// section. +func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { + gen, err := bloombits.NewGenerator(uint(b.size)) + b.gen, b.section, b.head = gen, section, common.Hash{} + return err +} + +// Process implements core.ChainIndexerBackend, adding a new header's bloom into +// the index. +func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { + b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) + b.head = header.Hash() + return nil +} + +// Commit implements core.ChainIndexerBackend, finalizing the bloom section and +// writing it out into the database. +func (b *BloomIndexer) Commit() error { + batch := b.db.NewBatch() + for i := 0; i < types.BloomBitLength; i++ { + bits, err := b.gen.Bitset(uint(i)) + if err != nil { + return err + } + rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) + } + return batch.Write() +} + +// Prune returns an empty error since we don't support pruning here. +func (b *BloomIndexer) Prune(threshold uint64) error { + return nil +} diff --git a/eth/backend.go b/eth/backend.go index 45a5028e0d0d..a3c2ed2a37b6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -138,7 +138,7 @@ func New(stack *node.Node, config *eth.Config) (*Ethereum, error) { gasPrice: config.Miner.GasPrice, etherbase: config.Miner.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), + bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), p2pServer: stack.Server(), } diff --git a/eth/bloombits.go b/eth/bloombits.go index bd34bd7b69d5..0cb7050d2327 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -17,16 +17,10 @@ package eth import ( - "context" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/bitutil" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" ) const ( @@ -78,66 +72,3 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { }() } } - -const ( - // bloomThrottling is the time to wait between processing two consecutive index - // sections. It's useful during chain upgrades to prevent disk overload. - bloomThrottling = 100 * time.Millisecond -) - -// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index -// for the Ethereum header bloom filters, permitting blazing fast filtering. -type BloomIndexer struct { - size uint64 // section size to generate bloombits for - db ethdb.Database // database instance to write index data and metadata into - gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index - section uint64 // Section is the section number being processed currently - head common.Hash // Head is the hash of the last header processed -} - -// NewBloomIndexer returns a chain indexer that generates bloom bits data for the -// canonical chain for fast logs filtering. -func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *core.ChainIndexer { - backend := &BloomIndexer{ - db: db, - size: size, - } - table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) - - return core.NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") -} - -// Reset implements core.ChainIndexerBackend, starting a new bloombits index -// section. -func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { - gen, err := bloombits.NewGenerator(uint(b.size)) - b.gen, b.section, b.head = gen, section, common.Hash{} - return err -} - -// Process implements core.ChainIndexerBackend, adding a new header's bloom into -// the index. -func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { - b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) - b.head = header.Hash() - return nil -} - -// Commit implements core.ChainIndexerBackend, finalizing the bloom section and -// writing it out into the database. -func (b *BloomIndexer) Commit() error { - batch := b.db.NewBatch() - for i := 0; i < types.BloomBitLength; i++ { - bits, err := b.gen.Bitset(uint(i)) - if err != nil { - return err - } - rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) - } - return batch.Write() -} - -// Prune returns an empty error since we don't support pruning here. -func (b *BloomIndexer) Prune(threshold uint64) error { - return nil -} diff --git a/les/client.go b/les/client.go index d3438bb16264..7408de9a6de1 100644 --- a/les/client.go +++ b/les/client.go @@ -107,7 +107,7 @@ func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) { accountManager: stack.AccountManager(), engine: ethx.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb), bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: ethx.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), + bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)), p2pServer: stack.Server(), } diff --git a/les/test_helper.go b/les/test_helper.go index ea93574ae845..27fbace7fc94 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - ethx "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -164,7 +163,7 @@ func prepare(n int, backend *backends.SimulatedBackend) { func testIndexers(db ethdb.Database, odr light.OdrBackend, config *light.IndexerConfig, disablePruning bool) []*core.ChainIndexer { var indexers [3]*core.ChainIndexer indexers[0] = light.NewChtIndexer(db, odr, config.ChtSize, config.ChtConfirms, disablePruning) - indexers[1] = ethx.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms) + indexers[1] = core.NewBloomIndexer(db, config.BloomSize, config.BloomConfirms) indexers[2] = light.NewBloomTrieIndexer(db, odr, config.BloomSize, config.BloomTrieSize, disablePruning) // make bloomTrieIndexer as a child indexer of bloom indexer. indexers[1].AddChildIndexer(indexers[2]) From 6cd7b63960b53e8a6ad74a2327926b1308ba2f78 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 10 Jan 2021 18:55:48 +0100 Subject: [PATCH 4/7] les: pass reference to CreateConsensusEngine --- cmd/faucet/faucet.go | 3 ++- cmd/utils/flags.go | 2 +- les/api_test.go | 2 +- les/client.go | 9 ++++++--- mobile/geth.go | 3 ++- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/cmd/faucet/faucet.go b/cmd/faucet/faucet.go index 157b08c96b3f..1fefa5c769a9 100644 --- a/cmd/faucet/faucet.go +++ b/cmd/faucet/faucet.go @@ -47,6 +47,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + ethereum "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethclient" @@ -254,7 +255,7 @@ func newFaucet(genesis *core.Genesis, port int, enodes []*discv5.Node, network u cfg.Genesis = genesis utils.SetDNSDiscoveryDefaults(&cfg, genesis.ToBlock(nil).Hash()) - lesBackend, err := les.New(stack, &cfg) + lesBackend, err := les.New(stack, &cfg, ethereum.CreateConsensusEngine) if err != nil { return nil, fmt.Errorf("Failed to register the Ethereum service: %w", err) } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e34b08e6e6a8..ffae713c0212 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1717,7 +1717,7 @@ func SetDNSDiscoveryDefaults(cfg *eth.Config, genesis common.Hash) { // RegisterEthService adds an Ethereum client to the stack. func RegisterEthService(stack *node.Node, cfg *eth.Config) ethapi.Backend { if cfg.SyncMode == downloader.LightSync { - backend, err := les.New(stack, cfg) + backend, err := les.New(stack, cfg, ethereum.CreateConsensusEngine) if err != nil { Fatalf("Failed to register the Ethereum service: %v", err) } diff --git a/les/api_test.go b/les/api_test.go index 9b530cc8e41f..7e4d6d122a68 100644 --- a/les/api_test.go +++ b/les/api_test.go @@ -496,7 +496,7 @@ func newLesClientService(ctx *adapters.ServiceContext, stack *node.Node) (node.L config := eth.DefaultConfig config.SyncMode = downloader.LightSync config.Ethash.PowMode = ethash.ModeFake - return New(stack, &config) + return New(stack, &config, ethereum.CreateConsensusEngine) } func newLesServerService(ctx *adapters.ServiceContext, stack *node.Node) (node.Lifecycle, error) { diff --git a/les/client.go b/les/client.go index 7408de9a6de1..e3a4ce45fcfa 100644 --- a/les/client.go +++ b/les/client.go @@ -26,15 +26,16 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" - ethx "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" lpc "github.com/ethereum/go-ethereum/les/lespay/client" @@ -75,8 +76,10 @@ type LightEthereum struct { p2pServer *p2p.Server } +type consensusCreatorFn func(stack *node.Node, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database) consensus.Engine + // New creates an instance of the light client. -func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) { +func New(stack *node.Node, config *eth.Config, cc consensusCreatorFn) (*LightEthereum, error) { chainDb, err := stack.OpenDatabase("lightchaindata", config.DatabaseCache, config.DatabaseHandles, "eth/db/chaindata/") if err != nil { return nil, err @@ -105,7 +108,7 @@ func New(stack *node.Node, config *eth.Config) (*LightEthereum, error) { eventMux: stack.EventMux(), reqDist: newRequestDistributor(peers, &mclock.System{}), accountManager: stack.AccountManager(), - engine: ethx.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb), + engine: cc(stack, chainConfig, &config.Ethash, nil, false, chainDb), bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)), diff --git a/mobile/geth.go b/mobile/geth.go index d4c6153b9df2..59a6f784985a 100644 --- a/mobile/geth.go +++ b/mobile/geth.go @@ -25,6 +25,7 @@ import ( "path/filepath" "github.com/ethereum/go-ethereum/core" + ethereum "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethclient" @@ -187,7 +188,7 @@ func NewNode(datadir string, config *NodeConfig) (stack *Node, _ error) { ethConf.SyncMode = downloader.LightSync ethConf.NetworkId = uint64(config.EthereumNetworkID) ethConf.DatabaseCache = config.EthereumDatabaseCache - lesBackend, err := les.New(rawStack, ðConf) + lesBackend, err := les.New(rawStack, ðConf, ethereum.CreateConsensusEngine) if err != nil { return nil, fmt.Errorf("ethereum init: %v", err) } From dadca59afdc236f9ffa2d6d538536fb229356a44 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 10 Jan 2021 20:28:17 +0100 Subject: [PATCH 5/7] les: renamed cc to mkEngineFn --- les/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/les/client.go b/les/client.go index e3a4ce45fcfa..7ca2e646bf35 100644 --- a/les/client.go +++ b/les/client.go @@ -79,7 +79,7 @@ type LightEthereum struct { type consensusCreatorFn func(stack *node.Node, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database) consensus.Engine // New creates an instance of the light client. -func New(stack *node.Node, config *eth.Config, cc consensusCreatorFn) (*LightEthereum, error) { +func New(stack *node.Node, config *eth.Config, mkEngineFn consensusCreatorFn) (*LightEthereum, error) { chainDb, err := stack.OpenDatabase("lightchaindata", config.DatabaseCache, config.DatabaseHandles, "eth/db/chaindata/") if err != nil { return nil, err @@ -108,7 +108,7 @@ func New(stack *node.Node, config *eth.Config, cc consensusCreatorFn) (*LightEth eventMux: stack.EventMux(), reqDist: newRequestDistributor(peers, &mclock.System{}), accountManager: stack.AccountManager(), - engine: cc(stack, chainConfig, &config.Ethash, nil, false, chainDb), + engine: mkEngineFn(stack, chainConfig, &config.Ethash, nil, false, chainDb), bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)), From 335d024f9b27af3a1570070677293e437bbb6bec Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 13 Jan 2021 14:20:57 +0100 Subject: [PATCH 6/7] les: refactored server handler --- les/handler_test.go | 34 +- les/peer.go | 4 +- les/protocol.go | 8 - les/server_handler.go | 790 ++++++++--------------------------------- les/server_requests.go | 539 ++++++++++++++++++++++++++++ 5 files changed, 710 insertions(+), 665 deletions(-) create mode 100644 les/server_requests.go diff --git a/les/handler_test.go b/les/handler_test.go index 04277f661b55..3010fb7cdef6 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -64,27 +64,27 @@ func testGetBlockHeaders(t *testing.T, protocol int) { // Create a batch of tests for various scenarios limit := uint64(MaxHeaderFetch) tests := []struct { - query *getBlockHeadersData // The query to execute for header retrieval + query *GetBlockHeadersReq // The query to execute for header retrieval expect []common.Hash // The hashes of the block whose headers are expected }{ // A single random block should be retrievable by hash and number too { - &getBlockHeadersData{Origin: hashOrNumber{Hash: bc.GetBlockByNumber(limit / 2).Hash()}, Amount: 1}, + &GetBlockHeadersReq{Origin: hashOrNumber{Hash: bc.GetBlockByNumber(limit / 2).Hash()}, Amount: 1}, []common.Hash{bc.GetBlockByNumber(limit / 2).Hash()}, }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 1}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: limit / 2}, Amount: 1}, []common.Hash{bc.GetBlockByNumber(limit / 2).Hash()}, }, // Multiple headers should be retrievable in both directions { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: limit / 2}, Amount: 3}, []common.Hash{ bc.GetBlockByNumber(limit / 2).Hash(), bc.GetBlockByNumber(limit/2 + 1).Hash(), bc.GetBlockByNumber(limit/2 + 2).Hash(), }, }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true}, []common.Hash{ bc.GetBlockByNumber(limit / 2).Hash(), bc.GetBlockByNumber(limit/2 - 1).Hash(), @@ -93,14 +93,14 @@ func testGetBlockHeaders(t *testing.T, protocol int) { }, // Multiple headers with skip lists should be retrievable { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3}, []common.Hash{ bc.GetBlockByNumber(limit / 2).Hash(), bc.GetBlockByNumber(limit/2 + 4).Hash(), bc.GetBlockByNumber(limit/2 + 8).Hash(), }, }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true}, []common.Hash{ bc.GetBlockByNumber(limit / 2).Hash(), bc.GetBlockByNumber(limit/2 - 4).Hash(), @@ -109,26 +109,26 @@ func testGetBlockHeaders(t *testing.T, protocol int) { }, // The chain endpoints should be retrievable { - &getBlockHeadersData{Origin: hashOrNumber{Number: 0}, Amount: 1}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: 0}, Amount: 1}, []common.Hash{bc.GetBlockByNumber(0).Hash()}, }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64()}, Amount: 1}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64()}, Amount: 1}, []common.Hash{bc.CurrentBlock().Hash()}, }, // Ensure protocol limits are honored //{ - // &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true}, + // &GetBlockHeadersReq{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true}, // []common.Hash{}, //}, // Check that requesting more than available is handled gracefully { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3}, []common.Hash{ bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 4).Hash(), bc.GetBlockByNumber(bc.CurrentBlock().NumberU64()).Hash(), }, }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true}, []common.Hash{ bc.GetBlockByNumber(4).Hash(), bc.GetBlockByNumber(0).Hash(), @@ -136,13 +136,13 @@ func testGetBlockHeaders(t *testing.T, protocol int) { }, // Check that requesting more than available is handled gracefully, even if mid skip { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3}, []common.Hash{ bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 4).Hash(), bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1).Hash(), }, }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true}, []common.Hash{ bc.GetBlockByNumber(4).Hash(), bc.GetBlockByNumber(1).Hash(), @@ -150,10 +150,10 @@ func testGetBlockHeaders(t *testing.T, protocol int) { }, // Check that non existing headers aren't returned { - &getBlockHeadersData{Origin: hashOrNumber{Hash: unknown}, Amount: 1}, + &GetBlockHeadersReq{Origin: hashOrNumber{Hash: unknown}, Amount: 1}, []common.Hash{}, }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() + 1}, Amount: 1}, + &GetBlockHeadersReq{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() + 1}, Amount: 1}, []common.Hash{}, }, } @@ -609,7 +609,7 @@ func TestStopResumeLes3(t *testing.T) { header := server.handler.blockchain.CurrentHeader() req := func() { reqID++ - sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1}) + sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, &GetBlockHeadersReq{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1}) } for i := 1; i <= 5; i++ { // send requests while we still have enough buffer and expect a response diff --git a/les/peer.go b/les/peer.go index 0e2ed52c124b..a43c3bc6a927 100644 --- a/les/peer.go +++ b/les/peer.go @@ -432,14 +432,14 @@ func (p *serverPeer) sendRequest(msgcode, reqID uint64, data interface{}, amount // specified header query, based on the hash of an origin block. func (p *serverPeer) requestHeadersByHash(reqID uint64, origin common.Hash, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return p.sendRequest(GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount) + return p.sendRequest(GetBlockHeadersMsg, reqID, &GetBlockHeadersReq{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount) } // requestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. func (p *serverPeer) requestHeadersByNumber(reqID, origin uint64, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return p.sendRequest(GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount) + return p.sendRequest(GetBlockHeadersMsg, reqID, &GetBlockHeadersReq{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount) } // requestBodies fetches a batch of blocks' bodies corresponding to the hashes diff --git a/les/protocol.go b/les/protocol.go index 39d9f5152fd8..a201bb2d9522 100644 --- a/les/protocol.go +++ b/les/protocol.go @@ -229,14 +229,6 @@ type blockInfo struct { Td *big.Int // Total difficulty of one particular block being announced } -// getBlockHeadersData represents a block header query. -type getBlockHeadersData struct { - Origin hashOrNumber // Block from which to retrieve headers - Amount uint64 // Maximum number of headers to retrieve - Skip uint64 // Blocks to skip between consecutive headers - Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) -} - // hashOrNumber is a combined field for specifying an origin block. type hashOrNumber struct { Hash common.Hash // Block hash from which to retrieve headers (excludes Number) diff --git a/les/server_handler.go b/les/server_handler.go index 2316c9c5a4c0..820b688b4b2e 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -18,8 +18,6 @@ package les import ( "crypto/ecdsa" - "encoding/binary" - "encoding/json" "errors" "sync" "sync/atomic" @@ -223,646 +221,165 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { } defer msg.Discard() - var ( - maxCost uint64 - task *servingTask - ) p.responseCount++ responseCount := p.responseCount - // accept returns an indicator whether the request can be served. - // If so, deduct the max cost from the flow control buffer. - accept := func(reqID, reqCnt, maxCnt uint64) bool { - // Short circuit if the peer is already frozen or the request is invalid. - inSizeCost := h.server.costTracker.realCost(0, msg.Size, 0) - if p.isFrozen() || reqCnt == 0 || reqCnt > maxCnt { - p.fcClient.OneTimeCost(inSizeCost) - return false - } - // Prepaid max cost units before request been serving. - maxCost = p.fcCosts.getMaxCost(msg.Code, reqCnt) - accepted, bufShort, priority := p.fcClient.AcceptRequest(reqID, responseCount, maxCost) - if !accepted { - p.freeze() - p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge))) - p.fcClient.OneTimeCost(inSizeCost) - return false - } - // Create a multi-stage task, estimate the time it takes for the task to - // execute, and cache it in the request service queue. - factor := h.server.costTracker.globalFactor() - if factor < 0.001 { - factor = 1 - p.Log().Error("Invalid global cost factor", "factor", factor) - } - maxTime := uint64(float64(maxCost) / factor) - task = h.server.servingQueue.newTask(p, maxTime, priority) - if task.start() { - return true - } - p.fcClient.RequestProcessed(reqID, responseCount, maxCost, inSizeCost) - return false - } - // sendResponse sends back the response and updates the flow control statistic. - sendResponse := func(reqID, amount uint64, reply *reply, servingTime uint64) { - p.responseLock.Lock() - defer p.responseLock.Unlock() - // Short circuit if the client is already frozen. - if p.isFrozen() { - realCost := h.server.costTracker.realCost(servingTime, msg.Size, 0) - p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost) - return - } - // Positive correction buffer value with real cost. - var replySize uint32 - if reply != nil { - replySize = reply.size() - } - var realCost uint64 - if h.server.costTracker.testing { - realCost = maxCost // Assign a fake cost for testing purpose - } else { - realCost = h.server.costTracker.realCost(servingTime, msg.Size, replySize) - if realCost > maxCost { - realCost = maxCost - } - } - bv := p.fcClient.RequestProcessed(reqID, responseCount, maxCost, realCost) - if amount != 0 { - // Feed cost tracker request serving statistic. - h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost) - // Reduce priority "balance" for the specific peer. - p.balance.RequestServed(realCost) - } - if reply != nil { - p.queueSend(func() { - if err := reply.send(bv); err != nil { - select { - case p.errCh <- err: - default: - } - } - }) + var ( + req struct { + ID uint64 + Data rlp.RawValue } + hreq handlerRequest + decodeErr error + ) + if err := msg.Decode(&req); err != nil { + clientErrorMeter.Mark(1) + return errResp(ErrDecode, "%v: %v", msg, err) } + switch msg.Code { case GetBlockHeadersMsg: p.Log().Trace("Received block header request") - if metrics.EnabledExpensive { - miscInHeaderPacketsMeter.Mark(1) - miscInHeaderTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Query getBlockHeadersData - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "%v: %v", msg, err) - } - query := req.Query - if accept(req.ReqID, query.Amount, MaxHeaderFetch) { - wg.Add(1) - go func() { - defer wg.Done() - hashMode := query.Origin.Hash != (common.Hash{}) - first := true - maxNonCanonical := uint64(100) - - // Gather headers until the fetch or network limits is reached - var ( - bytes common.StorageSize - headers []*types.Header - unknown bool - ) - for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit { - if !first && !task.waitOrStop() { - sendResponse(req.ReqID, 0, nil, task.servingTime) - return - } - // Retrieve the next header satisfying the query - var origin *types.Header - if hashMode { - if first { - origin = h.blockchain.GetHeaderByHash(query.Origin.Hash) - if origin != nil { - query.Origin.Number = origin.Number.Uint64() - } - } else { - origin = h.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) - } - } else { - origin = h.blockchain.GetHeaderByNumber(query.Origin.Number) - } - if origin == nil { - break - } - headers = append(headers, origin) - bytes += estHeaderRlpSize - - // Advance to the next header of the query - switch { - case hashMode && query.Reverse: - // Hash based traversal towards the genesis block - ancestor := query.Skip + 1 - if ancestor == 0 { - unknown = true - } else { - query.Origin.Hash, query.Origin.Number = h.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) - unknown = query.Origin.Hash == common.Hash{} - } - case hashMode && !query.Reverse: - // Hash based traversal towards the leaf block - var ( - current = origin.Number.Uint64() - next = current + query.Skip + 1 - ) - if next <= current { - infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") - p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) - unknown = true - } else { - if header := h.blockchain.GetHeaderByNumber(next); header != nil { - nextHash := header.Hash() - expOldHash, _ := h.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) - if expOldHash == query.Origin.Hash { - query.Origin.Hash, query.Origin.Number = nextHash, next - } else { - unknown = true - } - } else { - unknown = true - } - } - case query.Reverse: - // Number based traversal towards the genesis block - if query.Origin.Number >= query.Skip+1 { - query.Origin.Number -= query.Skip + 1 - } else { - unknown = true - } - - case !query.Reverse: - // Number based traversal towards the leaf block - query.Origin.Number += query.Skip + 1 - } - first = false - } - reply := p.replyBlockHeaders(req.ReqID, headers) - sendResponse(req.ReqID, query.Amount, reply, task.done()) - if metrics.EnabledExpensive { - miscOutHeaderPacketsMeter.Mark(1) - miscOutHeaderTrafficMeter.Mark(int64(reply.size())) - miscServingTimeHeaderTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := &GetBlockHeadersReq{} + decodeErr = rlp.DecodeBytes(req.Data, r) + hreq = r case GetBlockBodiesMsg: p.Log().Trace("Received block bodies request") - if metrics.EnabledExpensive { - miscInBodyPacketsMeter.Mark(1) - miscInBodyTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Hashes []common.Hash - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - var ( - bytes int - bodies []rlp.RawValue - ) - reqCnt := len(req.Hashes) - if accept(req.ReqID, uint64(reqCnt), MaxBodyFetch) { - wg.Add(1) - go func() { - defer wg.Done() - for i, hash := range req.Hashes { - if i != 0 && !task.waitOrStop() { - sendResponse(req.ReqID, 0, nil, task.servingTime) - return - } - if bytes >= softResponseLimit { - break - } - body := h.blockchain.GetBodyRLP(hash) - if body == nil { - p.bumpInvalid() - continue - } - bodies = append(bodies, body) - bytes += len(body) - } - reply := p.replyBlockBodiesRLP(req.ReqID, bodies) - sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) - if metrics.EnabledExpensive { - miscOutBodyPacketsMeter.Mark(1) - miscOutBodyTrafficMeter.Mark(int64(reply.size())) - miscServingTimeBodyTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := GetBlockBodiesReq{} + decodeErr = rlp.DecodeBytes(req.Data, &r) + hreq = r case GetCodeMsg: p.Log().Trace("Received code request") - if metrics.EnabledExpensive { - miscInCodePacketsMeter.Mark(1) - miscInCodeTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Reqs []CodeReq - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - var ( - bytes int - data [][]byte - ) - reqCnt := len(req.Reqs) - if accept(req.ReqID, uint64(reqCnt), MaxCodeFetch) { - wg.Add(1) - go func() { - defer wg.Done() - for i, request := range req.Reqs { - if i != 0 && !task.waitOrStop() { - sendResponse(req.ReqID, 0, nil, task.servingTime) - return - } - // Look up the root hash belonging to the request - header := h.blockchain.GetHeaderByHash(request.BHash) - if header == nil { - p.Log().Warn("Failed to retrieve associate header for code", "hash", request.BHash) - p.bumpInvalid() - continue - } - // Refuse to search stale state data in the database since looking for - // a non-exist key is kind of expensive. - local := h.blockchain.CurrentHeader().Number.Uint64() - if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local { - p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local) - p.bumpInvalid() - continue - } - triedb := h.blockchain.StateCache().TrieDB() - - account, err := h.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey)) - if err != nil { - p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) - p.bumpInvalid() - continue - } - code, err := h.blockchain.StateCache().ContractCode(common.BytesToHash(request.AccKey), common.BytesToHash(account.CodeHash)) - if err != nil { - p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err) - continue - } - // Accumulate the code and abort if enough data was retrieved - data = append(data, code) - if bytes += len(code); bytes >= softResponseLimit { - break - } - } - reply := p.replyCode(req.ReqID, data) - sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) - if metrics.EnabledExpensive { - miscOutCodePacketsMeter.Mark(1) - miscOutCodeTrafficMeter.Mark(int64(reply.size())) - miscServingTimeCodeTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := GetCodeReq{} + decodeErr = rlp.DecodeBytes(req.Data, &r) + hreq = r case GetReceiptsMsg: p.Log().Trace("Received receipts request") - if metrics.EnabledExpensive { - miscInReceiptPacketsMeter.Mark(1) - miscInReceiptTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Hashes []common.Hash - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - var ( - bytes int - receipts []rlp.RawValue - ) - reqCnt := len(req.Hashes) - if accept(req.ReqID, uint64(reqCnt), MaxReceiptFetch) { - wg.Add(1) - go func() { - defer wg.Done() - for i, hash := range req.Hashes { - if i != 0 && !task.waitOrStop() { - sendResponse(req.ReqID, 0, nil, task.servingTime) - return - } - if bytes >= softResponseLimit { - break - } - // Retrieve the requested block's receipts, skipping if unknown to us - results := h.blockchain.GetReceiptsByHash(hash) - if results == nil { - if header := h.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { - p.bumpInvalid() - continue - } - } - // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(results); err != nil { - log.Error("Failed to encode receipt", "err", err) - } else { - receipts = append(receipts, encoded) - bytes += len(encoded) - } - } - reply := p.replyReceiptsRLP(req.ReqID, receipts) - sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) - if metrics.EnabledExpensive { - miscOutReceiptPacketsMeter.Mark(1) - miscOutReceiptTrafficMeter.Mark(int64(reply.size())) - miscServingTimeReceiptTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := GetReceiptsReq{} + decodeErr = rlp.DecodeBytes(req.Data, &r) + hreq = r case GetProofsV2Msg: p.Log().Trace("Received les/2 proofs request") - if metrics.EnabledExpensive { - miscInTrieProofPacketsMeter.Mark(1) - miscInTrieProofTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Reqs []ProofReq - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - lastBHash common.Hash - root common.Hash - header *types.Header - ) - reqCnt := len(req.Reqs) - if accept(req.ReqID, uint64(reqCnt), MaxProofsFetch) { - wg.Add(1) - go func() { - defer wg.Done() - nodes := light.NewNodeSet() - - for i, request := range req.Reqs { - if i != 0 && !task.waitOrStop() { - sendResponse(req.ReqID, 0, nil, task.servingTime) - return - } - // Look up the root hash belonging to the request - if request.BHash != lastBHash { - root, lastBHash = common.Hash{}, request.BHash - - if header = h.blockchain.GetHeaderByHash(request.BHash); header == nil { - p.Log().Warn("Failed to retrieve header for proof", "hash", request.BHash) - p.bumpInvalid() - continue - } - // Refuse to search stale state data in the database since looking for - // a non-exist key is kind of expensive. - local := h.blockchain.CurrentHeader().Number.Uint64() - if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local { - p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local) - p.bumpInvalid() - continue - } - root = header.Root - } - // If a header lookup failed (non existent), ignore subsequent requests for the same header - if root == (common.Hash{}) { - p.bumpInvalid() - continue - } - // Open the account or storage trie for the request - statedb := h.blockchain.StateCache() - - var trie state.Trie - switch len(request.AccKey) { - case 0: - // No account key specified, open an account trie - trie, err = statedb.OpenTrie(root) - if trie == nil || err != nil { - p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err) - continue - } - default: - // Account key specified, open a storage trie - account, err := h.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey)) - if err != nil { - p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) - p.bumpInvalid() - continue - } - trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root) - if trie == nil || err != nil { - p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err) - continue - } - } - // Prove the user's request from the account or stroage trie - if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil { - p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err) - continue - } - if nodes.DataSize() >= softResponseLimit { - break - } - } - reply := p.replyProofsV2(req.ReqID, nodes.NodeList()) - sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) - if metrics.EnabledExpensive { - miscOutTrieProofPacketsMeter.Mark(1) - miscOutTrieProofTrafficMeter.Mark(int64(reply.size())) - miscServingTimeTrieProofTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := GetProofsReq{} + decodeErr = rlp.DecodeBytes(req.Data, &r) + hreq = r case GetHelperTrieProofsMsg: p.Log().Trace("Received helper trie proof request") - if metrics.EnabledExpensive { - miscInHelperTriePacketsMeter.Mark(1) - miscInHelperTrieTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Reqs []HelperTrieReq - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - auxBytes int - auxData [][]byte - ) - reqCnt := len(req.Reqs) - if accept(req.ReqID, uint64(reqCnt), MaxHelperTrieProofsFetch) { - wg.Add(1) - go func() { - defer wg.Done() - var ( - lastIdx uint64 - lastType uint - root common.Hash - auxTrie *trie.Trie - ) - nodes := light.NewNodeSet() - for i, request := range req.Reqs { - if i != 0 && !task.waitOrStop() { - sendResponse(req.ReqID, 0, nil, task.servingTime) - return - } - if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx { - auxTrie, lastType, lastIdx = nil, request.Type, request.TrieIdx - - var prefix string - if root, prefix = h.getHelperTrie(request.Type, request.TrieIdx); root != (common.Hash{}) { - auxTrie, _ = trie.New(root, trie.NewDatabase(rawdb.NewTable(h.chainDb, prefix))) - } - } - if request.AuxReq == auxRoot { - var data []byte - if root != (common.Hash{}) { - data = root[:] - } - auxData = append(auxData, data) - auxBytes += len(data) - } else { - if auxTrie != nil { - auxTrie.Prove(request.Key, request.FromLevel, nodes) - } - if request.AuxReq != 0 { - data := h.getAuxiliaryHeaders(request) - auxData = append(auxData, data) - auxBytes += len(data) - } - } - if nodes.DataSize()+auxBytes >= softResponseLimit { - break - } - } - reply := p.replyHelperTrieProofs(req.ReqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}) - sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) - if metrics.EnabledExpensive { - miscOutHelperTriePacketsMeter.Mark(1) - miscOutHelperTrieTrafficMeter.Mark(int64(reply.size())) - miscServingTimeHelperTrieTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := GetHelperTrieProofsReq{} + decodeErr = rlp.DecodeBytes(req.Data, &r) + hreq = r case SendTxV2Msg: p.Log().Trace("Received new transactions") - if metrics.EnabledExpensive { - miscInTxsPacketsMeter.Mark(1) - miscInTxsTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Txs []*types.Transaction - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - reqCnt := len(req.Txs) - if accept(req.ReqID, uint64(reqCnt), MaxTxSend) { - wg.Add(1) - go func() { - defer wg.Done() - stats := make([]light.TxStatus, len(req.Txs)) - for i, tx := range req.Txs { - if i != 0 && !task.waitOrStop() { - return - } - hash := tx.Hash() - stats[i] = h.txStatus(hash) - if stats[i].Status == core.TxStatusUnknown { - addFn := h.txpool.AddRemotes - // Add txs synchronously for testing purpose - if h.addTxsSync { - addFn = h.txpool.AddRemotesSync - } - if errs := addFn([]*types.Transaction{tx}); errs[0] != nil { - stats[i].Error = errs[0].Error() - continue - } - stats[i] = h.txStatus(hash) - } - } - reply := p.replyTxStatus(req.ReqID, stats) - sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) - if metrics.EnabledExpensive { - miscOutTxsPacketsMeter.Mark(1) - miscOutTxsTrafficMeter.Mark(int64(reply.size())) - miscServingTimeTxTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := SendTxReq{} + decodeErr = rlp.DecodeBytes(req.Data, &r) + hreq = r case GetTxStatusMsg: p.Log().Trace("Received transaction status query request") - if metrics.EnabledExpensive { - miscInTxStatusPacketsMeter.Mark(1) - miscInTxStatusTrafficMeter.Mark(int64(msg.Size)) - } - var req struct { - ReqID uint64 - Hashes []common.Hash - } - if err := msg.Decode(&req); err != nil { - clientErrorMeter.Mark(1) - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - reqCnt := len(req.Hashes) - if accept(req.ReqID, uint64(reqCnt), MaxTxStatus) { - wg.Add(1) - go func() { - defer wg.Done() - stats := make([]light.TxStatus, len(req.Hashes)) - for i, hash := range req.Hashes { - if i != 0 && !task.waitOrStop() { - sendResponse(req.ReqID, 0, nil, task.servingTime) - return - } - stats[i] = h.txStatus(hash) - } - reply := p.replyTxStatus(req.ReqID, stats) - sendResponse(req.ReqID, uint64(reqCnt), reply, task.done()) - if metrics.EnabledExpensive { - miscOutTxStatusPacketsMeter.Mark(1) - miscOutTxStatusTrafficMeter.Mark(int64(reply.size())) - miscServingTimeTxStatusTimer.Update(time.Duration(task.servingTime)) - } - }() - } + r := GetTxStatusReq{} + decodeErr = rlp.DecodeBytes(req.Data, &r) + hreq = r default: p.Log().Trace("Received invalid message", "code", msg.Code) clientErrorMeter.Mark(1) return errResp(ErrInvalidMsgCode, "%v", msg.Code) } + + if metrics.EnabledExpensive { + hreq.InMetrics(int64(msg.Size)) + } + if decodeErr != nil { + clientErrorMeter.Mark(1) + return errResp(ErrDecode, "%v: %v", msg, decodeErr) + } + reqCnt, maxCnt := hreq.ReqAmount() + + // Short circuit if the peer is already frozen or the request is invalid. + inSizeCost := h.server.costTracker.realCost(0, msg.Size, 0) + if p.isFrozen() || reqCnt == 0 || reqCnt > maxCnt { + p.fcClient.OneTimeCost(inSizeCost) + return nil + } + // Prepaid max cost units before request been serving. + maxCost := p.fcCosts.getMaxCost(msg.Code, reqCnt) + accepted, bufShort, priority := p.fcClient.AcceptRequest(req.ID, responseCount, maxCost) + if !accepted { + p.freeze() + p.Log().Error("Request came too early", "remaining", common.PrettyDuration(time.Duration(bufShort*1000000/p.fcParams.MinRecharge))) + p.fcClient.OneTimeCost(inSizeCost) + return nil + } + // Create a multi-stage task, estimate the time it takes for the task to + // execute, and cache it in the request service queue. + factor := h.server.costTracker.globalFactor() + if factor < 0.001 { + factor = 1 + p.Log().Error("Invalid global cost factor", "factor", factor) + } + maxTime := uint64(float64(maxCost) / factor) + task := h.server.servingQueue.newTask(p, maxTime, priority) + if task.start() { + wg.Add(1) + go func() { + defer wg.Done() + reply := hreq.Serve(h, req.ID, p, task.waitOrStop) + if reply != nil { + task.done() + } + + p.responseLock.Lock() + defer p.responseLock.Unlock() + + // Short circuit if the client is already frozen. + if p.isFrozen() { + realCost := h.server.costTracker.realCost(task.servingTime, msg.Size, 0) + p.fcClient.RequestProcessed(req.ID, responseCount, maxCost, realCost) + return + } + // Positive correction buffer value with real cost. + var replySize uint32 + if reply != nil { + replySize = reply.size() + } + var realCost uint64 + if h.server.costTracker.testing { + realCost = maxCost // Assign a fake cost for testing purpose + } else { + realCost = h.server.costTracker.realCost(task.servingTime, msg.Size, replySize) + if realCost > maxCost { + realCost = maxCost + } + } + bv := p.fcClient.RequestProcessed(req.ID, responseCount, maxCost, realCost) + if reply != nil { + // Feed cost tracker request serving statistic. + h.server.costTracker.updateStats(msg.Code, reqCnt, task.servingTime, realCost) + // Reduce priority "balance" for the specific peer. + p.balance.RequestServed(realCost) + p.queueSend(func() { + if err := reply.send(bv); err != nil { + select { + case p.errCh <- err: + default: + } + } + }) + if metrics.EnabledExpensive { + hreq.OutMetrics(int64(replySize), time.Duration(task.servingTime)) + } + } + }() + } else { + p.fcClient.RequestProcessed(req.ID, responseCount, maxCost, inSizeCost) + } + // If the client has made too much invalid request(e.g. request a non-existent data), // reject them to prevent SPAM attack. if p.getInvalid() > maxRequestErrors { @@ -872,8 +389,24 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { return nil } +func (h *serverHandler) BlockChain() *core.BlockChain { + return h.blockchain +} + +func (h *serverHandler) TxPool() *core.TxPool { + return h.txpool +} + +func (h *serverHandler) ArchiveMode() bool { + return h.server.archiveMode +} + +func (h *serverHandler) AddTxsSync() bool { + return h.addTxsSync +} + // getAccount retrieves an account from the state based on root. -func (h *serverHandler) getAccount(triedb *trie.Database, root, hash common.Hash) (state.Account, error) { +func getAccount(triedb *trie.Database, root, hash common.Hash) (state.Account, error) { trie, err := trie.New(root, triedb) if err != nil { return state.Account{}, err @@ -890,43 +423,24 @@ func (h *serverHandler) getAccount(triedb *trie.Database, root, hash common.Hash } // getHelperTrie returns the post-processed trie root for the given trie ID and section index -func (h *serverHandler) getHelperTrie(typ uint, index uint64) (common.Hash, string) { +func (h *serverHandler) GetHelperTrie(typ uint, index uint64) *trie.Trie { + var ( + root common.Hash + prefix string + ) switch typ { case htCanonical: sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.ChtSize-1) - return light.GetChtRoot(h.chainDb, index, sectionHead), light.ChtTablePrefix + root, prefix = light.GetChtRoot(h.chainDb, index, sectionHead), light.ChtTablePrefix case htBloomBits: sectionHead := rawdb.ReadCanonicalHash(h.chainDb, (index+1)*h.server.iConfig.BloomTrieSize-1) - return light.GetBloomTrieRoot(h.chainDb, index, sectionHead), light.BloomTrieTablePrefix - } - return common.Hash{}, "" -} - -// getAuxiliaryHeaders returns requested auxiliary headers for the CHT request. -func (h *serverHandler) getAuxiliaryHeaders(req HelperTrieReq) []byte { - if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 { - blockNum := binary.BigEndian.Uint64(req.Key) - hash := rawdb.ReadCanonicalHash(h.chainDb, blockNum) - return rawdb.ReadHeaderRLP(h.chainDb, hash, blockNum) + root, prefix = light.GetBloomTrieRoot(h.chainDb, index, sectionHead), light.BloomTrieTablePrefix } - return nil -} - -// txStatus returns the status of a specified transaction. -func (h *serverHandler) txStatus(hash common.Hash) light.TxStatus { - var stat light.TxStatus - // Looking the transaction in txpool first. - stat.Status = h.txpool.Status([]common.Hash{hash})[0] - - // If the transaction is unknown to the pool, try looking it up locally. - if stat.Status == core.TxStatusUnknown { - lookup := h.blockchain.GetTransactionLookup(hash) - if lookup != nil { - stat.Status = core.TxStatusIncluded - stat.Lookup = lookup - } + if root == (common.Hash{}) { + return nil } - return stat + trie, _ := trie.New(root, trie.NewDatabase(rawdb.NewTable(h.chainDb, prefix))) + return trie } // broadcastLoop broadcasts new block information to all connected light diff --git a/les/server_requests.go b/les/server_requests.go new file mode 100644 index 000000000000..3dda06730b35 --- /dev/null +++ b/les/server_requests.go @@ -0,0 +1,539 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "encoding/binary" + "encoding/json" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/light" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" +) + +type serverBackend interface { + ArchiveMode() bool + AddTxsSync() bool + BlockChain() *core.BlockChain + TxPool() *core.TxPool + GetHelperTrie(typ uint, index uint64) *trie.Trie +} + +type handlerRequest interface { + ReqAmount() (uint64, uint64) + InMetrics(size int64) + OutMetrics(size int64, servingTime time.Duration) + Serve(b serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply +} + +type GetBlockHeadersReq struct { + Origin hashOrNumber // Block from which to retrieve headers + Amount uint64 // Maximum number of headers to retrieve + Skip uint64 // Blocks to skip between consecutive headers + Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) +} + +func (r *GetBlockHeadersReq) ReqAmount() (uint64, uint64) { return r.Amount, MaxHeaderFetch } + +func (r *GetBlockHeadersReq) InMetrics(size int64) { + miscInHeaderPacketsMeter.Mark(1) + miscInHeaderTrafficMeter.Mark(size) +} + +func (r *GetBlockHeadersReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutHeaderPacketsMeter.Mark(1) + miscOutHeaderTrafficMeter.Mark(size) + miscServingTimeHeaderTimer.Update(servingTime) +} + +func (r *GetBlockHeadersReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + bc := backend.BlockChain() + hashMode := r.Origin.Hash != (common.Hash{}) + first := true + maxNonCanonical := uint64(100) + + // Gather headers until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(r.Amount) && bytes < softResponseLimit { + if !first && !waitOrStop() { + return nil + } + // Retrieve the next header satisfying the r + var origin *types.Header + if hashMode { + if first { + origin = bc.GetHeaderByHash(r.Origin.Hash) + if origin != nil { + r.Origin.Number = origin.Number.Uint64() + } + } else { + origin = bc.GetHeader(r.Origin.Hash, r.Origin.Number) + } + } else { + origin = bc.GetHeaderByNumber(r.Origin.Number) + } + if origin == nil { + break + } + headers = append(headers, origin) + bytes += estHeaderRlpSize + + // Advance to the next header of the r + switch { + case hashMode && r.Reverse: + // Hash based traversal towards the genesis block + ancestor := r.Skip + 1 + if ancestor == 0 { + unknown = true + } else { + r.Origin.Hash, r.Origin.Number = bc.GetAncestor(r.Origin.Hash, r.Origin.Number, ancestor, &maxNonCanonical) + unknown = r.Origin.Hash == common.Hash{} + } + case hashMode && !r.Reverse: + // Hash based traversal towards the leaf block + var ( + current = origin.Number.Uint64() + next = current + r.Skip + 1 + ) + if next <= current { + infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") + p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", r.Skip, "next", next, "attacker", infos) + unknown = true + } else { + if header := bc.GetHeaderByNumber(next); header != nil { + nextHash := header.Hash() + expOldHash, _ := bc.GetAncestor(nextHash, next, r.Skip+1, &maxNonCanonical) + if expOldHash == r.Origin.Hash { + r.Origin.Hash, r.Origin.Number = nextHash, next + } else { + unknown = true + } + } else { + unknown = true + } + } + case r.Reverse: + // Number based traversal towards the genesis block + if r.Origin.Number >= r.Skip+1 { + r.Origin.Number -= r.Skip + 1 + } else { + unknown = true + } + + case !r.Reverse: + // Number based traversal towards the leaf block + r.Origin.Number += r.Skip + 1 + } + first = false + } + return p.replyBlockHeaders(reqID, headers) +} + +type GetBlockBodiesReq []common.Hash + +func (r GetBlockBodiesReq) ReqAmount() (uint64, uint64) { return uint64(len(r)), MaxBodyFetch } + +func (r GetBlockBodiesReq) InMetrics(size int64) { + miscInBodyPacketsMeter.Mark(1) + miscInBodyTrafficMeter.Mark(size) +} + +func (r GetBlockBodiesReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutBodyPacketsMeter.Mark(1) + miscOutBodyTrafficMeter.Mark(size) + miscServingTimeBodyTimer.Update(servingTime) +} + +func (r GetBlockBodiesReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + var ( + bytes int + bodies []rlp.RawValue + ) + bc := backend.BlockChain() + for i, hash := range r { + if i != 0 && !waitOrStop() { + return nil + } + if bytes >= softResponseLimit { + break + } + body := bc.GetBodyRLP(hash) + if body == nil { + p.bumpInvalid() + continue + } + bodies = append(bodies, body) + bytes += len(body) + } + return p.replyBlockBodiesRLP(reqID, bodies) +} + +type GetCodeReq []CodeReq + +func (r GetCodeReq) ReqAmount() (uint64, uint64) { return uint64(len(r)), MaxCodeFetch } + +func (r GetCodeReq) InMetrics(size int64) { + miscInCodePacketsMeter.Mark(1) + miscInCodeTrafficMeter.Mark(size) +} + +func (r GetCodeReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutCodePacketsMeter.Mark(1) + miscOutCodeTrafficMeter.Mark(size) + miscServingTimeCodeTimer.Update(servingTime) +} + +func (r GetCodeReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + var ( + bytes int + data [][]byte + ) + bc := backend.BlockChain() + for i, request := range r { + if i != 0 && !waitOrStop() { + return nil + } + // Look up the root hash belonging to the request + header := bc.GetHeaderByHash(request.BHash) + if header == nil { + p.Log().Warn("Failed to retrieve associate header for code", "hash", request.BHash) + p.bumpInvalid() + continue + } + // Refuse to search stale state data in the database since looking for + // a non-exist key is kind of expensive. + local := bc.CurrentHeader().Number.Uint64() + if !backend.ArchiveMode() && header.Number.Uint64()+core.TriesInMemory <= local { + p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local) + p.bumpInvalid() + continue + } + triedb := bc.StateCache().TrieDB() + + account, err := getAccount(triedb, header.Root, common.BytesToHash(request.AccKey)) + if err != nil { + p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) + p.bumpInvalid() + continue + } + code, err := bc.StateCache().ContractCode(common.BytesToHash(request.AccKey), common.BytesToHash(account.CodeHash)) + if err != nil { + p.Log().Warn("Failed to retrieve account code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "codehash", common.BytesToHash(account.CodeHash), "err", err) + continue + } + // Accumulate the code and abort if enough data was retrieved + data = append(data, code) + if bytes += len(code); bytes >= softResponseLimit { + break + } + } + return p.replyCode(reqID, data) +} + +type GetReceiptsReq []common.Hash + +func (r GetReceiptsReq) ReqAmount() (uint64, uint64) { return uint64(len(r)), MaxReceiptFetch } + +func (r GetReceiptsReq) InMetrics(size int64) { + miscInReceiptPacketsMeter.Mark(1) + miscInReceiptTrafficMeter.Mark(size) +} + +func (r GetReceiptsReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutReceiptPacketsMeter.Mark(1) + miscOutReceiptTrafficMeter.Mark(size) + miscServingTimeReceiptTimer.Update(servingTime) +} + +func (r GetReceiptsReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + var ( + bytes int + receipts []rlp.RawValue + ) + bc := backend.BlockChain() + for i, hash := range r { + if i != 0 && !waitOrStop() { + return nil + } + if bytes >= softResponseLimit { + break + } + // Retrieve the requested block's receipts, skipping if unknown to us + results := bc.GetReceiptsByHash(hash) + if results == nil { + if header := bc.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + p.bumpInvalid() + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + log.Error("Failed to encode receipt", "err", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) + } + } + return p.replyReceiptsRLP(reqID, receipts) +} + +type GetProofsReq []ProofReq + +func (r GetProofsReq) ReqAmount() (uint64, uint64) { return uint64(len(r)), MaxProofsFetch } + +func (r GetProofsReq) InMetrics(size int64) { + miscInTrieProofPacketsMeter.Mark(1) + miscInTrieProofTrafficMeter.Mark(size) +} + +func (r GetProofsReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutTrieProofPacketsMeter.Mark(1) + miscOutTrieProofTrafficMeter.Mark(size) + miscServingTimeTrieProofTimer.Update(servingTime) +} + +func (r GetProofsReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + var ( + lastBHash common.Hash + root common.Hash + header *types.Header + err error + ) + bc := backend.BlockChain() + nodes := light.NewNodeSet() + + for i, request := range r { + if i != 0 && !waitOrStop() { + return nil + } + // Look up the root hash belonging to the request + if request.BHash != lastBHash { + root, lastBHash = common.Hash{}, request.BHash + + if header = bc.GetHeaderByHash(request.BHash); header == nil { + p.Log().Warn("Failed to retrieve header for proof", "hash", request.BHash) + p.bumpInvalid() + continue + } + // Refuse to search stale state data in the database since looking for + // a non-exist key is kind of expensive. + local := bc.CurrentHeader().Number.Uint64() + if !backend.ArchiveMode() && header.Number.Uint64()+core.TriesInMemory <= local { + p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local) + p.bumpInvalid() + continue + } + root = header.Root + } + // If a header lookup failed (non existent), ignore subsequent requests for the same header + if root == (common.Hash{}) { + p.bumpInvalid() + continue + } + // Open the account or storage trie for the request + statedb := bc.StateCache() + + var trie state.Trie + switch len(request.AccKey) { + case 0: + // No account key specified, open an account trie + trie, err = statedb.OpenTrie(root) + if trie == nil || err != nil { + p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "root", root, "err", err) + continue + } + default: + // Account key specified, open a storage trie + account, err := getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey)) + if err != nil { + p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) + p.bumpInvalid() + continue + } + trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root) + if trie == nil || err != nil { + p.Log().Warn("Failed to open storage trie for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "root", account.Root, "err", err) + continue + } + } + // Prove the user's request from the account or stroage trie + if err := trie.Prove(request.Key, request.FromLevel, nodes); err != nil { + p.Log().Warn("Failed to prove state request", "block", header.Number, "hash", header.Hash(), "err", err) + continue + } + if nodes.DataSize() >= softResponseLimit { + break + } + } + return p.replyProofsV2(reqID, nodes.NodeList()) +} + +type GetHelperTrieProofsReq []HelperTrieReq + +func (r GetHelperTrieProofsReq) ReqAmount() (uint64, uint64) { + return uint64(len(r)), MaxHelperTrieProofsFetch +} + +func (r GetHelperTrieProofsReq) InMetrics(size int64) { + miscInHelperTriePacketsMeter.Mark(1) + miscInHelperTrieTrafficMeter.Mark(size) +} + +func (r GetHelperTrieProofsReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutHelperTriePacketsMeter.Mark(1) + miscOutHelperTrieTrafficMeter.Mark(size) + miscServingTimeHelperTrieTimer.Update(servingTime) +} + +func (r GetHelperTrieProofsReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + var ( + lastIdx uint64 + lastType uint + root common.Hash + auxTrie *trie.Trie + auxBytes int + auxData [][]byte + ) + bc := backend.BlockChain() + nodes := light.NewNodeSet() + for i, request := range r { + if i != 0 && !waitOrStop() { + return nil + } + if auxTrie == nil || request.Type != lastType || request.TrieIdx != lastIdx { + lastType, lastIdx = request.Type, request.TrieIdx + auxTrie = backend.GetHelperTrie(request.Type, request.TrieIdx) + } + if request.AuxReq == auxRoot { + var data []byte + if root != (common.Hash{}) { + data = root[:] + } + auxData = append(auxData, data) + auxBytes += len(data) + } else { + if auxTrie != nil { + auxTrie.Prove(request.Key, request.FromLevel, nodes) + } + if request.Type == htCanonical && request.AuxReq == auxHeader && len(request.Key) == 8 { + header := bc.GetHeaderByNumber(binary.BigEndian.Uint64(request.Key)) + data, err := rlp.EncodeToBytes(header) + if err != nil { + log.Error("Failed to encode header", "err", err) + } + auxData = append(auxData, data) + auxBytes += len(data) + } + } + if nodes.DataSize()+auxBytes >= softResponseLimit { + break + } + } + return p.replyHelperTrieProofs(reqID, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}) +} + +type SendTxReq []*types.Transaction + +func (r SendTxReq) ReqAmount() (uint64, uint64) { return uint64(len(r)), MaxTxSend } + +func (r SendTxReq) InMetrics(size int64) { + miscInTxsPacketsMeter.Mark(1) + miscInTxsTrafficMeter.Mark(size) +} + +func (r SendTxReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutTxsPacketsMeter.Mark(1) + miscOutTxsTrafficMeter.Mark(size) + miscServingTimeTxTimer.Update(servingTime) +} + +func (r SendTxReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + stats := make([]light.TxStatus, len(r)) + for i, tx := range r { + if i != 0 && !waitOrStop() { + return nil + } + hash := tx.Hash() + stats[i] = txStatus(backend, hash) + if stats[i].Status == core.TxStatusUnknown { + addFn := backend.TxPool().AddRemotes + // Add txs synchronously for testing purpose + if backend.AddTxsSync() { + addFn = backend.TxPool().AddRemotesSync + } + if errs := addFn([]*types.Transaction{tx}); errs[0] != nil { + stats[i].Error = errs[0].Error() + continue + } + stats[i] = txStatus(backend, hash) + } + } + return p.replyTxStatus(reqID, stats) +} + +type GetTxStatusReq []common.Hash + +func (r GetTxStatusReq) ReqAmount() (uint64, uint64) { return uint64(len(r)), MaxTxStatus } + +func (r GetTxStatusReq) InMetrics(size int64) { + miscInTxStatusPacketsMeter.Mark(1) + miscInTxStatusTrafficMeter.Mark(size) +} + +func (r GetTxStatusReq) OutMetrics(size int64, servingTime time.Duration) { + miscOutTxStatusPacketsMeter.Mark(1) + miscOutTxStatusTrafficMeter.Mark(size) + miscServingTimeTxStatusTimer.Update(servingTime) +} + +func (r GetTxStatusReq) Serve(backend serverBackend, reqID uint64, p *clientPeer, waitOrStop func() bool) *reply { + stats := make([]light.TxStatus, len(r)) + for i, hash := range r { + if i != 0 && !waitOrStop() { + return nil + } + stats[i] = txStatus(backend, hash) + } + return p.replyTxStatus(reqID, stats) +} + +// txStatus returns the status of a specified transaction. +func txStatus(b serverBackend, hash common.Hash) light.TxStatus { + var stat light.TxStatus + // Looking the transaction in txpool first. + stat.Status = b.TxPool().Status([]common.Hash{hash})[0] + + // If the transaction is unknown to the pool, try looking it up locally. + if stat.Status == core.TxStatusUnknown { + lookup := b.BlockChain().GetTransactionLookup(hash) + if lookup != nil { + stat.Status = core.TxStatusIncluded + stat.Lookup = lookup + } + } + return stat +} From 76e813be5bd3d6d5afeda72ca2c3a98da61739be Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 14 Jan 2021 16:31:44 +0100 Subject: [PATCH 7/7] tests/fuzzers/les: add fuzzer for les server handler --- les/server_handler.go | 2 +- les/server_requests.go | 2 +- les/test_helper.go | 4 + tests/fuzzers/les/debug/main.go | 41 +++++ tests/fuzzers/les/les-fuzzer.go | 305 ++++++++++++++++++++++++++++++++ 5 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 tests/fuzzers/les/debug/main.go create mode 100644 tests/fuzzers/les/les-fuzzer.go diff --git a/les/server_handler.go b/les/server_handler.go index 820b688b4b2e..d406d3712bdc 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -229,7 +229,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { ID uint64 Data rlp.RawValue } - hreq handlerRequest + hreq HandlerRequest decodeErr error ) if err := msg.Decode(&req); err != nil { diff --git a/les/server_requests.go b/les/server_requests.go index 3dda06730b35..4d258d660ba8 100644 --- a/les/server_requests.go +++ b/les/server_requests.go @@ -39,7 +39,7 @@ type serverBackend interface { GetHelperTrie(typ uint, index uint64) *trie.Trie } -type handlerRequest interface { +type HandlerRequest interface { ReqAmount() (uint64, uint64) InMetrics(size int64) OutMetrics(size int64, servingTime time.Duration) diff --git a/les/test_helper.go b/les/test_helper.go index 27fbace7fc94..332474e15e96 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -586,3 +586,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer } return s, c, teardown } + +func NewFuzzerPeer(version int) *clientPeer { + return newClientPeer(version, 0, p2p.NewPeer(enode.ID{}, "", nil), nil) +} diff --git a/tests/fuzzers/les/debug/main.go b/tests/fuzzers/les/debug/main.go new file mode 100644 index 000000000000..09e087d4c88a --- /dev/null +++ b/tests/fuzzers/les/debug/main.go @@ -0,0 +1,41 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package main + +import ( + "fmt" + "io/ioutil" + "os" + + "github.com/ethereum/go-ethereum/tests/fuzzers/les" +) + +func main() { + if len(os.Args) != 2 { + fmt.Fprintf(os.Stderr, "Usage: debug \n") + fmt.Fprintf(os.Stderr, "Example\n") + fmt.Fprintf(os.Stderr, " $ debug ../crashers/4bbef6857c733a87ecf6fd8b9e7238f65eb9862a\n") + os.Exit(1) + } + crasher := os.Args[1] + data, err := ioutil.ReadFile(crasher) + if err != nil { + fmt.Fprintf(os.Stderr, "error loading crasher %v: %v", crasher, err) + os.Exit(1) + } + les.Fuzz(data) +} diff --git a/tests/fuzzers/les/les-fuzzer.go b/tests/fuzzers/les/les-fuzzer.go new file mode 100644 index 000000000000..676b2f2f4529 --- /dev/null +++ b/tests/fuzzers/les/les-fuzzer.go @@ -0,0 +1,305 @@ +package les + +import ( + "bytes" + "encoding/binary" + "io" + "math/big" + "math/rand" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + l "github.com/ethereum/go-ethereum/les" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" + //fuzz "github.com/google/gofuzz" +) + +var ( + bankKey, _ = crypto.GenerateKey() + bankAddr = crypto.PubkeyToAddress(bankKey.PublicKey) + bankFunds = big.NewInt(1000000000000000000) + + userKey1, _ = crypto.GenerateKey() + userKey2, _ = crypto.GenerateKey() + userAddr1 = crypto.PubkeyToAddress(userKey1.PublicKey) + userAddr2 = crypto.PubkeyToAddress(userKey2.PublicKey) + + testContractAddr common.Address + testContractCode = common.Hex2Bytes("606060405260cc8060106000396000f360606040526000357c01000000000000000000000000000000000000000000000000000000009004806360cd2685146041578063c16431b914606b57603f565b005b6055600480803590602001909190505060a9565b6040518082815260200191505060405180910390f35b60886004808035906020019091908035906020019091905050608a565b005b80600060005083606481101560025790900160005b50819055505b5050565b6000600060005082606481101560025790900160005b5054905060c7565b91905056") +) + +func makeChain(n int) (bc *core.BlockChain, addrHashes, txHashes []common.Hash) { + db := rawdb.NewMemoryDatabase() + gspec := core.Genesis{ + Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{bankAddr: {Balance: bankFunds}}, + GasLimit: 100000000, + } + genesis := gspec.MustCommit(db) + signer := types.HomesteadSigner{} + blocks, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, n, + func(i int, gen *core.BlockGen) { + var ( + tx *types.Transaction + addr common.Address + ) + nonce := uint64(i) + if i%16 == 0 { + tx, _ = types.SignTx(types.NewContractCreation(nonce, big.NewInt(0), 200000, big.NewInt(0), testContractCode), signer, bankKey) + addr = crypto.CreateAddress(userAddr1, nonce) + } else { + key, _ := crypto.GenerateKey() + addr = crypto.PubkeyToAddress(key.PublicKey) + tx, _ = types.SignTx(types.NewTransaction(nonce, addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, bankKey) + } + gen.AddTx(tx) + addrHashes = append(addrHashes, crypto.Keccak256Hash(addr[:])) + txHashes = append(txHashes, tx.Hash()) + }) + bc, _ = core.NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil) + + if _, err := bc.InsertChain(blocks); err != nil { + panic(err) + } + return +} + +type fuzzer struct { + chain *core.BlockChain + pool *core.TxPool + trie *trie.Trie + addr, txs []common.Hash + + input io.Reader + exhausted bool +} + +func newFuzzer(input []byte) *fuzzer { + f := &fuzzer{ + input: bytes.NewReader(input), + } + f.chain, f.addr, f.txs = makeChain(int(f.randomByte())) + f.pool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, f.chain) + f.trie, _ = trie.New(common.Hash{}, trie.NewDatabase(rawdb.NewMemoryDatabase())) + r := rand.New(rand.NewSource(42)) + for i := 0; i < 100; i++ { + key := make([]byte, r.Intn(32)+1) + r.Read(key) + value := make([]byte, r.Intn(64)) + r.Read(value) + f.trie.Update(key, value) + } + return f +} + +func (f *fuzzer) read(size int) []byte { + out := make([]byte, size) + if _, err := f.input.Read(out); err != nil { + f.exhausted = true + } + return out +} + +func (f *fuzzer) randomByte() byte { + d := f.read(1) + return d[0] +} + +func (f *fuzzer) randomBool() bool { + d := f.read(1) + return d[0]&1 == 1 +} + +func (f *fuzzer) randomInt(max int) int { + if max == 0 { + return 0 + } + if max <= 256 { + return int(f.randomByte()) % max + } + var a uint16 + if err := binary.Read(f.input, binary.LittleEndian, &a); err != nil { + f.exhausted = true + } + return int(a % uint16(max)) +} + +func (f *fuzzer) randomX(max int) uint64 { + var a uint16 + if err := binary.Read(f.input, binary.LittleEndian, &a); err != nil { + f.exhausted = true + } + if a < 0x8000 { + return uint64(a%uint16(max+1)) - 1 + } + return (uint64(1)<<(a%64+1) - 1) & (uint64(a) * 343897772345826595) +} + +func (f *fuzzer) randomBlockHash() common.Hash { + b := f.randomByte() + h := f.chain.GetCanonicalHash(uint64(b)) + if h == (common.Hash{}) { + if b&1 == 1 { + h[0] = b + } + } + return h +} + +func (f *fuzzer) randomAddrKey() []byte { + i := int(f.randomByte()) + if i < len(f.addr) { + return f.addr[i].Bytes() + } else { + h := make([]byte, int(f.randomByte())) + if i < len(h) { + h[i] = byte(i) + 1 + } + return h + } +} + +func (f *fuzzer) randomTxHash() common.Hash { + i := int(f.randomByte()) + if i < len(f.txs) { + return f.txs[i] + } else { + var h common.Hash + if i&1 == 1 { + h[0] = byte(i) + } + return h + } +} + +func (f *fuzzer) randomBytes(maxLen int) []byte { + return f.read(f.randomInt(maxLen + 1)) +} + +func (f *fuzzer) BlockChain() *core.BlockChain { + return f.chain +} + +func (f *fuzzer) TxPool() *core.TxPool { + return f.pool +} + +func (f *fuzzer) ArchiveMode() bool { + return false +} + +func (f *fuzzer) AddTxsSync() bool { + return false +} + +func (f *fuzzer) GetHelperTrie(typ uint, index uint64) *trie.Trie { + if index < 2 { + return f.trie + } + return nil +} + +func (f *fuzzer) doFuzz(req l.HandlerRequest) { + defer f.chain.Stop() + //fuzz.NewFromGoFuzz(input).Fuzz(req) + peer := l.NewFuzzerPeer(3) + req.Serve(f, 42, peer, func() bool { return true }) +} + +func Fuzz(input []byte) int { + f := newFuzzer(input) + if f.exhausted { + return -1 + } + for !f.exhausted { + switch f.randomInt(8) { + case 0: + req := &l.GetBlockHeadersReq{ + Amount: f.randomX(l.MaxHeaderFetch + 1), + Skip: f.randomX(10), + Reverse: f.randomBool(), + } + if f.randomBool() { + req.Origin.Hash = f.randomBlockHash() + } else { + req.Origin.Number = uint64(f.randomByte()) + } + f.doFuzz(req) + + case 1: + req := make(l.GetBlockBodiesReq, f.randomInt(l.MaxBodyFetch+1)) + for i := range req { + req[i] = f.randomBlockHash() + } + f.doFuzz(req) + + case 2: + req := make(l.GetCodeReq, f.randomInt(l.MaxCodeFetch+1)) + for i := range req { + req[i] = l.CodeReq{ + BHash: f.randomBlockHash(), + AccKey: f.randomAddrKey(), + } + } + f.doFuzz(req) + + case 3: + req := make(l.GetReceiptsReq, f.randomInt(l.MaxReceiptFetch+1)) + for i := range req { + req[i] = f.randomBlockHash() + } + f.doFuzz(req) + + case 4: + req := make(l.GetProofsReq, f.randomInt(l.MaxProofsFetch+1)) + for i := range req { + req[i] = l.ProofReq{ + BHash: f.randomBlockHash(), + AccKey: f.randomAddrKey(), + Key: f.randomAddrKey(), + FromLevel: uint(f.randomX(3)), + } + } + f.doFuzz(req) + + case 5: + req := make(l.GetHelperTrieProofsReq, f.randomInt(l.MaxHelperTrieProofsFetch+1)) + for i := range req { + req[i] = l.HelperTrieReq{ + Type: uint(f.randomX(3)), + TrieIdx: f.randomX(3), + Key: f.randomAddrKey(), + FromLevel: uint(f.randomX(3)), + AuxReq: uint(f.randomX(3)), + } + } + f.doFuzz(req) + + case 6: + req := make(l.SendTxReq, f.randomInt(l.MaxTxSend+1)) + signer := types.HomesteadSigner{} + for i := range req { + nonce := uint64(f.randomByte()) + if nonce%1 == 0 { + nonce = uint64(len(f.txs)) + } + req[i], _ = types.SignTx(types.NewTransaction(nonce, common.Address{}, big.NewInt(10000), params.TxGas, big.NewInt(1000000000*int64(f.randomByte())), nil), signer, bankKey) + } + f.doFuzz(req) + + case 7: + req := make(l.GetTxStatusReq, f.randomInt(l.MaxTxStatus+1)) + for i := range req { + req[i] = f.randomTxHash() + } + f.doFuzz(req) + } + } + return 0 +}