@@ -2,13 +2,9 @@ package ring
2
2
3
3
import (
4
4
"context"
5
- "encoding/json"
6
- "errors"
7
5
"flag"
8
6
"fmt"
9
- "io/ioutil"
10
7
"os"
11
- "path"
12
8
"sort"
13
9
"sync"
14
10
"time"
42
38
}, []string {"op" , "status" , "name" })
43
39
)
44
40
45
- const (
46
- tokensFileName = "tokens"
47
- )
48
-
49
41
// LifecyclerConfig is the config to build a Lifecycler.
50
42
type LifecyclerConfig struct {
51
43
RingConfig Config `yaml:"ring,omitempty"`
@@ -61,7 +53,7 @@ type LifecyclerConfig struct {
61
53
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
62
54
InfNames []string `yaml:"interface_names"`
63
55
FinalSleep time.Duration `yaml:"final_sleep"`
64
- TokensFileDir string `yaml:"token_file_dir ,omitempty"`
56
+ TokensFileDir string `yaml:"tokens_file_dir ,omitempty"`
65
57
66
58
// For testing, you can override the address and ID of this ingester
67
59
Addr string `yaml:"address"`
@@ -93,7 +85,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
93
85
flagext .DeprecatedFlag (f , prefix + "claim-on-rollout" , "DEPRECATED. This feature is no longer optional." )
94
86
f .BoolVar (& cfg .NormaliseTokens , prefix + "normalise-tokens" , false , "Store tokens in a normalised fashion to reduce allocations." )
95
87
f .DurationVar (& cfg .FinalSleep , prefix + "final-sleep" , 30 * time .Second , "Duration to sleep for before exiting, to ensure metrics are scraped." )
96
- f .StringVar (& cfg .TokensFileDir , prefix + "token -file-dir" , "" , "Directory in which the token file is to be stored." )
88
+ f .StringVar (& cfg .TokensFileDir , prefix + "tokens -file-dir" , "" , "Directory in which the tokens file is stored. If empty, tokens are not stored at shutdown and restored at startup ." )
97
89
98
90
hostname , err := os .Hostname ()
99
91
if err != nil {
@@ -265,49 +257,16 @@ func (i *Lifecycler) getTokens() Tokens {
265
257
return i .tokens
266
258
}
267
259
268
- func (i * Lifecycler ) flushTokensToFile () {
269
- if i .cfg .TokensFileDir == "" || i .tokens == nil || len (i .tokens ) == 0 {
270
- return
271
- }
272
- tokenFilePath := path .Join (i .cfg .TokensFileDir , tokensFileName )
273
- f , err := os .OpenFile (tokenFilePath , os .O_WRONLY | os .O_CREATE , 0666 )
274
- if err != nil {
275
- level .Error (util .Logger ).Log ("msg" , "error in creating token file" , "err" , err )
276
- return
277
- }
278
-
279
- b , err := MarshalTokens (i .tokens )
280
- if err != nil {
281
- level .Error (util .Logger ).Log ("msg" , "error in marshalling tokens" , "err" , err )
282
- return
283
- }
284
- if _ , err = f .Write (b ); err != nil {
285
- level .Error (util .Logger ).Log ("msg" , "error in writing token file" , "err" , err )
286
- return
287
- }
288
- }
289
-
290
- func (i * Lifecycler ) getTokensFromFile () (Tokens , error ) {
291
- tokenFilePath := path .Join (i .cfg .TokensFileDir , tokensFileName )
292
- b , err := ioutil .ReadFile (tokenFilePath )
293
- if err != nil {
294
- if os .IsNotExist (err ) {
295
- return nil , nil
296
- }
297
- return nil , err
298
- }
299
-
300
- return UnmarshalTokens (b )
301
- }
302
-
303
260
func (i * Lifecycler ) setTokens (tokens Tokens ) {
304
261
tokensOwned .WithLabelValues (i .RingName ).Set (float64 (len (tokens )))
305
262
306
263
i .stateMtx .Lock ()
307
264
i .tokens = tokens
308
265
i .stateMtx .Unlock ()
309
266
310
- i .flushTokensToFile ()
267
+ if err := i .tokens .StoreToFile (i .cfg .TokensFileDir ); err != nil {
268
+ level .Error (util .Logger ).Log ("msg" , "error storing tokens to disk" , "err" , err )
269
+ }
311
270
}
312
271
313
272
// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
@@ -506,8 +465,9 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
506
465
507
466
ingesterDesc , ok := ringDesc .Ingesters [i .ID ]
508
467
if ! ok {
468
+ var tokens Tokens
509
469
// We load the tokens from the file only if it does not exist in the ring yet.
510
- tokens , err := i . getTokensFromFile ( )
470
+ err := tokens . LoadFromFile ( i . cfg . TokensFileDir )
511
471
if err != nil {
512
472
level .Error (util .Logger ).Log ("msg" , "error in getting tokens from file" , "err" , err )
513
473
} else if tokens != nil && len (tokens ) > 0 {
@@ -752,38 +712,3 @@ func (i *Lifecycler) unregister(ctx context.Context) error {
752
712
return ringDesc , true , nil
753
713
})
754
714
}
755
-
756
- // TokenVersion1 is the version is a simple list of tokens.
757
- const TokenVersion1 = 1
758
-
759
- // Tokens is a simple list of tokens.
760
- type Tokens []uint32
761
-
762
- // MarshalTokens encodes given tokens into JSON.
763
- func MarshalTokens (tokens Tokens ) ([]byte , error ) {
764
- data := tokensJSON {
765
- Version : TokenVersion1 ,
766
- Tokens : tokens ,
767
- }
768
- return json .Marshal (data )
769
- }
770
-
771
- // UnmarshalTokens converts the JSON byte stream into Tokens.
772
- func UnmarshalTokens (b []byte ) (Tokens , error ) {
773
- tj := tokensJSON {}
774
- if err := json .Unmarshal (b , & tj ); err != nil {
775
- return nil , err
776
- }
777
- switch tj .Version {
778
- case TokenVersion1 :
779
- tokens := Tokens (tj .Tokens )
780
- return tokens , nil
781
- default :
782
- return nil , errors .New ("invalid token type" )
783
- }
784
- }
785
-
786
- type tokensJSON struct {
787
- Version int `json:"version"`
788
- Tokens []uint32 `json:"tokens"`
789
- }
0 commit comments