@@ -2,6 +2,8 @@ package ring
2
2
3
3
import (
4
4
"context"
5
+ "encoding/json"
6
+ "errors"
5
7
"flag"
6
8
"fmt"
7
9
"io/ioutil"
@@ -254,17 +256,17 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
254
256
return <- err
255
257
}
256
258
257
- func (i * Lifecycler ) getTokens () [] uint32 {
259
+ func (i * Lifecycler ) getTokens () Tokens {
258
260
i .stateMtx .Lock ()
259
261
defer i .stateMtx .Unlock ()
260
262
if i .tokens == nil {
261
263
return nil
262
264
}
263
- return i .tokens . Tokens ()
265
+ return i .tokens
264
266
}
265
267
266
268
func (i * Lifecycler ) flushTokensToFile () {
267
- if i .cfg .TokensFileDir == "" || i .tokens == nil || i .tokens . Len ( ) == 0 {
269
+ if i .cfg .TokensFileDir == "" || i .tokens == nil || len ( i .tokens ) == 0 {
268
270
return
269
271
}
270
272
tokenFilePath := path .Join (i .cfg .TokensFileDir , tokensFileName )
@@ -274,7 +276,11 @@ func (i *Lifecycler) flushTokensToFile() {
274
276
return
275
277
}
276
278
277
- b := i .tokens .Marshal (nil )
279
+ b , err := MarshalTokens (i .tokens )
280
+ if err != nil {
281
+ level .Error (util .Logger ).Log ("msg" , "error in marshalling tokens" , "err" , err )
282
+ return
283
+ }
278
284
if _ , err = f .Write (b ); err != nil {
279
285
level .Error (util .Logger ).Log ("msg" , "error in writing token file" , "err" , err )
280
286
return
@@ -295,7 +301,7 @@ func (i *Lifecycler) getTokensFromFile() (Tokens, error) {
295
301
}
296
302
297
303
func (i * Lifecycler ) setTokens (tokens Tokens ) {
298
- tokensOwned .WithLabelValues (i .RingName ).Set (float64 (tokens . Len ( )))
304
+ tokensOwned .WithLabelValues (i .RingName ).Set (float64 (len ( tokens )))
299
305
300
306
i .stateMtx .Lock ()
301
307
i .tokens = tokens
@@ -504,12 +510,12 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
504
510
tokens , err := i .getTokensFromFile ()
505
511
if err != nil {
506
512
level .Error (util .Logger ).Log ("msg" , "error in getting tokens from file" , "err" , err )
507
- } else if tokens != nil && tokens . Len ( ) > 0 {
508
- level .Info (util .Logger ).Log ("msg" , "adding tokens from file" , "num_tokens" , tokens . Len ( ))
509
- if tokens . Len ( ) == i .cfg .NumTokens {
513
+ } else if tokens != nil && len ( tokens ) > 0 {
514
+ level .Info (util .Logger ).Log ("msg" , "adding tokens from file" , "num_tokens" , len ( tokens ))
515
+ if len ( tokens ) == i .cfg .NumTokens {
510
516
i .setState (ACTIVE )
511
517
}
512
- ringDesc .AddIngester (i .ID , i .Addr , tokens . Tokens () , i .GetState (), i .cfg .NormaliseTokens )
518
+ ringDesc .AddIngester (i .ID , i .Addr , tokens , i .GetState (), i .cfg .NormaliseTokens )
513
519
i .setTokens (tokens )
514
520
return ringDesc , true , nil
515
521
}
@@ -525,7 +531,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
525
531
tokens , _ := ringDesc .TokensFor (i .ID )
526
532
i .setTokens (tokens )
527
533
528
- level .Info (util .Logger ).Log ("msg" , "existing entry found in ring" , "state" , i .GetState (), "tokens" , tokens . Len ( ))
534
+ level .Info (util .Logger ).Log ("msg" , "existing entry found in ring" , "state" , i .GetState (), "tokens" , len ( tokens ))
529
535
// we haven't modified the ring, don't try to store it.
530
536
return nil , true , nil
531
537
})
@@ -557,15 +563,15 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
557
563
558
564
if ! i .compareTokens (ringTokens ) {
559
565
// uh, oh... our tokens are not our anymore. Let's try new ones.
560
- needTokens := i .cfg .NumTokens - ringTokens . Len ( )
566
+ needTokens := i .cfg .NumTokens - len ( ringTokens )
561
567
562
568
level .Info (util .Logger ).Log ("msg" , "generating new tokens" , "count" , needTokens )
563
- newTokens := GenerateTokens (needTokens , takenTokens . Tokens () )
569
+ newTokens := GenerateTokens (needTokens , takenTokens )
564
570
565
- ringTokens . Add ( newTokens ... )
566
- sort .Sort (sortableUint32 (ringTokens . Tokens () ))
571
+ ringTokens = append ( ringTokens , newTokens ... )
572
+ sort .Sort (sortableUint32 (ringTokens ))
567
573
568
- ringDesc .AddIngester (i .ID , i .Addr , ringTokens . Tokens () , i .GetState (), i .cfg .NormaliseTokens )
574
+ ringDesc .AddIngester (i .ID , i .Addr , ringTokens , i .GetState (), i .cfg .NormaliseTokens )
569
575
570
576
i .setTokens (ringTokens )
571
577
@@ -586,18 +592,17 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
586
592
}
587
593
588
594
func (i * Lifecycler ) compareTokens (fromRing Tokens ) bool {
589
- sort .Sort (sortableUint32 (fromRing . Tokens () ))
595
+ sort .Sort (sortableUint32 (fromRing ))
590
596
591
597
tokens := i .getTokens ()
592
598
sort .Sort (sortableUint32 (tokens ))
593
599
594
- if len (tokens ) != fromRing . Len ( ) {
600
+ if len (tokens ) != len ( fromRing ) {
595
601
return false
596
602
}
597
603
598
- tokensFromRing := fromRing .Tokens ()
599
604
for i := 0 ; i < len (tokens ); i ++ {
600
- if tokens [i ] != tokensFromRing [i ] {
605
+ if tokens [i ] != fromRing [i ] {
601
606
return false
602
607
}
603
608
}
@@ -617,17 +622,17 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er
617
622
618
623
// At this point, we should not have any tokens, and we should be in PENDING state.
619
624
myTokens , takenTokens := ringDesc .TokensFor (i .ID )
620
- if myTokens . Len ( ) > 0 {
621
- level .Error (util .Logger ).Log ("msg" , "tokens already exist for this ingester - wasn't expecting any!" , "num_tokens" , myTokens . Len ( ))
625
+ if len ( myTokens ) > 0 {
626
+ level .Error (util .Logger ).Log ("msg" , "tokens already exist for this ingester - wasn't expecting any!" , "num_tokens" , len ( myTokens ))
622
627
}
623
628
624
- newTokens := GenerateTokens (i .cfg .NumTokens - myTokens . Len ( ), takenTokens . Tokens () )
629
+ newTokens := GenerateTokens (i .cfg .NumTokens - len ( myTokens ), takenTokens )
625
630
i .setState (targetState )
626
631
627
632
ringDesc .AddIngester (i .ID , i .Addr , newTokens , i .GetState (), i .cfg .NormaliseTokens )
628
633
629
- myTokens . Add ( newTokens ... )
630
- sort .Sort (sortableUint32 (myTokens . Tokens () ))
634
+ myTokens = append ( myTokens , newTokens ... )
635
+ sort .Sort (sortableUint32 (myTokens ))
631
636
i .setTokens (myTokens )
632
637
633
638
return ringDesc , true , nil
@@ -747,3 +752,38 @@ func (i *Lifecycler) unregister(ctx context.Context) error {
747
752
return ringDesc , true , nil
748
753
})
749
754
}
755
+
756
+ // TokenVersion1 is the version is a simple list of tokens.
757
+ const TokenVersion1 = 1
758
+
759
+ // Tokens is a simple list of tokens.
760
+ type Tokens []uint32
761
+
762
+ // MarshalTokens encodes given tokens into JSON.
763
+ func MarshalTokens (tokens Tokens ) ([]byte , error ) {
764
+ data := tokensJSON {
765
+ Version : TokenVersion1 ,
766
+ Tokens : tokens ,
767
+ }
768
+ return json .Marshal (data )
769
+ }
770
+
771
+ // UnmarshalTokens converts the JSON byte stream into Tokens.
772
+ func UnmarshalTokens (b []byte ) (Tokens , error ) {
773
+ tj := tokensJSON {}
774
+ if err := json .Unmarshal (b , & tj ); err != nil {
775
+ return nil , err
776
+ }
777
+ switch tj .Version {
778
+ case TokenVersion1 :
779
+ tokens := Tokens (tj .Tokens )
780
+ return tokens , nil
781
+ default :
782
+ return nil , errors .New ("invalid token type" )
783
+ }
784
+ }
785
+
786
+ type tokensJSON struct {
787
+ Version int `json:"version"`
788
+ Tokens []uint32 `json:"tokens"`
789
+ }
0 commit comments