Skip to content

Commit b434d76

Browse files
committed
les: handle conn/disc/reg logic in the eventloop
1 parent 8c4a7fa commit b434d76

File tree

1 file changed

+135
-58
lines changed

1 file changed

+135
-58
lines changed

les/serverpool.go

Lines changed: 135 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,26 @@ const (
8787
initStatsWeight = 1
8888
)
8989

90+
// connReq represents a request for peer connection.
91+
type connReq struct {
92+
p *peer
93+
ip net.IP
94+
port uint16
95+
cont chan *poolEntry
96+
}
97+
98+
// discReq represents a request for peer disconnection.
99+
type discReq struct {
100+
entry *poolEntry
101+
cont chan struct{}
102+
}
103+
104+
// registerReq represents a request for peer registration.
105+
type registerReq struct {
106+
entry *poolEntry
107+
cont chan struct{}
108+
}
109+
90110
// serverPool implements a pool for storing and selecting newly discovered and already
91111
// known light server nodes. It received discovered nodes, stores statistics about
92112
// known nodes and takes care of always having enough good quality servers connected.
@@ -109,6 +129,10 @@ type serverPool struct {
109129
timeout, enableRetry chan *poolEntry
110130
adjustStats chan poolStatAdjust
111131

132+
connCh chan *connReq
133+
discCh chan *discReq
134+
registerCh chan *registerReq
135+
112136
knownQueue, newQueue poolEntryQueue
113137
knownSelect, newSelect *weightedRandomSelect
114138
knownSelected, newSelected int
@@ -125,6 +149,9 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s
125149
timeout: make(chan *poolEntry, 1),
126150
adjustStats: make(chan poolStatAdjust, 100),
127151
enableRetry: make(chan *poolEntry, 1),
152+
connCh: make(chan *connReq),
153+
discCh: make(chan *discReq),
154+
registerCh: make(chan *registerReq),
128155
knownSelect: newWeightedRandomSelect(),
129156
newSelect: newWeightedRandomSelect(),
130157
fastDiscover: true,
@@ -158,83 +185,73 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
158185
// Note that whenever a connection has been accepted and a pool entry has been returned,
159186
// disconnect should also always be called.
160187
func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
161-
pool.lock.Lock()
162-
defer pool.lock.Unlock()
163-
entry := pool.entries[p.ID()]
164-
if entry == nil {
165-
entry = pool.findOrNewNode(p.ID(), ip, port)
166-
}
167-
p.Log().Debug("Connecting to new peer", "state", entry.state)
168-
if entry.state == psConnected || entry.state == psRegistered {
188+
log.Debug("Connect new entry", "enode", p.id)
189+
req := &connReq{p: p, ip: ip, port: port, cont: make(chan *poolEntry, 1)}
190+
select {
191+
case pool.connCh <- req:
192+
case <-pool.quit:
169193
return nil
170194
}
171-
pool.connWg.Add(1)
172-
entry.peer = p
173-
entry.state = psConnected
174-
addr := &poolEntryAddress{
175-
ip: ip,
176-
port: port,
177-
lastSeen: mclock.Now(),
178-
}
179-
entry.lastConnected = addr
180-
entry.addr = make(map[string]*poolEntryAddress)
181-
entry.addr[addr.strKey()] = addr
182-
entry.addrSelect = *newWeightedRandomSelect()
183-
entry.addrSelect.update(addr)
184-
return entry
195+
return <-req.cont
185196
}
186197

187198
// registered should be called after a successful handshake
188199
func (pool *serverPool) registered(entry *poolEntry) {
189200
log.Debug("Registered new entry", "enode", entry.id)
190-
pool.lock.Lock()
191-
defer pool.lock.Unlock()
192-
193-
entry.state = psRegistered
194-
entry.regTime = mclock.Now()
195-
if !entry.known {
196-
pool.newQueue.remove(entry)
197-
entry.known = true
201+
req := &registerReq{entry: entry, cont: make(chan struct{})}
202+
select {
203+
case pool.registerCh <- req:
204+
case <-pool.quit:
205+
return
198206
}
199-
pool.knownQueue.setLatest(entry)
200-
entry.shortRetry = shortRetryCnt
207+
<-req.cont
208+
return
201209
}
202210

203211
// disconnect should be called when ending a connection. Service quality statistics
204212
// can be updated optionally (not updated if no registration happened, in this case
205213
// only connection statistics are updated, just like in case of timeout)
206214
func (pool *serverPool) disconnect(entry *poolEntry) {
207215
log.Debug("Disconnected old entry", "enode", entry.id)
208-
pool.lock.Lock()
209-
defer pool.lock.Unlock()
210-
211-
if entry.state == psRegistered {
212-
connTime := mclock.Now() - entry.regTime
213-
connAdjust := float64(connTime) / float64(targetConnTime)
214-
if connAdjust > 1 {
215-
connAdjust = 1
216-
}
217-
stopped := false
218-
select {
219-
case <-pool.quit:
220-
stopped = true
221-
default:
222-
}
223-
if stopped {
216+
stopped := false
217+
select {
218+
case <-pool.quit:
219+
stopped = true
220+
default:
221+
}
222+
223+
if stopped {
224+
// Request is emitted by ourselves, handle the logic here since eventloop doesn't
225+
// serve requests anymore.
226+
pool.lock.Lock()
227+
defer pool.lock.Unlock()
228+
229+
if entry.state == psRegistered {
230+
connTime := mclock.Now() - entry.regTime
231+
connAdjust := float64(connTime) / float64(targetConnTime)
232+
if connAdjust > 1 {
233+
connAdjust = 1
234+
}
224235
entry.connectStats.add(1, connAdjust)
236+
}
237+
entry.state = psNotConnected
238+
if entry.knownSelected {
239+
pool.knownSelected--
225240
} else {
226-
entry.connectStats.add(connAdjust, 1)
241+
pool.newSelected--
227242
}
228-
}
229-
230-
entry.state = psNotConnected
231-
if entry.knownSelected {
232-
pool.knownSelected--
243+
pool.setRetryDial(entry)
244+
pool.connWg.Done()
233245
} else {
234-
pool.newSelected--
246+
// Request is emitted by the server side
247+
req := &discReq{entry: entry, cont: make(chan struct{})}
248+
select {
249+
case pool.discCh <- req:
250+
case <-pool.quit:
251+
return
252+
}
253+
<-req.cont
235254
}
236-
pool.setRetryDial(entry)
237-
pool.connWg.Done()
238255
}
239256

240257
const (
@@ -327,6 +344,67 @@ func (pool *serverPool) eventLoop() {
327344
}
328345
}
329346

347+
case req := <-pool.connCh:
348+
// Handle peer connection requests.
349+
entry := pool.entries[req.p.ID()]
350+
if entry == nil {
351+
entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port)
352+
}
353+
req.p.Log().Debug("Connecting to new peer", "state", entry.state)
354+
if entry.state == psConnected || entry.state == psRegistered {
355+
req.cont <- nil
356+
continue
357+
}
358+
pool.connWg.Add(1)
359+
entry.peer = req.p
360+
entry.state = psConnected
361+
addr := &poolEntryAddress{
362+
ip: req.ip,
363+
port: req.port,
364+
lastSeen: mclock.Now(),
365+
}
366+
entry.lastConnected = addr
367+
entry.addr = make(map[string]*poolEntryAddress)
368+
entry.addr[addr.strKey()] = addr
369+
entry.addrSelect = *newWeightedRandomSelect()
370+
entry.addrSelect.update(addr)
371+
req.cont <- entry
372+
373+
case req := <-pool.registerCh:
374+
// Handle peer registration requests.
375+
entry := req.entry
376+
entry.state = psRegistered
377+
entry.regTime = mclock.Now()
378+
if !entry.known {
379+
pool.newQueue.remove(entry)
380+
entry.known = true
381+
}
382+
pool.knownQueue.setLatest(entry)
383+
entry.shortRetry = shortRetryCnt
384+
close(req.cont)
385+
386+
case req := <-pool.discCh:
387+
// Handle peer disconnection requests.
388+
entry := req.entry
389+
if entry.state == psRegistered {
390+
connTime := mclock.Now() - entry.regTime
391+
connAdjust := float64(connTime) / float64(targetConnTime)
392+
if connAdjust > 1 {
393+
connAdjust = 1
394+
}
395+
entry.connectStats.add(connAdjust, 1)
396+
}
397+
398+
entry.state = psNotConnected
399+
if entry.knownSelected {
400+
pool.knownSelected--
401+
} else {
402+
pool.newSelected--
403+
}
404+
pool.setRetryDial(entry)
405+
pool.connWg.Done()
406+
close(req.cont)
407+
330408
case <-pool.quit:
331409
if pool.discSetPeriod != nil {
332410
close(pool.discSetPeriod)
@@ -335,7 +413,6 @@ func (pool *serverPool) eventLoop() {
335413
pool.saveNodes()
336414
pool.wg.Done()
337415
return
338-
339416
}
340417
}
341418
}

0 commit comments

Comments
 (0)