@@ -18,23 +18,23 @@ import (
18
18
)
19
19
20
20
var (
21
- consulHeartbeats = promauto .NewCounter (prometheus.CounterOpts {
22
- Name : "cortex_ingester_consul_heartbeats_total " ,
21
+ consulHeartbeats = promauto .NewCounterVec (prometheus.CounterOpts {
22
+ Name : "cortex_member_consul_heartbeats_total " ,
23
23
Help : "The total number of heartbeats sent to consul." ,
24
- })
25
- tokensOwned = promauto .NewGauge (prometheus.GaugeOpts {
26
- Name : "cortex_ingester_ring_tokens_owned " ,
24
+ }, [] string { "name" } )
25
+ tokensOwned = promauto .NewGaugeVec (prometheus.GaugeOpts {
26
+ Name : "cortex_member_ring_tokens_owned " ,
27
27
Help : "The number of tokens owned in the ring." ,
28
- })
29
- tokensToOwn = promauto .NewGauge (prometheus.GaugeOpts {
30
- Name : "cortex_ingester_ring_tokens_to_own " ,
28
+ }, [] string { "name" } )
29
+ tokensToOwn = promauto .NewGaugeVec (prometheus.GaugeOpts {
30
+ Name : "cortex_member_ring_tokens_to_own " ,
31
31
Help : "The number of tokens to own in the ring." ,
32
- })
32
+ }, [] string { "name" } )
33
33
shutdownDuration = promauto .NewHistogramVec (prometheus.HistogramOpts {
34
34
Name : "cortex_shutdown_duration_seconds" ,
35
35
Help : "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush)." ,
36
36
Buckets : prometheus .ExponentialBuckets (10 , 2 , 8 ), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
37
- }, []string {"op" , "status" })
37
+ }, []string {"op" , "status" , "name" })
38
38
)
39
39
40
40
// LifecyclerConfig is the config to build a Lifecycler.
@@ -61,15 +61,20 @@ type LifecyclerConfig struct {
61
61
62
62
// RegisterFlags adds the flags required to config this to the given FlagSet
63
63
func (cfg * LifecyclerConfig ) RegisterFlags (f * flag.FlagSet ) {
64
- cfg .RingConfig .RegisterFlags (f )
64
+ cfg .RegisterFlagsWithPrefix ("" , f )
65
+ }
66
+
67
+ // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
68
+ func (cfg * LifecyclerConfig ) RegisterFlagsWithPrefix (prefix string , f * flag.FlagSet ) {
69
+ cfg .RingConfig .RegisterFlagsWithPrefix (prefix , f )
65
70
66
- f .IntVar (& cfg .NumTokens , "ingester. num-tokens" , 128 , "Number of tokens for each ingester." )
67
- f .DurationVar (& cfg .HeartbeatPeriod , "ingester. heartbeat-period" , 5 * time .Second , "Period at which to heartbeat to consul." )
68
- f .DurationVar (& cfg .JoinAfter , "ingester. join-after" , 0 * time .Second , "Period to wait for a claim from another ingester ; will join automatically after this." )
69
- f .DurationVar (& cfg .MinReadyDuration , "ingester. min-ready-duration" , 1 * time .Minute , "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring." )
70
- f .BoolVar (& cfg .ClaimOnRollout , "ingester. claim-on-rollout" , false , "Send chunks to PENDING ingesters on exit." )
71
- f .BoolVar (& cfg .NormaliseTokens , "ingester. normalise-tokens" , false , "Store tokens in a normalised fashion to reduce allocations." )
72
- f .DurationVar (& cfg .FinalSleep , "ingester. final-sleep" , 30 * time .Second , "Duration to sleep for before exiting, to ensure metrics are scraped." )
71
+ f .IntVar (& cfg .NumTokens , prefix + " num-tokens" , 128 , "Number of tokens for each ingester." )
72
+ f .DurationVar (& cfg .HeartbeatPeriod , prefix + " heartbeat-period" , 5 * time .Second , "Period at which to heartbeat to consul." )
73
+ f .DurationVar (& cfg .JoinAfter , prefix + " join-after" , 0 * time .Second , "Period to wait for a claim from another member ; will join automatically after this." )
74
+ f .DurationVar (& cfg .MinReadyDuration , prefix + " min-ready-duration" , 1 * time .Minute , "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring." )
75
+ f .BoolVar (& cfg .ClaimOnRollout , prefix + " claim-on-rollout" , false , "Send chunks to PENDING ingesters on exit." )
76
+ f .BoolVar (& cfg .NormaliseTokens , prefix + " normalise-tokens" , false , "Store tokens in a normalised fashion to reduce allocations." )
77
+ f .DurationVar (& cfg .FinalSleep , prefix + " final-sleep" , 30 * time .Second , "Duration to sleep for before exiting, to ensure metrics are scraped." )
73
78
74
79
hostname , err := os .Hostname ()
75
80
if err != nil {
@@ -78,10 +83,10 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) {
78
83
}
79
84
80
85
cfg .InfNames = []string {"eth0" , "en0" }
81
- f .Var ((* flagext .Strings )(& cfg .InfNames ), "ingester .interface" , "Name of network interface to read address from." )
82
- f .StringVar (& cfg .Addr , "ingester .addr" , "" , "IP address to advertise in consul." )
83
- f .IntVar (& cfg .Port , "ingester .port" , 0 , "port to advertise in consul (defaults to server.grpc-listen-port)." )
84
- f .StringVar (& cfg .ID , "ingester .ID" , hostname , "ID to register into consul." )
86
+ f .Var ((* flagext .Strings )(& cfg .InfNames ), prefix + "lifecycler .interface" , "Name of network interface to read address from." )
87
+ f .StringVar (& cfg .Addr , prefix + "lifecycler .addr" , "" , "IP address to advertise in consul." )
88
+ f .IntVar (& cfg .Port , prefix + "lifecycler .port" , 0 , "port to advertise in consul (defaults to server.grpc-listen-port)." )
89
+ f .StringVar (& cfg .ID , prefix + "lifecycler .ID" , hostname , "ID to register into consul." )
85
90
}
86
91
87
92
// FlushTransferer controls the shutdown of an ingester.
@@ -103,8 +108,9 @@ type Lifecycler struct {
103
108
actorChan chan func ()
104
109
105
110
// These values are initialised at startup, and never change
106
- ID string
107
- addr string
111
+ ID string
112
+ Addr string
113
+ RingName string
108
114
109
115
// We need to remember the ingester state just in case consul goes away and comes
110
116
// back empty. And it changes during lifecycle of ingester.
@@ -119,7 +125,7 @@ type Lifecycler struct {
119
125
}
120
126
121
127
// NewLifecycler makes and starts a new Lifecycler.
122
- func NewLifecycler (cfg LifecyclerConfig , flushTransferer FlushTransferer ) (* Lifecycler , error ) {
128
+ func NewLifecycler (cfg LifecyclerConfig , flushTransferer FlushTransferer , name string ) (* Lifecycler , error ) {
123
129
addr := cfg .Addr
124
130
if addr == "" {
125
131
var err error
@@ -143,7 +149,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life
143
149
flushTransferer : flushTransferer ,
144
150
KVStore : store ,
145
151
146
- addr : fmt .Sprintf ("%s:%d" , addr , port ),
152
+ Addr : fmt .Sprintf ("%s:%d" , addr , port ),
147
153
ID : cfg .ID ,
148
154
149
155
quit : make (chan struct {}),
@@ -153,7 +159,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life
153
159
startTime : time .Now (),
154
160
}
155
161
156
- tokensToOwn .Set (float64 (cfg .NumTokens ))
162
+ tokensToOwn .WithLabelValues ( l . RingName ). Set (float64 (cfg .NumTokens ))
157
163
158
164
l .done .Add (1 )
159
165
go l .loop ()
@@ -223,7 +229,7 @@ func (i *Lifecycler) getTokens() []uint32 {
223
229
}
224
230
225
231
func (i * Lifecycler ) setTokens (tokens []uint32 ) {
226
- tokensOwned .Set (float64 (len (tokens )))
232
+ tokensOwned .WithLabelValues ( i . RingName ). Set (float64 (len (tokens )))
227
233
228
234
i .stateMtx .Lock ()
229
235
defer i .stateMtx .Unlock ()
@@ -275,7 +281,7 @@ func (i *Lifecycler) Shutdown() {
275
281
276
282
func (i * Lifecycler ) loop () {
277
283
defer func () {
278
- level .Info (util .Logger ).Log ("msg" , "Ingester .loop() exited gracefully" )
284
+ level .Info (util .Logger ).Log ("msg" , "member .loop() exited gracefully" )
279
285
i .done .Done ()
280
286
}()
281
287
@@ -308,7 +314,7 @@ loop:
308
314
}
309
315
310
316
case <- heartbeatTicker .C :
311
- consulHeartbeats .Inc ()
317
+ consulHeartbeats .WithLabelValues ( i . RingName ). Inc ()
312
318
if err := i .updateConsul (context .Background ()); err != nil {
313
319
level .Error (util .Logger ).Log ("msg" , "failed to write to consul, sleeping" , "err" , err )
314
320
}
@@ -336,7 +342,7 @@ heartbeatLoop:
336
342
for {
337
343
select {
338
344
case <- heartbeatTicker .C :
339
- consulHeartbeats .Inc ()
345
+ consulHeartbeats .WithLabelValues ( i . RingName ). Inc ()
340
346
if err := i .updateConsul (context .Background ()); err != nil {
341
347
level .Error (util .Logger ).Log ("msg" , "failed to write to consul, sleeping" , "err" , err )
342
348
}
@@ -371,7 +377,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
371
377
if ! ok {
372
378
// Either we are a new ingester, or consul must have restarted
373
379
level .Info (util .Logger ).Log ("msg" , "entry not found in ring, adding with no tokens" )
374
- ringDesc .AddIngester (i .ID , i .addr , []uint32 {}, i .GetState (), i .cfg .NormaliseTokens )
380
+ ringDesc .AddIngester (i .ID , i .Addr , []uint32 {}, i .GetState (), i .cfg .NormaliseTokens )
375
381
return ringDesc , true , nil
376
382
}
377
383
@@ -403,7 +409,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context) error {
403
409
404
410
newTokens := GenerateTokens (i .cfg .NumTokens - len (myTokens ), takenTokens )
405
411
i .setState (ACTIVE )
406
- ringDesc .AddIngester (i .ID , i .addr , newTokens , i .GetState (), i .cfg .NormaliseTokens )
412
+ ringDesc .AddIngester (i .ID , i .Addr , newTokens , i .GetState (), i .cfg .NormaliseTokens )
407
413
408
414
tokens := append (myTokens , newTokens ... )
409
415
sort .Sort (sortableUint32 (tokens ))
@@ -428,11 +434,11 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
428
434
if ! ok {
429
435
// consul must have restarted
430
436
level .Info (util .Logger ).Log ("msg" , "found empty ring, inserting tokens" )
431
- ringDesc .AddIngester (i .ID , i .addr , i .getTokens (), i .GetState (), i .cfg .NormaliseTokens )
437
+ ringDesc .AddIngester (i .ID , i .Addr , i .getTokens (), i .GetState (), i .cfg .NormaliseTokens )
432
438
} else {
433
439
ingesterDesc .Timestamp = time .Now ().Unix ()
434
440
ingesterDesc .State = i .GetState ()
435
- ingesterDesc .Addr = i .addr
441
+ ingesterDesc .Addr = i .Addr
436
442
ringDesc .Ingesters [i .ID ] = ingesterDesc
437
443
}
438
444
@@ -464,17 +470,17 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
464
470
transferStart := time .Now ()
465
471
if err := i .flushTransferer .TransferOut (ctx ); err != nil {
466
472
level .Error (util .Logger ).Log ("msg" , "Failed to transfer chunks to another ingester" , "err" , err )
467
- shutdownDuration .WithLabelValues ("transfer" , "fail" ).Observe (time .Since (transferStart ).Seconds ())
473
+ shutdownDuration .WithLabelValues ("transfer" , "fail" , i . RingName ).Observe (time .Since (transferStart ).Seconds ())
468
474
} else {
469
475
flushRequired = false
470
- shutdownDuration .WithLabelValues ("transfer" , "success" ).Observe (time .Since (transferStart ).Seconds ())
476
+ shutdownDuration .WithLabelValues ("transfer" , "success" , i . RingName ).Observe (time .Since (transferStart ).Seconds ())
471
477
}
472
478
}
473
479
474
480
if flushRequired {
475
481
flushStart := time .Now ()
476
482
i .flushTransferer .Flush ()
477
- shutdownDuration .WithLabelValues ("flush" , "success" ).Observe (time .Since (flushStart ).Seconds ())
483
+ shutdownDuration .WithLabelValues ("flush" , "success" , i . RingName ).Observe (time .Since (flushStart ).Seconds ())
478
484
}
479
485
480
486
// Sleep so the shutdownDuration metric can be collected.
@@ -483,6 +489,8 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
483
489
484
490
// unregister removes our entry from consul.
485
491
func (i * Lifecycler ) unregister (ctx context.Context ) error {
492
+ level .Debug (util .Logger ).Log ("msg" , "unregistering member from ring" )
493
+
486
494
return i .KVStore .CAS (ctx , ConsulKey , func (in interface {}) (out interface {}, retry bool , err error ) {
487
495
if in == nil {
488
496
return nil , false , fmt .Errorf ("found empty ring when trying to unregister" )
0 commit comments