Skip to content

Beacon sync update header and reorg blocks processing #3306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions execution_chain/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,6 @@ type
"is accepted"
name: "debug-beacon-sync-target-file" .}: Option[InputFile]

beaconSyncBlocksQueueHwm* {.
hidden
desc: "Limit number of blocks on staging queue for beacon sync"
defaultValue: 0
name: "debug-beacon-sync-blocks-queue-hwm" .}: int

rocksdbMaxOpenFiles {.
hidden
defaultValue: defaultMaxOpenFiles
Expand Down
6 changes: 6 additions & 0 deletions execution_chain/core/chain/header_chain_cache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,12 @@ func head*(hc: HeaderChainRef): Header =
if collecting <= hc.state:
return hc.session.head

func headHash*(hc: HeaderChainRef): Hash32 =
## Getter: hash of `head()`
##
if collecting <= hc.state:
return hc.session.headHash

func antecedent*(hc: HeaderChainRef): Header =
## Getter: bottom of header chain. In case there is no header chain
## initialised, the return value is `Header()` (i.e. the block number
Expand Down
2 changes: 1 addition & 1 deletion execution_chain/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,

# Always initialise beacon syncer
nimbus.beaconSyncRef = BeaconSyncRef.init(
nimbus.ethNode, nimbus.fc, conf.maxPeers, conf.beaconSyncBlocksQueueHwm)
nimbus.ethNode, nimbus.fc, conf.maxPeers)

# Optional for pre-setting the sync target (i.e. debugging)
if conf.beaconSyncTargetFile.isSome():
Expand Down
2 changes: 0 additions & 2 deletions execution_chain/sync/beacon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ proc init*(
ethNode: EthereumNode;
chain: ForkedChainRef;
maxPeers: int;
blockQueueHwm = 0;
): T =
var desc = T()
desc.initSync(ethNode, maxPeers)
desc.ctx.pool.blkStagedHwm = blockQueueHwm
desc.ctx.pool.chain = chain
desc

Expand Down
66 changes: 35 additions & 31 deletions execution_chain/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import
# Private functions
# ------------------------------------------------------------------------------

proc napUnlessSomethingToFetch(
proc napUnlessSomethingToCollect(
buddy: BeaconBuddyRef;
): Future[bool] {.async: (raises: []).} =
## When idle, save cpu cycles waiting for something to do.
if buddy.ctx.pool.blkImportOk or # currently importing blocks
buddy.ctx.hibernate or # not activated yet?
not (buddy.headersStagedFetchOk() or # something on TODO list
buddy.blocksStagedFetchOk()):
if buddy.ctx.hibernate or # not activated yet?
not (buddy.headersStagedCollectOk() or # something on TODO list
buddy.blocksStagedCollectOk()):
try:
await sleepAsync workerIdleWaitInterval
except CancelledError:
Expand Down Expand Up @@ -118,7 +117,7 @@ proc runDaemon*(
) {.async: (raises: []).} =
## Global background job that will be re-started as long as the variable
## `ctx.daemon` is set `true` which corresponds to `ctx.hibernating` set
## to false`.
## to false.
##
## On a fresh start, the flag `ctx.daemon` will not be set `true` before the
## first usable request from the CL (via RPC) stumbles in.
Expand All @@ -129,22 +128,14 @@ proc runDaemon*(
return

# Execute staged block records.
if ctx.blocksStagedCanImportOk():

block:
# Set flag informing peers to go into idle mode while importing takes
# place. It has been observed that importing blocks and downloading
# at the same time does not work very well, most probably due to high
# system activity while importing. Peers will get lost pretty soon after
# downloading starts if they continue downloading.
ctx.pool.blkImportOk = true
defer: ctx.pool.blkImportOk = false

# Import from staged queue.
while await ctx.blocksStagedImport(info):
if not ctx.daemon or # Implied by external sync shutdown?
ctx.poolMode: # Oops, re-org needed?
return
if ctx.blocksStagedProcessOk():

# Import bodies from the `staged` queue.
discard await ctx.blocksStagedProcess info

if not ctx.daemon or # Implied by external sync shutdown?
ctx.poolMode: # Oops, re-org needed?
return

# At the end of the cycle, leave time to trigger refill headers/blocks
try: await sleepAsync daemonWaitInterval
Expand Down Expand Up @@ -187,23 +178,36 @@ proc runPeer*(
buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun
buddy.only.nMultiLoop.inc # statistics/debugging

if not await buddy.napUnlessSomethingToFetch():
if not await buddy.napUnlessSomethingToCollect():

# Download and process headers and blocks
while buddy.headersStagedFetchOk():
while buddy.headersStagedCollectOk():

# Collect headers and either stash them on the header chain cache
# directly, or stage then on the header queue to get them serialised,
# later.
if await buddy.headersStagedCollect info:
# directly, or stage on the header queue to get them serialised and
# stashed, later.
await buddy.headersStagedCollect info

# Store headers from the `staged` queue onto the header chain cache.
buddy.headersStagedProcess info
# Store serialised headers from the `staged` queue onto the header
# chain cache.
if not buddy.headersStagedProcess info:
# Need to proceed with another peer (e.g. gap between queue and
# header chain cache.)
break

# Fetch bodies and combine them with headers to blocks to be staged. These
# staged blocks are then excuted by the daemon process (no `peer` needed.)
while buddy.blocksStagedFetchOk():
discard await buddy.blocksStagedCollect info
while buddy.blocksStagedCollectOk():

# Collect bodies and either import them via `FC` module, or stage on
# the blocks queue to get them serialised and imported, later.
await buddy.blocksStagedCollect info

# Import bodies from the `staged` queue.
if not await buddy.blocksStagedProcess info:
# Need to proceed with another peer (e.g. gap between top imported
# block and blocks queue.)
break

# Note that it is important **not** to leave this function to be
# re-invoked by the scheduler unless necessary. While the time gap
Expand Down
Loading
Loading