diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 704e3dadc1e..c1e7172295f 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -53,6 +53,7 @@ type LifecyclerConfig struct { NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` + TokensFilePath string `yaml:"tokens_file_path,omitempty"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address"` @@ -84,6 +85,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.") f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.") f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") + f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") hostname, err := os.Hostname() if err != nil { @@ -125,7 +127,7 @@ type Lifecycler struct { // back empty. And it changes during lifecycle of ingester. stateMtx sync.Mutex state IngesterState - tokens []uint32 + tokens Tokens // Controls the ready-reporting readyLock sync.Mutex @@ -246,18 +248,24 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error return <-err } -func (i *Lifecycler) getTokens() []uint32 { +func (i *Lifecycler) getTokens() Tokens { i.stateMtx.Lock() defer i.stateMtx.Unlock() return i.tokens } -func (i *Lifecycler) setTokens(tokens []uint32) { +func (i *Lifecycler) setTokens(tokens Tokens) { tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens))) i.stateMtx.Lock() defer i.stateMtx.Unlock() + i.tokens = tokens + if i.cfg.TokensFilePath != "" { + if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil { + level.Error(util.Logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err) + } + } } // ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester. @@ -270,7 +278,7 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro err := make(chan error) i.actorChan <- func() { - var tokens []uint32 + var tokens Tokens claimTokens := func(in interface{}) (out interface{}, retry bool, err error) { ringDesc, ok := in.(*Desc) @@ -445,9 +453,22 @@ heartbeatLoop: // - add an ingester entry to the ring // - copies out our state and tokens if they exist func (i *Lifecycler) initRing(ctx context.Context) error { - var ringDesc *Desc + var ( + ringDesc *Desc + tokensFromFile Tokens + err error + ) + + if i.cfg.TokensFilePath != "" { + tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath) + if err != nil { + level.Error(util.Logger).Log("msg", "error in getting tokens from file", "err", err) + } + } else { + level.Info(util.Logger).Log("msg", "not loading tokens from file, tokens file path is empty") + } - err := i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) { + err = i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) { if in == nil { ringDesc = NewDesc() } else { @@ -456,6 +477,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error { ingesterDesc, ok := ringDesc.Ingesters[i.ID] if !ok { + // We use the tokens from the file only if it does not exist in the ring yet. + if len(tokensFromFile) > 0 { + level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile)) + if len(tokensFromFile) >= i.cfg.NumTokens { + i.setState(ACTIVE) + } + ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens) + i.setTokens(tokensFromFile) + return ringDesc, true, nil + } + // Either we are a new ingester, or consul must have restarted level.Info(util.Logger).Log("msg", "entry not found in ring, adding with no tokens") ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens) @@ -505,7 +537,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool { newTokens := GenerateTokens(needTokens, takenTokens) ringTokens = append(ringTokens, newTokens...) - sort.Sort(sortableUint32(ringTokens)) + sort.Sort(ringTokens) ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState(), i.cfg.NormaliseTokens) @@ -527,11 +559,11 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool { return result } -func (i *Lifecycler) compareTokens(fromRing []uint32) bool { - sort.Sort(sortableUint32(fromRing)) +func (i *Lifecycler) compareTokens(fromRing Tokens) bool { + sort.Sort(fromRing) tokens := i.getTokens() - sort.Sort(sortableUint32(tokens)) + sort.Sort(tokens) if len(tokens) != len(fromRing) { return false @@ -566,9 +598,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er i.setState(targetState) ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens) - tokens := append(myTokens, newTokens...) - sort.Sort(sortableUint32(tokens)) - i.setTokens(tokens) + myTokens = append(myTokens, newTokens...) + sort.Sort(myTokens) + i.setTokens(myTokens) return ringDesc, true, nil }) diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index a7a756094a1..06854caee03 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -2,6 +2,9 @@ package ring import ( "context" + "io/ioutil" + "os" + "sort" "testing" "time" @@ -252,7 +255,7 @@ func TestCheckReady(t *testing.T) { cfg := testLifecyclerConfig(ringConfig, "ring1") cfg.MinReadyDuration = 1 * time.Nanosecond l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester") - l1.setTokens([]uint32{1}) + l1.setTokens(Tokens([]uint32{1})) l1.Start() require.NoError(t, err) @@ -260,3 +263,84 @@ func TestCheckReady(t *testing.T) { err = l1.CheckReady(context.Background()) require.Error(t, err) } + +type noopFlushTransferer struct { + lifecycler *Lifecycler +} + +func (f *noopFlushTransferer) StopIncomingRequests() {} +func (f *noopFlushTransferer) Flush() {} +func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil } + +func TestTokensOnDisk(t *testing.T) { + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec()) + + r, err := New(ringConfig, "ingester") + require.NoError(t, err) + defer r.Stop() + + tokenDir, err := ioutil.TempDir(os.TempDir(), "tokens_on_disk") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(tokenDir)) + }() + + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig.NumTokens = 512 + lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" + lifecyclerConfig.NormaliseTokens = true + + // Start first ingester. + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester") + require.NoError(t, err) + l1.Start() + // Check this ingester joined, is active, and has 512 token. + var expTokens []uint32 + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ConsulKey) + require.NoError(t, err) + desc, ok := d.(*Desc) + if ok { + expTokens = desc.Ingesters["ing1"].Tokens + } + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters["ing1"].State == ACTIVE && + len(desc.Ingesters["ing1"].Tokens) == 512 && + len(desc.Tokens) == 0 + }) + + l1.Shutdown() + + // Start new ingester at same token directory. + lifecyclerConfig.ID = "ing2" + l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester") + require.NoError(t, err) + l2.Start() + defer l2.Shutdown() + + // Check this ingester joined, is active, and has 512 token. + var actTokens []uint32 + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ConsulKey) + require.NoError(t, err) + desc, ok := d.(*Desc) + if ok { + actTokens = desc.Ingesters["ing2"].Tokens + } + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters["ing2"].State == ACTIVE && + len(desc.Ingesters["ing2"].Tokens) == 512 && + len(desc.Tokens) == 0 + }) + + // Check for same tokens. + sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] }) + sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] }) + for i := 0; i < 512; i++ { + require.Equal(t, expTokens, actTokens) + } +} diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 047b3f8acd3..43ae3140ff1 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -79,8 +79,8 @@ func (d *Desc) RemoveIngester(id string) { // This method assumes that Ring is in the correct state, 'from' ingester has no tokens anywhere, // and 'to' ingester uses either normalised or non-normalised tokens, but not both. Tokens list must // be sorted properly. If all of this is true, everything will be fine. -func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32 { - var result []uint32 +func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) Tokens { + var result Tokens if normaliseTokens { @@ -169,8 +169,8 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error { } // TokensFor partitions the tokens into those for the given ID, and those for others. -func (d *Desc) TokensFor(id string) (tokens, other []uint32) { - var takenTokens, myTokens []uint32 +func (d *Desc) TokensFor(id string) (tokens, other Tokens) { + takenTokens, myTokens := Tokens{}, Tokens{} for _, token := range migrateRing(d) { takenTokens = append(takenTokens, token.Token) if token.Ingester == id { @@ -319,8 +319,8 @@ func buildNormalizedIngestersMap(inputRing *Desc) map[string]IngesterDesc { continue } - if !sort.IsSorted(sortableUint32(ing.Tokens)) { - sort.Sort(sortableUint32(ing.Tokens)) + if !sort.IsSorted(Tokens(ing.Tokens)) { + sort.Sort(Tokens(ing.Tokens)) } seen := make(map[uint32]bool) @@ -407,7 +407,7 @@ func resolveConflicts(normalizedIngesters map[string]IngesterDesc) { } } - sort.Sort(sortableUint32(tokens)) + sort.Sort(Tokens(tokens)) // let's store the resolved result back newTokenLists := map[string][]uint32{} diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index fb235f1e63c..97e954ad06c 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -104,7 +104,7 @@ func TestClaimTokensFromNormalizedToNormalized(t *testing.T) { r := normalizedSource() result := r.ClaimTokens("first", "second", true) - assert.Equal(t, []uint32{100, 200, 300}, result) + assert.Equal(t, Tokens{100, 200, 300}, result) assert.Equal(t, normalizedOutput(), r) } @@ -112,7 +112,7 @@ func TestClaimTokensFromNormalizedToUnnormalized(t *testing.T) { r := normalizedSource() result := r.ClaimTokens("first", "second", false) - assert.Equal(t, []uint32{100, 200, 300}, result) + assert.Equal(t, Tokens{100, 200, 300}, result) assert.Equal(t, unnormalizedOutput(), r) } @@ -120,7 +120,7 @@ func TestClaimTokensFromUnnormalizedToUnnormalized(t *testing.T) { r := unnormalizedSource() result := r.ClaimTokens("first", "second", false) - assert.Equal(t, []uint32{100, 200, 300}, result) + assert.Equal(t, Tokens{100, 200, 300}, result) assert.Equal(t, unnormalizedOutput(), r) } @@ -129,7 +129,7 @@ func TestClaimTokensFromUnnormalizedToNormalized(t *testing.T) { result := r.ClaimTokens("first", "second", true) - assert.Equal(t, []uint32{100, 200, 300}, result) + assert.Equal(t, Tokens{100, 200, 300}, result) assert.Equal(t, normalizedOutput(), r) } diff --git a/pkg/ring/tokens.go b/pkg/ring/tokens.go new file mode 100644 index 00000000000..b8ec5e323b9 --- /dev/null +++ b/pkg/ring/tokens.go @@ -0,0 +1,81 @@ +package ring + +import ( + "encoding/json" + "errors" + "io/ioutil" + "os" +) + +// Tokens is a simple list of tokens. +type Tokens []uint32 + +func (t Tokens) Len() int { return len(t) } +func (t Tokens) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t Tokens) Less(i, j int) bool { return t[i] < t[j] } + +// StoreToFile stores the tokens in the given directory. +func (t Tokens) StoreToFile(tokenFilePath string) error { + if tokenFilePath == "" { + return errors.New("path is empty") + } + + // If any operations failed further in the function, we keep the temporary + // file hanging around for debugging. + f, err := ioutil.TempFile(os.TempDir(), "tokens") + if err != nil { + return err + } + + defer func() { + // If the file was not closed, then there must already be an error, hence ignore + // the error (if any) from f.Close(). If the file was already closed, then + // we would ignore the error in that case too. + _ = f.Close() + }() + + b, err := t.Marshal() + if err != nil { + return err + } + if _, err = f.Write(b); err != nil { + return err + } + + if err := f.Close(); err != nil { + return err + } + + // Tokens successfully written, replace the temporary file with the actual file path. + return os.Rename(f.Name(), tokenFilePath) +} + +// LoadTokensFromFile loads tokens from given file path. +func LoadTokensFromFile(tokenFilePath string) (Tokens, error) { + b, err := ioutil.ReadFile(tokenFilePath) + if err != nil { + return nil, err + } + var t Tokens + err = t.Unmarshal(b) + return t, err +} + +// Marshal encodes the tokens into JSON. +func (t Tokens) Marshal() ([]byte, error) { + return json.Marshal(tokensJSON{Tokens: t}) +} + +// Unmarshal reads the tokens from JSON byte stream. +func (t *Tokens) Unmarshal(b []byte) error { + tj := tokensJSON{} + if err := json.Unmarshal(b, &tj); err != nil { + return err + } + *t = Tokens(tj.Tokens) + return nil +} + +type tokensJSON struct { + Tokens []uint32 `json:"tokens"` +} diff --git a/pkg/ring/tokens_test.go b/pkg/ring/tokens_test.go new file mode 100644 index 00000000000..702b600a9eb --- /dev/null +++ b/pkg/ring/tokens_test.go @@ -0,0 +1,22 @@ +package ring + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTokenSerialization(t *testing.T) { + tokens := make(Tokens, 512) + for i := 0; i < 512; i++ { + tokens = append(tokens, uint32(rand.Int31())) + } + + b, err := tokens.Marshal() + require.NoError(t, err) + + var unmarshaledTokens Tokens + require.NoError(t, unmarshaledTokens.Unmarshal(b)) + require.Equal(t, tokens, unmarshaledTokens) +} diff --git a/pkg/ring/util.go b/pkg/ring/util.go index d072e549322..e30166fa7bc 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -27,9 +27,3 @@ func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 { } return tokens } - -type sortableUint32 []uint32 - -func (ts sortableUint32) Len() int { return len(ts) } -func (ts sortableUint32) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } -func (ts sortableUint32) Less(i, j int) bool { return ts[i] < ts[j] }