Skip to content

Commit 7ab02ff

Browse files
committed
Add interface for tokens with a SimpleListTokens type
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent aa67711 commit 7ab02ff

File tree

4 files changed

+143
-54
lines changed

4 files changed

+143
-54
lines changed

pkg/ring/lifecycler.go

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package ring
22

33
import (
44
"context"
5-
"encoding/binary"
6-
"errors"
75
"flag"
86
"fmt"
97
"io/ioutil"
@@ -133,7 +131,7 @@ type Lifecycler struct {
133131
// back empty. And it changes during lifecycle of ingester.
134132
stateMtx sync.Mutex
135133
state IngesterState
136-
tokens []uint32
134+
tokens Tokens
137135

138136
// Controls the ready-reporting
139137
readyLock sync.Mutex
@@ -207,7 +205,7 @@ func (i *Lifecycler) CheckReady(ctx context.Context) error {
207205
return fmt.Errorf("error talking to consul: %s", err)
208206
}
209207

210-
if len(i.getTokens()) == 0 {
208+
if i.getTokens().Len() == 0 {
211209
return fmt.Errorf("this ingester owns no tokens")
212210
}
213211

@@ -246,9 +244,14 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
246244
return <-err
247245
}
248246

249-
func (i *Lifecycler) getTokens() []uint32 {
247+
var emptyTokens Tokens = &SimpleListTokens{}
248+
249+
func (i *Lifecycler) getTokens() Tokens {
250250
i.stateMtx.Lock()
251251
defer i.stateMtx.Unlock()
252+
if i.tokens == nil {
253+
return emptyTokens
254+
}
252255
return i.tokens
253256
}
254257

@@ -263,18 +266,14 @@ func (i *Lifecycler) flushTokensToFile() {
263266
return
264267
}
265268

266-
b := make([]byte, 4*len(i.tokens))
267-
for idx, token := range i.tokens {
268-
binary.BigEndian.PutUint32(b[idx*4:], token)
269-
}
270-
269+
b := i.tokens.Marshal(nil)
271270
if _, err = f.Write(b); err != nil {
272271
level.Error(util.Logger).Log("msg", "error in writing token file", "err", err)
273272
return
274273
}
275274
}
276275

277-
func (i *Lifecycler) getTokensFromFile() ([]uint32, error) {
276+
func (i *Lifecycler) getTokensFromFile() (Tokens, error) {
278277
tokenFilePath := path.Join(i.cfg.TokensFileDir, tokensFileName)
279278
b, err := ioutil.ReadFile(tokenFilePath)
280279
if err != nil {
@@ -283,23 +282,12 @@ func (i *Lifecycler) getTokensFromFile() ([]uint32, error) {
283282
}
284283
return nil, err
285284
}
286-
if len(b) == 0 {
287-
return nil, nil
288-
} else if len(b)%4 != 0 {
289-
return nil, errors.New("token data is not 4 byte aligned")
290-
}
291-
292-
numTokens := len(b) >> 2
293-
tokens := make([]uint32, 0, numTokens)
294-
for i := 0; i < numTokens; i++ {
295-
tokens = append(tokens, binary.BigEndian.Uint32(b[i<<2:]))
296-
}
297285

298-
return tokens, nil
286+
return UnmarshalTokens(b)
299287
}
300288

301-
func (i *Lifecycler) setTokens(tokens []uint32) {
302-
tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens)))
289+
func (i *Lifecycler) setTokens(tokens Tokens) {
290+
tokensOwned.WithLabelValues(i.RingName).Set(float64(tokens.Len()))
303291

304292
i.stateMtx.Lock()
305293
i.tokens = tokens
@@ -313,7 +301,7 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro
313301
err := make(chan error)
314302

315303
i.actorChan <- func() {
316-
var tokens []uint32
304+
var tokens Tokens
317305

318306
claimTokens := func(in interface{}) (out interface{}, retry bool, err error) {
319307
ringDesc, ok := in.(*Desc)
@@ -451,12 +439,12 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
451439
tokens, err := i.getTokensFromFile()
452440
if err != nil {
453441
level.Error(util.Logger).Log("msg", "error in getting tokens from file", "err", err)
454-
} else if len(tokens) > 0 {
455-
level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokens))
456-
if len(tokens) == i.cfg.NumTokens {
442+
} else if tokens != nil && tokens.Len() > 0 {
443+
level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", tokens.Len())
444+
if tokens.Len() == i.cfg.NumTokens {
457445
i.setState(ACTIVE)
458446
}
459-
ringDesc.AddIngester(i.ID, i.Addr, tokens, i.GetState(), i.cfg.NormaliseTokens)
447+
ringDesc.AddIngester(i.ID, i.Addr, tokens.Tokens(), i.GetState(), i.cfg.NormaliseTokens)
460448
i.setTokens(tokens)
461449
return ringDesc, true, nil
462450
}
@@ -472,7 +460,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
472460
tokens, _ := ringDesc.TokensFor(i.ID)
473461
i.setTokens(tokens)
474462

475-
level.Info(util.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens))
463+
level.Info(util.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", tokens.Len())
476464
return ringDesc, true, nil
477465
})
478466
}
@@ -489,17 +477,17 @@ func (i *Lifecycler) autoJoin(ctx context.Context) error {
489477

490478
// At this point, we should not have any tokens, and we should be in PENDING state.
491479
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
492-
if len(myTokens) > 0 {
493-
level.Error(util.Logger).Log("msg", "tokens already exist for this ingester - wasn't expecting any!", "num_tokens", len(myTokens))
480+
if myTokens.Len() > 0 {
481+
level.Error(util.Logger).Log("msg", "tokens already exist for this ingester - wasn't expecting any!", "num_tokens", myTokens.Len())
494482
}
495483

496-
newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
484+
newTokens := GenerateTokens(i.cfg.NumTokens-myTokens.Len(), takenTokens.Tokens())
497485
i.setState(ACTIVE)
498486
ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)
499487

500-
tokens := append(myTokens, newTokens...)
501-
sort.Sort(sortableUint32(tokens))
502-
i.setTokens(tokens)
488+
myTokens.Add(newTokens...)
489+
sort.Sort(sortableUint32(myTokens.Tokens()))
490+
i.setTokens(myTokens)
503491

504492
return ringDesc, true, nil
505493
})
@@ -520,7 +508,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
520508
if !ok {
521509
// consul must have restarted
522510
level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens")
523-
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState(), i.cfg.NormaliseTokens)
511+
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens().Tokens(), i.GetState(), i.cfg.NormaliseTokens)
524512
} else {
525513
ingesterDesc.Timestamp = time.Now().Unix()
526514
ingesterDesc.State = i.GetState()

pkg/ring/lifecycler_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestRingNormaliseMigration(t *testing.T) {
8585
return checkDenormalised(d, "ing1")
8686
})
8787

88-
token := l1.tokens[0]
88+
token := l1.tokens.Tokens()[0]
8989

9090
// Add a second ingester with normalised tokens.
9191
var lifecyclerConfig2 = testLifecyclerConfig(t, ringConfig, "ing2")
@@ -139,7 +139,7 @@ func TestRingRestart(t *testing.T) {
139139
return checkNormalised(d, "ing1")
140140
})
141141

142-
token := l1.tokens[0]
142+
token := l1.tokens.Tokens()[0]
143143

144144
// Add a second ingester with the same settings, so it will think it has restarted
145145
l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester")
@@ -151,8 +151,8 @@ func TestRingRestart(t *testing.T) {
151151
require.NoError(t, err)
152152
l2Tokens := l2.getTokens()
153153
return checkNormalised(d, "ing1") &&
154-
len(l2Tokens) == 1 &&
155-
l2Tokens[0] == token
154+
l2Tokens.Len() == 1 &&
155+
l2Tokens.Tokens()[0] == token
156156
})
157157
}
158158

@@ -197,13 +197,14 @@ func TestCheckReady(t *testing.T) {
197197
flagext.DefaultValues(&ringConfig)
198198
ringConfig.KVStore.Mock = &MockClient{}
199199

200+
tokens := SimpleListTokens([]uint32{1})
200201
r, err := New(ringConfig, "ingester")
201202
require.NoError(t, err)
202203
defer r.Stop()
203204
cfg := testLifecyclerConfig(t, ringConfig, "ring1")
204205
cfg.MinReadyDuration = 1 * time.Nanosecond
205206
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester")
206-
l1.setTokens([]uint32{1})
207+
l1.setTokens(&tokens)
207208
require.NoError(t, err)
208209

209210
// Delete the ring key before checking ready

pkg/ring/model.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ func (d *Desc) RemoveIngester(id string) {
7575

7676
// ClaimTokens transfers all the tokens from one ingester to another,
7777
// returning the claimed token.
78-
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32 {
79-
var result []uint32
78+
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) Tokens {
79+
result := &SimpleListTokens{}
8080

8181
if normaliseTokens {
8282

8383
// If the ingester we are claiming from is normalising, get its tokens then erase them from the ring.
8484
if fromDesc, found := d.Ingesters[from]; found {
85-
result = fromDesc.Tokens
85+
result.Add(fromDesc.Tokens...)
8686
fromDesc.Tokens = nil
8787
d.Ingesters[from] = fromDesc
8888
}
@@ -93,24 +93,24 @@ func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32 {
9393
// When all ingesters are in normalised mode, d.Tokens is empty here
9494
for i := 0; i < len(d.Tokens); {
9595
if d.Tokens[i].Ingester == from {
96-
result = append(result, d.Tokens[i].Token)
96+
result.Add(d.Tokens[i].Token)
9797
d.Tokens = d.Tokens[:i+copy(d.Tokens[i:], d.Tokens[i+1:])]
9898
continue
9999
}
100100
i++
101101
}
102102

103-
sort.Sort(uint32s(result))
103+
sort.Sort(uint32s(result.Tokens()))
104104
ing := d.Ingesters[to]
105-
ing.Tokens = result
105+
ing.Tokens = result.Tokens()
106106
d.Ingesters[to] = ing
107107

108108
} else {
109109

110110
for i := 0; i < len(d.Tokens); i++ {
111111
if d.Tokens[i].Ingester == from {
112112
d.Tokens[i].Ingester = to
113-
result = append(result, d.Tokens[i].Token)
113+
result.Add(d.Tokens[i].Token)
114114
}
115115
}
116116
}
@@ -148,12 +148,12 @@ func (d *Desc) Ready(heartbeatTimeout time.Duration) error {
148148
}
149149

150150
// TokensFor partitions the tokens into those for the given ID, and those for others.
151-
func (d *Desc) TokensFor(id string) (tokens, other []uint32) {
152-
var takenTokens, myTokens []uint32
151+
func (d *Desc) TokensFor(id string) (tokens, other Tokens) {
152+
takenTokens, myTokens := &SimpleListTokens{}, &SimpleListTokens{}
153153
for _, token := range migrateRing(d) {
154-
takenTokens = append(takenTokens, token.Token)
154+
takenTokens.Add(token.Token)
155155
if token.Ingester == id {
156-
myTokens = append(myTokens, token.Token)
156+
myTokens.Add(token.Token)
157157
}
158158
}
159159
return myTokens, takenTokens

pkg/ring/tokens.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package ring
2+
3+
import (
4+
"encoding/binary"
5+
"errors"
6+
)
7+
8+
// Tokens is the interface to store Token.
9+
type Tokens interface {
10+
// Encoding return the type of the token implementation.
11+
Encoding() TokenType
12+
// Add adds a new token.
13+
Add(...uint32)
14+
// Tokens returns a slice of uint32 tokens.
15+
Tokens() []uint32
16+
// Len returns the number of tokens.
17+
Len() int
18+
// Marshal marshalls all the tokens into byte stream.
19+
Marshal([]byte) []byte
20+
// Unmarshal reads the tokens from the byte stream.
21+
Unmarshal([]byte) error
22+
}
23+
24+
// TokenType is the type identifier for the token.
25+
type TokenType byte
26+
27+
const (
28+
// SimpleList is the type for Type1Tokens.
29+
SimpleList TokenType = 1
30+
)
31+
32+
// SimpleListTokens is a simple list of tokens.
33+
type SimpleListTokens []uint32
34+
35+
// Encoding implements the Tokens interface.
36+
func (slt SimpleListTokens) Encoding() TokenType {
37+
return SimpleList
38+
}
39+
40+
// Tokens implements the Tokens interface.
41+
func (slt SimpleListTokens) Tokens() []uint32 {
42+
return slt
43+
}
44+
45+
// Len implements the Tokens interface.
46+
func (slt SimpleListTokens) Len() int {
47+
return len(slt)
48+
}
49+
50+
// Add implements the Tokens interface.
51+
func (slt *SimpleListTokens) Add(tokens ...uint32) {
52+
*slt = append(*slt, tokens...)
53+
}
54+
55+
// Marshal implements the Tokens interface.
56+
func (slt SimpleListTokens) Marshal(b []byte) []byte {
57+
if cap(b) < 1+(4*len(slt)) {
58+
b = make([]byte, 1+(4*len(slt)))
59+
}
60+
b = b[:1+(4*len(slt))]
61+
b[0] = byte(SimpleList)
62+
for idx, token := range slt {
63+
binary.BigEndian.PutUint32(b[1+(idx*4):], uint32(token))
64+
}
65+
return b
66+
}
67+
68+
// Unmarshal implements the Tokens interface.
69+
func (slt *SimpleListTokens) Unmarshal(b []byte) error {
70+
*slt = (*slt)[:0]
71+
if len(b) == 0 {
72+
return nil
73+
} else if len(b)%4 != 0 {
74+
return errors.New("token data is not 4 byte aligned")
75+
}
76+
77+
numTokens := len(b) >> 2
78+
for i := 0; i < numTokens; i++ {
79+
*slt = append(*slt, binary.BigEndian.Uint32(b[i<<2:]))
80+
}
81+
82+
return nil
83+
}
84+
85+
// UnmarshalTokens converts the byte stream into Tokens.
86+
// The first byte should be the token type follwed by the
87+
// actual token data.
88+
func UnmarshalTokens(b []byte) (Tokens, error) {
89+
if len(b) == 0 {
90+
return nil, errors.New("empty bytes")
91+
}
92+
switch TokenType(b[0]) {
93+
case SimpleList:
94+
tokens := &SimpleListTokens{}
95+
err := tokens.Unmarshal(b[1:])
96+
return tokens, err
97+
default:
98+
return nil, errors.New("invalid token type")
99+
}
100+
}

0 commit comments

Comments
 (0)