Skip to content

Commit 4090dcc

Browse files
authored
swarm: remove unnecessary reqno for pending request tracking (libp2p#2460)
* swarm: remove unnecessary reqno for pending request tracking * use struct{} instead of bool for addrs map * use struct{} for pendingRequests map
1 parent 9edef5a commit 4090dcc

File tree

1 file changed

+25
-49
lines changed

1 file changed

+25
-49
lines changed

p2p/net/swarm/dial_worker.go

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ type addrDial struct {
5656
conn *Conn
5757
// err is the err on dialing the address
5858
err error
59-
// requests is the list of pendRequests interested in this dial
60-
// the value in the slice is the request number assigned to this request by the dialWorker
61-
requests []int
6259
// dialed indicates whether we have triggered the dial to the address
6360
dialed bool
6461
// createdAt is the time this struct was created
@@ -74,13 +71,9 @@ type dialWorker struct {
7471
peer peer.ID
7572
// reqch is used to send dial requests to the worker. close reqch to end the worker loop
7673
reqch <-chan dialRequest
77-
// reqno is the request number used to track different dialRequests for a peer.
78-
// Each incoming request is assigned a reqno. This reqno is used in pendingRequests and in
79-
// addrDial objects in trackedDials to track this request
80-
reqno int
81-
// pendingRequests maps reqno to the pendRequest object for a dialRequest
82-
pendingRequests map[int]*pendRequest
83-
// trackedDials tracks dials to the peers addresses. An entry here is used to ensure that
74+
// pendingRequests is the set of pendingRequests
75+
pendingRequests map[*pendRequest]struct{}
76+
// trackedDials tracks dials to the peer's addresses. An entry here is used to ensure that
8477
// we dial an address at most once
8578
trackedDials map[string]*addrDial
8679
// resch is used to receive response for dials to the peers addresses.
@@ -101,7 +94,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dia
10194
s: s,
10295
peer: p,
10396
reqch: reqch,
104-
pendingRequests: make(map[int]*pendRequest),
97+
pendingRequests: make(map[*pendRequest]struct{}),
10598
trackedDials: make(map[string]*addrDial),
10699
resch: make(chan dialResult),
107100
cl: cl,
@@ -232,10 +225,8 @@ loop:
232225
continue loop
233226
}
234227

235-
// The request has some pending or new dials. We assign this request a request number.
236-
// This value of w.reqno is used to track this request in all the structures
237-
w.reqno++
238-
w.pendingRequests[w.reqno] = pr
228+
// The request has some pending or new dials
229+
w.pendingRequests[pr] = struct{}{}
239230

240231
for _, ad := range tojoin {
241232
if !ad.dialed {
@@ -253,7 +244,6 @@ loop:
253244
}
254245
}
255246
// add the request to the addrDial
256-
ad.requests = append(ad.requests, w.reqno)
257247
}
258248

259249
if len(todial) > 0 {
@@ -263,7 +253,6 @@ loop:
263253
w.trackedDials[string(a.Bytes())] = &addrDial{
264254
addr: a,
265255
ctx: req.ctx,
266-
requests: []int{w.reqno},
267256
createdAt: now,
268257
}
269258
dq.Add(network.AddrDelay{Addr: a, Delay: addrDelay[string(a.Bytes())]})
@@ -328,20 +317,14 @@ loop:
328317
continue loop
329318
}
330319

331-
// request succeeded, respond to all pending requests
332-
for _, reqno := range ad.requests {
333-
pr, ok := w.pendingRequests[reqno]
334-
if !ok {
335-
// some other dial for this request succeeded before this one
336-
continue
320+
for pr := range w.pendingRequests {
321+
if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok {
322+
pr.req.resch <- dialResponse{conn: conn}
323+
delete(w.pendingRequests, pr)
337324
}
338-
pr.req.resch <- dialResponse{conn: conn}
339-
delete(w.pendingRequests, reqno)
340325
}
341326

342327
ad.conn = conn
343-
ad.requests = nil
344-
345328
if !w.connected {
346329
w.connected = true
347330
if w.s.metricsTracer != nil {
@@ -375,33 +358,26 @@ loop:
375358
// dispatches an error to a specific addr dial
376359
func (w *dialWorker) dispatchError(ad *addrDial, err error) {
377360
ad.err = err
378-
for _, reqno := range ad.requests {
379-
pr, ok := w.pendingRequests[reqno]
380-
if !ok {
381-
// some other dial for this request succeeded before this one
382-
continue
383-
}
384-
361+
for pr := range w.pendingRequests {
385362
// accumulate the error
386-
pr.err.recordErr(ad.addr, err)
387-
388-
delete(pr.addrs, string(ad.addr.Bytes()))
389-
if len(pr.addrs) == 0 {
390-
// all addrs have erred, dispatch dial error
391-
// but first do a last one check in case an acceptable connection has landed from
392-
// a simultaneous dial that started later and added new acceptable addrs
393-
c, _ := w.s.bestAcceptableConnToPeer(pr.req.ctx, w.peer)
394-
if c != nil {
395-
pr.req.resch <- dialResponse{conn: c}
396-
} else {
397-
pr.req.resch <- dialResponse{err: pr.err}
363+
if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok {
364+
pr.err.recordErr(ad.addr, err)
365+
delete(pr.addrs, string(ad.addr.Bytes()))
366+
if len(pr.addrs) == 0 {
367+
// all addrs have erred, dispatch dial error
368+
// but first do a last one check in case an acceptable connection has landed from
369+
// a simultaneous dial that started later and added new acceptable addrs
370+
c, _ := w.s.bestAcceptableConnToPeer(pr.req.ctx, w.peer)
371+
if c != nil {
372+
pr.req.resch <- dialResponse{conn: c}
373+
} else {
374+
pr.req.resch <- dialResponse{err: pr.err}
375+
}
376+
delete(w.pendingRequests, pr)
398377
}
399-
delete(w.pendingRequests, reqno)
400378
}
401379
}
402380

403-
ad.requests = nil
404-
405381
// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests.
406382
// this is necessary to support active listen scenarios, where a new dial comes in while
407383
// another dial is in progress, and needs to do a direct connection without inhibitions from

0 commit comments

Comments
 (0)