Skip to content

Commit a1a8e04

Browse files
codesomegouthamve
authored andcommitted
Flush tokens to disk (#1750)
* Flush tokens to file Signed-off-by: Ganesh Vernekar <[email protected]> * Read tokens from file on startup Signed-off-by: Ganesh Vernekar <[email protected]> * Fix tests Signed-off-by: Ganesh Vernekar <[email protected]> * Don't flush on empty directory flag Signed-off-by: Ganesh Vernekar <[email protected]> * Unit test Signed-off-by: Ganesh Vernekar <[email protected]> * Fix some bugs Signed-off-by: Ganesh Vernekar <[email protected]> * Fix integration test Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * Add interface for tokens with a SimpleListTokens type Signed-off-by: Ganesh Vernekar <[email protected]> * Remove Tokens interface Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * Vendor fix Signed-off-by: Ganesh Vernekar <[email protected]> * Fix review comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix vendoring Signed-off-by: Ganesh Vernekar <[email protected]> * Fix Marco's comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix Peter's comments Signed-off-by: Ganesh Vernekar <[email protected]> * Fix Goutham's and Peter's comment Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 08ddf88 commit a1a8e04

File tree

7 files changed

+244
-31
lines changed

7 files changed

+244
-31
lines changed

pkg/ring/lifecycler.go

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type LifecyclerConfig struct {
5353
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
5454
InfNames []string `yaml:"interface_names"`
5555
FinalSleep time.Duration `yaml:"final_sleep"`
56+
TokensFilePath string `yaml:"tokens_file_path,omitempty"`
5657

5758
// For testing, you can override the address and ID of this ingester
5859
Addr string `yaml:"address"`
@@ -84,6 +85,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
8485
flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.")
8586
f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
8687
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
88+
f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
8789

8890
hostname, err := os.Hostname()
8991
if err != nil {
@@ -125,7 +127,7 @@ type Lifecycler struct {
125127
// back empty. And it changes during lifecycle of ingester.
126128
stateMtx sync.Mutex
127129
state IngesterState
128-
tokens []uint32
130+
tokens Tokens
129131

130132
// Controls the ready-reporting
131133
readyLock sync.Mutex
@@ -246,18 +248,24 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
246248
return <-err
247249
}
248250

249-
func (i *Lifecycler) getTokens() []uint32 {
251+
func (i *Lifecycler) getTokens() Tokens {
250252
i.stateMtx.Lock()
251253
defer i.stateMtx.Unlock()
252254
return i.tokens
253255
}
254256

255-
func (i *Lifecycler) setTokens(tokens []uint32) {
257+
func (i *Lifecycler) setTokens(tokens Tokens) {
256258
tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens)))
257259

258260
i.stateMtx.Lock()
259261
defer i.stateMtx.Unlock()
262+
260263
i.tokens = tokens
264+
if i.cfg.TokensFilePath != "" {
265+
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
266+
level.Error(util.Logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
267+
}
268+
}
261269
}
262270

263271
// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
@@ -270,7 +278,7 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro
270278
err := make(chan error)
271279

272280
i.actorChan <- func() {
273-
var tokens []uint32
281+
var tokens Tokens
274282

275283
claimTokens := func(in interface{}) (out interface{}, retry bool, err error) {
276284
ringDesc, ok := in.(*Desc)
@@ -445,9 +453,22 @@ heartbeatLoop:
445453
// - add an ingester entry to the ring
446454
// - copies out our state and tokens if they exist
447455
func (i *Lifecycler) initRing(ctx context.Context) error {
448-
var ringDesc *Desc
456+
var (
457+
ringDesc *Desc
458+
tokensFromFile Tokens
459+
err error
460+
)
461+
462+
if i.cfg.TokensFilePath != "" {
463+
tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath)
464+
if err != nil {
465+
level.Error(util.Logger).Log("msg", "error in getting tokens from file", "err", err)
466+
}
467+
} else {
468+
level.Info(util.Logger).Log("msg", "not loading tokens from file, tokens file path is empty")
469+
}
449470

450-
err := i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
471+
err = i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
451472
if in == nil {
452473
ringDesc = NewDesc()
453474
} else {
@@ -456,6 +477,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
456477

457478
ingesterDesc, ok := ringDesc.Ingesters[i.ID]
458479
if !ok {
480+
// We use the tokens from the file only if it does not exist in the ring yet.
481+
if len(tokensFromFile) > 0 {
482+
level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
483+
if len(tokensFromFile) >= i.cfg.NumTokens {
484+
i.setState(ACTIVE)
485+
}
486+
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens)
487+
i.setTokens(tokensFromFile)
488+
return ringDesc, true, nil
489+
}
490+
459491
// Either we are a new ingester, or consul must have restarted
460492
level.Info(util.Logger).Log("msg", "entry not found in ring, adding with no tokens")
461493
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens)
@@ -505,7 +537,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
505537
newTokens := GenerateTokens(needTokens, takenTokens)
506538

507539
ringTokens = append(ringTokens, newTokens...)
508-
sort.Sort(sortableUint32(ringTokens))
540+
sort.Sort(ringTokens)
509541

510542
ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState(), i.cfg.NormaliseTokens)
511543

@@ -527,11 +559,11 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
527559
return result
528560
}
529561

530-
func (i *Lifecycler) compareTokens(fromRing []uint32) bool {
531-
sort.Sort(sortableUint32(fromRing))
562+
func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
563+
sort.Sort(fromRing)
532564

533565
tokens := i.getTokens()
534-
sort.Sort(sortableUint32(tokens))
566+
sort.Sort(tokens)
535567

536568
if len(tokens) != len(fromRing) {
537569
return false
@@ -566,9 +598,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er
566598
i.setState(targetState)
567599
ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)
568600

569-
tokens := append(myTokens, newTokens...)
570-
sort.Sort(sortableUint32(tokens))
571-
i.setTokens(tokens)
601+
myTokens = append(myTokens, newTokens...)
602+
sort.Sort(myTokens)
603+
i.setTokens(myTokens)
572604

573605
return ringDesc, true, nil
574606
})

pkg/ring/lifecycler_test.go

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package ring
22

33
import (
44
"context"
5+
"io/ioutil"
6+
"os"
7+
"sort"
58
"testing"
69
"time"
710

@@ -252,11 +255,92 @@ func TestCheckReady(t *testing.T) {
252255
cfg := testLifecyclerConfig(ringConfig, "ring1")
253256
cfg.MinReadyDuration = 1 * time.Nanosecond
254257
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester")
255-
l1.setTokens([]uint32{1})
258+
l1.setTokens(Tokens([]uint32{1}))
256259
l1.Start()
257260
require.NoError(t, err)
258261

259262
// Delete the ring key before checking ready
260263
err = l1.CheckReady(context.Background())
261264
require.Error(t, err)
262265
}
266+
267+
type noopFlushTransferer struct {
268+
lifecycler *Lifecycler
269+
}
270+
271+
func (f *noopFlushTransferer) StopIncomingRequests() {}
272+
func (f *noopFlushTransferer) Flush() {}
273+
func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil }
274+
275+
func TestTokensOnDisk(t *testing.T) {
276+
var ringConfig Config
277+
flagext.DefaultValues(&ringConfig)
278+
ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec())
279+
280+
r, err := New(ringConfig, "ingester")
281+
require.NoError(t, err)
282+
defer r.Stop()
283+
284+
tokenDir, err := ioutil.TempDir(os.TempDir(), "tokens_on_disk")
285+
require.NoError(t, err)
286+
defer func() {
287+
require.NoError(t, os.RemoveAll(tokenDir))
288+
}()
289+
290+
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
291+
lifecyclerConfig.NumTokens = 512
292+
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"
293+
lifecyclerConfig.NormaliseTokens = true
294+
295+
// Start first ingester.
296+
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester")
297+
require.NoError(t, err)
298+
l1.Start()
299+
// Check this ingester joined, is active, and has 512 token.
300+
var expTokens []uint32
301+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
302+
d, err := r.KVClient.Get(context.Background(), ConsulKey)
303+
require.NoError(t, err)
304+
desc, ok := d.(*Desc)
305+
if ok {
306+
expTokens = desc.Ingesters["ing1"].Tokens
307+
}
308+
return ok &&
309+
len(desc.Ingesters) == 1 &&
310+
desc.Ingesters["ing1"].State == ACTIVE &&
311+
len(desc.Ingesters["ing1"].Tokens) == 512 &&
312+
len(desc.Tokens) == 0
313+
})
314+
315+
l1.Shutdown()
316+
317+
// Start new ingester at same token directory.
318+
lifecyclerConfig.ID = "ing2"
319+
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester")
320+
require.NoError(t, err)
321+
l2.Start()
322+
defer l2.Shutdown()
323+
324+
// Check this ingester joined, is active, and has 512 token.
325+
var actTokens []uint32
326+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
327+
d, err := r.KVClient.Get(context.Background(), ConsulKey)
328+
require.NoError(t, err)
329+
desc, ok := d.(*Desc)
330+
if ok {
331+
actTokens = desc.Ingesters["ing2"].Tokens
332+
}
333+
return ok &&
334+
len(desc.Ingesters) == 1 &&
335+
desc.Ingesters["ing2"].State == ACTIVE &&
336+
len(desc.Ingesters["ing2"].Tokens) == 512 &&
337+
len(desc.Tokens) == 0
338+
})
339+
340+
// Check for same tokens.
341+
sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] })
342+
sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] })
343+
for i := 0; i < 512; i++ {
344+
require.Equal(t, expTokens, actTokens)
345+
}
346+
}

pkg/ring/model.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ func (d *Desc) RemoveIngester(id string) {
7979
// This method assumes that Ring is in the correct state, 'from' ingester has no tokens anywhere,
8080
// and 'to' ingester uses either normalised or non-normalised tokens, but not both. Tokens list must
8181
// be sorted properly. If all of this is true, everything will be fine.
82-
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32 {
83-
var result []uint32
82+
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) Tokens {
83+
var result Tokens
8484

8585
if normaliseTokens {
8686

@@ -169,8 +169,8 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
169169
}
170170

171171
// TokensFor partitions the tokens into those for the given ID, and those for others.
172-
func (d *Desc) TokensFor(id string) (tokens, other []uint32) {
173-
var takenTokens, myTokens []uint32
172+
func (d *Desc) TokensFor(id string) (tokens, other Tokens) {
173+
takenTokens, myTokens := Tokens{}, Tokens{}
174174
for _, token := range migrateRing(d) {
175175
takenTokens = append(takenTokens, token.Token)
176176
if token.Ingester == id {
@@ -319,8 +319,8 @@ func buildNormalizedIngestersMap(inputRing *Desc) map[string]IngesterDesc {
319319
continue
320320
}
321321

322-
if !sort.IsSorted(sortableUint32(ing.Tokens)) {
323-
sort.Sort(sortableUint32(ing.Tokens))
322+
if !sort.IsSorted(Tokens(ing.Tokens)) {
323+
sort.Sort(Tokens(ing.Tokens))
324324
}
325325

326326
seen := make(map[uint32]bool)
@@ -407,7 +407,7 @@ func resolveConflicts(normalizedIngesters map[string]IngesterDesc) {
407407
}
408408
}
409409

410-
sort.Sort(sortableUint32(tokens))
410+
sort.Sort(Tokens(tokens))
411411

412412
// let's store the resolved result back
413413
newTokenLists := map[string][]uint32{}

pkg/ring/model_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,23 +104,23 @@ func TestClaimTokensFromNormalizedToNormalized(t *testing.T) {
104104
r := normalizedSource()
105105
result := r.ClaimTokens("first", "second", true)
106106

107-
assert.Equal(t, []uint32{100, 200, 300}, result)
107+
assert.Equal(t, Tokens{100, 200, 300}, result)
108108
assert.Equal(t, normalizedOutput(), r)
109109
}
110110

111111
func TestClaimTokensFromNormalizedToUnnormalized(t *testing.T) {
112112
r := normalizedSource()
113113
result := r.ClaimTokens("first", "second", false)
114114

115-
assert.Equal(t, []uint32{100, 200, 300}, result)
115+
assert.Equal(t, Tokens{100, 200, 300}, result)
116116
assert.Equal(t, unnormalizedOutput(), r)
117117
}
118118

119119
func TestClaimTokensFromUnnormalizedToUnnormalized(t *testing.T) {
120120
r := unnormalizedSource()
121121
result := r.ClaimTokens("first", "second", false)
122122

123-
assert.Equal(t, []uint32{100, 200, 300}, result)
123+
assert.Equal(t, Tokens{100, 200, 300}, result)
124124
assert.Equal(t, unnormalizedOutput(), r)
125125
}
126126

@@ -129,7 +129,7 @@ func TestClaimTokensFromUnnormalizedToNormalized(t *testing.T) {
129129

130130
result := r.ClaimTokens("first", "second", true)
131131

132-
assert.Equal(t, []uint32{100, 200, 300}, result)
132+
assert.Equal(t, Tokens{100, 200, 300}, result)
133133
assert.Equal(t, normalizedOutput(), r)
134134
}
135135

0 commit comments

Comments
 (0)