Skip to content

eth, core: track block propagation #1078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 5, 2024
Merged
1 change: 1 addition & 0 deletions builder/files/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ syncmode = "full"
# json = false
# backtrace = ""
# debug = true
# enable-block-tracking = false

[p2p]
# maxpeers = 1
Expand Down
1 change: 1 addition & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type Block struct {
// inter-peer block relay.
ReceivedAt time.Time
ReceivedFrom interface{}
AnnouncedAt *time.Time
}

// "external" block encoding. used for eth protocol, etc.
Expand Down
9 changes: 5 additions & 4 deletions docs/cli/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ devfakeauthor = false # Run miner without validator set authorization
"32000000" = "0x875500011e5eecc0c554f95d07b31cf59df4ca2505f4dbbfffa7d4e4da917c68"

[log]
vmodule = "" # Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
json = false # Format logs with JSON
backtrace = "" # Request a stack trace at a specific logging statement (e.g. "block.go:271")
debug = true # Prepends log messages with call-site location (file and line number) - {requires some effort}
vmodule = "" # Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
json = false # Format logs with JSON
backtrace = "" # Request a stack trace at a specific logging statement (e.g. "block.go:271")
debug = true # Prepends log messages with call-site location (file and line number)
enable-block-tracking = false # Enables additional logging of information collected while tracking block lifecycle

[p2p]
maxpeers = 50 # Maximum number of network peers (network disabled if set to 0)
Expand Down
2 changes: 2 additions & 0 deletions docs/cli/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ The ```bor server``` command runs the Bor client.

- ```log.debug```: Prepends log messages with call-site location (file and line number) (default: false)

- ```log.enable-block-tracking```: Enables additional logging of information collected while tracking block lifecycle (default: false)

- ```log.json```: Format logs with JSON (default: false)

- ```vmodule```: Per-module verbosity: comma-separated list of <pattern>=<level> (e.g. eth/*=5,p2p=4)
Expand Down
25 changes: 13 additions & 12 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,18 +272,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Merger: eth.merger,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
EthAPI: blockChainAPI,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Merger: eth.merger,
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
EthAPI: blockChainAPI,
checker: checker,
txArrivalWait: eth.p2pServer.TxArrivalWait,
enableBlockTracking: eth.config.EnableBlockTracking,
}); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ type Config struct {

// OverrideVerkle (TODO: remove after the fork)
OverrideVerkle *big.Int `toml:",omitempty"`

// EnableBlockTracking allows logging of information collected while tracking block lifecycle
EnableBlockTracking bool
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
109 changes: 71 additions & 38 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,20 @@ type blockAnnounce struct {

// headerFilterTask represents a batch of headers needing fetcher filtering.
type headerFilterTask struct {
peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
peer string // The source peer of block headers
headers []*types.Header // Collection of headers to filter
time time.Time // Arrival time of the headers
announcedTime time.Time // Announcement time of the headers
}

// bodyFilterTask represents a batch of block bodies (transactions and uncles)
// needing fetcher filtering.
type bodyFilterTask struct {
peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
peer string // The source peer of block bodies
transactions [][]*types.Transaction // Collection of transactions per block bodies
uncles [][]*types.Header // Collection of uncles per block bodies
time time.Time // Arrival time of the blocks' contents
announcedTime time.Time
}

// blockOrHeaderInject represents a schedules import operation.
Expand Down Expand Up @@ -197,34 +199,38 @@ type BlockFetcher struct {
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62)

// Logging
enableBlockTracking bool // Whether to log block tracking information
}

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn, enableBlockTracking bool) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New[int64, *blockOrHeaderInject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
enableBlockTracking: enableBlockTracking,
}
}

Expand Down Expand Up @@ -276,7 +282,7 @@ func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {

// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time, announcedAt time.Time) []*types.Header {
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

// Send the filter channel to the fetcher
Expand All @@ -289,7 +295,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time
}
// Request the filtering of the header list
select {
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
case filter <- &headerFilterTask{peer: peer, headers: headers, time: time, announcedTime: announcedAt}:
case <-f.quit:
return nil
}
Expand All @@ -304,7 +310,7 @@ func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time

// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time, announcedAt time.Time) ([][]*types.Transaction, [][]*types.Header) {
log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))

// Send the filter channel to the fetcher
Expand All @@ -317,7 +323,7 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac
}
// Request the filtering of the body list
select {
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time, announcedTime: announcedAt}:
case <-f.quit:
return nil, nil
}
Expand Down Expand Up @@ -480,7 +486,7 @@ func (f *BlockFetcher) loop() {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)

// Create a closure of the fetch and schedule in on a new thread
fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
fetchHeader, hashes, announcedAt := f.fetching[hashes[0]].fetchHeader, hashes, f.fetching[hashes[0]].time
go func(peer string) {
if f.fetchingHook != nil {
f.fetchingHook(hashes)
Expand All @@ -504,7 +510,7 @@ func (f *BlockFetcher) loop() {
select {
case res := <-resCh:
res.Done <- nil
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now().Add(res.Time))
f.FilterHeaders(peer, *res.Res.(*eth.BlockHeadersPacket), time.Now(), announcedAt)

case <-timeout.C:
// The peer didn't respond in time. The request
Expand Down Expand Up @@ -547,6 +553,7 @@ func (f *BlockFetcher) loop() {

fetchBodies := f.completing[hashes[0]].fetchBodies
bodyFetchMeter.Mark(int64(len(hashes)))
announcedAt := f.completing[hashes[0]].time

go func(peer string, hashes []common.Hash) {
resCh := make(chan *eth.Response)
Expand All @@ -565,7 +572,7 @@ func (f *BlockFetcher) loop() {
res.Done <- nil
// Ignoring withdrawals here, since the block fetcher is not used post-merge.
txs, uncles, _ := res.Res.(*eth.BlockBodiesPacket).Unpack()
f.FilterBodies(peer, txs, uncles, time.Now())
f.FilterBodies(peer, txs, uncles, time.Now(), announcedAt)

case <-timeout.C:
// The peer didn't respond in time. The request
Expand Down Expand Up @@ -631,6 +638,7 @@ func (f *BlockFetcher) loop() {

block := types.NewBlockWithHeader(header)
block.ReceivedAt = task.time
block.AnnouncedAt = &task.announcedTime

complete = append(complete, block)
f.completing[hash] = announce
Expand Down Expand Up @@ -725,6 +733,7 @@ func (f *BlockFetcher) loop() {
if f.getBlock(hash) == nil {
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
block.ReceivedAt = task.time
block.AnnouncedAt = &task.announcedTime
blocks = append(blocks, block)
} else {
f.forgetHash(hash)
Expand Down Expand Up @@ -923,6 +932,30 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
return
}

if f.enableBlockTracking {
// Log the insertion event
var (
msg string
delayInMs uint64
prettyDelay common.PrettyDuration
)

if block.AnnouncedAt != nil {
msg = "[block tracker] Inserted new block with announcement"
delayInMs = uint64(time.Since(*block.AnnouncedAt).Milliseconds())
prettyDelay = common.PrettyDuration(time.Since(*block.AnnouncedAt))
} else {
msg = "[block tracker] Inserted new block without announcement"
delayInMs = uint64(time.Since(block.ReceivedAt).Milliseconds())
prettyDelay = common.PrettyDuration(time.Since(block.ReceivedAt))
}

totalDelay := uint64(time.Now().UnixMilli()) - block.Time()*1000

log.Info(msg, "number", block.Number().Uint64(), "hash", hash, "delayInMs", delayInMs, "delay", prettyDelay, "totalDelay", totalDelay)
}

// If import succeeded, broadcast the block
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)

Expand Down
2 changes: 1 addition & 1 deletion eth/fetcher/block_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newTester(light bool) *fetcherTester {
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer, false)
tester.fetcher.Start()

return tester
Expand Down
61 changes: 35 additions & 26 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,19 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Merger *consensus.Merger // The manager for eth1/2 transition
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
txArrivalWait time.Duration // Maximum duration to wait for an announced tx before requesting it
checker ethereum.ChainValidator
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
EthAPI *ethapi.BlockChainAPI // EthAPI to interact
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
Merger *consensus.Merger // The manager for eth1/2 transition
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
txArrivalWait time.Duration // Maximum duration to wait for an announced tx before requesting it
checker ethereum.ChainValidator
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
EthAPI *ethapi.BlockChainAPI // EthAPI to interact
enableBlockTracking bool // Whether to log block tracking information
}

type handler struct {
Expand Down Expand Up @@ -126,6 +127,8 @@ type handler struct {

requiredBlocks map[uint64]common.Hash

enableBlockTracking bool

// channels for fetcher, syncer, txsyncLoop
quitSync chan struct{}

Expand All @@ -144,19 +147,20 @@ func newHandler(config *handlerConfig) (*handler, error) {
}

h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
merger: config.Merger,
ethAPI: config.EthAPI,
requiredBlocks: config.RequiredBlocks,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
merger: config.Merger,
ethAPI: config.EthAPI,
requiredBlocks: config.RequiredBlocks,
enableBlockTracking: config.enableBlockTracking,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
}
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
Expand Down Expand Up @@ -295,7 +299,7 @@ func newHandler(config *handlerConfig) (*handler, error) {

return n, err
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer, h.enableBlockTracking)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
Expand Down Expand Up @@ -688,6 +692,11 @@ func (h *handler) minedBroadcastLoop() {

for obj := range h.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
if h.enableBlockTracking {
delayInMs := uint64(time.Now().UnixMilli()) - ev.Block.Time()*1000
delayInNs := uint64(time.Now().UnixNano()) - ev.Block.Time()*1000000000
log.Info("[block tracker] Broadcasting mined block", "number", ev.Block.NumberU64(), "hash", ev.Block.Hash(), "delayInMs", delayInMs, "delayInNs", delayInNs, "blockTime", ev.Block.Time(), "now", time.Now().Unix())
}
h.BroadcastBlock(ev.Block, true) // First propagate block to peers
h.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
Expand Down
Loading