From c1bbe42ea950733fb2a815de445a1dccb98ee5f8 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Tue, 6 May 2025 11:36:13 +0100 Subject: [PATCH 1/8] Remove cruft --- execution_chain/core/chain/header_chain_cache.nim | 4 ---- 1 file changed, 4 deletions(-) diff --git a/execution_chain/core/chain/header_chain_cache.nim b/execution_chain/core/chain/header_chain_cache.nim index 7f105d0d13..08ad08b221 100644 --- a/execution_chain/core/chain/header_chain_cache.nim +++ b/execution_chain/core/chain/header_chain_cache.nim @@ -656,10 +656,6 @@ func latestConsHeadNumber*(hc: HeaderChainRef): BlockNumber = ## hc.session.consHeadNum -func latestNum*(hc: HeaderChainRef): BlockNumber = - ## Aka `hc.chain.latestNumber()` (avoiding `forked_chain` import) - hc.chain.activeBranch.headNumber - # ------------------------------------------------------------------------------ # Public debugging helpers # ------------------------------------------------------------------------------ From ab2085de0e7ec960214542afb865cab0fea5e425 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Tue, 6 May 2025 16:08:43 +0100 Subject: [PATCH 2/8] Cosmetics, docu and logging updates, etc. --- .../core/chain/header_chain_cache.nim | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/execution_chain/core/chain/header_chain_cache.nim b/execution_chain/core/chain/header_chain_cache.nim index 08ad08b221..21852e2962 100644 --- a/execution_chain/core/chain/header_chain_cache.nim +++ b/execution_chain/core/chain/header_chain_cache.nim @@ -56,12 +56,6 @@ ## the `FC` module, the header append process must be finished by a dedicated ## commit statement `commit()`. ## -## Temporary extra: -## -## * There is a function provided combining `importBlocks()` and `forkChoice()` -## as a wrapper around `importBlock()` followed by occasional update of the -## base value of the `FC` module proper using `forkChoice()`. -## {.push raises:[].} @@ -73,7 +67,8 @@ import "../.."/[common, db/core_db, db/storage_types], ../../db/[kvt, kvt_cf], ../../db/kvt/[kvt_utils, kvt_tx_frame], - ./forked_chain/[chain_branch, chain_desc, block_quarantine], ./forked_chain + ./forked_chain, + ./forked_chain/[chain_branch, chain_desc, block_quarantine] logScope: topics = "hc-cache" @@ -191,13 +186,21 @@ proc delInfo(db: KvtTxRef) = proc putHeader(db: KvtTxRef; h: Header) = - ## Store rlp encoded header + ## Store the argument `header` indexed by block number, and the hash lookup + ## of the parent header. let data = encodePayload(h) db.put(beaconHeaderKey(h.number).toOpenArray, data).isOkOr: raiseAssert RaisePfx & "put(header) failed: " & $error - db.put(genericHashKey(h.parentHash).toOpenArray, (h.number-1).toBytesBE).isOkOr: - raiseAssert RaisePfx & "put(hash->number) failed: " & $error + let parNumData = (h.number-1).toBytesBE + db.put(genericHashKey(h.parentHash).toOpenArray, parNumData).isOkOr: + raiseAssert RaisePfx & "put(number-1) failed: " & $error + + +proc getNumber(db: KvtTxRef, hash: Hash32): Opt[BlockNumber] = + let number = db.get(genericHashKey(hash).toOpenArray).valueOr: + return err() + ok(uint64.fromBytesBE(number)) proc getHeader(db: KvtTxRef; bn: BlockNumber): Opt[Header] = ## Retrieve some header from cache @@ -205,6 +208,11 @@ proc getHeader(db: KvtTxRef; bn: BlockNumber): Opt[Header] = return err() ok decodePayload(data, Header) +proc getHeader(db: KvtTxRef; hash: Hash32): Opt[Header] = + ## Variant of `getHeader()` + db.getHeader ?db.getNumber(hash) + + proc delHeader(db: KvtTxRef; bn: BlockNumber) = ## Remove header from cache let h = db.getHeader(bn).valueOr: @@ -212,11 +220,6 @@ proc delHeader(db: KvtTxRef; bn: BlockNumber) = discard db.del(beaconHeaderKey(bn).toOpenArray) discard db.del(genericHashKey(h.parentHash).toOpenArray) -proc getNumber(db: KvtTxRef, hash: Hash32): Opt[BlockNumber] = - let number = db.get(genericHashKey(hash).toOpenArray).valueOr: - return err() - ok(uint64.fromBytesBE(number)) - # ---------------------- proc persistInfo(hc: HeaderChainRef) = @@ -241,10 +244,6 @@ proc persistClear(hc: HeaderChainRef) = # Private functions # ------------------------------------------------------------------------------ -func baseNum(hc: HeaderChainRef): BlockNumber = - ## Aka `hc.chain.baseNumber()` (avoiding `forked_chain` import) - hc.chain.baseBranch.tailNumber - func expectingMode( hc: HeaderChainRef; mode: HeaderChainMode; @@ -273,7 +272,7 @@ proc tryFcParent(hc: HeaderChainRef; hdr: Header): HeaderChainMode = # Ignore `hdr` unless its block number equals the one of the base (note # that this function is called with decreasing block numbers.) - let baseNum = hc.baseNum() + let baseNum = hc.chain.baseNumber() if baseNum + 1 < hdr.number: return collecting # inconclusive @@ -334,7 +333,9 @@ proc resolveFinHash(hc: HeaderChainRef, f: Hash32) = return if hc.chain.tryUpdatePendingFCU(f, number): - debug "PendingFCU resolved to block number", hash = f.short, number = number.bnStr + debug "PendingFCU resolved to block number", + hash = f.short, + number = number.bnStr proc headUpdateFromCL(hc: HeaderChainRef; h: Header; f: Hash32) = ## Call back function to register new/prevously-unknown FC updates. @@ -343,7 +344,7 @@ proc headUpdateFromCL(hc: HeaderChainRef; h: Header; f: Hash32) = ## client app. ## if f != zeroHash32 and # finalised hash is set - hc.baseNum + 1 < h.number: # otherwise useless + hc.chain.baseNumber() + 1 < h.number: # otherwise useless if hc.session.mode == closed: # Set new session environment @@ -497,8 +498,8 @@ proc put*( return ok() # nothing to do debug "HC updated", - minNum=rev[^1].number, - maxNum=rev[0].number, + minNum=rev[^1].bnStr, + maxNum=rev[0].bnStr, numHeaders=rev.len # Check whether argument list closes up to headers chain @@ -555,7 +556,7 @@ proc put*( if hc.chain.tryUpdatePendingFCU(hash, hdr.number): debug "PendingFCU resolved to block number", hash=hash.short, - number=hdr.number.bnStr + number=hdr.bnStr # Check whether `hdr` has a parent on the `FC` module. let newMode = hc.tryFcParent(hdr) @@ -594,7 +595,7 @@ proc commit*(hc: HeaderChainRef): Result[void,string] = hc.session.mode = locked # update internal state return ok() - let baseNum = hc.baseNum + let baseNum = hc.chain.baseNumber() if baseNum < hc.chain.latestFinalizedBlockNumber: # # So the `finalised` hash was resolved (otherwise it would have a zero From b20dfe34689056e51e1dfa0b489fe37064678794 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Wed, 7 May 2025 12:30:46 +0100 Subject: [PATCH 3/8] Header Cache: Fix header chain cache `stop()` function why: Must unlink closure that refers to the `hc` descriptor. `stop()` is also used in the destructor. --- execution_chain/core/chain/header_chain_cache.nim | 1 + 1 file changed, 1 insertion(+) diff --git a/execution_chain/core/chain/header_chain_cache.nim b/execution_chain/core/chain/header_chain_cache.nim index 21852e2962..6c5b0fb18a 100644 --- a/execution_chain/core/chain/header_chain_cache.nim +++ b/execution_chain/core/chain/header_chain_cache.nim @@ -395,6 +395,7 @@ proc stop*(hc: HeaderChainRef) = ## destructor `destroy()`. ## hc.chain.com.headerChainUpdate = HeaderChainUpdateCB(nil) + hc.chain.com.resolveFinHash = ResolveFinHashCB(nil) hc.notify = HeaderChainNotifyCB(nil) proc start*(hc: HeaderChainRef; notify: HeaderChainNotifyCB) = From 78574ed5cf75a9f0adffeeeed3c083cf290e9119 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Wed, 7 May 2025 12:16:08 +0100 Subject: [PATCH 4/8] Header Cache: Remove assert for missing entry to be deleted why: Might crash repeatedly while debugging. --- execution_chain/core/chain/header_chain_cache.nim | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/execution_chain/core/chain/header_chain_cache.nim b/execution_chain/core/chain/header_chain_cache.nim index 6c5b0fb18a..b21a647182 100644 --- a/execution_chain/core/chain/header_chain_cache.nim +++ b/execution_chain/core/chain/header_chain_cache.nim @@ -214,11 +214,14 @@ proc getHeader(db: KvtTxRef; hash: Hash32): Opt[Header] = proc delHeader(db: KvtTxRef; bn: BlockNumber) = - ## Remove header from cache - let h = db.getHeader(bn).valueOr: - raiseAssert RaisePfx & "getHeader failed" - discard db.del(beaconHeaderKey(bn).toOpenArray) - discard db.del(genericHashKey(h.parentHash).toOpenArray) + ## Remove header from cache, ignore non-existing entries + let + bnKey = beaconHeaderKey(bn) + rc = db.get(bnKey.toOpenArray) + discard db.del(bnKey.toOpenArray) + if rc.isOk: + let h = decodePayload(rc.value, Header) + discard db.del(genericHashKey(h.parentHash).toOpenArray) # ---------------------- From 0e7f1a662f0bd294e799dc5f4d353e2f34ea7592 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Tue, 6 May 2025 18:16:15 +0100 Subject: [PATCH 5/8] Header Cache: Update header chain cache `FC` parent detection why: Previously, `finalised` was local and static. So it was known that the chain `antecedent..finalised` was a segment on the canonical chain shared with the header chain cache. This was exploited in some fringe cases for `FC` parent detection when there was some interference with the FCU calls from the `CL` via RPC. Now, the `finalised` entry is maintained outside. So the assumption that `antecedent..finalised` is on the header chain cache does not necessarily hold anymore. --- .../core/chain/header_chain_cache.nim | 85 ++++--------------- 1 file changed, 18 insertions(+), 67 deletions(-) diff --git a/execution_chain/core/chain/header_chain_cache.nim b/execution_chain/core/chain/header_chain_cache.nim index b21a647182..bb0756968c 100644 --- a/execution_chain/core/chain/header_chain_cache.nim +++ b/execution_chain/core/chain/header_chain_cache.nim @@ -279,47 +279,7 @@ proc tryFcParent(hc: HeaderChainRef; hdr: Header): HeaderChainMode = if baseNum + 1 < hdr.number: return collecting # inconclusive - # The block number of `hdr` must not go below the `base`. It cannot be - # handled even if it has a parent on the block chain (but not on the - # `FC` module.) - # - # Rationale: - # This situataion might arise by means of `CL` requests via RPC updating - # the `base` concurrently to a syncer client. This can only happen if the - # base is near the canonical head. Which in turn means that the header - # chain is short. - # - # Reversing the antecedent (above `base`) is avoided. This goes with the - # argument that completely re-syncing a short header chain is worth it - # comparing additional administrative costs (for a syncer client of that - # module) of handling backward moves of the antecedent. - # - if hdr.number <= baseNum: - return orphan # beyond reach - - # Now: baseNum + 1 == hdr.number - # - # This is the last stop (with least possible block number) where the - # `hdr` could have a parent on the `FC` module which obviously failed - # (i.e. `base` is not parent of `hdr`.) - if hc.chain.latestFinalizedBlockNumber <= baseNum: - # So, if resolved at all then `finalised` is ancestor of, or equal to - # `base`. In any case, `hdr` has no parent on the canonical branch up - # to `base`. So it is on another branch. - return orphan # maybe on the wrong branch - - # Now `base` and `finalised` (and all ancestors of either) are on the - # the canonical branch of the DAG generated by the `CL` logic. - # - # So is `hdr` as its block number is `baseNum+1` at most the one of - # `finalised`. And both are on the header chain. - # - # So `base` and `hdr` are also on the canonical chain with block number - # distance 1. But `hdr` has no parent on the `FC` module -- oops. - # - # Impossible situation! - raiseAssert RaisePfx & "Base " & baseNum.bnStr & " was expected " & - "to be parent header chain item " & hdr.number.bnStr + return orphan # maybe on the wrong branch # ------------------------------------------------------------------------------ # Private fork choice call back function @@ -599,35 +559,26 @@ proc commit*(hc: HeaderChainRef): Result[void,string] = hc.session.mode = locked # update internal state return ok() - let baseNum = hc.chain.baseNumber() - if baseNum < hc.chain.latestFinalizedBlockNumber: - # - # So the `finalised` hash was resolved (otherwise it would have a zero - # block number.) - # - # There are two segments of the canonical chain, `base..finalised` and - # `ante..finalised` (the latter possibly degraded) which both share at - # least `finalised` on the header chain cache. - # - # On the intersection of `ante..finalised` and `base..finalised` there is - # a header with a parent on the `FC` module. Note that the intersecion is - # fully part of the header chain cache where the most senior element can - # be discarded. Neither `base` nor `ante` have a parent on the `FC` module. - # - let startHere = max(baseNum, hc.session.ante.number) + 1 - - # Find out where the parent to some `FC` module header is. - for bn in startHere .. hc.chain.latestFinalizedBlockNumber: - let newAnte = hc.kvt.getHeader(bn).valueOr: - raiseAssert RaisePfx & "getHeader(" & bn.bnStr & ") failed" - if hc.chain.hashToBlock.hasKey(newAnte.parentHash): - hc.session.ante = newAnte # update header chain + block assignFinalisedChild: + # Use `finalised` only if it is on the header chain as well + let fin = hc.kvt.getHeader(hc.chain.pendingFCU).valueOr: + break assignFinalisedChild + + if hc.chain.baseNumber() < fin.number: + # Now, there are two segments of the canonical chain, `base..finalised` + # on# the `FC` module and `ante..finalised` (maybe degraded) on the + # header chain cache. + # + # So `finalised` is on the header chain cache and has a parent on the + # `FC` module. + if hc.chain.hashToBlock.hasKey(hc.chain.pendingFCU): + hc.session.ante = fin hc.session.mode = locked # update internal state return ok() - # Impossible situation! - raiseAssert RaisePfx & "No parent on FC module for anu of " & - startHere.bnStr & ".." & hc.chain.latestFinalizedBlockNumber.bnStr + # Impossible situation! + raiseAssert RaisePfx & + "Missing finalised " & fin.bnStr & " parent on FC module" hc.session.mode = orphan err("Parent on FC module has been lost: obsolete branch segment") From f6ea16dd27e03b02e2ae28f065ecd8c0ef79a20a Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Wed, 7 May 2025 16:06:19 +0100 Subject: [PATCH 6/8] Syncer: Handle `CancelledError` in block importer function why: Previously, the `importBlock()` function was deterministic, i.e. non-async. This has changed. Now, this function returns a `Future` which can throw an `CancelledError` exception. --- execution_chain/sync/beacon.nim | 2 +- execution_chain/sync/beacon/worker.nim | 2 +- .../sync/beacon/worker/blocks_staged.nim | 29 +++++++++++-------- execution_chain/sync/sync_sched.nim | 4 +-- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/execution_chain/sync/beacon.nim b/execution_chain/sync/beacon.nim index bc6ae6b16d..e3d43d3935 100644 --- a/execution_chain/sync/beacon.nim +++ b/execution_chain/sync/beacon.nim @@ -35,7 +35,7 @@ proc runSetup(ctx: BeaconCtxRef): bool = proc runRelease(ctx: BeaconCtxRef) = worker.release(ctx, "RunRelease") -proc runDaemon(ctx: BeaconCtxRef) {.async: (raises: [CancelledError]).} = +proc runDaemon(ctx: BeaconCtxRef) {.async: (raises: []).} = await worker.runDaemon(ctx, "RunDaemon") proc runTicker(ctx: BeaconCtxRef) = diff --git a/execution_chain/sync/beacon/worker.nim b/execution_chain/sync/beacon/worker.nim index ede1340133..1fdfb9c2a4 100644 --- a/execution_chain/sync/beacon/worker.nim +++ b/execution_chain/sync/beacon/worker.nim @@ -115,7 +115,7 @@ proc runTicker*(ctx: BeaconCtxRef; info: static[string]) = proc runDaemon*( ctx: BeaconCtxRef; info: static[string]; - ) {.async: (raises: [CancelledError]).} = + ) {.async: (raises: []).} = ## Global background job that will be re-started as long as the variable ## `ctx.daemon` is set `true` which corresponds to `ctx.hibernating` set ## to false`. diff --git a/execution_chain/sync/beacon/worker/blocks_staged.nim b/execution_chain/sync/beacon/worker/blocks_staged.nim index d8802a1a36..b05bb0a66d 100644 --- a/execution_chain/sync/beacon/worker/blocks_staged.nim +++ b/execution_chain/sync/beacon/worker/blocks_staged.nim @@ -327,7 +327,7 @@ proc blocksStagedImport*( ctx: BeaconCtxRef; info: static[string]; ): Future[bool] - {.async: (raises: [CancelledError]).} = + {.async: (raises: []).} = ## Import/execute blocks record from staged queue ## let qItem = ctx.blk.staged.ge(0).valueOr: @@ -363,17 +363,22 @@ proc blocksStagedImport*( nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short continue - (await ctx.chain.importBlock(qItem.data.blocks[n])).isOkOr: - # The way out here is simply to re-compile the block queue. At any - # point, the `FC` module data area might have been moved to a new - # canonical branch. - # - ctx.poolMode = true - warn info & ": import block error (reorg triggered)", n, iv, - B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short, - `error`=error - maxImport = nBn + try: + (await ctx.chain.importBlock(qItem.data.blocks[n])).isOkOr: + # The way out here is simply to re-compile the block queue. At any + # point, the `FC` module data area might have been moved to a new + # canonical branch. + # + ctx.poolMode = true + warn info & ": import block error (reorg triggered)", n, iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short, + `error`=error + maxImport = nBn + break importLoop + # isOk => continue + except CancelledError: + maxImport = nBn # shutdown? break importLoop # Allow pseudo/async thread switch. diff --git a/execution_chain/sync/sync_sched.nim b/execution_chain/sync/sync_sched.nim index 4a6a97608f..3fc78f547d 100644 --- a/execution_chain/sync/sync_sched.nim +++ b/execution_chain/sync/sync_sched.nim @@ -165,7 +165,7 @@ proc key(peer: Peer): ENode = # Private functions # ------------------------------------------------------------------------------ -proc terminate[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = +proc terminate[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: []).} = ## Request termination and wait for sub-tasks to finish mixin runRelease @@ -208,7 +208,7 @@ proc terminate[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = dsc.runCtrl = terminated -proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: [CancelledError]).} = +proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: []).} = mixin runDaemon if dsc.ctx.daemon and dsc.runCtrl == running: From a78a87163bfdce6db6b5ac540b3923e58afa5f5e Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Thu, 8 May 2025 11:52:05 +0100 Subject: [PATCH 7/8] Syncer: Keep the last peer if labelled `slow` but zombify on other errors why: PR #3204 introduced the concept of generally keeping the last peer even if syncing becomes nearly useless. This was an attempt to exploit the last peer even if it is labelled `slow`. This generality led to sort of syncer looping when fetching from a peer that repeatedly causes a disconnect exception so long as the peer is fully discarded by the p2p lib. This has been changed to keeping a `slow` sync peer if it is the last one, but zombify (i.e. discard and keep from reconnecting for a while) otherwise. --- execution_chain/sync/beacon/worker.nim | 2 +- execution_chain/sync/beacon/worker/blocks_staged.nim | 2 +- .../sync/beacon/worker/blocks_staged/bodies.nim | 7 ++++--- .../sync/beacon/worker/headers_staged/headers.nim | 5 +++-- .../sync/beacon/worker/headers_staged/staged_collect.nim | 2 +- execution_chain/sync/beacon/worker_desc.nim | 7 ------- 6 files changed, 10 insertions(+), 15 deletions(-) diff --git a/execution_chain/sync/beacon/worker.nim b/execution_chain/sync/beacon/worker.nim index 1fdfb9c2a4..667dad54c4 100644 --- a/execution_chain/sync/beacon/worker.nim +++ b/execution_chain/sync/beacon/worker.nim @@ -35,7 +35,7 @@ proc napUnlessSomethingToFetch( try: await sleepAsync workerIdleWaitInterval except CancelledError: - buddy.ctrl.zombie = buddy.infectedByTVirus + buddy.ctrl.stopped = true return true else: # Returning `false` => no need to check for shutdown diff --git a/execution_chain/sync/beacon/worker/blocks_staged.nim b/execution_chain/sync/beacon/worker/blocks_staged.nim index b05bb0a66d..fa12eb4f60 100644 --- a/execution_chain/sync/beacon/worker/blocks_staged.nim +++ b/execution_chain/sync/beacon/worker/blocks_staged.nim @@ -42,7 +42,7 @@ proc updateBuddyErrorState(buddy: BeaconBuddyRef) = fetchBodiesProcessErrThresholdCount < buddy.only.nBdyProcErrors: # Make sure that this peer does not immediately reconnect - buddy.ctrl.zombie = buddy.infectedByTVirus + buddy.ctrl.zombie = true # ------------------------------------------------------------------------------ # Private function(s) diff --git a/execution_chain/sync/beacon/worker/blocks_staged/bodies.nim b/execution_chain/sync/beacon/worker/blocks_staged/bodies.nim index b3f2ca6268..5ee8adb09c 100644 --- a/execution_chain/sync/beacon/worker/blocks_staged/bodies.nim +++ b/execution_chain/sync/beacon/worker/blocks_staged/bodies.nim @@ -24,10 +24,11 @@ import func bdyErrors*(buddy: BeaconBuddyRef): string = $buddy.only.nBdyRespErrors & "/" & $buddy.only.nBdyProcErrors -proc fetchRegisterError*(buddy: BeaconBuddyRef) = +proc fetchRegisterError*(buddy: BeaconBuddyRef, slowPeer = false) = buddy.only.nBdyRespErrors.inc if fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors: - buddy.ctrl.zombie = buddy.infectedByTVirus # abandon slow peer + if 1 < buddy.ctx.pool.nBuddies or not slowPeer: + buddy.ctrl.zombie = true # abandon slow peer unless last one proc bodiesFetch*( buddy: BeaconBuddyRef; @@ -80,7 +81,7 @@ proc bodiesFetch*( # mimimum share of the number of requested headers expected, typically 10%. if fetchBodiesReqErrThresholdZombie < elapsed or b.len.uint64 * 100 < nReq.uint64 * fetchBodiesReqMinResponsePC: - buddy.fetchRegisterError() + buddy.fetchRegisterError(slowPeer=true) else: buddy.only.nBdyRespErrors = 0 # reset error count diff --git a/execution_chain/sync/beacon/worker/headers_staged/headers.nim b/execution_chain/sync/beacon/worker/headers_staged/headers.nim index 1dba7cbfed..d32c3e59dd 100644 --- a/execution_chain/sync/beacon/worker/headers_staged/headers.nim +++ b/execution_chain/sync/beacon/worker/headers_staged/headers.nim @@ -21,10 +21,11 @@ import # Private functions # ------------------------------------------------------------------------------ -proc registerError(buddy: BeaconBuddyRef) = +proc registerError(buddy: BeaconBuddyRef, slowPeer = false) = buddy.incHdrRespErrors() if fetchHeadersReqErrThresholdCount < buddy.nHdrRespErrors: - buddy.ctrl.zombie = buddy.infectedByTVirus # abandon slow peer + if 1 < buddy.ctx.pool.nBuddies or not slowPeer: + buddy.ctrl.zombie = true # abandon slow peer unless last one # ------------------------------------------------------------------------------ # Public debugging & logging helpers diff --git a/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim b/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim index fa0fe8a7e0..36aa54603b 100644 --- a/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim +++ b/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim @@ -37,7 +37,7 @@ proc updateBuddyErrorState(buddy: BeaconBuddyRef) = fetchHeadersProcessErrThresholdCount < buddy.nHdrProcErrors: # Make sure that this peer does not immediately reconnect - buddy.ctrl.zombie = buddy.infectedByTVirus + buddy.ctrl.zombie = true proc updateBuddyProcError(buddy: BeaconBuddyRef) = buddy.incHdrProcErrors() diff --git a/execution_chain/sync/beacon/worker_desc.nim b/execution_chain/sync/beacon/worker_desc.nim index 8f1122530d..01cef7a39a 100644 --- a/execution_chain/sync/beacon/worker_desc.nim +++ b/execution_chain/sync/beacon/worker_desc.nim @@ -186,13 +186,6 @@ proc `hibernate=`*(ctx: BeaconCtxRef; val: bool) = # ----- -func infectedByTVirus*(buddy: BeaconBuddyRef): bool = - ## T-Virus: A series of mutant Progenitor virus strains developed - ## by Umbrella Corporation. - ## Keep the sole survivor of virus outbreak, we still can sync - ## from it even though it has become mindless. - buddy.ctx.pool.nBuddies > 1 - proc nHdrRespErrors*(buddy: BeaconBuddyRef): int = ## Getter, returns the number of `resp` errors for argument `buddy` buddy.only.nHdrRespErrors.int From edd5f49b614d02eba6932d250ec17a04f0fd609a Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Wed, 7 May 2025 10:24:45 +0100 Subject: [PATCH 8/8] Syncer: Make sure that at most one peer fetches headers deterministically why: This feature was accidentally removed in PR #3221. It has been re-added with a fat explaining comment. The problem was that under some circumstances, two peers were able to pretend to deterministically (i.e. addressed by hash) fetch different header ranges while in reality, they fetched the same range. This led the syncer to assume wrongly that all was downloaded while it was not. also: The fetch routine double checks that the block number at least of the first header is correct. So, fetching by hash (i.e. deterministically) will check that hash and received block number expect expectations. --- .../sync/beacon/worker/headers_staged.nim | 35 ++++++++++++++++++- .../beacon/worker/headers_staged/headers.nim | 9 +++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/execution_chain/sync/beacon/worker/headers_staged.nim b/execution_chain/sync/beacon/worker/headers_staged.nim index 5eeb431d2e..209b138a99 100644 --- a/execution_chain/sync/beacon/worker/headers_staged.nim +++ b/execution_chain/sync/beacon/worker/headers_staged.nim @@ -57,9 +57,42 @@ proc headersStagedCollect*( nOpportunistic = 0 # ditto block fetchHeadersBody: - + # # Start deterministically. Explicitely fetch/append by parent hash. + # + # Exactly one peer can fetch deterministically (i.e. hash based) and + # store headers directly on the header chain cache. All other peers fetch + # opportunistcally (i.e. block number based) and queue the headers for + # later serialisation. while true: + let top = ctx.headersUnprocAvailTop() + 1 + # + # A deterministic fetch can directly append to the lower end `dangling` + # of header chain cache. So this criteria is unique at a given time + # and when an interval is taken out of the `unproc` pool: + # :: + # ------------------| unproc pool + # |-------| block interval to fetch next + # |---------- already stored headers on cache + # top + # dangling + # + # After claiming the block interval that will be processed next for the + # deterministic fetch, the situation looks like + # :: + # ---------| unproc pool + # |-------| block interval to fetch next + # |---------- already stored headers on cache + # top dangling + # + # so any other peer arriving here will see a gap between `top` and + # `dangling` which will lead them to fetch opportunistcally. + if top < ctx.dangling.number: + break + + # Throw away overlap (should not happen anyway) + if ctx.dangling.number < top: + discard ctx.headersUnprocFetch(top-ctx.dangling.number).expect("iv") let # Reserve the full range of block numbers so they can be appended in a diff --git a/execution_chain/sync/beacon/worker/headers_staged/headers.nim b/execution_chain/sync/beacon/worker/headers_staged/headers.nim index d32c3e59dd..8fcaabb7a1 100644 --- a/execution_chain/sync/beacon/worker/headers_staged/headers.nim +++ b/execution_chain/sync/beacon/worker/headers_staged/headers.nim @@ -111,6 +111,15 @@ proc headersFetchReversed*( ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors return err() + # Verify that first block number matches + if h[^1].number != ivReq.minPt: + buddy.registerError() + trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, + hash=topHash.toStr, ivReqMinPt=ivReq.minPt.bnStr, ivRespMinPt=h[^1].bnStr, + nResp=h.len, elapsed=elapsed.toStr, + ctrl=buddy.ctrl.state, hdrErrors=buddy.hdrErrors + return err() + # Ban an overly slow peer for a while when seen in a row. Also there is a # mimimum share of the number of requested headers expected, typically 10%. if fetchHeadersReqErrThresholdZombie < elapsed or