Skip to content

Commit dc109cc

Browse files
authored
les: move server pool to les/vflux/client (#22377)
* les: move serverPool to les/vflux/client * les: add metrics * les: moved ValueTracker inside ServerPool * les: protect against node registration before server pool is started * les/vflux/client: fixed tests * les: make peer registration safe
1 parent de9465f commit dc109cc

File tree

9 files changed

+206
-198
lines changed

9 files changed

+206
-198
lines changed

internal/web3ext/web3ext.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ var Modules = map[string]string{
3333
"swarmfs": SwarmfsJs,
3434
"txpool": TxpoolJs,
3535
"les": LESJs,
36-
"lespay": LESPayJs,
36+
"vflux": VfluxJs,
3737
}
3838

3939
const ChequebookJs = `
@@ -877,32 +877,32 @@ web3._extend({
877877
});
878878
`
879879

880-
const LESPayJs = `
880+
const VfluxJs = `
881881
web3._extend({
882-
property: 'lespay',
882+
property: 'vflux',
883883
methods:
884884
[
885885
new web3._extend.Method({
886886
name: 'distribution',
887-
call: 'lespay_distribution',
887+
call: 'vflux_distribution',
888888
params: 2
889889
}),
890890
new web3._extend.Method({
891891
name: 'timeout',
892-
call: 'lespay_timeout',
892+
call: 'vflux_timeout',
893893
params: 2
894894
}),
895895
new web3._extend.Method({
896896
name: 'value',
897-
call: 'lespay_value',
897+
call: 'vflux_value',
898898
params: 2
899899
}),
900900
],
901901
properties:
902902
[
903903
new web3._extend.Property({
904904
name: 'requestStats',
905-
getter: 'lespay_requestStats'
905+
getter: 'vflux_requestStats'
906906
}),
907907
]
908908
});

les/client.go

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ type LightEthereum struct {
5757
handler *clientHandler
5858
txPool *light.TxPool
5959
blockchain *light.LightChain
60-
serverPool *serverPool
61-
valueTracker *vfc.ValueTracker
60+
serverPool *vfc.ServerPool
6261
dialCandidates enode.Iterator
6362
pruner *pruner
6463

@@ -109,17 +108,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
109108
engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb),
110109
bloomRequests: make(chan chan *bloombits.Retrieval),
111110
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
112-
valueTracker: vfc.NewValueTracker(lesDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
113111
p2pServer: stack.Server(),
114112
p2pConfig: &stack.Config().P2P,
115113
}
116-
peers.subscribe((*vtSubscription)(leth.valueTracker))
117114

118-
leth.serverPool = newServerPool(lesDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
119-
peers.subscribe(leth.serverPool)
120-
leth.dialCandidates = leth.serverPool.dialIterator
115+
leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
116+
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)
121117

122-
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout)
118+
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
123119
leth.relay = newLesTxRelay(peers, leth.retriever)
124120

125121
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.peers, leth.retriever)
@@ -193,23 +189,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
193189
return leth, nil
194190
}
195191

196-
// vtSubscription implements serverPeerSubscriber
197-
type vtSubscription vfc.ValueTracker
198-
199-
// registerPeer implements serverPeerSubscriber
200-
func (v *vtSubscription) registerPeer(p *serverPeer) {
201-
vt := (*vfc.ValueTracker)(v)
202-
p.setValueTracker(vt, vt.Register(p.ID()))
203-
p.updateVtParams()
204-
}
205-
206-
// unregisterPeer implements serverPeerSubscriber
207-
func (v *vtSubscription) unregisterPeer(p *serverPeer) {
208-
vt := (*vfc.ValueTracker)(v)
209-
vt.Unregister(p.ID())
210-
p.setValueTracker(nil, nil)
211-
}
212-
213192
type LightDummyAPI struct{}
214193

215194
// Etherbase is the address that mining rewards will be send to
@@ -266,7 +245,7 @@ func (s *LightEthereum) APIs() []rpc.API {
266245
}, {
267246
Namespace: "vflux",
268247
Version: "1.0",
269-
Service: vfc.NewPrivateClientAPI(s.valueTracker),
248+
Service: s.serverPool.API(),
270249
Public: false,
271250
},
272251
}...)
@@ -302,8 +281,8 @@ func (s *LightEthereum) Start() error {
302281
if err != nil {
303282
return err
304283
}
305-
s.serverPool.addSource(discovery)
306-
s.serverPool.start()
284+
s.serverPool.AddSource(discovery)
285+
s.serverPool.Start()
307286
// Start bloom request workers.
308287
s.wg.Add(bloomServiceThreads)
309288
s.startBloomHandlers(params.BloomBitsBlocksClient)
@@ -316,8 +295,7 @@ func (s *LightEthereum) Start() error {
316295
// Ethereum protocol.
317296
func (s *LightEthereum) Stop() error {
318297
close(s.closeCh)
319-
s.serverPool.stop()
320-
s.valueTracker.Stop()
298+
s.serverPool.Stop()
321299
s.peers.close()
322300
s.reqDist.close()
323301
s.odr.Stop()

les/client_handler.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,25 @@ func (h *clientHandler) handle(p *serverPeer) error {
114114
p.Log().Debug("Light Ethereum handshake failed", "err", err)
115115
return err
116116
}
117+
// Register peer with the server pool
118+
if h.backend.serverPool != nil {
119+
if nvt, err := h.backend.serverPool.RegisterNode(p.Node()); err == nil {
120+
p.setValueTracker(nvt)
121+
p.updateVtParams()
122+
defer func() {
123+
p.setValueTracker(nil)
124+
h.backend.serverPool.UnregisterNode(p.Node())
125+
}()
126+
} else {
127+
return err
128+
}
129+
}
117130
// Register the peer locally
118131
if err := h.backend.peers.register(p); err != nil {
119132
p.Log().Error("Light Ethereum peer registration failed", "err", err)
120133
return err
121134
}
135+
122136
serverConnectionGauge.Update(int64(h.backend.peers.len()))
123137

124138
connectedAt := mclock.Now()

les/peer.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,6 @@ type serverPeer struct {
349349

350350
fcServer *flowcontrol.ServerNode // Client side mirror token bucket.
351351
vtLock sync.Mutex
352-
valueTracker *vfc.ValueTracker
353352
nodeValueTracker *vfc.NodeValueTracker
354353
sentReqs map[uint64]sentReqEntry
355354

@@ -676,9 +675,8 @@ func (p *serverPeer) Handshake(genesis common.Hash, forkid forkid.ID, forkFilter
676675

677676
// setValueTracker sets the value tracker references for connected servers. Note that the
678677
// references should be removed upon disconnection by setValueTracker(nil, nil).
679-
func (p *serverPeer) setValueTracker(vt *vfc.ValueTracker, nvt *vfc.NodeValueTracker) {
678+
func (p *serverPeer) setValueTracker(nvt *vfc.NodeValueTracker) {
680679
p.vtLock.Lock()
681-
p.valueTracker = vt
682680
p.nodeValueTracker = nvt
683681
if nvt != nil {
684682
p.sentReqs = make(map[uint64]sentReqEntry)
@@ -705,7 +703,7 @@ func (p *serverPeer) updateVtParams() {
705703
}
706704
}
707705
}
708-
p.valueTracker.UpdateCosts(p.nodeValueTracker, reqCosts)
706+
p.nodeValueTracker.UpdateCosts(reqCosts)
709707
}
710708

711709
// sentReqEntry remembers sent requests and their sending times
@@ -732,7 +730,6 @@ func (p *serverPeer) answeredRequest(id uint64) {
732730
}
733731
e, ok := p.sentReqs[id]
734732
delete(p.sentReqs, id)
735-
vt := p.valueTracker
736733
nvt := p.nodeValueTracker
737734
p.vtLock.Unlock()
738735
if !ok {
@@ -752,7 +749,7 @@ func (p *serverPeer) answeredRequest(id uint64) {
752749
vtReqs[1] = vfc.ServedRequest{ReqType: uint32(m.rest), Amount: e.amount - 1}
753750
}
754751
dt := time.Duration(mclock.Now() - e.at)
755-
vt.Served(nvt, vtReqs[:reqCount], dt)
752+
nvt.Served(vtReqs[:reqCount], dt)
756753
}
757754

758755
// clientPeer represents each node to which the les server is connected.

les/vflux/client/queueiterator_test.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,6 @@ import (
2626
"github.com/ethereum/go-ethereum/p2p/nodestate"
2727
)
2828

29-
func testNodeID(i int) enode.ID {
30-
return enode.ID{42, byte(i % 256), byte(i / 256)}
31-
}
32-
33-
func testNodeIndex(id enode.ID) int {
34-
if id[0] != 42 {
35-
return -1
36-
}
37-
return int(id[1]) + int(id[2])*256
38-
}
39-
4029
func testNode(i int) *enode.Node {
4130
return enode.SignNull(new(enr.Record), testNodeID(i))
4231
}

0 commit comments

Comments
 (0)