Skip to content

Beacon sync maintenance update and header download fix #3269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 53 additions & 101 deletions execution_chain/core/chain/header_chain_cache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@
## the `FC` module, the header append process must be finished by a dedicated
## commit statement `commit()`.
##
## Temporary extra:
##
## * There is a function provided combining `importBlocks()` and `forkChoice()`
## as a wrapper around `importBlock()` followed by occasional update of the
## base value of the `FC` module proper using `forkChoice()`.
##

{.push raises:[].}

Expand All @@ -73,7 +67,8 @@ import
"../.."/[common, db/core_db, db/storage_types],
../../db/[kvt, kvt_cf],
../../db/kvt/[kvt_utils, kvt_tx_frame],
./forked_chain/[chain_branch, chain_desc, block_quarantine], ./forked_chain
./forked_chain,
./forked_chain/[chain_branch, chain_desc, block_quarantine]

logScope:
topics = "hc-cache"
Expand Down Expand Up @@ -191,31 +186,42 @@ proc delInfo(db: KvtTxRef) =


proc putHeader(db: KvtTxRef; h: Header) =
## Store rlp encoded header
## Store the argument `header` indexed by block number, and the hash lookup
## of the parent header.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the number of the parent header and not the current one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to avoid computing a hash, cc: @jangko

let data = encodePayload(h)
db.put(beaconHeaderKey(h.number).toOpenArray, data).isOkOr:
raiseAssert RaisePfx & "put(header) failed: " & $error

db.put(genericHashKey(h.parentHash).toOpenArray, (h.number-1).toBytesBE).isOkOr:
raiseAssert RaisePfx & "put(hash->number) failed: " & $error
let parNumData = (h.number-1).toBytesBE
db.put(genericHashKey(h.parentHash).toOpenArray, parNumData).isOkOr:
raiseAssert RaisePfx & "put(number-1) failed: " & $error


proc getNumber(db: KvtTxRef, hash: Hash32): Opt[BlockNumber] =
let number = db.get(genericHashKey(hash).toOpenArray).valueOr:
return err()
ok(uint64.fromBytesBE(number))

proc getHeader(db: KvtTxRef; bn: BlockNumber): Opt[Header] =
## Retrieve some header from cache
let data = db.get(beaconHeaderKey(bn).toOpenArray).valueOr:
return err()
ok decodePayload(data, Header)

proc delHeader(db: KvtTxRef; bn: BlockNumber) =
## Remove header from cache
let h = db.getHeader(bn).valueOr:
raiseAssert RaisePfx & "getHeader failed"
discard db.del(beaconHeaderKey(bn).toOpenArray)
discard db.del(genericHashKey(h.parentHash).toOpenArray)
proc getHeader(db: KvtTxRef; hash: Hash32): Opt[Header] =
## Variant of `getHeader()`
db.getHeader ?db.getNumber(hash)

proc getNumber(db: KvtTxRef, hash: Hash32): Opt[BlockNumber] =
let number = db.get(genericHashKey(hash).toOpenArray).valueOr:
return err()
ok(uint64.fromBytesBE(number))

proc delHeader(db: KvtTxRef; bn: BlockNumber) =
## Remove header from cache, ignore non-existing entries
let
bnKey = beaconHeaderKey(bn)
rc = db.get(bnKey.toOpenArray)
discard db.del(bnKey.toOpenArray)
if rc.isOk:
let h = decodePayload(rc.value, Header)
discard db.del(genericHashKey(h.parentHash).toOpenArray)

# ----------------------

Expand All @@ -241,10 +247,6 @@ proc persistClear(hc: HeaderChainRef) =
# Private functions
# ------------------------------------------------------------------------------

func baseNum(hc: HeaderChainRef): BlockNumber =
## Aka `hc.chain.baseNumber()` (avoiding `forked_chain` import)
hc.chain.baseBranch.tailNumber

func expectingMode(
hc: HeaderChainRef;
mode: HeaderChainMode;
Expand Down Expand Up @@ -273,51 +275,11 @@ proc tryFcParent(hc: HeaderChainRef; hdr: Header): HeaderChainMode =

# Ignore `hdr` unless its block number equals the one of the base (note
# that this function is called with decreasing block numbers.)
let baseNum = hc.baseNum()
let baseNum = hc.chain.baseNumber()
if baseNum + 1 < hdr.number:
return collecting # inconclusive

# The block number of `hdr` must not go below the `base`. It cannot be
# handled even if it has a parent on the block chain (but not on the
# `FC` module.)
#
# Rationale:
# This situataion might arise by means of `CL` requests via RPC updating
# the `base` concurrently to a syncer client. This can only happen if the
# base is near the canonical head. Which in turn means that the header
# chain is short.
#
# Reversing the antecedent (above `base`) is avoided. This goes with the
# argument that completely re-syncing a short header chain is worth it
# comparing additional administrative costs (for a syncer client of that
# module) of handling backward moves of the antecedent.
#
if hdr.number <= baseNum:
return orphan # beyond reach

# Now: baseNum + 1 == hdr.number
#
# This is the last stop (with least possible block number) where the
# `hdr` could have a parent on the `FC` module which obviously failed
# (i.e. `base` is not parent of `hdr`.)
if hc.chain.latestFinalizedBlockNumber <= baseNum:
# So, if resolved at all then `finalised` is ancestor of, or equal to
# `base`. In any case, `hdr` has no parent on the canonical branch up
# to `base`. So it is on another branch.
return orphan # maybe on the wrong branch

# Now `base` and `finalised` (and all ancestors of either) are on the
# the canonical branch of the DAG generated by the `CL` logic.
#
# So is `hdr` as its block number is `baseNum+1` at most the one of
# `finalised`. And both are on the header chain.
#
# So `base` and `hdr` are also on the canonical chain with block number
# distance 1. But `hdr` has no parent on the `FC` module -- oops.
#
# Impossible situation!
raiseAssert RaisePfx & "Base " & baseNum.bnStr & " was expected " &
"to be parent header chain item " & hdr.number.bnStr
return orphan # maybe on the wrong branch

# ------------------------------------------------------------------------------
# Private fork choice call back function
Expand All @@ -334,7 +296,9 @@ proc resolveFinHash(hc: HeaderChainRef, f: Hash32) =
return

if hc.chain.tryUpdatePendingFCU(f, number):
debug "PendingFCU resolved to block number", hash = f.short, number = number.bnStr
debug "PendingFCU resolved to block number",
hash = f.short,
number = number.bnStr

proc headUpdateFromCL(hc: HeaderChainRef; h: Header; f: Hash32) =
## Call back function to register new/prevously-unknown FC updates.
Expand All @@ -343,7 +307,7 @@ proc headUpdateFromCL(hc: HeaderChainRef; h: Header; f: Hash32) =
## client app.
##
if f != zeroHash32 and # finalised hash is set
hc.baseNum + 1 < h.number: # otherwise useless
hc.chain.baseNumber() + 1 < h.number: # otherwise useless

if hc.session.mode == closed:
# Set new session environment
Expand Down Expand Up @@ -394,6 +358,7 @@ proc stop*(hc: HeaderChainRef) =
## destructor `destroy()`.
##
hc.chain.com.headerChainUpdate = HeaderChainUpdateCB(nil)
hc.chain.com.resolveFinHash = ResolveFinHashCB(nil)
hc.notify = HeaderChainNotifyCB(nil)

proc start*(hc: HeaderChainRef; notify: HeaderChainNotifyCB) =
Expand Down Expand Up @@ -497,8 +462,8 @@ proc put*(
return ok() # nothing to do

debug "HC updated",
minNum=rev[^1].number,
maxNum=rev[0].number,
minNum=rev[^1].bnStr,
maxNum=rev[0].bnStr,
numHeaders=rev.len

# Check whether argument list closes up to headers chain
Expand Down Expand Up @@ -555,7 +520,7 @@ proc put*(
if hc.chain.tryUpdatePendingFCU(hash, hdr.number):
debug "PendingFCU resolved to block number",
hash=hash.short,
number=hdr.number.bnStr
number=hdr.bnStr

# Check whether `hdr` has a parent on the `FC` module.
let newMode = hc.tryFcParent(hdr)
Expand Down Expand Up @@ -594,35 +559,26 @@ proc commit*(hc: HeaderChainRef): Result[void,string] =
hc.session.mode = locked # update internal state
return ok()

let baseNum = hc.baseNum
if baseNum < hc.chain.latestFinalizedBlockNumber:
#
# So the `finalised` hash was resolved (otherwise it would have a zero
# block number.)
#
# There are two segments of the canonical chain, `base..finalised` and
# `ante..finalised` (the latter possibly degraded) which both share at
# least `finalised` on the header chain cache.
#
# On the intersection of `ante..finalised` and `base..finalised` there is
# a header with a parent on the `FC` module. Note that the intersecion is
# fully part of the header chain cache where the most senior element can
# be discarded. Neither `base` nor `ante` have a parent on the `FC` module.
#
let startHere = max(baseNum, hc.session.ante.number) + 1

# Find out where the parent to some `FC` module header is.
for bn in startHere .. hc.chain.latestFinalizedBlockNumber:
let newAnte = hc.kvt.getHeader(bn).valueOr:
raiseAssert RaisePfx & "getHeader(" & bn.bnStr & ") failed"
if hc.chain.hashToBlock.hasKey(newAnte.parentHash):
hc.session.ante = newAnte # update header chain
block assignFinalisedChild:
# Use `finalised` only if it is on the header chain as well
let fin = hc.kvt.getHeader(hc.chain.pendingFCU).valueOr:
break assignFinalisedChild

if hc.chain.baseNumber() < fin.number:
# Now, there are two segments of the canonical chain, `base..finalised`
# on# the `FC` module and `ante..finalised` (maybe degraded) on the
# header chain cache.
#
# So `finalised` is on the header chain cache and has a parent on the
# `FC` module.
if hc.chain.hashToBlock.hasKey(hc.chain.pendingFCU):
hc.session.ante = fin
hc.session.mode = locked # update internal state
return ok()

# Impossible situation!
raiseAssert RaisePfx & "No parent on FC module for anu of " &
startHere.bnStr & ".." & hc.chain.latestFinalizedBlockNumber.bnStr
# Impossible situation!
raiseAssert RaisePfx &
"Missing finalised " & fin.bnStr & " parent on FC module"

hc.session.mode = orphan
err("Parent on FC module has been lost: obsolete branch segment")
Expand Down Expand Up @@ -656,10 +612,6 @@ func latestConsHeadNumber*(hc: HeaderChainRef): BlockNumber =
##
hc.session.consHeadNum

func latestNum*(hc: HeaderChainRef): BlockNumber =
## Aka `hc.chain.latestNumber()` (avoiding `forked_chain` import)
hc.chain.activeBranch.headNumber

# ------------------------------------------------------------------------------
# Public debugging helpers
# ------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion execution_chain/sync/beacon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ proc runSetup(ctx: BeaconCtxRef): bool =
proc runRelease(ctx: BeaconCtxRef) =
worker.release(ctx, "RunRelease")

proc runDaemon(ctx: BeaconCtxRef) {.async: (raises: [CancelledError]).} =
proc runDaemon(ctx: BeaconCtxRef) {.async: (raises: []).} =
await worker.runDaemon(ctx, "RunDaemon")

proc runTicker(ctx: BeaconCtxRef) =
Expand Down
4 changes: 2 additions & 2 deletions execution_chain/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ proc napUnlessSomethingToFetch(
try:
await sleepAsync workerIdleWaitInterval
except CancelledError:
buddy.ctrl.zombie = buddy.infectedByTVirus
buddy.ctrl.stopped = true
return true
else:
# Returning `false` => no need to check for shutdown
Expand Down Expand Up @@ -115,7 +115,7 @@ proc runTicker*(ctx: BeaconCtxRef; info: static[string]) =
proc runDaemon*(
ctx: BeaconCtxRef;
info: static[string];
) {.async: (raises: [CancelledError]).} =
) {.async: (raises: []).} =
## Global background job that will be re-started as long as the variable
## `ctx.daemon` is set `true` which corresponds to `ctx.hibernating` set
## to false`.
Expand Down
31 changes: 18 additions & 13 deletions execution_chain/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ proc updateBuddyErrorState(buddy: BeaconBuddyRef) =
fetchBodiesProcessErrThresholdCount < buddy.only.nBdyProcErrors:

# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = buddy.infectedByTVirus
buddy.ctrl.zombie = true

# ------------------------------------------------------------------------------
# Private function(s)
Expand Down Expand Up @@ -327,7 +327,7 @@ proc blocksStagedImport*(
ctx: BeaconCtxRef;
info: static[string];
): Future[bool]
{.async: (raises: [CancelledError]).} =
{.async: (raises: []).} =
## Import/execute blocks record from staged queue
##
let qItem = ctx.blk.staged.ge(0).valueOr:
Expand Down Expand Up @@ -363,17 +363,22 @@ proc blocksStagedImport*(
nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short
continue

(await ctx.chain.importBlock(qItem.data.blocks[n])).isOkOr:
# The way out here is simply to re-compile the block queue. At any
# point, the `FC` module data area might have been moved to a new
# canonical branch.
#
ctx.poolMode = true
warn info & ": import block error (reorg triggered)", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short,
`error`=error
maxImport = nBn
try:
(await ctx.chain.importBlock(qItem.data.blocks[n])).isOkOr:
# The way out here is simply to re-compile the block queue. At any
# point, the `FC` module data area might have been moved to a new
# canonical branch.
#
ctx.poolMode = true
warn info & ": import block error (reorg triggered)", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short,
`error`=error
maxImport = nBn
break importLoop
# isOk => continue
except CancelledError:
maxImport = nBn # shutdown?
break importLoop

# Allow pseudo/async thread switch.
Expand Down
7 changes: 4 additions & 3 deletions execution_chain/sync/beacon/worker/blocks_staged/bodies.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import
func bdyErrors*(buddy: BeaconBuddyRef): string =
$buddy.only.nBdyRespErrors & "/" & $buddy.only.nBdyProcErrors

proc fetchRegisterError*(buddy: BeaconBuddyRef) =
proc fetchRegisterError*(buddy: BeaconBuddyRef, slowPeer = false) =
buddy.only.nBdyRespErrors.inc
if fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors:
buddy.ctrl.zombie = buddy.infectedByTVirus # abandon slow peer
if 1 < buddy.ctx.pool.nBuddies or not slowPeer:
buddy.ctrl.zombie = true # abandon slow peer unless last one

proc bodiesFetch*(
buddy: BeaconBuddyRef;
Expand Down Expand Up @@ -80,7 +81,7 @@ proc bodiesFetch*(
# mimimum share of the number of requested headers expected, typically 10%.
if fetchBodiesReqErrThresholdZombie < elapsed or
b.len.uint64 * 100 < nReq.uint64 * fetchBodiesReqMinResponsePC:
buddy.fetchRegisterError()
buddy.fetchRegisterError(slowPeer=true)
else:
buddy.only.nBdyRespErrors = 0 # reset error count

Expand Down
Loading
Loading