Skip to content

Schedule updateBase to asynchronous worker. #3455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion execution_chain/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ type

persistBatchSize* {.
hidden
defaultValue: 32'u64
defaultValue: 4'u64
name: "debug-persist-batch-size" .}: uint64

beaconSyncTargetFile* {.
Expand Down
142 changes: 100 additions & 42 deletions execution_chain/core/chain/forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ export

const
BaseDistance = 128'u64
PersistBatchSize = 32'u64
MaxQueueSize = 12
PersistBatchSize = 4'u64
MaxQueueSize = 128

# ------------------------------------------------------------------------------
# Private functions
Expand Down Expand Up @@ -292,8 +292,7 @@ proc updateFinalized(c: ForkedChainRef, finalized: BlockRef, fcuHead: BlockRef)
doAssert(candidate.isNil.not)
c.latest = candidate

proc updateBase(c: ForkedChainRef, base: BlockRef):
Future[void] {.async: (raises: [CancelledError]), gcsafe.} =
proc updateBase(c: ForkedChainRef, base: BlockRef): uint =
##
## A1 - A2 - A3 D5 - D6
## / /
Expand All @@ -312,53 +311,110 @@ proc updateBase(c: ForkedChainRef, base: BlockRef):
# No update, return
return

# Persist the new base block - this replaces the base tx in coredb!
for x in base.everyNthBlock(4):
const
# We cap waiting for an idle slot in case there's a lot of network traffic
# taking up all CPU - we don't want to _completely_ stop processing blocks
# in this case - doing so also allows us to benefit from more batching /
# larger network reads when under load.
idleTimeout = 10.milliseconds

discard await idleAsync().withTimeout(idleTimeout)
c.com.db.persist(x.txFrame, Opt.some(x.stateRoot))
c.com.db.persist(base.txFrame, Opt.some(base.stateRoot))

# Update baseTxFrame when we about to yield to the event loop
# and prevent other modules accessing expired baseTxFrame.
c.baseTxFrame = x.txFrame
# Update baseTxFrame when we about to yield to the event loop
# and prevent other modules accessing expired baseTxFrame.
c.baseTxFrame = base.txFrame

# Cleanup in-memory blocks starting from base backward
# e.g. B2 backward.
var count = 0
loopIt(base.parent):
var
count = 0'u
it = base.parent

while it.isOk:
c.removeBlockFromCache(it)
inc count
let b = it
it = it.parent
b.parent = nil

# Update base branch
c.base = base
c.base.parent = nil

# Log only if more than one block persisted
# This is to avoid log spamming, during normal operation
# of the client following the chain
# When multiple blocks are persisted together, it's mainly
# during `beacon sync` or `nrpc sync`
if count > 1:
notice "Finalized blocks persisted",
nBlocks = count,
base = c.base.number,
baseHash = c.base.hash.short,
pendingFCU = c.pendingFCU.short,
resolvedFin= c.latestFinalizedBlockNumber
count

proc processUpdateBase(c: ForkedChainRef) {.async: (raises: [CancelledError]).} =
if c.baseQueue.len > 0:
let base = c.baseQueue.popFirst()
c.persistedCount += c.updateBase(base)

const
minLogInterval = 5

if c.baseQueue.len == 0:
let time = EthTime.now()
if time - c.lastBaseLogTime > minLogInterval:
# Log only if more than one block persisted
# This is to avoid log spamming, during normal operation
# of the client following the chain
# When multiple blocks are persisted together, it's mainly
# during `beacon sync` or `nrpc sync`
if c.persistedCount > 1:
notice "Finalized blocks persisted",
nBlocks = c.persistedCount,
base = c.base.number,
baseHash = c.base.hash.short,
pendingFCU = c.pendingFCU.short,
resolvedFin= c.latestFinalizedBlockNumber
else:
debug "Finalized blocks persisted",
nBlocks = c.persistedCount,
target = c.base.hash.short,
base = c.base.number,
baseHash = c.base.hash.short,
pendingFCU = c.pendingFCU.short,
resolvedFin= c.latestFinalizedBlockNumber
c.lastBaseLogTime = time
c.persistedCount = 0
return

if c.queue.isNil:
# This recursive mode only used in test env with small set of blocks
await c.processUpdateBase()
else:
debug "Finalized blocks persisted",
nBlocks = count,
target = base.hash.short,
base = c.base.number,
baseHash = c.base.hash.short,
pendingFCU = c.pendingFCU.short,
resolvedFin= c.latestFinalizedBlockNumber
proc asyncHandler(): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
await c.processUpdateBase()
ok()
await c.queue.addLast(QueueItem(handler: asyncHandler))

proc queueUpdateBase(c: ForkedChainRef, base: BlockRef)
{.async: (raises: [CancelledError]).} =
let
prevQueuedBase = if c.baseQueue.len > 0:
c.baseQueue.peekLast()
else:
c.base

if prevQueuedBase.number == base.number:
return

var
number = base.number - min(base.number, PersistBatchSize)
steps = newSeqOfCap[BlockRef]((base.number-c.base.number) div PersistBatchSize + 1)
it = prevQueuedBase

steps.add base

while it.number > prevQueuedBase.number:
if it.number == number:
steps.add it
number -= min(number, PersistBatchSize)
it = it.parent

for i in countdown(steps.len-1, 0):
c.baseQueue.addLast(steps[i])

if c.queue.isNil:
# This recursive mode only used in test env with small set of blocks
await c.processUpdateBase()
else:
proc asyncHandler(): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
await c.processUpdateBase()
ok()
await c.queue.addLast(QueueItem(handler: asyncHandler))

proc validateBlock(c: ForkedChainRef,
parent: BlockRef,
Expand Down Expand Up @@ -426,7 +482,7 @@ proc validateBlock(c: ForkedChainRef,
prevBase = c.base.number

c.updateFinalized(base, base)
await c.updateBase(base)
await c.queueUpdateBase(base)

# If on disk head behind base, move it to base too.
if c.base.number > prevBase:
Expand All @@ -437,7 +493,7 @@ proc validateBlock(c: ForkedChainRef,

template queueOrphan(c: ForkedChainRef, parent: BlockRef, finalized = false): auto =
if c.queue.isNil:
# This recursive mode only used in test env with finite set of blocks
# This recursive mode only used in test env with small set of blocks
c.processOrphan(parent, finalized)
else:
proc asyncHandler(): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
Expand Down Expand Up @@ -533,6 +589,8 @@ proc init*(
quarantine: Quarantine.init(),
fcuHead: fcuHead,
fcuSafe: fcuSafe,
baseQueue: initDeque[BlockRef](),
lastBaseLogTime: EthTime.now(),
)

# updateFinalized will stop ancestor lineage
Expand Down Expand Up @@ -630,7 +688,7 @@ proc forkChoice*(c: ForkedChainRef,
# and possibly switched to other chain beside the one with head.
doAssert(finalized.number <= head.number)
doAssert(base.number <= finalized.number)
await c.updateBase(base)
await c.queueUpdateBase(base)

ok()

Expand Down
15 changes: 0 additions & 15 deletions execution_chain/core/chain/forked_chain/chain_branch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,3 @@ template loopFinalized*(init: BlockRef, body: untyped) =
while not it.finalized:
body
it = it.parent

iterator everyNthBlock*(base: BlockRef, step: uint64): BlockRef =
var
number = base.number - min(base.number, step)
steps = newSeqOfCap[BlockRef](128)

steps.add base

loopIt(base):
if it.number == number:
steps.add it
number -= min(number, step)

for i in countdown(steps.len-1, 0):
yield steps[i]
13 changes: 12 additions & 1 deletion execution_chain/core/chain/forked_chain/chain_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
{.push raises: [].}

import
std/tables,
std/[tables, deques],
chronos,
../../../common,
../../../db/[core_db, fcu_db],
Expand All @@ -35,6 +35,17 @@ type
# The base block, the last block stored in database.
# Any blocks newer than base is kept in memory.

baseQueue* : Deque[BlockRef]
# Queue of blocks that will become base.
# This queue will be filled by `importBlock` or `forkChoice`.
# Then consumed by the `processQueue` async worker.

lastBaseLogTime*: EthTime

persistedCount*: uint
# Count how many blocks persisted when `baseQueue`
# consumed.

latest* : BlockRef
# Every time a new block added,
# that block automatically become the latest block.
Expand Down
2 changes: 1 addition & 1 deletion execution_chain/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ proc basicServices(nimbus: NimbusNode,
# Setup the chain
let fc = ForkedChainRef.init(com,
eagerStateRoot = conf.eagerStateRootCheck,
persistBatchSize=conf.persistBatchSize,
persistBatchSize = conf.persistBatchSize,
enableQueue = true)
fc.deserialize().isOkOr:
warn "Loading block DAG from database", msg=error
Expand Down
4 changes: 2 additions & 2 deletions tests/test_forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ suite "ForkedChainRef tests":
checkHeadHash chain, blk7.blockHash
check chain.latestHash == blk7.blockHash
check chain.heads.len == 1
check chain.base.number == 0
check chain.base.number == 4
check chain.validate info & " (9)"

test "newBase on shorter canonical arc, discard arc with oldBase" &
Expand Down Expand Up @@ -726,7 +726,7 @@ suite "ForkedChainRef tests":
checkForkChoice(chain, blk7, blk5)
check chain.validate info & " (2)"
checkHeadHash chain, blk7.blockHash
check chain.baseNumber == 0'u64
check chain.baseNumber == 4'u64
check chain.latestHash == blk7.blockHash
check chain.validate info & " (3)"

Expand Down
Loading