Skip to content

Commit e8c9fdd

Browse files
gzliudanholimankaralabe
authored
eth/downloader: refactor downloader + queue ethereum#21263 (XinFinOrg#1041)
* eth/downloader: refactor downloader + queue downloader, fetcher: throttle-metrics, fetcher filter improvements, standalone resultcache downloader: more accurate deliverytime calculation, less mem overhead in state requests downloader/queue: increase underlying buffer of results, new throttle mechanism eth/downloader: updates to tests eth/downloader: fix up some review concerns eth/downloader/queue: minor fixes eth/downloader: minor fixes after review call eth/downloader: testcases for queue.go eth/downloader: minor change, don't set progress unless progress... eth/downloader: fix flaw which prevented useless peers from being dropped eth/downloader: try to fix tests eth/downloader: verify non-deliveries against advertised remote head eth/downloader: fix flaw with checking closed-status causing hang eth/downloader: hashing avoidance eth/downloader: review concerns + simplify resultcache and queue eth/downloader: add back some locks, address review concerns downloader/queue: fix remaining lock flaw * eth/downloader: nitpick fixes * eth/downloader: remove the *2*3/4 throttling threshold dance * eth/downloader: print correct throttle threshold in stats Co-authored-by: Martin Holst Swende <[email protected]> Co-authored-by: Péter Szilágyi <[email protected]>
1 parent e13265a commit e8c9fdd

File tree

11 files changed

+1109
-363
lines changed

11 files changed

+1109
-363
lines changed

core/types/block.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,17 @@ func (h *Header) Size() common.StorageSize {
153153
return common.StorageSize(unsafe.Sizeof(*h)) + common.StorageSize(len(h.Extra)+(h.Difficulty.BitLen()+h.Number.BitLen()+h.Time.BitLen())/8)
154154
}
155155

156+
// EmptyBody returns true if there is no additional 'body' to complete the header
157+
// that is: no transactions and no uncles.
158+
func (h *Header) EmptyBody() bool {
159+
return h.TxHash == EmptyRootHash && h.UncleHash == EmptyUncleHash
160+
}
161+
162+
// EmptyReceipts returns true if there are no receipts for this header/block.
163+
func (h *Header) EmptyReceipts() bool {
164+
return h.ReceiptHash == EmptyRootHash
165+
}
166+
156167
// Body is a simple (mutable, non-safe) data container for storing and moving
157168
// a block's data contents (transactions and uncles) together.
158169
type Body struct {

eth/downloader/downloader.go

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai
213213
dl := &Downloader{
214214
stateDB: stateDb,
215215
mux: mux,
216-
queue: newQueue(),
216+
queue: newQueue(blockCacheItems),
217217
peers: newPeerSet(),
218218
rttEstimate: uint64(rttMaxEstimate),
219219
rttConfidence: uint64(1000000),
@@ -359,7 +359,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
359359
log.Info("Block synchronisation started")
360360
}
361361
// Reset the queue, peer set and wake channels to clean any internal leftover state
362-
d.queue.Reset()
362+
d.queue.Reset(blockCacheItems)
363363
d.peers.Reset()
364364

365365
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
@@ -575,7 +575,7 @@ func (d *Downloader) fetchHeight(p *peerConnection, hash common.Hash) (*types.He
575575
// Make sure the peer actually gave something valid
576576
headers := packet.(*headerPack).headers
577577
if len(headers) != 1 {
578-
p.log.Debug("Multiple headers for single request", "headers", len(headers))
578+
p.log.Warn("Multiple headers for single request", "headers", len(headers))
579579
return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
580580
}
581581
head := headers[0]
@@ -784,7 +784,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
784784
// Make sure the peer actually gave something valid
785785
headers := packer.(*headerPack).headers
786786
if len(headers) != 1 {
787-
p.log.Debug("Multiple headers for single request", "headers", len(headers))
787+
p.log.Warn("Multiple headers for single request", "headers", len(headers))
788788
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
789789
}
790790
arrived = true
@@ -808,7 +808,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
808808
}
809809
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
810810
if header.Number.Uint64() != check {
811-
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
811+
p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
812812
return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
813813
}
814814
start = check
@@ -1017,17 +1017,18 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
10171017
pack := packet.(*headerPack)
10181018
return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
10191019
}
1020-
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
1021-
throttle = func() bool { return false }
1022-
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
1023-
return d.queue.ReserveHeaders(p, count), false, nil
1020+
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
1021+
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
1022+
return d.queue.ReserveHeaders(p, count), false, false
10241023
}
10251024
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
10261025
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
1027-
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
1026+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
1027+
p.SetHeadersIdle(accepted, deliveryTime)
1028+
}
10281029
)
10291030
err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
1030-
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
1031+
d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
10311032
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
10321033

10331034
log.Debug("Skeleton fill terminated", "err", err)
@@ -1050,10 +1051,10 @@ func (d *Downloader) fetchBodies(from uint64) error {
10501051
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
10511052
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
10521053
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
1053-
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
1054+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
10541055
)
10551056
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
1056-
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
1057+
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
10571058
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
10581059

10591060
log.Debug("Block body download terminated", "err", err)
@@ -1074,10 +1075,12 @@ func (d *Downloader) fetchReceipts(from uint64) error {
10741075
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
10751076
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
10761077
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
1077-
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
1078+
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
1079+
p.SetReceiptsIdle(accepted, deliveryTime)
1080+
}
10781081
)
10791082
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
1080-
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
1083+
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
10811084
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
10821085

10831086
log.Debug("Transaction receipt download terminated", "err", err)
@@ -1110,9 +1113,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
11101113
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
11111114
// - kind: textual label of the type being downloaded to display in log mesages
11121115
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1113-
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
1116+
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
11141117
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
1115-
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
1118+
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
11161119

11171120
// Create a ticker to detect expired retrieval tasks
11181121
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1128,6 +1131,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
11281131
return errCanceled
11291132

11301133
case packet := <-deliveryCh:
1134+
deliveryTime := time.Now()
11311135
// If the peer was previously banned and failed to deliver its pack
11321136
// in a reasonable time frame, ignore its message.
11331137
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
@@ -1140,7 +1144,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
11401144
// caused by a timed out request which came through in the end), set it to
11411145
// idle. If the delivery's stale, the peer should have already been idled.
11421146
if !errors.Is(err, errStaleDelivery) {
1143-
setIdle(peer, accepted)
1147+
setIdle(peer, accepted, deliveryTime)
11441148
}
11451149
// Issue a log to the user to see what's going on
11461150
switch {
@@ -1193,7 +1197,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
11931197
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
11941198
if fails > 2 {
11951199
peer.log.Trace("Data delivery timed out", "type", kind)
1196-
setIdle(peer, 0)
1200+
setIdle(peer, 0, time.Now())
11971201
} else {
11981202
peer.log.Debug("Stalling delivery, dropping", "type", kind)
11991203

@@ -1228,27 +1232,27 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12281232
// Send a download request to all idle peers, until throttled
12291233
progressed, throttled, running := false, false, inFlight()
12301234
idles, total := idle()
1231-
1235+
pendCount := pending()
12321236
for _, peer := range idles {
12331237
// Short circuit if throttling activated
1234-
if throttle() {
1235-
throttled = true
1238+
if throttled {
12361239
break
12371240
}
12381241
// Short circuit if there is no more available task.
1239-
if pending() == 0 {
1242+
if pendCount = pending(); pendCount == 0 {
12401243
break
12411244
}
12421245
// Reserve a chunk of fetches for a peer. A nil can mean either that
12431246
// no more headers are available, or that the peer is known not to
12441247
// have them.
1245-
request, progress, err := reserve(peer, capacity(peer))
1246-
if err != nil {
1247-
return err
1248-
}
1248+
request, progress, throttle := reserve(peer, capacity(peer))
12491249
if progress {
12501250
progressed = true
12511251
}
1252+
if throttle {
1253+
throttled = true
1254+
throttleCounter.Inc(1)
1255+
}
12521256
if request == nil {
12531257
continue
12541258
}
@@ -1273,7 +1277,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12731277
}
12741278
// Make sure that we have peers available for fetching. If all peers have been tried
12751279
// and all failed throw an error
1276-
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
1280+
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
12771281
return errPeersUnavailable
12781282
}
12791283
}
@@ -1285,8 +1289,11 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12851289
// queue until the stream ends or a failure occurs.
12861290
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
12871291
// Keep a count of uncertain headers to roll back
1288-
rollback := []*types.Header{}
1289-
mode := d.getMode()
1292+
var (
1293+
rollback []*types.Header
1294+
rollbackErr error
1295+
mode = d.getMode()
1296+
)
12901297
defer func() {
12911298
if len(rollback) > 0 {
12921299
// Flatten the headers and roll them back
@@ -1308,7 +1315,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
13081315
log.Warn("Rolled back headers", "count", len(hashes),
13091316
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
13101317
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
1311-
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
1318+
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
13121319
}
13131320
}()
13141321

@@ -1318,6 +1325,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
13181325
for {
13191326
select {
13201327
case <-d.cancelCh:
1328+
rollbackErr = errCanceled
13211329
return errCanceled
13221330

13231331
case headers := <-d.headerProcCh:
@@ -1371,6 +1379,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
13711379
// Terminate if something failed in between processing chunks
13721380
select {
13731381
case <-d.cancelCh:
1382+
rollbackErr = errCanceled
13741383
return errCanceled
13751384
default:
13761385
}
@@ -1395,11 +1404,12 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
13951404
frequency = 1
13961405
}
13971406
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1407+
rollbackErr = err
13981408
// If some headers were inserted, add them too to the rollback list
13991409
if n > 0 {
14001410
rollback = append(rollback, chunk[:n]...)
14011411
}
1402-
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
1412+
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
14031413
return fmt.Errorf("%w: %v", errInvalidChain, err)
14041414
}
14051415
// All verifications passed, store newly found uncertain headers
@@ -1414,14 +1424,15 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
14141424
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
14151425
select {
14161426
case <-d.cancelCh:
1427+
rollbackErr = errCanceled
14171428
return errCanceled
14181429
case <-time.After(time.Second):
14191430
}
14201431
}
14211432
// Otherwise insert the headers for content retrieval
14221433
inserts := d.queue.Schedule(chunk, origin)
14231434
if len(inserts) != len(chunk) {
1424-
log.Debug("Stale headers")
1435+
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
14251436
return fmt.Errorf("%w: stale headers", errBadPeer)
14261437
}
14271438
}
@@ -1638,6 +1649,14 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
16381649
}
16391650

16401651
func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
1652+
if len(results) == 0 {
1653+
return nil, nil, nil
1654+
}
1655+
if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
1656+
// the pivot is somewhere in the future
1657+
return nil, results, nil
1658+
}
1659+
// This can also be optimized, but only happens very seldom
16411660
for _, result := range results {
16421661
num := result.Header.Number.Uint64()
16431662
switch {

0 commit comments

Comments
 (0)