Skip to content

Commit 8647233

Browse files
authored
les: fix balance expiration (#22343)
* les/lespay/server: fix balance expiration and add test * les: move client balances to a new db * les: rename lespayDb to lesDb
1 parent c027507 commit 8647233

File tree

6 files changed

+92
-9
lines changed

6 files changed

+92
-9
lines changed

les/client.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
8181
if err != nil {
8282
return nil, err
8383
}
84-
lespayDb, err := stack.OpenDatabase("lespay", 0, 0, "eth/db/lespay")
84+
lesDb, err := stack.OpenDatabase("les.client", 0, 0, "eth/db/les.client")
8585
if err != nil {
8686
return nil, err
8787
}
@@ -99,6 +99,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
9999
chainConfig: chainConfig,
100100
iConfig: light.DefaultClientIndexerConfig,
101101
chainDb: chainDb,
102+
lesDb: lesDb,
102103
closeCh: make(chan struct{}),
103104
},
104105
peers: peers,
@@ -108,13 +109,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
108109
engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &config.Ethash, nil, false, chainDb),
109110
bloomRequests: make(chan chan *bloombits.Retrieval),
110111
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
111-
valueTracker: vfc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
112+
valueTracker: vfc.NewValueTracker(lesDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
112113
p2pServer: stack.Server(),
113114
p2pConfig: &stack.Config().P2P,
114115
}
115116
peers.subscribe((*vtSubscription)(leth.valueTracker))
116117

117-
leth.serverPool = newServerPool(lespayDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
118+
leth.serverPool = newServerPool(lesDb, []byte("serverpool:"), leth.valueTracker, time.Second, nil, &mclock.System{}, config.UltraLightServers)
118119
peers.subscribe(leth.serverPool)
119120
leth.dialCandidates = leth.serverPool.dialIterator
120121

@@ -331,6 +332,7 @@ func (s *LightEthereum) Stop() error {
331332
s.eventMux.Stop()
332333
rawdb.PopUncleanShutdownMarker(s.chainDb)
333334
s.chainDb.Close()
335+
s.lesDb.Close()
334336
s.wg.Wait()
335337
log.Info("Light ethereum stopped")
336338
return nil

les/clientpool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ type clientInfo struct {
105105
}
106106

107107
// newClientPool creates a new client pool
108-
func newClientPool(ns *nodestate.NodeStateMachine, lespayDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
108+
func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
109109
pool := &clientPool{
110110
ns: ns,
111111
BalanceTrackerSetup: balanceTrackerSetup,
@@ -115,7 +115,7 @@ func newClientPool(ns *nodestate.NodeStateMachine, lespayDb ethdb.Database, minC
115115
connectedBias: connectedBias,
116116
removePeer: removePeer,
117117
}
118-
pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lespayDb, clock, &utils.Expirer{}, &utils.Expirer{})
118+
pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{})
119119
pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
120120

121121
// set default expiration constants used by tests

les/commons.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type lesCommons struct {
5151
config *ethconfig.Config
5252
chainConfig *params.ChainConfig
5353
iConfig *light.IndexerConfig
54-
chainDb ethdb.Database
54+
chainDb, lesDb ethdb.Database
5555
chainReader chainReader
5656
chtIndexer, bloomTrieIndexer *core.ChainIndexer
5757
oracle *checkpointoracle.CheckpointOracle

les/server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ type LesServer struct {
8585
}
8686

8787
func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*LesServer, error) {
88+
lesDb, err := node.OpenDatabase("les.server", 0, 0, "eth/db/les.server")
89+
if err != nil {
90+
return nil, err
91+
}
8892
ns := nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
8993
// Calculate the number of threads used to service the light client
9094
// requests based on the user-specified value.
@@ -99,6 +103,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
99103
chainConfig: e.BlockChain().Config(),
100104
iConfig: light.DefaultServerIndexerConfig,
101105
chainDb: e.ChainDb(),
106+
lesDb: lesDb,
102107
chainReader: e.BlockChain(),
103108
chtIndexer: light.NewChtIndexer(e.ChainDb(), nil, params.CHTFrequency, params.HelperTrieProcessConfirmations, true),
104109
bloomTrieIndexer: light.NewBloomTrieIndexer(e.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency, true),
@@ -136,7 +141,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
136141
srv.maxCapacity = totalRecharge
137142
}
138143
srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
139-
srv.clientPool = newClientPool(ns, srv.chainDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient)
144+
srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient)
140145
srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})
141146

142147
checkpoint := srv.latestLocalCheckpoint()
@@ -222,6 +227,7 @@ func (s *LesServer) Stop() error {
222227

223228
// Note, bloom trie indexer is closed by parent bloombits indexer.
224229
s.chtIndexer.Close()
230+
s.lesDb.Close()
225231
s.wg.Wait()
226232
log.Info("Les server stopped")
227233

les/vflux/server/balance_test.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package server
1818

1919
import (
20+
"math"
2021
"math/rand"
2122
"reflect"
2223
"testing"
@@ -69,7 +70,9 @@ func (b *balanceTestSetup) newNode(capacity uint64) *NodeBalance {
6970
node := enode.SignNull(&enr.Record{}, enode.ID{})
7071
b.ns.SetState(node, testFlag, nodestate.Flags{}, 0)
7172
b.ns.SetField(node, btTestSetup.connAddressField, "")
72-
b.ns.SetField(node, ppTestSetup.CapacityField, capacity)
73+
if capacity != 0 {
74+
b.ns.SetField(node, ppTestSetup.CapacityField, capacity)
75+
}
7376
n, _ := b.ns.GetField(node, btTestSetup.BalanceField).(*NodeBalance)
7477
return n
7578
}
@@ -398,3 +401,71 @@ func TestCallback(t *testing.T) {
398401
case <-time.NewTimer(time.Millisecond * 100).C:
399402
}
400403
}
404+
405+
func TestBalancePersistence(t *testing.T) {
406+
clock := &mclock.Simulated{}
407+
ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
408+
db := memorydb.New()
409+
posExp := &utils.Expirer{}
410+
negExp := &utils.Expirer{}
411+
posExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour*2)) // halves every two hours
412+
negExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour)) // halves every hour
413+
bt := NewBalanceTracker(ns, btTestSetup, db, clock, posExp, negExp)
414+
ns.Start()
415+
bts := &balanceTestSetup{
416+
clock: clock,
417+
ns: ns,
418+
bt: bt,
419+
}
420+
var nb *NodeBalance
421+
exp := func(expPos, expNeg uint64) {
422+
pos, neg := nb.GetBalance()
423+
if pos != expPos {
424+
t.Fatalf("Positive balance incorrect, want %v, got %v", expPos, pos)
425+
}
426+
if neg != expNeg {
427+
t.Fatalf("Positive balance incorrect, want %v, got %v", expPos, pos)
428+
}
429+
}
430+
expTotal := func(expTotal uint64) {
431+
total := bt.TotalTokenAmount()
432+
if total != expTotal {
433+
t.Fatalf("Total token amount incorrect, want %v, got %v", expTotal, total)
434+
}
435+
}
436+
437+
expTotal(0)
438+
nb = bts.newNode(0)
439+
expTotal(0)
440+
nb.SetBalance(16000000000, 16000000000)
441+
exp(16000000000, 16000000000)
442+
expTotal(16000000000)
443+
clock.Run(time.Hour * 2)
444+
exp(8000000000, 4000000000)
445+
expTotal(8000000000)
446+
bt.Stop()
447+
ns.Stop()
448+
449+
clock = &mclock.Simulated{}
450+
ns = nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
451+
posExp = &utils.Expirer{}
452+
negExp = &utils.Expirer{}
453+
posExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour*2)) // halves every two hours
454+
negExp.SetRate(clock.Now(), math.Log(2)/float64(time.Hour)) // halves every hour
455+
bt = NewBalanceTracker(ns, btTestSetup, db, clock, posExp, negExp)
456+
ns.Start()
457+
bts = &balanceTestSetup{
458+
clock: clock,
459+
ns: ns,
460+
bt: bt,
461+
}
462+
expTotal(8000000000)
463+
nb = bts.newNode(0)
464+
exp(8000000000, 4000000000)
465+
expTotal(8000000000)
466+
clock.Run(time.Hour * 2)
467+
exp(4000000000, 1000000000)
468+
expTotal(4000000000)
469+
bt.Stop()
470+
ns.Stop()
471+
}

les/vflux/server/balance_tracker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ func NewBalanceTracker(ns *nodestate.NodeStateMachine, setup BalanceTrackerSetup
9999
balanceTimer: utils.NewUpdateTimer(clock, time.Second*10),
100100
quit: make(chan struct{}),
101101
}
102+
posOffset, negOffset := bt.ndb.getExpiration()
103+
posExp.SetLogOffset(clock.Now(), posOffset)
104+
negExp.SetLogOffset(clock.Now(), negOffset)
105+
102106
bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool {
103107
bt.inactive.AddExp(balance)
104108
return true
@@ -177,7 +181,7 @@ func (bt *BalanceTracker) TotalTokenAmount() uint64 {
177181
bt.balanceTimer.Update(func(_ time.Duration) bool {
178182
bt.active = utils.ExpiredValue{}
179183
bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
180-
if n, ok := bt.ns.GetField(node, bt.BalanceField).(*NodeBalance); ok {
184+
if n, ok := bt.ns.GetField(node, bt.BalanceField).(*NodeBalance); ok && n.active {
181185
pos, _ := n.GetRawBalance()
182186
bt.active.AddExp(pos)
183187
}

0 commit comments

Comments
 (0)