Skip to content

Commit 7e79254

Browse files
eth/protocols/eth: implement eth/69 (#29158)
This PR implements eth/69. This protocol version drops the bloom filter from receipts messages, reducing the amount of data needed for a sync by ~530GB (2.3B txs * 256 byte) uncompressed. Compressed this will be reduced to ~100GB The new version also changes the Status message and introduces the BlockRangeUpdate message to relay information about the available history range. --------- Co-authored-by: Felix Lange <[email protected]>
1 parent 892a661 commit 7e79254

35 files changed

+1513
-389
lines changed

cmd/devp2p/internal/ethtest/conn.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,9 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) {
6666
return nil, err
6767
}
6868
conn.caps = []p2p.Cap{
69-
{Name: "eth", Version: 67},
70-
{Name: "eth", Version: 68},
69+
{Name: "eth", Version: 69},
7170
}
72-
conn.ourHighestProtoVersion = 68
71+
conn.ourHighestProtoVersion = 69
7372
return &conn, nil
7473
}
7574

@@ -156,7 +155,7 @@ func (c *Conn) ReadEth() (any, error) {
156155
var msg any
157156
switch int(code) {
158157
case eth.StatusMsg:
159-
msg = new(eth.StatusPacket)
158+
msg = new(eth.StatusPacket69)
160159
case eth.GetBlockHeadersMsg:
161160
msg = new(eth.GetBlockHeadersPacket)
162161
case eth.BlockHeadersMsg:
@@ -231,7 +230,7 @@ func (c *Conn) ReadSnap() (any, error) {
231230

232231
// peer performs both the protocol handshake and the status message
233232
// exchange with the node in order to peer with it.
234-
func (c *Conn) peer(chain *Chain, status *eth.StatusPacket) error {
233+
func (c *Conn) peer(chain *Chain, status *eth.StatusPacket69) error {
235234
if err := c.handshake(); err != nil {
236235
return fmt.Errorf("handshake failed: %v", err)
237236
}
@@ -304,7 +303,7 @@ func (c *Conn) negotiateEthProtocol(caps []p2p.Cap) {
304303
}
305304

306305
// statusExchange performs a `Status` message exchange with the given node.
307-
func (c *Conn) statusExchange(chain *Chain, status *eth.StatusPacket) error {
306+
func (c *Conn) statusExchange(chain *Chain, status *eth.StatusPacket69) error {
308307
loop:
309308
for {
310309
code, data, err := c.Read()
@@ -313,12 +312,16 @@ loop:
313312
}
314313
switch code {
315314
case eth.StatusMsg + protoOffset(ethProto):
316-
msg := new(eth.StatusPacket)
315+
msg := new(eth.StatusPacket69)
317316
if err := rlp.DecodeBytes(data, &msg); err != nil {
318317
return fmt.Errorf("error decoding status packet: %w", err)
319318
}
320-
if have, want := msg.Head, chain.blocks[chain.Len()-1].Hash(); have != want {
321-
return fmt.Errorf("wrong head block in status, want: %#x (block %d) have %#x",
319+
if have, want := msg.LatestBlock, chain.blocks[chain.Len()-1].NumberU64(); have != want {
320+
return fmt.Errorf("wrong head block in status, want: %d, have %d",
321+
want, have)
322+
}
323+
if have, want := msg.LatestBlockHash, chain.blocks[chain.Len()-1].Hash(); have != want {
324+
return fmt.Errorf("wrong head block in status, want: %#x (block %d) have %#x",
322325
want, chain.blocks[chain.Len()-1].NumberU64(), have)
323326
}
324327
if have, want := msg.ForkID, chain.ForkID(); !reflect.DeepEqual(have, want) {
@@ -348,13 +351,14 @@ loop:
348351
}
349352
if status == nil {
350353
// default status message
351-
status = &eth.StatusPacket{
354+
status = &eth.StatusPacket69{
352355
ProtocolVersion: uint32(c.negotiatedProtoVersion),
353356
NetworkID: chain.config.ChainID.Uint64(),
354-
TD: chain.TD(),
355-
Head: chain.blocks[chain.Len()-1].Hash(),
356357
Genesis: chain.blocks[0].Hash(),
357358
ForkID: chain.ForkID(),
359+
EarliestBlock: 0,
360+
LatestBlock: chain.blocks[chain.Len()-1].NumberU64(),
361+
LatestBlockHash: chain.blocks[chain.Len()-1].Hash(),
358362
}
359363
}
360364
if err := c.Write(ethProto, eth.StatusMsg, status); err != nil {

cmd/devp2p/internal/ethtest/protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const (
3232
// Unexported devp2p protocol lengths from p2p package.
3333
const (
3434
baseProtoLen = 16
35-
ethProtoLen = 17
35+
ethProtoLen = 18
3636
snapProtoLen = 8
3737
)
3838

cmd/devp2p/internal/ethtest/suite.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ func (s *Suite) EthTests() []utesting.Test {
7474
{Name: "SimultaneousRequests", Fn: s.TestSimultaneousRequests},
7575
{Name: "SameRequestID", Fn: s.TestSameRequestID},
7676
{Name: "ZeroRequestID", Fn: s.TestZeroRequestID},
77-
// get block bodies
77+
// get history
7878
{Name: "GetBlockBodies", Fn: s.TestGetBlockBodies},
79+
{Name: "GetReceipts", Fn: s.TestGetReceipts},
7980
// // malicious handshakes + status
8081
{Name: "MaliciousHandshake", Fn: s.TestMaliciousHandshake},
8182
// test transactions
@@ -418,6 +419,51 @@ func (s *Suite) TestGetBlockBodies(t *utesting.T) {
418419
}
419420
}
420421

422+
func (s *Suite) TestGetReceipts(t *utesting.T) {
423+
t.Log(`This test sends GetReceipts requests to the node for known blocks in the test chain.`)
424+
425+
conn, err := s.dial()
426+
if err != nil {
427+
t.Fatalf("dial failed: %v", err)
428+
}
429+
defer conn.Close()
430+
if err := conn.peer(s.chain, nil); err != nil {
431+
t.Fatalf("peering failed: %v", err)
432+
}
433+
434+
// Find some blocks containing receipts.
435+
var hashes = make([]common.Hash, 0, 3)
436+
for i := range s.chain.Len() {
437+
block := s.chain.GetBlock(i)
438+
if len(block.Transactions()) > 0 {
439+
hashes = append(hashes, block.Hash())
440+
}
441+
if len(hashes) == cap(hashes) {
442+
break
443+
}
444+
}
445+
446+
// Create block bodies request.
447+
req := &eth.GetReceiptsPacket{
448+
RequestId: 66,
449+
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
450+
}
451+
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
452+
t.Fatalf("could not write to connection: %v", err)
453+
}
454+
// Wait for response.
455+
resp := new(eth.ReceiptsPacket[*eth.ReceiptList69])
456+
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
457+
t.Fatalf("error reading block bodies msg: %v", err)
458+
}
459+
if got, want := resp.RequestId, req.RequestId; got != want {
460+
t.Fatalf("unexpected request id in respond", got, want)
461+
}
462+
if len(resp.List) != len(req.GetReceiptsRequest) {
463+
t.Fatalf("wrong bodies in response: expected %d bodies, got %d", len(req.GetReceiptsRequest), len(resp.List))
464+
}
465+
}
466+
421467
// randBuf makes a random buffer size kilobytes large.
422468
func randBuf(size int) []byte {
423469
buf := make([]byte, size*1024)
@@ -500,6 +546,31 @@ func (s *Suite) TestMaliciousHandshake(t *utesting.T) {
500546
}
501547
}
502548

549+
func (s *Suite) TestInvalidBlockRangeUpdate(t *utesting.T) {
550+
t.Log(`This test sends an invalid BlockRangeUpdate message to the node and expects to be disconnected.`)
551+
552+
conn, err := s.dial()
553+
if err != nil {
554+
t.Fatalf("dial failed: %v", err)
555+
}
556+
defer conn.Close()
557+
if err := conn.peer(s.chain, nil); err != nil {
558+
t.Fatalf("peering failed: %v", err)
559+
}
560+
561+
conn.Write(ethProto, eth.BlockRangeUpdateMsg, &eth.BlockRangeUpdatePacket{
562+
EarliestBlock: 10,
563+
LatestBlock: 8,
564+
LatestBlockHash: s.chain.GetBlock(8).Hash(),
565+
})
566+
567+
if code, _, err := conn.Read(); err != nil {
568+
t.Fatalf("expected disconnect, got err: %v", err)
569+
} else if code != discMsg {
570+
t.Fatalf("expected disconnect message, got msg code %d", code)
571+
}
572+
}
573+
503574
func (s *Suite) TestTransaction(t *utesting.T) {
504575
t.Log(`This test sends a valid transaction to the node and checks if the
505576
transaction gets propagated.`)

cmd/utils/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ func ImportHistory(chain *core.BlockChain, dir string, network string) error {
309309
if err != nil {
310310
return fmt.Errorf("error reading receipts %d: %w", it.Number(), err)
311311
}
312-
if _, err := chain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{receipts}, 2^64-1); err != nil {
312+
encReceipts := types.EncodeBlockReceiptLists([]types.Receipts{receipts})
313+
if _, err := chain.InsertReceiptChain([]*types.Block{block}, encReceipts, 2^64-1); err != nil {
313314
return fmt.Errorf("error inserting body %d: %w", it.Number(), err)
314315
}
315316
imported += 1

core/blockchain.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,12 +1309,11 @@ const (
13091309
//
13101310
// The optional ancientLimit can also be specified and chain segment before that
13111311
// will be directly stored in the ancient, getting rid of the chain migration.
1312-
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) {
1312+
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []rlp.RawValue, ancientLimit uint64) (int, error) {
13131313
// Verify the supplied headers before insertion without lock
13141314
var headers []*types.Header
13151315
for _, block := range blockChain {
13161316
headers = append(headers, block.Header())
1317-
13181317
// Here we also validate that blob transactions in the block do not
13191318
// contain a sidecar. While the sidecar does not affect the block hash
13201319
// or tx hash, sending blobs within a block is not allowed.
@@ -1357,11 +1356,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
13571356
//
13581357
// this function only accepts canonical chain data. All side chain will be reverted
13591358
// eventually.
1360-
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
1359+
writeAncient := func(blockChain types.Blocks, receiptChain []rlp.RawValue) (int, error) {
13611360
// Ensure genesis is in the ancient store
13621361
if blockChain[0].NumberU64() == 1 {
13631362
if frozen, _ := bc.db.Ancients(); frozen == 0 {
1364-
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil})
1363+
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList})
13651364
if err != nil {
13661365
log.Error("Error writing genesis to ancients", "err", err)
13671366
return 0, err
@@ -1404,7 +1403,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
14041403
// existing local chain segments (reorg around the chain tip). The reorganized part
14051404
// will be included in the provided chain segment, and stale canonical markers will be
14061405
// silently rewritten. Therefore, no explicit reorg logic is needed.
1407-
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
1406+
writeLive := func(blockChain types.Blocks, receiptChain []rlp.RawValue) (int, error) {
14081407
var (
14091408
skipPresenceCheck = false
14101409
batch = bc.db.NewBatch()
@@ -1429,7 +1428,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
14291428
// Write all the data out into the database
14301429
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
14311430
rawdb.WriteBlock(batch, block)
1432-
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
1431+
rawdb.WriteRawReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
14331432

14341433
// Write everything belongs to the blocks into the database. So that
14351434
// we can ensure all components of body is completed(body, receipts)
@@ -2650,7 +2649,7 @@ func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, e
26502649
first = headers[0].Number.Uint64()
26512650
)
26522651
if first == 1 && frozen == 0 {
2653-
_, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil})
2652+
_, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []rlp.RawValue{rlp.EmptyList})
26542653
if err != nil {
26552654
log.Error("Error writing genesis to ancients", "err", err)
26562655
return 0, err

core/blockchain_reader.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,12 +234,22 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
234234
return receipts
235235
}
236236

237-
func (bc *BlockChain) GetRawReceiptsByHash(hash common.Hash) types.Receipts {
237+
// GetRawReceipts retrieves the receipts for all transactions in a given block
238+
// without deriving the internal fields and the Bloom.
239+
func (bc *BlockChain) GetRawReceipts(hash common.Hash, number uint64) types.Receipts {
240+
if receipts, ok := bc.receiptsCache.Get(hash); ok {
241+
return receipts
242+
}
243+
return rawdb.ReadRawReceipts(bc.db, hash, number)
244+
}
245+
246+
// GetReceiptsRLP retrieves the receipts of a block.
247+
func (bc *BlockChain) GetReceiptsRLP(hash common.Hash) rlp.RawValue {
238248
number := rawdb.ReadHeaderNumber(bc.db, hash)
239249
if number == nil {
240250
return nil
241251
}
242-
return rawdb.ReadRawReceipts(bc.db, hash, *number)
252+
return rawdb.ReadReceiptsRLP(bc.db, hash, *number)
243253
}
244254

245255
// GetUnclesInChain retrieves all the uncles from a given block backwards until

core/blockchain_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ func testFastVsFullChains(t *testing.T, scheme string) {
734734
fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
735735
defer fast.Stop()
736736

737-
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
737+
if n, err := fast.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0); err != nil {
738738
t.Fatalf("failed to insert receipt %d: %v", n, err)
739739
}
740740
// Freezer style fast import the chain.
@@ -747,7 +747,7 @@ func testFastVsFullChains(t *testing.T, scheme string) {
747747
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
748748
defer ancient.Stop()
749749

750-
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
750+
if n, err := ancient.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), uint64(len(blocks)/2)); err != nil {
751751
t.Fatalf("failed to insert receipt %d: %v", n, err)
752752
}
753753

@@ -871,7 +871,7 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) {
871871
fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
872872
defer fast.Stop()
873873

874-
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
874+
if n, err := fast.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0); err != nil {
875875
t.Fatalf("failed to insert receipt %d: %v", n, err)
876876
}
877877
assert(t, "fast", fast, height, height, 0)
@@ -884,7 +884,7 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) {
884884
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
885885
defer ancient.Stop()
886886

887-
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
887+
if n, err := ancient.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), uint64(3*len(blocks)/4)); err != nil {
888888
t.Fatalf("failed to insert receipt %d: %v", n, err)
889889
}
890890
assert(t, "ancient", ancient, height, height, 0)
@@ -1696,7 +1696,7 @@ func testBlockchainRecovery(t *testing.T, scheme string) {
16961696
defer ancientDb.Close()
16971697
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
16981698

1699-
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
1699+
if n, err := ancient.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), uint64(3*len(blocks)/4)); err != nil {
17001700
t.Fatalf("failed to insert receipt %d: %v", n, err)
17011701
}
17021702
rawdb.WriteLastPivotNumber(ancientDb, blocks[len(blocks)-1].NumberU64()) // Force fast sync behavior
@@ -1991,7 +1991,7 @@ func testInsertKnownChainData(t *testing.T, typ string, scheme string) {
19911991
}
19921992
} else if typ == "receipts" {
19931993
inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
1994-
_, err = chain.InsertReceiptChain(blocks, receipts, 0)
1994+
_, err = chain.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0)
19951995
return err
19961996
}
19971997
asserter = func(t *testing.T, block *types.Block) {
@@ -2157,7 +2157,7 @@ func testInsertKnownChainDataWithMerging(t *testing.T, typ string, mergeHeight i
21572157
}
21582158
} else if typ == "receipts" {
21592159
inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
2160-
_, err = chain.InsertReceiptChain(blocks, receipts, 0)
2160+
_, err = chain.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), 0)
21612161
return err
21622162
}
21632163
asserter = func(t *testing.T, block *types.Block) {
@@ -4205,10 +4205,10 @@ func testChainReorgSnapSync(t *testing.T, ancientLimit uint64) {
42054205
chain, _ := NewBlockChain(db, DefaultCacheConfigWithScheme(rawdb.PathScheme), gspec, nil, beacon.New(ethash.NewFaker()), vm.Config{}, nil)
42064206
defer chain.Stop()
42074207

4208-
if n, err := chain.InsertReceiptChain(blocks, receipts, ancientLimit); err != nil {
4208+
if n, err := chain.InsertReceiptChain(blocks, types.EncodeBlockReceiptLists(receipts), ancientLimit); err != nil {
42094209
t.Fatalf("failed to insert receipt %d: %v", n, err)
42104210
}
4211-
if n, err := chain.InsertReceiptChain(chainA, receiptsA, ancientLimit); err != nil {
4211+
if n, err := chain.InsertReceiptChain(chainA, types.EncodeBlockReceiptLists(receiptsA), ancientLimit); err != nil {
42124212
t.Fatalf("failed to insert receipt %d: %v", n, err)
42134213
}
42144214
// If the common ancestor is below the ancient limit, rewind the chain head.
@@ -4218,7 +4218,7 @@ func testChainReorgSnapSync(t *testing.T, ancientLimit uint64) {
42184218
rawdb.WriteLastPivotNumber(db, ancestor)
42194219
chain.SetHead(ancestor)
42204220
}
4221-
if n, err := chain.InsertReceiptChain(chainB, receiptsB, ancientLimit); err != nil {
4221+
if n, err := chain.InsertReceiptChain(chainB, types.EncodeBlockReceiptLists(receiptsB), ancientLimit); err != nil {
42224222
t.Fatalf("failed to insert receipt %d: %v", n, err)
42234223
}
42244224
head := chain.CurrentSnapBlock()
@@ -4336,7 +4336,7 @@ func testInsertChainWithCutoff(t *testing.T, cutoff uint64, ancientLimit uint64,
43364336
if n, err := chain.InsertHeadersBeforeCutoff(headersBefore); err != nil {
43374337
t.Fatalf("failed to insert headers before cutoff %d: %v", n, err)
43384338
}
4339-
if n, err := chain.InsertReceiptChain(blocksAfter, receiptsAfter, ancientLimit); err != nil {
4339+
if n, err := chain.InsertReceiptChain(blocksAfter, types.EncodeBlockReceiptLists(receiptsAfter), ancientLimit); err != nil {
43404340
t.Fatalf("failed to insert receipt %d: %v", n, err)
43414341
}
43424342
headSnap := chain.CurrentSnapBlock()

core/filtermaps/chain_view.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type blockchain interface {
2929
GetHeader(hash common.Hash, number uint64) *types.Header
3030
GetCanonicalHash(number uint64) common.Hash
3131
GetReceiptsByHash(hash common.Hash) types.Receipts
32-
GetRawReceiptsByHash(hash common.Hash) types.Receipts
32+
GetRawReceipts(hash common.Hash, number uint64) types.Receipts
3333
}
3434

3535
// ChainView represents an immutable view of a chain with a block id and a set
@@ -117,7 +117,7 @@ func (cv *ChainView) RawReceipts(number uint64) types.Receipts {
117117
log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber)
118118
return nil
119119
}
120-
return cv.chain.GetRawReceiptsByHash(blockHash)
120+
return cv.chain.GetRawReceipts(blockHash, number)
121121
}
122122

123123
// SharedRange returns the block range shared by two chain views.

core/filtermaps/indexer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ func (tc *testChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
515515
return tc.receipts[hash]
516516
}
517517

518-
func (tc *testChain) GetRawReceiptsByHash(hash common.Hash) types.Receipts {
518+
func (tc *testChain) GetRawReceipts(hash common.Hash, number uint64) types.Receipts {
519519
tc.lock.RLock()
520520
defer tc.lock.RUnlock()
521521

0 commit comments

Comments
 (0)