@@ -18,6 +18,7 @@ package les
18
18
19
19
import (
20
20
"context"
21
+ "sort"
21
22
"time"
22
23
23
24
"github.com/ethereum/go-ethereum/common/mclock"
@@ -31,14 +32,16 @@ type LesOdr struct {
31
32
db ethdb.Database
32
33
indexerConfig * light.IndexerConfig
33
34
chtIndexer , bloomTrieIndexer , bloomIndexer * core.ChainIndexer
35
+ peers * serverPeerSet
34
36
retriever * retrieveManager
35
37
stop chan struct {}
36
38
}
37
39
38
- func NewLesOdr (db ethdb.Database , config * light.IndexerConfig , retriever * retrieveManager ) * LesOdr {
40
+ func NewLesOdr (db ethdb.Database , config * light.IndexerConfig , peers * serverPeerSet , retriever * retrieveManager ) * LesOdr {
39
41
return & LesOdr {
40
42
db : db ,
41
43
indexerConfig : config ,
44
+ peers : peers ,
42
45
retriever : retriever ,
43
46
stop : make (chan struct {}),
44
47
}
@@ -98,7 +101,101 @@ type Msg struct {
98
101
Obj interface {}
99
102
}
100
103
101
- // Retrieve tries to fetch an object from the LES network.
104
+ // peerByTxHistory is a heap.Interface implementation which can sort
105
+ // the peerset by transaction history.
106
+ type peerByTxHistory []* serverPeer
107
+
108
+ func (h peerByTxHistory ) Len () int { return len (h ) }
109
+ func (h peerByTxHistory ) Less (i , j int ) bool {
110
+ if h [i ].txHistory == txIndexUnlimited {
111
+ return false
112
+ }
113
+ if h [j ].txHistory == txIndexUnlimited {
114
+ return true
115
+ }
116
+ return h [i ].txHistory < h [j ].txHistory
117
+ }
118
+ func (h peerByTxHistory ) Swap (i , j int ) { h [i ], h [j ] = h [j ], h [i ] }
119
+
120
+ const (
121
+ maxTxStatusRetry = 3 // The maximum retrys will be made for tx status request.
122
+ maxTxStatusCandidates = 5 // The maximum les servers the tx status requests will be sent to.
123
+ )
124
+
125
+ // RetrieveTxStatus retrieves the transaction status from the LES network.
126
+ // There is no guarantee in the LES protocol that the mined transaction will
127
+ // be retrieved back for sure because of different reasons(the transaction
128
+ // is unindexed, the malicous server doesn't reply it deliberately, etc).
129
+ // Therefore, unretrieved transactions(UNKNOWN) will receive a certain number
130
+ // of retrys, thus giving a weak guarantee.
131
+ func (odr * LesOdr ) RetrieveTxStatus (ctx context.Context , req * light.TxStatusRequest ) error {
132
+ // Sort according to the transaction history supported by the peer and
133
+ // select the peers with longest history.
134
+ var (
135
+ retrys int
136
+ peers []* serverPeer
137
+ missing = len (req .Hashes )
138
+ result = make ([]light.TxStatus , len (req .Hashes ))
139
+ canSend = make (map [string ]bool )
140
+ )
141
+ for _ , peer := range odr .peers .allPeers () {
142
+ if peer .txHistory == txIndexDisabled {
143
+ continue
144
+ }
145
+ peers = append (peers , peer )
146
+ }
147
+ sort .Sort (sort .Reverse (peerByTxHistory (peers )))
148
+ for i := 0 ; i < maxTxStatusCandidates && i < len (peers ); i ++ {
149
+ canSend [peers [i ].id ] = true
150
+ }
151
+ // Send out the request and assemble the result.
152
+ for {
153
+ if retrys >= maxTxStatusRetry {
154
+ break
155
+ }
156
+ var (
157
+ // Deep copy the request, so that the partial result won't be mixed.
158
+ req = & TxStatusRequest {Hashes : req .Hashes }
159
+ id = genReqID ()
160
+ distreq = & distReq {
161
+ getCost : func (dp distPeer ) uint64 { return req .GetCost (dp .(* serverPeer )) },
162
+ canSend : func (dp distPeer ) bool { return canSend [dp .(* serverPeer ).id ] },
163
+ request : func (dp distPeer ) func () {
164
+ p := dp .(* serverPeer )
165
+ p .fcServer .QueuedRequest (id , req .GetCost (p ))
166
+ delete (canSend , p .id )
167
+ return func () { req .Request (id , p ) }
168
+ },
169
+ }
170
+ )
171
+ if err := odr .retriever .retrieve (ctx , id , distreq , func (p distPeer , msg * Msg ) error { return req .Validate (odr .db , msg ) }, odr .stop ); err != nil {
172
+ return err
173
+ }
174
+ // Collect the response and assemble them to the final result.
175
+ // All the response is not verifiable, so always pick the first
176
+ // one we get.
177
+ for index , status := range req .Status {
178
+ if result [index ].Status != core .TxStatusUnknown {
179
+ continue
180
+ }
181
+ if status .Status == core .TxStatusUnknown {
182
+ continue
183
+ }
184
+ result [index ], missing = status , missing - 1
185
+ }
186
+ // Abort the procedure if all the status are retrieved
187
+ if missing == 0 {
188
+ break
189
+ }
190
+ retrys += 1
191
+ }
192
+ req .Status = result
193
+ return nil
194
+ }
195
+
196
+ // Retrieve tries to fetch an object from the LES network. It's a common API
197
+ // for most of the LES requests except for the TxStatusRequest which needs
198
+ // the addtional retry mechanism.
102
199
// If the network retrieval was successful, it stores the object in local db.
103
200
func (odr * LesOdr ) Retrieve (ctx context.Context , req light.OdrRequest ) (err error ) {
104
201
lreq := LesRequest (req )
0 commit comments