diff --git a/execution_chain/config.nim b/execution_chain/config.nim index 0900335d57..be59d1817b 100644 --- a/execution_chain/config.nim +++ b/execution_chain/config.nim @@ -352,12 +352,6 @@ type "is accepted" name: "debug-beacon-sync-target-file" .}: Option[InputFile] - beaconSyncBlocksQueueHwm* {. - hidden - desc: "Limit number of blocks on staging queue for beacon sync" - defaultValue: 0 - name: "debug-beacon-sync-blocks-queue-hwm" .}: int - rocksdbMaxOpenFiles {. hidden defaultValue: defaultMaxOpenFiles diff --git a/execution_chain/core/chain/header_chain_cache.nim b/execution_chain/core/chain/header_chain_cache.nim index bb0756968c..688649972a 100644 --- a/execution_chain/core/chain/header_chain_cache.nim +++ b/execution_chain/core/chain/header_chain_cache.nim @@ -593,6 +593,12 @@ func head*(hc: HeaderChainRef): Header = if collecting <= hc.state: return hc.session.head +func headHash*(hc: HeaderChainRef): Hash32 = + ## Getter: hash of `head()` + ## + if collecting <= hc.state: + return hc.session.headHash + func antecedent*(hc: HeaderChainRef): Header = ## Getter: bottom of header chain. In case there is no header chain ## initialised, the return value is `Header()` (i.e. the block number diff --git a/execution_chain/nimbus_execution_client.nim b/execution_chain/nimbus_execution_client.nim index 79a957cc84..f781f52ccb 100644 --- a/execution_chain/nimbus_execution_client.nim +++ b/execution_chain/nimbus_execution_client.nim @@ -117,7 +117,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, # Always initialise beacon syncer nimbus.beaconSyncRef = BeaconSyncRef.init( - nimbus.ethNode, nimbus.fc, conf.maxPeers, conf.beaconSyncBlocksQueueHwm) + nimbus.ethNode, nimbus.fc, conf.maxPeers) # Optional for pre-setting the sync target (i.e. debugging) if conf.beaconSyncTargetFile.isSome(): diff --git a/execution_chain/sync/beacon.nim b/execution_chain/sync/beacon.nim index e3d43d3935..e7996039ed 100644 --- a/execution_chain/sync/beacon.nim +++ b/execution_chain/sync/beacon.nim @@ -62,11 +62,9 @@ proc init*( ethNode: EthereumNode; chain: ForkedChainRef; maxPeers: int; - blockQueueHwm = 0; ): T = var desc = T() desc.initSync(ethNode, maxPeers) - desc.ctx.pool.blkStagedHwm = blockQueueHwm desc.ctx.pool.chain = chain desc diff --git a/execution_chain/sync/beacon/worker.nim b/execution_chain/sync/beacon/worker.nim index 667dad54c4..dddbf2441a 100644 --- a/execution_chain/sync/beacon/worker.nim +++ b/execution_chain/sync/beacon/worker.nim @@ -24,14 +24,13 @@ import # Private functions # ------------------------------------------------------------------------------ -proc napUnlessSomethingToFetch( +proc napUnlessSomethingToCollect( buddy: BeaconBuddyRef; ): Future[bool] {.async: (raises: []).} = ## When idle, save cpu cycles waiting for something to do. - if buddy.ctx.pool.blkImportOk or # currently importing blocks - buddy.ctx.hibernate or # not activated yet? - not (buddy.headersStagedFetchOk() or # something on TODO list - buddy.blocksStagedFetchOk()): + if buddy.ctx.hibernate or # not activated yet? + not (buddy.headersStagedCollectOk() or # something on TODO list + buddy.blocksStagedCollectOk()): try: await sleepAsync workerIdleWaitInterval except CancelledError: @@ -118,7 +117,7 @@ proc runDaemon*( ) {.async: (raises: []).} = ## Global background job that will be re-started as long as the variable ## `ctx.daemon` is set `true` which corresponds to `ctx.hibernating` set - ## to false`. + ## to false. ## ## On a fresh start, the flag `ctx.daemon` will not be set `true` before the ## first usable request from the CL (via RPC) stumbles in. @@ -129,22 +128,14 @@ proc runDaemon*( return # Execute staged block records. - if ctx.blocksStagedCanImportOk(): - - block: - # Set flag informing peers to go into idle mode while importing takes - # place. It has been observed that importing blocks and downloading - # at the same time does not work very well, most probably due to high - # system activity while importing. Peers will get lost pretty soon after - # downloading starts if they continue downloading. - ctx.pool.blkImportOk = true - defer: ctx.pool.blkImportOk = false - - # Import from staged queue. - while await ctx.blocksStagedImport(info): - if not ctx.daemon or # Implied by external sync shutdown? - ctx.poolMode: # Oops, re-org needed? - return + if ctx.blocksStagedProcessOk(): + + # Import bodies from the `staged` queue. + discard await ctx.blocksStagedProcess info + + if not ctx.daemon or # Implied by external sync shutdown? + ctx.poolMode: # Oops, re-org needed? + return # At the end of the cycle, leave time to trigger refill headers/blocks try: await sleepAsync daemonWaitInterval @@ -187,23 +178,36 @@ proc runPeer*( buddy.only.multiRunIdle = Moment.now() - buddy.only.stoppedMultiRun buddy.only.nMultiLoop.inc # statistics/debugging - if not await buddy.napUnlessSomethingToFetch(): + if not await buddy.napUnlessSomethingToCollect(): # Download and process headers and blocks - while buddy.headersStagedFetchOk(): + while buddy.headersStagedCollectOk(): # Collect headers and either stash them on the header chain cache - # directly, or stage then on the header queue to get them serialised, - # later. - if await buddy.headersStagedCollect info: + # directly, or stage on the header queue to get them serialised and + # stashed, later. + await buddy.headersStagedCollect info - # Store headers from the `staged` queue onto the header chain cache. - buddy.headersStagedProcess info + # Store serialised headers from the `staged` queue onto the header + # chain cache. + if not buddy.headersStagedProcess info: + # Need to proceed with another peer (e.g. gap between queue and + # header chain cache.) + break # Fetch bodies and combine them with headers to blocks to be staged. These # staged blocks are then excuted by the daemon process (no `peer` needed.) - while buddy.blocksStagedFetchOk(): - discard await buddy.blocksStagedCollect info + while buddy.blocksStagedCollectOk(): + + # Collect bodies and either import them via `FC` module, or stage on + # the blocks queue to get them serialised and imported, later. + await buddy.blocksStagedCollect info + + # Import bodies from the `staged` queue. + if not await buddy.blocksStagedProcess info: + # Need to proceed with another peer (e.g. gap between top imported + # block and blocks queue.) + break # Note that it is important **not** to leave this function to be # re-invoked by the scheduler unless necessary. While the time gap diff --git a/execution_chain/sync/beacon/worker/blocks_staged.nim b/execution_chain/sync/beacon/worker/blocks_staged.nim index 935219f036..fee39f6c5e 100644 --- a/execution_chain/sync/beacon/worker/blocks_staged.nim +++ b/execution_chain/sync/beacon/worker/blocks_staged.nim @@ -11,400 +11,255 @@ {.push raises:[].} import - pkg/[chronicles, chronos], + pkg/[chronicles, chronos, results], pkg/eth/common, pkg/stew/[interval_set, sorted_set], + ../../../networking/p2p, ../worker_desc, - ./blocks_staged/bodies, - ../../wire_protocol/types, - ./[blocks_unproc, helpers, update] + ./blocks_staged/[bodies, staged_blocks], + ./[blocks_unproc, helpers] # ------------------------------------------------------------------------------ -# Private debugging & logging helpers +# Private function(s) # ------------------------------------------------------------------------------ -formatIt(Hash32): - it.short - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ +proc blocksStagedProcessImpl( + ctx: BeaconCtxRef; + maybePeer: Opt[Peer]; + info: static[string]; + ): Future[bool] + {.async: (raises: []).} = + ## Import/execute blocks record from staged queue. + ## + ## The function returns `false` if the caller should make sure to allow + ## to switch to another sync peer, e.g. for directly filling the gap + ## between the top of the `topImported` and the least queue block number. + ## + if ctx.blk.staged.len == 0: + trace info & ": blocksStagedProcess empty queue", peer=($maybePeer), + topImported=ctx.blk.topImported.bnStr, nStagedQ=ctx.blk.staged.len, + poolMode=ctx.poolMode, syncState=ctx.pool.lastState, + nSyncPeers=ctx.pool.nBuddies + return false # switch peer -proc getNthHash(ctx: BeaconCtxRef; blk: BlocksForImport; n: int): Hash32 = - ctx.hdrCache.getHash(blk.blocks[n].header.number).valueOr: - return zeroHash32 + var + nImported = 0u64 # statistics + switchPeer = false # for return code -proc updateBuddyErrorState(buddy: BeaconBuddyRef) = - ## Helper/wrapper - if ((0 < buddy.only.nBdyRespErrors or - 0 < buddy.only.nBdyProcErrors) and buddy.ctrl.stopped) or - fetchBodiesReqErrThresholdCount < buddy.only.nBdyRespErrors or - fetchBodiesProcessErrThresholdCount < buddy.only.nBdyProcErrors: + trace info & ": blocksStagedProcess start", peer=($maybePeer), + topImported=ctx.blk.topImported.bnStr, nStagedQ=ctx.blk.staged.len, + poolMode=ctx.poolMode, syncState=ctx.pool.lastState, + nSyncPeers=ctx.pool.nBuddies - # Make sure that this peer does not immediately reconnect - buddy.ctrl.zombie = true + var minNum = BlockNumber(0) + while ctx.pool.lastState == processingBlocks: -# ------------------------------------------------------------------------------ -# Private function(s) -# ------------------------------------------------------------------------------ + # Fetch list with the least block numbers + let qItem = ctx.blk.staged.ge(0).valueOr: + break # all done -proc fetchAndCheck( - buddy: BeaconBuddyRef; - ivReq: BnRange; - blk: ref BlocksForImport; # update in place - info: static[string]; - ): Future[bool] {.async: (raises: []).} = + # Make sure that the lowest block is available, already. Or the other way + # round: no unprocessed block number range precedes the least staged block. + minNum = qItem.data.blocks[0].header.number + if ctx.blk.topImported + 1 < minNum: + trace info & ": block queue not ready yet", peer=($maybePeer), + topImported=ctx.blk.topImported.bnStr, qItem=qItem.data.blocks.bnStr, + nStagedQ=ctx.blk.staged.len, nSyncPeers=ctx.pool.nBuddies + switchPeer = true # there is a gap -- come back later + break - let - ctx = buddy.ctx - offset = blk.blocks.len.uint64 - - # Make sure that the block range matches the top - doAssert offset == 0 or blk.blocks[offset - 1].header.number+1 == ivReq.minPt - - # Preset/append headers to be completed with bodies. Also collect block hashes - # for fetching missing blocks. - blk.blocks.setLen(offset + ivReq.len) - var request = BlockBodiesRequest( - blockHashes: newSeq[Hash32](ivReq.len) - ) - for n in 1u ..< ivReq.len: - let header = ctx.hdrCache.get(ivReq.minPt + n).valueOr: - # There is nothing one can do here - info "Block header missing (reorg triggered)", ivReq, n, - nth=(ivReq.minPt + n).bnStr - # So require reorg - blk.blocks.setLen(offset) - ctx.poolMode = true - return false - request.blockHashes[n - 1] = header.parentHash - blk.blocks[offset + n].header = header - blk.blocks[offset].header = ctx.hdrCache.get(ivReq.minPt).valueOr: - # There is nothing one can do here - info "Block header missing (reorg triggered)", ivReq, n=0, - nth=ivReq.minPt.bnStr - # So require reorg - blk.blocks.setLen(offset) - ctx.poolMode = true - return false - request.blockHashes[ivReq.len - 1] = - blk.blocks[offset + ivReq.len - 1].header.computeBlockHash - - # Fetch bodies - let bodies = block: - let rc = await buddy.bodiesFetch(request, info) - if rc.isErr: - blk.blocks.setLen(offset) - return false - rc.value - - # Append bodies, note that the bodies are not fully verified here but rather - # when they are imported and executed. - let nBodies = bodies.len.uint64 - if nBodies < ivReq.len: - blk.blocks.setLen(offset + nBodies) - block loop: - for n in 0 ..< nBodies: - block checkTxLenOk: - if blk.blocks[offset + n].header.transactionsRoot != emptyRoot: - if 0 < bodies[n].transactions.len: - break checkTxLenOk - else: - if bodies[n].transactions.len == 0: - break checkTxLenOk - # Oops, cut off the rest - blk.blocks.setLen(offset + n) - buddy.fetchRegisterError() - trace info & ": cut off fetched junk", peer=buddy.peer, ivReq, n, - nTxs=bodies[n].transactions.len, nBodies, bdyErrors=buddy.bdyErrors - break loop - - blk.blocks[offset + n].transactions = bodies[n].transactions - blk.blocks[offset + n].uncles = bodies[n].uncles - blk.blocks[offset + n].withdrawals = bodies[n].withdrawals - - if offset < blk.blocks.len.uint64: - return true - - buddy.only.nBdyProcErrors.inc - return false + # Remove from queue + discard ctx.blk.staged.delete qItem.key -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ + # Import blocks list + await ctx.blocksImport(maybePeer, qItem.data.blocks, info) -func blocksStagedCanImportOk*(ctx: BeaconCtxRef): bool = - ## Check whether the queue is at its maximum size so import can start with - ## a full queue. - ## - if ctx.poolMode: - # Re-org is scheduled - return false + # Import probably incomplete, so a partial roll back may be needed + let lastBn = qItem.data.blocks[^1].header.number + if ctx.blk.topImported < lastBn: + ctx.blocksUnprocAppend(ctx.blk.topImported+1, lastBn) - if 0 < ctx.blk.staged.len: - # Start importing if there are no more blocks available. So they have - # either been all staged, or are about to be staged. For the latter - # case wait until finished with current block downloads. - if ctx.blocksUnprocAvail() == 0: + nImported += ctx.blk.topImported - minNum + 1 + # End while loop - # Wait until finished with current block downloads - return ctx.blocksBorrowedIsEmpty() + if 0 < nImported: + info "Blocks serialised and imported", + topImported=ctx.blk.topImported.bnStr, nImported, + nStagedQ=ctx.blk.staged.len, nSyncPeers=ctx.pool.nBuddies, switchPeer - # Make sure that the lowest block is available, already. Or the other way - # round: no unprocessed block number range precedes the least staged block. - if ctx.blk.staged.ge(0).value.key < ctx.blocksUnprocTotalBottom(): - # Also suggest importing blocks if there is currently no peer active. - # The `unprocessed` ranges will contain some higher number block ranges, - # but these can be fetched later. - if ctx.pool.nBuddies == 0: - return true - - # If the last peer is labelled `slow` it will be ignored for the sake - # of deciding whether to execute blocks. - # - # As a consequence, the syncer will import blocks immediately allowing - # the syncer to collect more sync peers. - if ctx.pool.nBuddies == 1 and ctx.pool.blkLastSlowPeer.isSome: - return true - - # If importing starts while peers are actively downloading, the system - # tends to loose download peers, most probably due to high system - # activity. - # - # * Typical download time to download and stage a queue record ~15s (raw - # download time typically ranges ~30ms ..~10s) - # - # * Anecdotal time to connect to a new download peer ~5m..~10m - # - # This implies that a staged full queue with 4 records typically does - # not take more than a minute, much less if enough peers are available - # while the penalty of potentially losing peers is a multiple of the - # queue ramp up time. - # - # So importing does not start before the queue is filled up. - if ctx.pool.blkStagedLenHwm <= ctx.blk.staged.len: + elif 0 < ctx.blk.staged.len and not switchPeer: + trace info & ": no blocks unqueued", peer=($maybePeer), + topImported=ctx.blk.topImported.bnStr, nStagedQ=ctx.blk.staged.len, + nSyncPeers=ctx.pool.nBuddies - # Wait until finished with current block downloads - return ctx.blocksBorrowedIsEmpty() + trace info & ": blocksStagedProcess end", peer=($maybePeer), + topImported=ctx.blk.topImported.bnStr, nImported, minNum, + nStagedQ=ctx.blk.staged.len, nSyncPeers=ctx.pool.nBuddies, switchPeer - false + return not switchPeer +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ -func blocksStagedFetchOk*(buddy: BeaconBuddyRef): bool = - ## Check whether body records can be fetched and stored on the `staged` queue. +func blocksStagedCollectOk*(buddy: BeaconBuddyRef): bool = + ## Check whether body records can be fetched and imported or stored + ## on the `staged` queue. ## if buddy.ctrl.running: - let ctx = buddy.ctx - if not ctx.poolMode: - - if 0 < ctx.blocksUnprocAvail(): - # Fetch if there is space on the queue. - if ctx.blk.staged.len < ctx.pool.blkStagedLenHwm: - return true - - # Make sure that there is no gap at the bottom which needs to be - # fetched regardless of the length of the queue. - if ctx.blocksUnprocAvailBottom() < ctx.blk.staged.ge(0).value.key: - return true + if 0 < ctx.blocksUnprocAvail() and + not ctx.blocksModeStopped(): + return true false +proc blocksStagedProcessOk*(ctx: BeaconCtxRef): bool = + ## Check whether import processing is possible + ## + not ctx.poolMode and + 0 < ctx.blk.staged.len +# -------------- proc blocksStagedCollect*( buddy: BeaconBuddyRef; info: static[string]; - ): Future[bool] {.async: (raises: []).} = - ## Collect bodies and stage them. + ) {.async: (raises: []).} = + ## Collect bodies and import or stage them. ## let ctx = buddy.ctx peer = buddy.peer - if ctx.blocksUnprocAvail() == 0 or # all done already? - ctx.poolMode: # reorg mode? - return false # nothing to do - - let - # Fetch the full range of headers to be completed to blocks - iv = ctx.blocksUnprocFetch(nFetchBodiesBatch.uint64).expect "valid interval" + if ctx.blocksUnprocIsEmpty(): + return # no action var - # This value is used for splitting the interval `iv` into - # `already-collected + [ivBottom,somePt] + [somePt+1,iv.maxPt]` where the - # middle interval `[ivBottom,somePt]` will be fetched from the network. - ivBottom = iv.minPt - - # This record will accumulate the fetched headers. It must be on the heap - # so that `async` can capture that properly. - blk = (ref BlocksForImport)() - - # Flag, not to reset error count - haveError = false - - while true: - # Extract bottom range interval and fetch/stage it - let - ivReqMax = if iv.maxPt < ivBottom + nFetchBodiesRequest - 1: iv.maxPt - else: ivBottom + nFetchBodiesRequest - 1 - - # Request interval - ivReq = BnRange.new(ivBottom, ivReqMax) - - # Current length of the blocks queue. This is used to calculate the - # response length from the network. - nBlkBlocks = blk.blocks.len - - # Fetch and extend staging record - if not await buddy.fetchAndCheck(ivReq, blk, info): - if ctx.poolMode: - # Reorg requested? - ctx.blocksUnprocCommit(iv, iv) - return false - - haveError = true - - # Throw away first time block fetch data. Keep other data for a - # partially assembled list. - if nBlkBlocks == 0: - buddy.updateBuddyErrorState() - - if ctx.pool.seenData: - trace info & ": current blocks discarded", peer, iv, ivReq, - nStaged=ctx.blk.staged.len, ctrl=buddy.ctrl.state, - bdyErrors=buddy.bdyErrors - else: - # Collect peer for detecting cul-de-sac syncing (i.e. non-existing - # block chain or similar.) This covers the case when headers are - # available but not block bodies. - ctx.pool.failedPeers.incl buddy.peerID - - debug info & ": no blocks yet", peer, ctrl=buddy.ctrl.state, - failedPeers=ctx.pool.failedPeers.len, bdyErrors=buddy.bdyErrors - - ctx.blocksUnprocCommit(iv, iv) - # At this stage allow a task switch so that some other peer might try - # to work on the currently returned interval. - try: await sleepAsync asyncThreadSwitchTimeSlot - except CancelledError: discard - return false - - # So there were some bodies downloaded already. Turn back unused data - # and proceed with staging. - trace info & ": list partially failed", peer, iv, ivReq, - unused=BnRange.new(ivBottom,iv.maxPt) - # There is some left over to store back - ctx.blocksUnprocCommit(iv, ivBottom, iv.maxPt) - break + nImported = 0u64 # statistics, to be updated + nQueued = 0 # ditto + + block fetchBlocksBody: + # + # Start deterministically. Explicitely fetch/append by parent hash. + # + # Exactly one peer can fetch and import store blocks directly on the `FC` + # module. All other peers fetch and queue blocks for later serialisation. + while true: + let bottom = ctx.blocksUnprocAvailBottom() - 1 + # + # A direct fetch and blocks import is possible if the next block to + # fetch neigbours the already imported blocks ening at `lastImported`. + # So this criteria is unique at a given time and when an interval is + # taken out of the `unproc` pool: + # :: + # |------------------ unproc pool + # |-------| block interval to fetch next + # ----------| already imported into `FC` module + # bottom + # topImported + # + # After claiming the block interval that will be processed next for the + # deterministic fetch, the situation for the new `bottom` would look like + # :: + # |--------- unproc pool + # |-------| block interval to fetch next + # ----------| already imported into `FC` module + # topImported bottom + # + if ctx.blk.topImported < bottom: + break - # There are block body data for this scrum - ctx.pool.seenData = true + # Throw away overlap (should not happen anyway) + if bottom < ctx.blk.topImported: + discard ctx.blocksUnprocFetch(ctx.blk.topImported - bottom).expect("iv") - # Update remaining interval - let ivRespLen = blk.blocks.len - nBlkBlocks - if iv.maxPt < ivBottom + ivRespLen.uint64: - # All collected - ctx.blocksUnprocCommit(iv) - break + trace info & ": blocksStagedCollect direct loop", peer, + ctrl=buddy.ctrl.state, poolMode=ctx.poolMode, + syncState=ctx.pool.lastState, topImported=ctx.blk.topImported.bnStr, + bottom=bottom.bnStr - ivBottom += ivRespLen.uint64 # will mostly result into `ivReq.maxPt+1` + # Fetch blocks and verify result + let blocks = (await buddy.blocksFetch(nFetchBodiesRequest, info)).valueOr: + break fetchBlocksBody # done, exit this function - if buddy.ctrl.stopped or ctx.poolMode: - # There is some left over to store back. And `ivBottom <= iv.maxPt` - # because of the check against `ivRespLen` above. - ctx.blocksUnprocCommit(iv, ivBottom, iv.maxPt) - break + # Set flag that there were some blocks fetched at all + ctx.pool.seenData = true # blocks data exist - # Store `blk` chain on the `staged` queue - let qItem = ctx.blk.staged.insert(iv.minPt).valueOr: - raiseAssert info & ": duplicate key on staged queue iv=" & $iv - qItem.data = blk[] + # Import blocks (no staging) + await ctx.blocksImport(Opt.some(peer), blocks, info) - # Reset block process errors (not too many consecutive failures this time) - if not haveError: - buddy.only.nBdyProcErrors = 0 + # Import probably incomplete, so a partial roll back may be needed + let lastBn = blocks[^1].header.number + if ctx.blk.topImported < lastBn: + ctx.blocksUnprocAppend(ctx.blk.topImported + 1, lastBn) - info "Downloaded blocks", iv=blk.blocks.bnStr, - nBlocks=blk.blocks.len, nStaged=ctx.blk.staged.len, - nSyncPeers=ctx.pool.nBuddies + # statistics + nImported += ctx.blk.topImported - blocks[0].header.number + 1 - return true + # Buddy might have been cancelled while importing blocks. + if buddy.ctrl.stopped or ctx.poolMode: + break fetchBlocksBody # done, exit this function + # End while: headersUnprocFetch() + blocksImport() -proc blocksStagedImport*( - ctx: BeaconCtxRef; - info: static[string]; - ): Future[bool] - {.async: (raises: []).} = - ## Import/execute blocks record from staged queue - ## - let qItem = ctx.blk.staged.ge(0).valueOr: - # Empty queue - return false + # Continue fetching blocks and queue them (if any) + if ctx.blk.staged.len + ctx.blk.reserveStaged < blocksStagedQueueLengthHwm: - # Make sure that the lowest block is available, already. Or the other way - # round: no unprocessed block number range precedes the least staged block. - let uBottom = ctx.blocksUnprocTotalBottom() - if uBottom < qItem.key: - trace info & ": block queue not ready yet", nSyncPeers=ctx.pool.nBuddies, - unprocBottom=uBottom.bnStr, least=qItem.key.bnStr - return false + # Fetch blocks and verify result + ctx.blk.reserveStaged.inc # Book a slot on `staged` + let rc = await buddy.blocksFetch(nFetchBodiesRequest, info) + ctx.blk.reserveStaged.dec # Free that slot again - # Remove from queue - discard ctx.blk.staged.delete qItem.key + if rc.isErr: + break fetchBlocksBody # done, exit this function - let - nBlocks = qItem.data.blocks.len - iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1) - - info "Importing blocks", iv, nBlocks, - base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr, - target=ctx.head.bnStr - - var maxImport = iv.maxPt # tentatively assume all ok - block importLoop: - for n in 0 ..< nBlocks: - let nBn = qItem.data.blocks[n].header.number - if nBn <= ctx.chain.baseNumber: - trace info & ": ignoring block less eq. base", n, iv, - B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short - continue - - try: - (await ctx.chain.importBlock(qItem.data.blocks[n])).isOkOr: - # The way out here is simply to re-compile the block queue. At any - # point, the `FC` module data area might have been moved to a new - # canonical branch. - # - ctx.poolMode = true - warn info & ": import block error (reorg triggered)", n, iv, - B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - nthBn=nBn.bnStr, nthHash=ctx.getNthHash(qItem.data, n).short, - `error`=error - maxImport = nBn - break importLoop - # isOk => continue - except CancelledError: - maxImport = nBn # shutdown? - break importLoop - - # Allow pseudo/async thread switch. - (await ctx.updateAsyncTasks()).isOkOr: - maxImport = nBn # shutdown? - break importLoop - - # Import probably incomplete, so a partial roll back may be needed - if maxImport < iv.maxPt: - ctx.blocksUnprocAppend(maxImport+1, iv.maxPt) - - info "Import done", iv=(iv.minPt, maxImport).bnStr, - nBlocks=(maxImport-iv.minPt+1), nFailed=(iv.maxPt-maxImport), - base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr, - target=ctx.head.bnStr - - return true + let + blocks = rc.value + + # Insert blocks list on the `staged` queue + key = blocks[0].header.number + qItem = ctx.blk.staged.insert(key).valueOr: + raiseAssert info & ": duplicate key on staged queue iv=" & + (key, blocks[^1].header.number).bnStr + + qItem.data.blocks = blocks # store `blocks[]` list + + nQueued += blocks.len # statistics + + # End block: `fetchBlocksBody` + + if nImported == 0 and nQueued == 0: + if not ctx.pool.seenData and + buddy.peerID notin ctx.pool.failedPeers and + buddy.ctrl.stopped: + # Collect peer for detecting cul-de-sac syncing (i.e. non-existing + # block chain or similar.) + ctx.pool.failedPeers.incl buddy.peerID + + debug info & ": no blocks yet", peer, ctrl=buddy.ctrl.state, + poolMode=ctx.poolMode, syncState=ctx.pool.lastState, + failedPeers=ctx.pool.failedPeers.len, bdyErrors=buddy.bdyErrors + return + info "Queued/staged or imported blocks", + topImported=ctx.blk.topImported.bnStr, + unprocBottom=(if ctx.blocksModeStopped(): "n/a" + else: ctx.blocksUnprocAvailBottom.bnStr), + nQueued, nImported, nStagedQ=ctx.blk.staged.len, + nSyncPeers=ctx.pool.nBuddies + + +template blocksStagedProcess*( + ctx: BeaconCtxRef; + info: static[string]; + ): auto = + ctx.blocksStagedProcessImpl(Opt.none(Peer), info) + +template blocksStagedProcess*( + buddy: BeaconBuddyRef; + info: static[string]; + ): auto = + buddy.ctx.blocksStagedProcessImpl(Opt.some(buddy.peer), info) proc blocksStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = @@ -429,7 +284,7 @@ proc blocksStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = # Reset block queues debug info & ": Flushing block queues", nUnproc=ctx.blocksUnprocTotal(), - nStaged=ctx.blk.staged.len, nReorg=ctx.pool.nReorg + nStagedQ=ctx.blk.staged.len, nReorg=ctx.pool.nReorg ctx.blocksUnprocClear() ctx.blk.staged.clear() diff --git a/execution_chain/sync/beacon/worker/blocks_staged/staged_blocks.nim b/execution_chain/sync/beacon/worker/blocks_staged/staged_blocks.nim new file mode 100644 index 0000000000..eb4097aa70 --- /dev/null +++ b/execution_chain/sync/beacon/worker/blocks_staged/staged_blocks.nim @@ -0,0 +1,211 @@ +# Nimbus +# Copyright (c) 2023-2025 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises:[].} + +import + pkg/[chronicles, chronos, results], + pkg/eth/common, + pkg/stew/interval_set, + ../../../../networking/p2p, + ../../../wire_protocol/types, + ../../worker_desc, + ../[blocks_unproc, helpers, update], + ./bodies + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +formatIt(Hash32): + it.short + +proc getNthHash(ctx: BeaconCtxRef; blocks: seq[EthBlock]; n: int): Hash32 = + ctx.hdrCache.getHash(blocks[n].header.number).valueOr: + return zeroHash32 + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc blocksFetchCheckImpl( + buddy: BeaconBuddyRef; + iv: BnRange; + info: static[string]; + ): Future[Opt[seq[EthBlock]]] + {.async: (raises: []).} = + ## From the ptp/ethXX network fetch the argument range `iv` of block bodies + ## and assemble a list of blocks to be returned. + ## + ## The block bodies are heuristically verified, the headers are taken from + ## the header chain cache. + ## + let + ctx = buddy.ctx + peer = buddy.peer + + # Preset/append headers to be completed with bodies. Also collect block hashes + # for fetching missing blocks. + var + request = BlockBodiesRequest(blockHashes: newSeqUninit[Hash32](iv.len)) + blocks = newSeq[EthBlock](iv.len) + + for n in 1u ..< iv.len: + let header = ctx.hdrCache.get(iv.minPt + n).valueOr: + # There is nothing one can do here + info "Block header missing (reorg triggered)", peer, iv, n, + nth=(iv.minPt + n).bnStr + ctx.poolMode = true # So require reorg + return Opt.none(seq[EthBlock]) + request.blockHashes[n - 1] = header.parentHash + blocks[n].header = header + blocks[0].header = ctx.hdrCache.get(iv.minPt).valueOr: + # There is nothing one can do here + info "Block header missing (reorg triggered)", peer, iv, n=0, + nth=iv.minPt.bnStr + ctx.poolMode = true # So require reorg + return Opt.none(seq[EthBlock]) + request.blockHashes[^1] = blocks[^1].header.computeBlockHash + + # Fetch bodies + let bodies = (await buddy.bodiesFetch(request, info)).valueOr: + return Opt.none(seq[EthBlock]) + if buddy.ctrl.stopped: + return Opt.none(seq[EthBlock]) + + # Append bodies, note that the bodies are not fully verified here but rather + # when they are imported and executed. + let nBodies = bodies.len.uint64 + if nBodies < iv.len: + blocks.setLen(nBodies) + block loop: + for n in 0 ..< nBodies: + block checkTxLenOk: + if blocks[n].header.transactionsRoot != emptyRoot: + if 0 < bodies[n].transactions.len: + break checkTxLenOk + else: + if bodies[n].transactions.len == 0: + break checkTxLenOk + # Oops, cut off the rest + blocks.setLen(n) # curb off junk + buddy.fetchRegisterError() + trace info & ": cut off junk blocks", peer, iv, n, + nTxs=bodies[n].transactions.len, nBodies, bdyErrors=buddy.bdyErrors + break loop + + blocks[n].transactions = bodies[n].transactions + blocks[n].uncles = bodies[n].uncles + blocks[n].withdrawals = bodies[n].withdrawals + + if 0 < blocks.len.uint64: + return Opt.some(blocks) + + buddy.only.nBdyProcErrors.inc + return Opt.none(seq[EthBlock]) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +func blocksModeStopped*(ctx: BeaconCtxRef): bool = + ## Helper, checks whether there is a general stop conditions based on + ## state settings (not on sync peer ctrl as `buddy.ctrl.running`.) + ctx.poolMode or + ctx.pool.lastState != processingBlocks + + +proc blocksFetch*( + buddy: BeaconBuddyRef; + num: uint; + info: static[string]; + ): Future[Opt[seq[EthBlock]]] + {.async: (raises: []).} = + ## From the p2p/ethXX network fetch as many blocks as given as argument `num`. + let + ctx = buddy.ctx + + # Fetch nect available interval + iv = ctx.blocksUnprocFetch(num).valueOr: + return Opt.none(seq[EthBlock]) + + # Fetch blocks and verify result + rc = await buddy.blocksFetchCheckImpl(iv, info) + + # Commit blocks received + if rc.isErr: + ctx.blocksUnprocCommit(iv, iv) + else: + ctx.blocksUnprocCommit(iv, iv.minPt + rc.value.len.uint64, iv.maxPt) + + return rc + + +proc blocksImport*( + ctx: BeaconCtxRef; + maybePeer: Opt[Peer]; + blocks: seq[EthBlock]; + info: static[string]; + ) {.async: (raises: []).} = + ## Import/execute a list of argument blocks. The function sets the global + ## block number of the last executed block which might preceed the least block + ## number from the argument list in case of an error. + ## + let iv = BnRange.new(blocks[0].header.number, blocks[^1].header.number) + doAssert iv.len == blocks.len.uint64 + + trace info & ": Start importing blocks", peer=($maybePeer), iv, + nBlocks=iv.len, base=ctx.chain.baseNumber.bnStr, + head=ctx.chain.latestNumber.bnStr, target=ctx.head.bnStr + + block loop: + for n in 0 ..< blocks.len: + let nBn = blocks[n].header.number + + if nBn <= ctx.chain.baseNumber: + trace info & ": ignoring block less eq. base", n, iv, nBlocks=iv.len, + nthBn=nBn.bnStr, nthHash=ctx.getNthHash(blocks, n), + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr + + ctx.blk.topImported = nBn # well, not really imported + continue + + try: + (await ctx.chain.importBlock(blocks[n])).isOkOr: + # The way out here is simply to re-compile the block queue. At any + # point, the `FC` module data area might have been moved to a new + # canonical branch. + # + ctx.poolMode = true + warn info & ": import block error (reorg triggered)", n, iv, + nBlocks=iv.len, nthBn=nBn.bnStr, nthHash=ctx.getNthHash(blocks, n), + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + `error`=error + break loop + # isOk => next instruction + except CancelledError: + break loop # shutdown? + + ctx.blk.topImported = nBn # Block imported OK + + # Allow pseudo/async thread switch. + (await ctx.updateAsyncTasks()).isOkOr: + break loop + + info "Imported blocks", iv=(if iv.minPt <= ctx.blk.topImported: + (iv.minPt, ctx.blk.topImported).bnStr else: "n/a"), + nBlocks=(ctx.blk.topImported - iv.minPt + 1), + nFailed=(iv.maxPt - ctx.blk.topImported), + base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr, + target=ctx.head.bnStr + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/execution_chain/sync/beacon/worker/headers_staged.nim b/execution_chain/sync/beacon/worker/headers_staged.nim index 209b138a99..8d2ab20e7f 100644 --- a/execution_chain/sync/beacon/worker/headers_staged.nim +++ b/execution_chain/sync/beacon/worker/headers_staged.nim @@ -22,39 +22,35 @@ import # Public functions # ------------------------------------------------------------------------------ -func headersStagedFetchOk*(buddy: BeaconBuddyRef): bool = - # Helper for `worker.nim`, etc. - 0 < buddy.ctx.headersUnprocAvail() and - buddy.ctrl.running and - not buddy.ctx.collectModeStopped() - +func headersStagedCollectOk*(buddy: BeaconBuddyRef): bool = + ## Helper for `worker.nim`, etc. + if buddy.ctrl.running: + let ctx = buddy.ctx + if 0 < ctx.headersUnprocAvail() and + not ctx.collectModeStopped(): + return true + false proc headersStagedCollect*( buddy: BeaconBuddyRef; info: static[string]; - ): Future[bool] {.async: (raises: []).} = + ) {.async: (raises: []).} = ## Collect headers and either stash them on the header chain cache directly, ## or stage then on the header queue to get them serialised, later. The ## header queue serialisation is needed in case of several peers fetching ## and processing headers concurrently. ## - ## If there are headers left to process, tThis function will always stages - ## a header list record on the header queue for serialisation, and returns - ## `true`. - ## - ## Otherwise the function returns `false` if there are no headers left to be - ## processed. - ## let ctx = buddy.ctx peer = buddy.peer if ctx.headersUnprocIsEmpty() or ctx.hdrCache.state != collecting: - return false # no action + return # no action + var - nDeterministic = 0u64 # statistics, to be updated - nOpportunistic = 0 # ditto + nStored = 0u64 # statistics, to be updated + nQueued = 0 # ditto block fetchHeadersBody: # @@ -78,7 +74,7 @@ proc headersStagedCollect*( # dangling # # After claiming the block interval that will be processed next for the - # deterministic fetch, the situation looks like + # deterministic fetch, the situation for the new `top` would look like # :: # ---------| unproc pool # |-------| block interval to fetch next @@ -110,16 +106,16 @@ proc headersStagedCollect*( # Check whether there were some headers fetched at all if bottom < iv.maxPt: - nDeterministic += (iv.maxPt - bottom) # statistics + nStored += (iv.maxPt - bottom) # statistics ctx.pool.seenData = true # header data exist # Job might have been cancelled or completed while downloading headers. # If so, no more bookkeeping of headers must take place. The *books* # might have been reset and prepared for the next stage. if ctx.collectModeStopped(): - trace info & ": deterministic headers fetch stopped", peer, iv, - bottom=bottom.bnStr, nDeterministic, syncState=ctx.pool.lastState, - cacheMode=ctx.hdrCache.state + trace info & ": stopped fetching/storing headers", peer, iv, + bottom=bottom.bnStr, nStored, ctrl=buddy.ctrl.state, + syncState=ctx.pool.lastState, cacheMode=ctx.hdrCache.state break fetchHeadersBody # done, exit this function # Commit partially processed block numbers @@ -129,12 +125,11 @@ proc headersStagedCollect*( ctx.headersUnprocCommit(iv) # all headers processed - debug info & ": deterministic headers fetch count", peer, + debug info & ": fetched headers count", peer, unprocTop=ctx.headersUnprocAvailTop.bnStr, D=ctx.dangling.bnStr, - nDeterministic, nStaged=ctx.hdr.staged.len, ctrl=buddy.ctrl.state + nStored, nStagedQ=ctx.hdr.staged.len, ctrl=buddy.ctrl.state - # Buddy might have been cancelled while downloading headers. Still - # bookkeeping (aka commiting unused `iv`) needed to proceed. + # Buddy might have been cancelled while downloading headers. if buddy.ctrl.stopped: break fetchHeadersBody @@ -142,7 +137,8 @@ proc headersStagedCollect*( # Continue opportunistically fetching by block number rather than hash. The # fetched headers need to be staged and checked/serialised later. - block: + if ctx.hdr.staged.len + ctx.hdr.reserveStaged < headersStagedQueueLengthHwm: + let # Comment see deterministic case iv = ctx.headersUnprocFetch(nFetchHeadersBatch).valueOr: @@ -152,19 +148,21 @@ proc headersStagedCollect*( # heap so that `async` can capture that properly. lhc = (ref LinkedHChain)(peerID: buddy.peerID) - # Fetch headers and fill up the headers list of `lhc`. The function - # returns the last unprocessed block number. - bottom = await buddy.collectAndStageOnMemQueue(iv, lhc, info) + # Fetch headers and fill up the headers list of `lhc`. The function + # returns the last unprocessed block number. + ctx.hdr.reserveStaged.inc # Book a slot on `staged` + let bottom = await buddy.collectAndStageOnMemQueue(iv, lhc, info) + ctx.hdr.reserveStaged.dec # Free that slot again - nOpportunistic = lhc.revHdrs.len # statistics + nQueued = lhc.revHdrs.len # statistics # Job might have been cancelled or completed while downloading headers. # If so, no more bookkeeping of headers must take place. The *books* # might have been reset and prepared for the next stage. if ctx.collectModeStopped(): - trace info & ": staging headers fetch stopped", peer, iv, - bottom=bottom.bnStr, nDeterministic, syncState=ctx.pool.lastState, - cacheMode=ctx.hdrCache.state + trace info & ": stopped fetching/staging headers", peer, iv, + bottom=bottom.bnStr, nStored, ctrl=buddy.ctrl.state, + syncState=ctx.pool.lastState, cacheMode=ctx.hdrCache.state break fetchHeadersBody # done, exit this function # Store `lhc` chain on the `staged` queue if there is any @@ -183,43 +181,48 @@ proc headersStagedCollect*( # End block: `fetchHeadersBody` - let nHeaders = nDeterministic + nOpportunistic.uint64 - if nHeaders == 0: - if not ctx.pool.seenData: + if nStored == 0 and nQueued == 0: + if not ctx.pool.seenData and + buddy.peerID notin ctx.pool.failedPeers and + buddy.ctrl.stopped: # Collect peer for detecting cul-de-sac syncing (i.e. non-existing # block chain or similar.) ctx.pool.failedPeers.incl buddy.peerID debug info & ": no headers yet", peer, ctrl=buddy.ctrl.state, + cacheMode=ctx.hdrCache.state, syncState=ctx.pool.lastState, failedPeers=ctx.pool.failedPeers.len, hdrErrors=buddy.hdrErrors - return false + return - info "Downloaded headers", + info "Queued/staged or DB/stored headers", unprocTop=(if ctx.collectModeStopped(): "n/a" else: ctx.headersUnprocAvailTop.bnStr), - nHeaders, nStaged=ctx.hdr.staged.len, nSyncPeers=ctx.pool.nBuddies + nQueued, nStored, nStagedQ=ctx.hdr.staged.len, nSyncPeers=ctx.pool.nBuddies - return true - - -proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]) = +proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]): bool = ## Store headers from the `staged` queue onto the header chain cache. ## + ## The function returns `false` if the caller should make sure to allow + ## to switch to another sync peer for deterministically filling the gap + ## between the top of the queue and the `dangling` block number. + ## let ctx = buddy.ctx peer = buddy.peer + if ctx.hdr.staged.len == 0: - return # avoids logging + return false # switch peer var - nProcessed = 0 # statistics + nStored = 0 # statistics + switchPeer = false # for return code while ctx.hdrCache.state == collecting: # Fetch list with largest block numbers let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr: - break # all done + break # all done let minNum = qItem.data.revHdrs[^1].number @@ -227,17 +230,12 @@ proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]) = dangling = ctx.dangling.number if maxNum + 1 < dangling: debug info & ": gap, serialisation postponed", peer, - qItem=qItem.data.bnStr, D=dangling.bnStr, nProcessed, - nStaged=ctx.hdr.staged.len, nSyncPeers=ctx.pool.nBuddies - return # there is a gap -- come back later - - # Overlap must not happen - if maxNum+1 != dangling: - raiseAssert info & ": Overlap" & - " qItem=" & qItem.data.bnStr & " D=" & dangling.bnStr + qItem=qItem.data.bnStr, D=dangling.bnStr, nStored, + nStagedQ=ctx.hdr.staged.len, nSyncPeers=ctx.pool.nBuddies + switchPeer = true # there is a gap -- come back later + break - # Process item from `staged` queue. So it is not needed in the list, - # anymore. + # Remove from queue discard ctx.hdr.staged.delete(qItem.key) # Store headers on database @@ -248,72 +246,46 @@ proc headersStagedProcess*(buddy: BeaconBuddyRef; info: static[string]) = buddy.incHdrProcErrors qItem.data.peerID debug info & ": discarding staged header list", peer, - qItem=qItem.data.bnStr, D=ctx.dangling.bnStr, nProcessed, + qItem=qItem.data.bnStr, D=ctx.dangling.bnStr, nStored, nDiscarded=qItem.data.revHdrs.len, nSyncPeers=ctx.pool.nBuddies, `error`=error - return + switchPeer = true + break # Antecedent `dangling` of the header cache might not be at `revHdrs[^1]`. let revHdrsLen = maxNum - ctx.dangling.number + 1 - nProcessed += revHdrsLen.int # count headers - + nStored += revHdrsLen.int # count headers # End while loop - if headersStagedQueueLengthLwm < ctx.hdr.staged.len: - ctx.poolMode = true + if 0 < nStored: + info "Headers serialised and stored", D=ctx.dangling.bnStr, nStored, + nStagedQ=ctx.hdr.staged.len, nSyncPeers=ctx.pool.nBuddies, switchPeer - debug info & ": headers serialised and stored", peer, D=ctx.dangling.bnStr, - nProcessed, nStagedLists=ctx.hdr.staged.len, nSyncPeers=ctx.pool.nBuddies, - reorgReq=ctx.poolMode + elif 0 < ctx.hdr.staged.len and not switchPeer: + trace info & ": no headers processed", peer, D=ctx.dangling.bnStr, + nStagedQ=ctx.hdr.staged.len, nSyncPeers=ctx.pool.nBuddies + not switchPeer proc headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = ## Some pool mode intervention. The effect is that all concurrent peers ## finish up their current work and run this function here (which might - ## do nothing.) This stopping should be enough in most cases to re-organise - ## when re-starting concurrently, again. + ## do nothing.) Pool mode is used to sync peers, e.g. for a forced state + ## change. ## - ## Only when the staged list gets too big it will be cleared to be re-filled - ## again. In therory, this might happen on a really slow lead actor - ## (downloading deterministically by hashes) and many fast opportunistic - ## actors filling the staged queue. - ## - doAssert ctx.headersBorrowedIsEmpty() - - if ctx.hdr.staged.len == 0: - # nothing to do - return - - # Update counter - ctx.pool.nReorg.inc - # Check for cancel request if ctx.pool.lastState == cancelHeaders: + # Update counter + ctx.pool.nReorg.inc + # Reset header queues debug info & ": Flushing header queues", nUnproc=ctx.headersUnprocTotal(), - nStaged=ctx.hdr.staged.len, nReorg=ctx.pool.nReorg + nStagedQ=ctx.hdr.staged.len, nReorg=ctx.pool.nReorg - ctx.headersUnprocClear() + ctx.headersUnprocClear() # clears `unprocessed` and `borrowed` list ctx.hdr.staged.clear() - return - - let nStaged = ctx.hdr.staged.len - if headersStagedQueueLengthHwm < nStaged: - debug info & ": hwm reached, flushing staged queue", - headersStagedQueueLengthLwm, nStaged, nReorg=ctx.pool.nReorg - - # Remove the leading `1 + nStaged - headersStagedQueueLengthLwm` entries - # from list so that the upper `headersStagedQueueLengthLwm-1` entries - # remain. - for _ in 0 .. nStaged - headersStagedQueueLengthLwm: - let - qItem = ctx.hdr.staged.ge(BlockNumber 0).expect "valid record" - key = qItem.key - nHeaders = qItem.data.revHdrs.len.uint64 - ctx.headersUnprocAppend(key - nHeaders + 1, key) - discard ctx.hdr.staged.delete key # ------------------------------------------------------------------------------ # End diff --git a/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim b/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim index 36aa54603b..9399f50fc8 100644 --- a/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim +++ b/execution_chain/sync/beacon/worker/headers_staged/staged_collect.nim @@ -82,7 +82,7 @@ func bnStr*(w: LinkedHChain | ref LinkedHChain): string = # ------------------------------------------------------------------------------ func collectModeStopped*(ctx: BeaconCtxRef): bool = - ## Hepler, checks whether there is a general stop conditions based on + ## Helper, checks whether there is a general stop conditions based on ## state settings (not on sync peer ctrl as `buddy.ctrl.running`.) ctx.poolMode or ctx.pool.lastState != collectingHeaders or diff --git a/execution_chain/sync/beacon/worker/helpers.nim b/execution_chain/sync/beacon/worker/helpers.nim index 468161774c..b9f811c7ea 100644 --- a/execution_chain/sync/beacon/worker/helpers.nim +++ b/execution_chain/sync/beacon/worker/helpers.nim @@ -13,9 +13,10 @@ ## Extracted helpers from `worker_desc` (avoids circular import) import - pkg/chronos, + pkg/[chronos, results], pkg/eth/common, pkg/stew/interval_set, + ../../../networking/p2p, ../../../utils/prettify, ../../../utils/utils @@ -54,7 +55,10 @@ func toStr*(h: Hash32): string = else: h.short -proc `$`*(w: Interval[BlockNumber,uint64]): string = +func `$`*(w: Interval[BlockNumber,uint64]): string = w.bnStr +func `$`*(w: Opt[Peer]): string = + if w.isSome: $w.value else: "n/a" + # End diff --git a/execution_chain/sync/beacon/worker/start_stop.nim b/execution_chain/sync/beacon/worker/start_stop.nim index d7e488655d..37276997ef 100644 --- a/execution_chain/sync/beacon/worker/start_stop.nim +++ b/execution_chain/sync/beacon/worker/start_stop.nim @@ -49,21 +49,7 @@ proc setupServices*(ctx: BeaconCtxRef; info: static[string]) = # Start in suspended mode ctx.hibernate = true - # Take it easy and assume that queue records contain full block list (which - # is mostly the case anyway.) So the the staging queue is limited by the - # number of sub-list records rather than the number of accumulated block - # objects. - let hwm = if blocksStagedLwm <= ctx.pool.blkStagedHwm: ctx.pool.blkStagedHwm - else: blocksStagedHwmDefault - ctx.pool.blkStagedLenHwm = (hwm + nFetchBodiesBatch - 1) div nFetchBodiesBatch - - # Set blocks batch import queue size - if ctx.pool.blkStagedHwm != 0: - debug info & ": import block lists queue", limit=ctx.pool.blkStagedLenHwm - ctx.pool.blkStagedHwm = hwm - - # Set up header cache descriptor. This will evenually be integrated - # into `ForkedChainRef` (i.e. `ctx.pool.chain`.) + # Set up header cache descriptor ctx.pool.hdrCache = HeaderChainRef.init(ctx.pool.chain) # Set up the notifier informing when a new syncer session has started. @@ -95,19 +81,18 @@ proc startBuddy*(buddy: BeaconBuddyRef): bool = ctx = buddy.ctx peer = buddy.peer - if peer.supports(eth69) and - peer.state(eth69).initialized: - ctx.pool.nBuddies.inc - buddy.initHdrProcErrors() - return true + template acceptProto(PROTO: type): bool = + peer.supports(PROTO) and + peer.state(PROTO).initialized - if peer.supports(eth68) and - peer.state(eth68).initialized: + if acceptProto(eth69) or + acceptProto(eth68): ctx.pool.nBuddies.inc ctx.pool.blkLastSlowPeer = Opt.none(Hash) buddy.initHdrProcErrors() return true + proc stopBuddy*(buddy: BeaconBuddyRef) = buddy.ctx.pool.nBuddies.dec buddy.clearHdrProcErrors() diff --git a/execution_chain/sync/beacon/worker/update.nim b/execution_chain/sync/beacon/worker/update.nim index 34213230ea..0512e716d4 100644 --- a/execution_chain/sync/beacon/worker/update.nim +++ b/execution_chain/sync/beacon/worker/update.nim @@ -107,6 +107,7 @@ proc setupProcessingBlocks(ctx: BeaconCtxRef; info: static[string]) = # Update list of block numbers to process ctx.blocksUnprocSet(d, h) + ctx.blk.topImported = d - 1 # State transition ctx.pool.lastState = processingBlocks @@ -207,7 +208,7 @@ proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) = info "Sync state changed", prevState, thisState, base=ctx.chain.baseNumber.bnStr, head=ctx.chain.latestNumber.bnStr, - target=ctx.head.bnStr + target=ctx.head.bnStr, targetHash=ctx.headHash.short # Final sync scrum layout reached or inconsistent/impossible state if thisState == idleSyncState: @@ -231,8 +232,8 @@ proc updateFromHibernateSetTarget*( # Update range ctx.headersUnprocSet(b+1, t-1) - info "Activating syncer", base=b.bnStr, - head=ctx.chain.latestNumber.bnStr, target=t.bnStr, + info "Activating syncer", base=b.bnStr, head=ctx.chain.latestNumber.bnStr, + target=t.bnStr, targetHash=ctx.headHash.short, nSyncPeers=ctx.pool.nBuddies return diff --git a/execution_chain/sync/beacon/worker_config.nim b/execution_chain/sync/beacon/worker_config.nim index 6c398f04f0..36b8527e10 100644 --- a/execution_chain/sync/beacon/worker_config.nim +++ b/execution_chain/sync/beacon/worker_config.nim @@ -86,28 +86,19 @@ const ## Length of the request/stage batch. Several headers are consecutively ## fetched and stashed together as a single record on the staged queue. - headersStagedQueueLengthLwm* = 16 + headersStagedQueueLengthHwm* = 8 ## Limit the number of records in the staged headers queue. ## ## Queue entries start accumulating if one peer stalls while fetching the ## top chain so leaving a gap. This gap must be filled first before ## inserting the queue into a contiguous chain of headers. - ## - ## This low-water mark tryggers the system to do some **magic** to mitigate - ## the above problem. Currently the **magic** is to let (pseudo) threads - ## terminate and then restart all over again. - - headersStagedQueueLengthHwm* = 24 - ## If this size is exceeded, the staged queue is flushed and resized to - ## `headersStagedQueueLengthLwm-1` entries. Then contents is re-fetched - ## from scratch. # ---------------------- fetchBodiesFailedInitialFailPeersHwm* = 50 ## Similar to `fetchHeadersFailedInitialFailPeersHwm` - nFetchBodiesRequest* = 128 + nFetchBodiesRequest* = 64 ## Similar to `nFetchHeadersRequest` fetchBodiesReqErrThresholdZombie* = chronos.seconds(4) @@ -120,23 +111,11 @@ const fetchBodiesReqMinResponsePC* = 10 ## Similar to `fetchHeadersReqMinResponsePC` - nFetchBodiesBatch* = 3 * nFetchBodiesRequest - ## Similar to `nFetchHeadersBatch` - ## - ## With an average less than 90KiB/block (on `mainnet` at block ~#22m), - ## one arrives at a total of at most 35MiB per block batch. - - blocksStagedHwmDefault* = 8 * nFetchBodiesBatch - ## This is an initialiser value for `blocksStagedHwm`. - ## - ## If the staged block queue exceeds this many number of block objects for + blocksStagedQueueLengthHwm* = 2 + ## If the staged block queue exceeds this many number of queue objects for ## import, no further block objets are added (but the current sub-list is ## completed.) - blocksStagedLwm* = nFetchBodiesBatch - ## Minimal accepted initialisation value for `blocksStagedHwm`. The latter - ## will be initalised with `blocksStagedHwmDefault` if smaller than the LWM. - # ---------------------- static: @@ -144,12 +123,9 @@ static: doAssert 0 < nFetchHeadersRequest doAssert nFetchHeadersRequest <= nFetchHeadersBatch - doAssert 0 < headersStagedQueueLengthLwm - doAssert headersStagedQueueLengthLwm < headersStagedQueueLengthHwm + doAssert 0 < headersStagedQueueLengthHwm doAssert 0 < nFetchBodiesRequest - doAssert nFetchBodiesRequest <= nFetchBodiesBatch - doAssert 0 < blocksStagedLwm - doAssert blocksStagedLwm <= blocksStagedHwmDefault + doAssert 0 < blocksStagedQueueLengthHwm # End diff --git a/execution_chain/sync/beacon/worker_desc.nim b/execution_chain/sync/beacon/worker_desc.nim index d406f7bf27..afc7c15764 100644 --- a/execution_chain/sync/beacon/worker_desc.nim +++ b/execution_chain/sync/beacon/worker_desc.nim @@ -71,12 +71,15 @@ type unprocessed*: BnRangeSet ## Block or header ranges to fetch borrowed*: BnRangeSet ## Fetched/locked ranges staged*: LinkedHChainQueue ## Blocks fetched but not stored yet + reserveStaged*: int ## Pre-book staged slot temporarily BlocksFetchSync* = object ## Block sync staging area unprocessed*: BnRangeSet ## Blocks download requested borrowed*: BnRangeSet ## Fetched/locked fetched ranges + topImported*: BlockNumber ## For syncronising opportunistic import staged*: StagedBlocksQueue ## Blocks ready for import + reserveStaged*: int ## Pre-book staged slot temporarily # ------------------- @@ -104,11 +107,6 @@ type chain*: ForkedChainRef ## Core database, FCU support hdrCache*: HeaderChainRef ## Currently in tandem with `chain` - # Blocks import/execution settings - blkImportOk*: bool ## Don't fetch data while block importing - blkStagedHwm*: int ## Set a `staged` queue limit - blkStagedLenHwm*: int ## Figured out as # staged records - # Info, debugging, and error handling stuff nReorg*: int ## Number of reorg invocations (info only) hdrProcError*: Table[Hash,uint8] ## Some globally accessible header errors @@ -138,6 +136,10 @@ func head*(ctx: BeaconCtxRef): Header = ## Shortcut ctx.hdrCache.head() +func headHash*(ctx: BeaconCtxRef): Hash32 = + ## Shortcut + ctx.hdrCache.headHash() + func dangling*(ctx: BeaconCtxRef): Header = ## Shortcut ctx.hdrCache.antecedent() diff --git a/execution_chain/sync/sync_sched.nim b/execution_chain/sync/sync_sched.nim index 96328f14a1..b6f3bc074c 100644 --- a/execution_chain/sync/sync_sched.nim +++ b/execution_chain/sync/sync_sched.nim @@ -186,7 +186,8 @@ proc terminate[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: []).} = try: waitFor sleepAsync termWaitPollingTime except CancelledError: - trace "Shutdown: peer timeout was cancelled", nWorkers=dsc.buddies.len + trace "Shutdown: peer timeout was cancelled", + nCachedWorkers=dsc.buddies.len while dsc.daemonRunning or dsc.tickerRunning: @@ -194,7 +195,8 @@ proc terminate[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: []).} = try: await sleepAsync termWaitPollingTime except CancelledError: - trace "Shutdown: daemon timeout was cancelled", nWorkers=dsc.buddies.len + trace "Shutdown: daemon timeout was cancelled", + nCachedWorkers=dsc.buddies.len # Final shutdown dsc.ctx.runRelease() @@ -237,7 +239,8 @@ proc daemonLoop[S,W](dsc: RunnerSyncRef[S,W]) {.async: (raises: []).} = # Stop on error (must not end up in busy-loop). If the activation flag # `dsc.ctx.daemon` remains `true`, the deamon will be re-started from # the worker loop in due time. - trace "Deamon loop sleep was cancelled", nWorkers=dsc.buddies.len + trace "Deamon loop sleep was cancelled", + nCachedWorkers=dsc.buddies.len break # End while @@ -364,7 +367,8 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async: (raises: []).} = try: await sleepAsync suspend except CancelledError: - trace "Peer loop sleep was cancelled", peer, nWorkers=dsc.buddies.len + trace "Peer loop sleep was cancelled", peer, + nCachedWorkers=dsc.buddies.len break # stop on error (must not end up in busy-loop) # End while @@ -382,7 +386,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = # Check for known entry (which should not exist.) let - maxWorkers {.used.} = dsc.buddiesMax + maxCachedWorkers {.used.} = dsc.buddiesMax nPeers {.used.} = dsc.pool.len zombie = dsc.buddies.eq peer.key if zombie.isOk: @@ -391,12 +395,13 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = ttz = zombie.value.zombified + zombieTimeToLinger if ttz < Moment.now(): if dsc.ctx.noisyLog: trace "Reconnecting zombie peer ignored", peer, - nPeers, nWorkers=dsc.buddies.len, maxWorkers, canRequeue=(now-ttz) + nPeers, nCachedWorkers=dsc.buddies.len, maxCachedWorkers, + canRequeue=(now-ttz) return # Zombie can be removed from the database dsc.buddies.del peer.key if dsc.ctx.noisyLog: trace "Zombie peer timeout, ready for requeing", peer, - nPeers, nWorkers=dsc.buddies.len, maxWorkers + nPeers, nCachedWorkers=dsc.buddies.len, maxCachedWorkers # Initialise worker for this peer let buddy = RunnerBuddyRef[S,W]( @@ -408,7 +413,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = peerID: peer.key.hash)) if not buddy.worker.runStart(): if dsc.ctx.noisyLog: trace "Ignoring useless peer", peer, nPeers, - nWorkers=dsc.buddies.len, maxWorkers + nCachedWorkers=dsc.buddies.len, maxCachedWorkers buddy.worker.ctrl.zombie = true return @@ -426,14 +431,15 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = if dsc.ctx.noisyLog: trace "Dequeuing zombie peer", # Fake `Peer` pretty print for `oldest` oldest=("Node[" & $leastVal.key.address & "]"), - since=leastVal.data.zombified, nPeers, nWorkers=dsc.buddies.len, - maxWorkers + since=leastVal.data.zombified, nPeers, nCachedWorkers=dsc.buddies.len, + maxCachedWorkers discard else: # This could happen if there are idle entries in the table, i.e. # somehow hanging runners. if dsc.ctx.noisyLog: trace "Peer table full! Dequeuing least used entry", - oldestPeer=oldest.peer, oldestOnly=oldest.only, nPeers=nPeers, nWorkers=dsc.buddies.len, maxWorkers + oldestPeer=oldest.peer, oldestOnly=oldest.only, nPeers=nPeers, + nCachedWorkers=dsc.buddies.len, maxCachedWorkers # Setting to `zombie` will trigger the worker to terminate (if any.) oldest.ctrl.zombie = true @@ -446,16 +452,16 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = let nPeers = dsc.pool.len - maxWorkers = dsc.buddiesMax - nWorkers = dsc.buddies.len + maxCachedWorkers = dsc.buddiesMax + nCachedWorkers = dsc.buddies.len rc = dsc.buddies.eq peer.key if rc.isErr: if dsc.ctx.noisyLog: debug "Disconnected, unregistered peer", peer, - nPeers, nWorkers, maxWorkers + nPeers, nCachedWorkers, maxCachedWorkers elif rc.value.worker.isNil: # Re-visiting zombie if dsc.ctx.noisyLog: trace "Ignore zombie", peer, - nPeers, nWorkers, maxWorkers + nPeers, nCachedWorkers, maxCachedWorkers elif rc.value.worker.ctrl.zombie: # Don't disconnect, leave them fall out of the LRU cache. The effect is, # that reconnecting might be blocked, for a while. For few peers cases, @@ -465,7 +471,7 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = rc.value.dsc = nil rc.value.zombified = Moment.now() if dsc.ctx.noisyLog: trace "Disconnected, zombie", peer, - nPeers, nWorkers, maxWorkers + nPeers, nCachedWorkers, maxCachedWorkers else: rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere dsc.buddies.del peer.key