Skip to content

Flush tokens to disk #1750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type LifecyclerConfig struct {
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
TokensFilePath string `yaml:"tokens_file_path,omitempty"`

// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address"`
Expand Down Expand Up @@ -84,6 +85,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.")
f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")

hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -125,7 +127,7 @@ type Lifecycler struct {
// back empty. And it changes during lifecycle of ingester.
stateMtx sync.Mutex
state IngesterState
tokens []uint32
tokens Tokens

// Controls the ready-reporting
readyLock sync.Mutex
Expand Down Expand Up @@ -246,18 +248,24 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
return <-err
}

func (i *Lifecycler) getTokens() []uint32 {
func (i *Lifecycler) getTokens() Tokens {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
return i.tokens
}

func (i *Lifecycler) setTokens(tokens []uint32) {
func (i *Lifecycler) setTokens(tokens Tokens) {
tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens)))

i.stateMtx.Lock()
defer i.stateMtx.Unlock()

i.tokens = tokens
if i.cfg.TokensFilePath != "" {
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
level.Error(util.Logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
}
}
}

// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
Expand All @@ -270,7 +278,7 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro
err := make(chan error)

i.actorChan <- func() {
var tokens []uint32
var tokens Tokens

claimTokens := func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc, ok := in.(*Desc)
Expand Down Expand Up @@ -445,9 +453,22 @@ heartbeatLoop:
// - add an ingester entry to the ring
// - copies out our state and tokens if they exist
func (i *Lifecycler) initRing(ctx context.Context) error {
var ringDesc *Desc
var (
ringDesc *Desc
tokensFromFile Tokens
err error
)

if i.cfg.TokensFilePath != "" {
tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath)
if err != nil {
level.Error(util.Logger).Log("msg", "error in getting tokens from file", "err", err)
}
} else {
level.Warn(util.Logger).Log("msg", "not loading tokens from file, tokens file path is empty")
}

err := i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
err = i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
ringDesc = NewDesc()
} else {
Expand All @@ -456,6 +477,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error {

ingesterDesc, ok := ringDesc.Ingesters[i.ID]
if !ok {
// We use the tokens from the file only if it does not exist in the ring yet.
if len(tokensFromFile) > 0 {
level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) == i.cfg.NumTokens {
i.setState(ACTIVE)
}
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens)
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}

// Either we are a new ingester, or consul must have restarted
level.Info(util.Logger).Log("msg", "entry not found in ring, adding with no tokens")
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens)
Expand Down Expand Up @@ -527,7 +559,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
return result
}

func (i *Lifecycler) compareTokens(fromRing []uint32) bool {
func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
sort.Sort(sortableUint32(fromRing))

tokens := i.getTokens()
Expand Down Expand Up @@ -564,11 +596,12 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er

newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
i.setState(targetState)

ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)

tokens := append(myTokens, newTokens...)
sort.Sort(sortableUint32(tokens))
i.setTokens(tokens)
myTokens = append(myTokens, newTokens...)
sort.Sort(sortableUint32(myTokens))
i.setTokens(myTokens)

return ringDesc, true, nil
})
Expand Down
86 changes: 85 additions & 1 deletion pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package ring

import (
"context"
"io/ioutil"
"os"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -252,11 +255,92 @@ func TestCheckReady(t *testing.T) {
cfg := testLifecyclerConfig(ringConfig, "ring1")
cfg.MinReadyDuration = 1 * time.Nanosecond
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester")
l1.setTokens([]uint32{1})
l1.setTokens(Tokens([]uint32{1}))
l1.Start()
require.NoError(t, err)

// Delete the ring key before checking ready
err = l1.CheckReady(context.Background())
require.Error(t, err)
}

type noopFlushTransferer struct {
lifecycler *Lifecycler
}

func (f *noopFlushTransferer) StopIncomingRequests() {}
func (f *noopFlushTransferer) Flush() {}
func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil }

func TestTokensOnDisk(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec())

r, err := New(ringConfig, "ingester")
require.NoError(t, err)
defer r.Stop()

tokenDir, err := ioutil.TempDir(os.TempDir(), "tokens_on_disk")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tokenDir))
}()

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"
lifecyclerConfig.NormaliseTokens = true

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester")
require.NoError(t, err)
l1.Start()
// Check this ingester joined, is active, and has 512 token.
var expTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
expTokens = desc.Ingesters["ing1"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == 512 &&
len(desc.Tokens) == 0
})

l1.Shutdown()

// Start new ingester at same token directory.
lifecyclerConfig.ID = "ing2"
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester")
require.NoError(t, err)
l2.Start()
defer l2.Shutdown()

// Check this ingester joined, is active, and has 512 token.
var actTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
actTokens = desc.Ingesters["ing2"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == ACTIVE &&
len(desc.Ingesters["ing2"].Tokens) == 512 &&
len(desc.Tokens) == 0
})

// Check for same tokens.
sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] })
sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] })
for i := 0; i < 512; i++ {
require.Equal(t, expTokens, actTokens)
}
}
8 changes: 4 additions & 4 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (d *Desc) RemoveIngester(id string) {
// This method assumes that Ring is in the correct state, 'from' ingester has no tokens anywhere,
// and 'to' ingester uses either normalised or non-normalised tokens, but not both. Tokens list must
// be sorted properly. If all of this is true, everything will be fine.
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32 {
var result []uint32
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) Tokens {
var result Tokens

if normaliseTokens {

Expand Down Expand Up @@ -169,8 +169,8 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
}

// TokensFor partitions the tokens into those for the given ID, and those for others.
func (d *Desc) TokensFor(id string) (tokens, other []uint32) {
var takenTokens, myTokens []uint32
func (d *Desc) TokensFor(id string) (tokens, other Tokens) {
takenTokens, myTokens := Tokens{}, Tokens{}
for _, token := range migrateRing(d) {
takenTokens = append(takenTokens, token.Token)
if token.Ingester == id {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,23 @@ func TestClaimTokensFromNormalizedToNormalized(t *testing.T) {
r := normalizedSource()
result := r.ClaimTokens("first", "second", true)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, normalizedOutput(), r)
}

func TestClaimTokensFromNormalizedToUnnormalized(t *testing.T) {
r := normalizedSource()
result := r.ClaimTokens("first", "second", false)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, unnormalizedOutput(), r)
}

func TestClaimTokensFromUnnormalizedToUnnormalized(t *testing.T) {
r := unnormalizedSource()
result := r.ClaimTokens("first", "second", false)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, unnormalizedOutput(), r)
}

Expand All @@ -129,7 +129,7 @@ func TestClaimTokensFromUnnormalizedToNormalized(t *testing.T) {

result := r.ClaimTokens("first", "second", true)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, normalizedOutput(), r)
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/ring/tokens.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ring

import (
"encoding/json"
"errors"
"io/ioutil"
"os"
)

// Tokens is a simple list of tokens.
type Tokens []uint32

// StoreToFile stores the tokens in the given directory.
func (t Tokens) StoreToFile(tokenFilePath string) error {
if tokenFilePath == "" {
return errors.New("path is empty")
}

// If any operations failed further in the function, we keep the temporary
// file hanging around for debugging.
f, err := ioutil.TempFile(os.TempDir(), "tokens")
if err != nil {
return err
}

defer func() {
// If the file was not closed, then there must already be an error, hence ignore
// the error (if any) from f.Close(). If the file was already closed, then
// we would ignore the error in that case too.
f.Close()
}()

b, err := t.Marshal()
if err != nil {
return err
}
if _, err = f.Write(b); err != nil {
return err
}

if err := f.Close(); err != nil {
return err
}

// Tokens successfully written, replace the temporary file with the actual file path.
return os.Rename(f.Name(), tokenFilePath)
}

// LoadTokensFromFile loads tokens from given file path.
func LoadTokensFromFile(tokenFilePath string) (Tokens, error) {
b, err := ioutil.ReadFile(tokenFilePath)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var t Tokens
err = t.Unmarshal(b)
return t, err
}

// Marshal encodes the tokens into JSON.
func (t Tokens) Marshal() ([]byte, error) {
return json.Marshal(tokensJSON{Tokens: t})
}

// Unmarshal reads the tokens from JSON byte stream.
func (t *Tokens) Unmarshal(b []byte) error {
tj := tokensJSON{}
if err := json.Unmarshal(b, &tj); err != nil {
return err
}
*t = Tokens(tj.Tokens)
return nil
}

type tokensJSON struct {
Tokens []uint32 `json:"tokens"`
}
22 changes: 22 additions & 0 deletions pkg/ring/tokens_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ring

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestTokenSerialization(t *testing.T) {
tokens := make(Tokens, 512)
for i := 0; i < 512; i++ {
tokens = append(tokens, uint32(rand.Int31()))
}

b, err := tokens.Marshal()
require.NoError(t, err)

var unmarshaledTokens Tokens
require.NoError(t, unmarshaledTokens.Unmarshal(b))
require.Equal(t, tokens, unmarshaledTokens)
}