Skip to content

Commit 7f36a2f

Browse files
authored
Schedule updateBase to asynchronous worker. (#3455)
* Schedule `updateBase` to asynchronous worker. `updateBase` become synchronous and the scheduler will interleave `updateBase` with `importBlock` and `forkChoice`. The scheduler will move the base at fixed size `PersistBatchSize`. * Remove persistBatchQueue and keep persistBatchSize * fix tests * queueUpdateBase tuning * Fix updateBase scheduler * Optimize a bit updateBase and queueUpdateBase
1 parent 0dc7cf3 commit 7f36a2f

File tree

6 files changed

+116
-62
lines changed

6 files changed

+116
-62
lines changed

execution_chain/config.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ type
342342

343343
persistBatchSize* {.
344344
hidden
345-
defaultValue: 32'u64
345+
defaultValue: 4'u64
346346
name: "debug-persist-batch-size" .}: uint64
347347

348348
beaconSyncTargetFile* {.

execution_chain/core/chain/forked_chain.nim

Lines changed: 100 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ export
4141

4242
const
4343
BaseDistance = 128'u64
44-
PersistBatchSize = 32'u64
45-
MaxQueueSize = 12
44+
PersistBatchSize = 4'u64
45+
MaxQueueSize = 128
4646

4747
# ------------------------------------------------------------------------------
4848
# Private functions
@@ -292,8 +292,7 @@ proc updateFinalized(c: ForkedChainRef, finalized: BlockRef, fcuHead: BlockRef)
292292
doAssert(candidate.isNil.not)
293293
c.latest = candidate
294294

295-
proc updateBase(c: ForkedChainRef, base: BlockRef):
296-
Future[void] {.async: (raises: [CancelledError]), gcsafe.} =
295+
proc updateBase(c: ForkedChainRef, base: BlockRef): uint =
297296
##
298297
## A1 - A2 - A3 D5 - D6
299298
## / /
@@ -312,53 +311,110 @@ proc updateBase(c: ForkedChainRef, base: BlockRef):
312311
# No update, return
313312
return
314313

315-
# Persist the new base block - this replaces the base tx in coredb!
316-
for x in base.everyNthBlock(4):
317-
const
318-
# We cap waiting for an idle slot in case there's a lot of network traffic
319-
# taking up all CPU - we don't want to _completely_ stop processing blocks
320-
# in this case - doing so also allows us to benefit from more batching /
321-
# larger network reads when under load.
322-
idleTimeout = 10.milliseconds
323-
324-
discard await idleAsync().withTimeout(idleTimeout)
325-
c.com.db.persist(x.txFrame, Opt.some(x.stateRoot))
314+
c.com.db.persist(base.txFrame, Opt.some(base.stateRoot))
326315

327-
# Update baseTxFrame when we about to yield to the event loop
328-
# and prevent other modules accessing expired baseTxFrame.
329-
c.baseTxFrame = x.txFrame
316+
# Update baseTxFrame when we about to yield to the event loop
317+
# and prevent other modules accessing expired baseTxFrame.
318+
c.baseTxFrame = base.txFrame
330319

331320
# Cleanup in-memory blocks starting from base backward
332321
# e.g. B2 backward.
333-
var count = 0
334-
loopIt(base.parent):
322+
var
323+
count = 0'u
324+
it = base.parent
325+
326+
while it.isOk:
335327
c.removeBlockFromCache(it)
336328
inc count
329+
let b = it
330+
it = it.parent
331+
b.parent = nil
337332

338333
# Update base branch
339334
c.base = base
340335
c.base.parent = nil
341336

342-
# Log only if more than one block persisted
343-
# This is to avoid log spamming, during normal operation
344-
# of the client following the chain
345-
# When multiple blocks are persisted together, it's mainly
346-
# during `beacon sync` or `nrpc sync`
347-
if count > 1:
348-
notice "Finalized blocks persisted",
349-
nBlocks = count,
350-
base = c.base.number,
351-
baseHash = c.base.hash.short,
352-
pendingFCU = c.pendingFCU.short,
353-
resolvedFin= c.latestFinalizedBlockNumber
337+
count
338+
339+
proc processUpdateBase(c: ForkedChainRef) {.async: (raises: [CancelledError]).} =
340+
if c.baseQueue.len > 0:
341+
let base = c.baseQueue.popFirst()
342+
c.persistedCount += c.updateBase(base)
343+
344+
const
345+
minLogInterval = 5
346+
347+
if c.baseQueue.len == 0:
348+
let time = EthTime.now()
349+
if time - c.lastBaseLogTime > minLogInterval:
350+
# Log only if more than one block persisted
351+
# This is to avoid log spamming, during normal operation
352+
# of the client following the chain
353+
# When multiple blocks are persisted together, it's mainly
354+
# during `beacon sync` or `nrpc sync`
355+
if c.persistedCount > 1:
356+
notice "Finalized blocks persisted",
357+
nBlocks = c.persistedCount,
358+
base = c.base.number,
359+
baseHash = c.base.hash.short,
360+
pendingFCU = c.pendingFCU.short,
361+
resolvedFin= c.latestFinalizedBlockNumber
362+
else:
363+
debug "Finalized blocks persisted",
364+
nBlocks = c.persistedCount,
365+
target = c.base.hash.short,
366+
base = c.base.number,
367+
baseHash = c.base.hash.short,
368+
pendingFCU = c.pendingFCU.short,
369+
resolvedFin= c.latestFinalizedBlockNumber
370+
c.lastBaseLogTime = time
371+
c.persistedCount = 0
372+
return
373+
374+
if c.queue.isNil:
375+
# This recursive mode only used in test env with small set of blocks
376+
await c.processUpdateBase()
354377
else:
355-
debug "Finalized blocks persisted",
356-
nBlocks = count,
357-
target = base.hash.short,
358-
base = c.base.number,
359-
baseHash = c.base.hash.short,
360-
pendingFCU = c.pendingFCU.short,
361-
resolvedFin= c.latestFinalizedBlockNumber
378+
proc asyncHandler(): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
379+
await c.processUpdateBase()
380+
ok()
381+
await c.queue.addLast(QueueItem(handler: asyncHandler))
382+
383+
proc queueUpdateBase(c: ForkedChainRef, base: BlockRef)
384+
{.async: (raises: [CancelledError]).} =
385+
let
386+
prevQueuedBase = if c.baseQueue.len > 0:
387+
c.baseQueue.peekLast()
388+
else:
389+
c.base
390+
391+
if prevQueuedBase.number == base.number:
392+
return
393+
394+
var
395+
number = base.number - min(base.number, PersistBatchSize)
396+
steps = newSeqOfCap[BlockRef]((base.number-c.base.number) div PersistBatchSize + 1)
397+
it = prevQueuedBase
398+
399+
steps.add base
400+
401+
while it.number > prevQueuedBase.number:
402+
if it.number == number:
403+
steps.add it
404+
number -= min(number, PersistBatchSize)
405+
it = it.parent
406+
407+
for i in countdown(steps.len-1, 0):
408+
c.baseQueue.addLast(steps[i])
409+
410+
if c.queue.isNil:
411+
# This recursive mode only used in test env with small set of blocks
412+
await c.processUpdateBase()
413+
else:
414+
proc asyncHandler(): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
415+
await c.processUpdateBase()
416+
ok()
417+
await c.queue.addLast(QueueItem(handler: asyncHandler))
362418

363419
proc validateBlock(c: ForkedChainRef,
364420
parent: BlockRef,
@@ -426,7 +482,7 @@ proc validateBlock(c: ForkedChainRef,
426482
prevBase = c.base.number
427483

428484
c.updateFinalized(base, base)
429-
await c.updateBase(base)
485+
await c.queueUpdateBase(base)
430486

431487
# If on disk head behind base, move it to base too.
432488
if c.base.number > prevBase:
@@ -437,7 +493,7 @@ proc validateBlock(c: ForkedChainRef,
437493

438494
template queueOrphan(c: ForkedChainRef, parent: BlockRef, finalized = false): auto =
439495
if c.queue.isNil:
440-
# This recursive mode only used in test env with finite set of blocks
496+
# This recursive mode only used in test env with small set of blocks
441497
c.processOrphan(parent, finalized)
442498
else:
443499
proc asyncHandler(): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
@@ -533,6 +589,8 @@ proc init*(
533589
quarantine: Quarantine.init(),
534590
fcuHead: fcuHead,
535591
fcuSafe: fcuSafe,
592+
baseQueue: initDeque[BlockRef](),
593+
lastBaseLogTime: EthTime.now(),
536594
)
537595
538596
# updateFinalized will stop ancestor lineage
@@ -630,7 +688,7 @@ proc forkChoice*(c: ForkedChainRef,
630688
# and possibly switched to other chain beside the one with head.
631689
doAssert(finalized.number <= head.number)
632690
doAssert(base.number <= finalized.number)
633-
await c.updateBase(base)
691+
await c.queueUpdateBase(base)
634692

635693
ok()
636694

execution_chain/core/chain/forked_chain/chain_branch.nim

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,3 @@ template loopFinalized*(init: BlockRef, body: untyped) =
6868
while not it.finalized:
6969
body
7070
it = it.parent
71-
72-
iterator everyNthBlock*(base: BlockRef, step: uint64): BlockRef =
73-
var
74-
number = base.number - min(base.number, step)
75-
steps = newSeqOfCap[BlockRef](128)
76-
77-
steps.add base
78-
79-
loopIt(base):
80-
if it.number == number:
81-
steps.add it
82-
number -= min(number, step)
83-
84-
for i in countdown(steps.len-1, 0):
85-
yield steps[i]

execution_chain/core/chain/forked_chain/chain_desc.nim

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
{.push raises: [].}
1212

1313
import
14-
std/tables,
14+
std/[tables, deques],
1515
chronos,
1616
../../../common,
1717
../../../db/[core_db, fcu_db],
@@ -35,6 +35,17 @@ type
3535
# The base block, the last block stored in database.
3636
# Any blocks newer than base is kept in memory.
3737

38+
baseQueue* : Deque[BlockRef]
39+
# Queue of blocks that will become base.
40+
# This queue will be filled by `importBlock` or `forkChoice`.
41+
# Then consumed by the `processQueue` async worker.
42+
43+
lastBaseLogTime*: EthTime
44+
45+
persistedCount*: uint
46+
# Count how many blocks persisted when `baseQueue`
47+
# consumed.
48+
3849
latest* : BlockRef
3950
# Every time a new block added,
4051
# that block automatically become the latest block.

execution_chain/nimbus_execution_client.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ proc basicServices(nimbus: NimbusNode,
4444
# Setup the chain
4545
let fc = ForkedChainRef.init(com,
4646
eagerStateRoot = conf.eagerStateRootCheck,
47-
persistBatchSize=conf.persistBatchSize,
47+
persistBatchSize = conf.persistBatchSize,
4848
enableQueue = true)
4949
fc.deserialize().isOkOr:
5050
warn "Loading block DAG from database", msg=error

tests/test_forked_chain.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ suite "ForkedChainRef tests":
445445
checkHeadHash chain, blk7.blockHash
446446
check chain.latestHash == blk7.blockHash
447447
check chain.heads.len == 1
448-
check chain.base.number == 0
448+
check chain.base.number == 4
449449
check chain.validate info & " (9)"
450450
451451
test "newBase on shorter canonical arc, discard arc with oldBase" &
@@ -726,7 +726,7 @@ suite "ForkedChainRef tests":
726726
checkForkChoice(chain, blk7, blk5)
727727
check chain.validate info & " (2)"
728728
checkHeadHash chain, blk7.blockHash
729-
check chain.baseNumber == 0'u64
729+
check chain.baseNumber == 4'u64
730730
check chain.latestHash == blk7.blockHash
731731
check chain.validate info & " (3)"
732732

0 commit comments

Comments
 (0)