47
47
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
48
48
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
49
49
50
- rttMinEstimate = 2 * time .Second // Minimum round-trip time to target for download requests
51
- rttMaxEstimate = 20 * time .Second // Maximum round-trip time to target for download requests
52
- rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
53
- ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
54
- ttlLimit = time .Minute // Maximum TTL allowance to prevent reaching crazy timeouts
55
-
56
- qosTuningPeers = 5 // Number of peers to tune based on (best peers)
57
- qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
58
- qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
59
-
60
50
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
61
51
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
62
52
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
96
86
)
97
87
98
88
type Downloader struct {
99
- // WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
100
- // On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
101
- // guaranteed to be so aligned, so take advantage of that. For more information,
102
- // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
103
- rttEstimate uint64 // Round trip time to target for download requests
104
- rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
105
-
106
89
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
107
90
mux * event.TypeMux // Event multiplexer to announce sync operation events
108
91
@@ -232,8 +215,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
232
215
checkpoint : checkpoint ,
233
216
queue : newQueue (blockCacheMaxItems , blockCacheInitialItems ),
234
217
peers : newPeerSet (),
235
- rttEstimate : uint64 (rttMaxEstimate ),
236
- rttConfidence : uint64 (1000000 ),
237
218
blockchain : chain ,
238
219
lightchain : lightchain ,
239
220
dropPeer : dropPeer ,
@@ -252,7 +233,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
252
233
},
253
234
trackStateReq : make (chan * stateReq ),
254
235
}
255
- go dl .qosTuner ()
256
236
go dl .stateFetcher ()
257
237
return dl
258
238
}
@@ -310,8 +290,6 @@ func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
310
290
logger .Error ("Failed to register sync peer" , "err" , err )
311
291
return err
312
292
}
313
- d .qosReduceConfidence ()
314
-
315
293
return nil
316
294
}
317
295
@@ -670,7 +648,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
670
648
}
671
649
go p .peer .RequestHeadersByHash (latest , fetch , fsMinFullBlocks - 1 , true )
672
650
673
- ttl := d .requestTTL ()
651
+ ttl := d .peers . rates . TargetTimeout ()
674
652
timeout := time .After (ttl )
675
653
for {
676
654
select {
@@ -853,7 +831,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
853
831
// Wait for the remote response to the head fetch
854
832
number , hash := uint64 (0 ), common.Hash {}
855
833
856
- ttl := d .requestTTL ()
834
+ ttl := d .peers . rates . TargetTimeout ()
857
835
timeout := time .After (ttl )
858
836
859
837
for finished := false ; ! finished ; {
@@ -942,7 +920,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
942
920
// Split our chain interval in two, and request the hash to cross check
943
921
check := (start + end ) / 2
944
922
945
- ttl := d .requestTTL ()
923
+ ttl := d .peers . rates . TargetTimeout ()
946
924
timeout := time .After (ttl )
947
925
948
926
go p .peer .RequestHeadersByNumber (check , 1 , 0 , false )
@@ -1035,7 +1013,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
1035
1013
getHeaders := func (from uint64 ) {
1036
1014
request = time .Now ()
1037
1015
1038
- ttl = d .requestTTL ()
1016
+ ttl = d .peers . rates . TargetTimeout ()
1039
1017
timeout .Reset (ttl )
1040
1018
1041
1019
if skeleton {
@@ -1050,7 +1028,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
1050
1028
pivoting = true
1051
1029
request = time .Now ()
1052
1030
1053
- ttl = d .requestTTL ()
1031
+ ttl = d .peers . rates . TargetTimeout ()
1054
1032
timeout .Reset (ttl )
1055
1033
1056
1034
d .pivotLock .RLock ()
@@ -1262,12 +1240,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
1262
1240
pack := packet .(* headerPack )
1263
1241
return d .queue .DeliverHeaders (pack .peerID , pack .headers , d .headerProcCh )
1264
1242
}
1265
- expire = func () map [string ]int { return d .queue .ExpireHeaders (d .requestTTL ()) }
1243
+ expire = func () map [string ]int { return d .queue .ExpireHeaders (d .peers . rates . TargetTimeout ()) }
1266
1244
reserve = func (p * peerConnection , count int ) (* fetchRequest , bool , bool ) {
1267
1245
return d .queue .ReserveHeaders (p , count ), false , false
1268
1246
}
1269
1247
fetch = func (p * peerConnection , req * fetchRequest ) error { return p .FetchHeaders (req .From , MaxHeaderFetch ) }
1270
- capacity = func (p * peerConnection ) int { return p .HeaderCapacity (d .requestRTT ()) }
1248
+ capacity = func (p * peerConnection ) int { return p .HeaderCapacity (d .peers . rates . TargetRoundTrip ()) }
1271
1249
setIdle = func (p * peerConnection , accepted int , deliveryTime time.Time ) {
1272
1250
p .SetHeadersIdle (accepted , deliveryTime )
1273
1251
}
@@ -1293,9 +1271,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
1293
1271
pack := packet .(* bodyPack )
1294
1272
return d .queue .DeliverBodies (pack .peerID , pack .transactions , pack .uncles )
1295
1273
}
1296
- expire = func () map [string ]int { return d .queue .ExpireBodies (d .requestTTL ()) }
1274
+ expire = func () map [string ]int { return d .queue .ExpireBodies (d .peers . rates . TargetTimeout ()) }
1297
1275
fetch = func (p * peerConnection , req * fetchRequest ) error { return p .FetchBodies (req ) }
1298
- capacity = func (p * peerConnection ) int { return p .BlockCapacity (d .requestRTT ()) }
1276
+ capacity = func (p * peerConnection ) int { return p .BlockCapacity (d .peers . rates . TargetRoundTrip ()) }
1299
1277
setIdle = func (p * peerConnection , accepted int , deliveryTime time.Time ) { p .SetBodiesIdle (accepted , deliveryTime ) }
1300
1278
)
1301
1279
err := d .fetchParts (d .bodyCh , deliver , d .bodyWakeCh , expire ,
@@ -1317,9 +1295,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
1317
1295
pack := packet .(* receiptPack )
1318
1296
return d .queue .DeliverReceipts (pack .peerID , pack .receipts )
1319
1297
}
1320
- expire = func () map [string ]int { return d .queue .ExpireReceipts (d .requestTTL ()) }
1298
+ expire = func () map [string ]int { return d .queue .ExpireReceipts (d .peers . rates . TargetTimeout ()) }
1321
1299
fetch = func (p * peerConnection , req * fetchRequest ) error { return p .FetchReceipts (req ) }
1322
- capacity = func (p * peerConnection ) int { return p .ReceiptCapacity (d .requestRTT ()) }
1300
+ capacity = func (p * peerConnection ) int { return p .ReceiptCapacity (d .peers . rates . TargetRoundTrip ()) }
1323
1301
setIdle = func (p * peerConnection , accepted int , deliveryTime time.Time ) {
1324
1302
p .SetReceiptsIdle (accepted , deliveryTime )
1325
1303
}
@@ -2031,78 +2009,3 @@ func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dro
2031
2009
return errNoSyncActive
2032
2010
}
2033
2011
}
2034
-
2035
- // qosTuner is the quality of service tuning loop that occasionally gathers the
2036
- // peer latency statistics and updates the estimated request round trip time.
2037
- func (d * Downloader ) qosTuner () {
2038
- for {
2039
- // Retrieve the current median RTT and integrate into the previoust target RTT
2040
- rtt := time .Duration ((1 - qosTuningImpact )* float64 (atomic .LoadUint64 (& d .rttEstimate )) + qosTuningImpact * float64 (d .peers .medianRTT ()))
2041
- atomic .StoreUint64 (& d .rttEstimate , uint64 (rtt ))
2042
-
2043
- // A new RTT cycle passed, increase our confidence in the estimated RTT
2044
- conf := atomic .LoadUint64 (& d .rttConfidence )
2045
- conf = conf + (1000000 - conf )/ 2
2046
- atomic .StoreUint64 (& d .rttConfidence , conf )
2047
-
2048
- // Log the new QoS values and sleep until the next RTT
2049
- log .Debug ("Recalculated downloader QoS values" , "rtt" , rtt , "confidence" , float64 (conf )/ 1000000.0 , "ttl" , d .requestTTL ())
2050
- select {
2051
- case <- d .quitCh :
2052
- return
2053
- case <- time .After (rtt ):
2054
- }
2055
- }
2056
- }
2057
-
2058
- // qosReduceConfidence is meant to be called when a new peer joins the downloader's
2059
- // peer set, needing to reduce the confidence we have in out QoS estimates.
2060
- func (d * Downloader ) qosReduceConfidence () {
2061
- // If we have a single peer, confidence is always 1
2062
- peers := uint64 (d .peers .Len ())
2063
- if peers == 0 {
2064
- // Ensure peer connectivity races don't catch us off guard
2065
- return
2066
- }
2067
- if peers == 1 {
2068
- atomic .StoreUint64 (& d .rttConfidence , 1000000 )
2069
- return
2070
- }
2071
- // If we have a ton of peers, don't drop confidence)
2072
- if peers >= uint64 (qosConfidenceCap ) {
2073
- return
2074
- }
2075
- // Otherwise drop the confidence factor
2076
- conf := atomic .LoadUint64 (& d .rttConfidence ) * (peers - 1 ) / peers
2077
- if float64 (conf )/ 1000000 < rttMinConfidence {
2078
- conf = uint64 (rttMinConfidence * 1000000 )
2079
- }
2080
- atomic .StoreUint64 (& d .rttConfidence , conf )
2081
-
2082
- rtt := time .Duration (atomic .LoadUint64 (& d .rttEstimate ))
2083
- log .Debug ("Relaxed downloader QoS values" , "rtt" , rtt , "confidence" , float64 (conf )/ 1000000.0 , "ttl" , d .requestTTL ())
2084
- }
2085
-
2086
- // requestRTT returns the current target round trip time for a download request
2087
- // to complete in.
2088
- //
2089
- // Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
2090
- // the downloader tries to adapt queries to the RTT, so multiple RTT values can
2091
- // be adapted to, but smaller ones are preferred (stabler download stream).
2092
- func (d * Downloader ) requestRTT () time.Duration {
2093
- return time .Duration (atomic .LoadUint64 (& d .rttEstimate )) * 9 / 10
2094
- }
2095
-
2096
- // requestTTL returns the current timeout allowance for a single download request
2097
- // to finish under.
2098
- func (d * Downloader ) requestTTL () time.Duration {
2099
- var (
2100
- rtt = time .Duration (atomic .LoadUint64 (& d .rttEstimate ))
2101
- conf = float64 (atomic .LoadUint64 (& d .rttConfidence )) / 1000000.0
2102
- )
2103
- ttl := time .Duration (ttlScaling ) * time .Duration (float64 (rtt )/ conf )
2104
- if ttl > ttlLimit {
2105
- ttl = ttlLimit
2106
- }
2107
- return ttl
2108
- }
0 commit comments