Skip to content

Commit a50412e

Browse files
authored
Merge pull request #1258 from grafana/20190219_distributed_ruler
Horizontally Scalable Ruler
2 parents 11f7aea + 12aec3d commit a50412e

19 files changed

+665
-412
lines changed

pkg/cortex/cortex.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ type Cortex struct {
125125
tableManager *chunk.TableManager
126126

127127
ruler *ruler.Ruler
128-
rulerServer *ruler.Server
129128
configAPI *api.API
130129
configDB db.DB
131130
alertmanager *alertmanager.MultitenantAlertmanager

pkg/cortex/modules.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (t *Cortex) stopServer() (err error) {
143143
}
144144

145145
func (t *Cortex) initRing(cfg *Config) (err error) {
146-
t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig)
146+
t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig, "ingester")
147147
if err != nil {
148148
return
149149
}
@@ -314,20 +314,17 @@ func (t *Cortex) stopTableManager() error {
314314
func (t *Cortex) initRuler(cfg *Config) (err error) {
315315
cfg.Querier.MaxConcurrent = cfg.Ruler.NumWorkers
316316
cfg.Querier.Timeout = cfg.Ruler.GroupTimeout
317+
cfg.Ruler.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
317318
queryable, engine := querier.New(cfg.Querier, t.distributor, t.store)
318-
t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor)
319-
if err != nil {
320-
return
321-
}
322319

323320
rulesAPI, err := config_client.New(cfg.ConfigStore)
324321
if err != nil {
325322
return err
326323
}
327324

328-
t.rulerServer, err = ruler.NewServer(cfg.Ruler, t.ruler, rulesAPI)
325+
t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor, rulesAPI)
329326
if err != nil {
330-
return err
327+
return
331328
}
332329

333330
// Only serve the API for setting & getting rules configs if we're not
@@ -340,11 +337,12 @@ func (t *Cortex) initRuler(cfg *Config) (err error) {
340337
}
341338
a.RegisterRoutes(t.server.HTTP)
342339
}
340+
341+
t.server.HTTP.Handle("/ruler_ring", t.ruler)
343342
return
344343
}
345344

346345
func (t *Cortex) stopRuler() error {
347-
t.rulerServer.Stop()
348346
t.ruler.Stop()
349347
return nil
350348
}

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
164164
}
165165

166166
var err error
167-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i)
167+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester")
168168
if err != nil {
169169
return nil, err
170170
}

pkg/ingester/lifecycle_test.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/go-kit/kit/log/level"
109
"github.com/stretchr/testify/assert"
1110
"github.com/stretchr/testify/require"
1211
"golang.org/x/net/context"
@@ -19,7 +18,7 @@ import (
1918
"github.com/cortexproject/cortex/pkg/chunk"
2019
"github.com/cortexproject/cortex/pkg/ingester/client"
2120
"github.com/cortexproject/cortex/pkg/ring"
22-
"github.com/cortexproject/cortex/pkg/util"
21+
"github.com/cortexproject/cortex/pkg/ring/testutils"
2322
"github.com/cortexproject/cortex/pkg/util/flagext"
2423
"github.com/cortexproject/cortex/pkg/util/test"
2524
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -70,7 +69,7 @@ func TestIngesterRestart(t *testing.T) {
7069
}
7170

7271
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
73-
return numTokens(config.LifecyclerConfig.RingConfig.Mock, "localhost")
72+
return testutils.NumTokens(config.LifecyclerConfig.RingConfig.Mock, "localhost")
7473
})
7574

7675
{
@@ -82,7 +81,7 @@ func TestIngesterRestart(t *testing.T) {
8281
time.Sleep(200 * time.Millisecond)
8382

8483
test.Poll(t, 100*time.Millisecond, 1, func() interface{} {
85-
return numTokens(config.LifecyclerConfig.RingConfig.Mock, "localhost")
84+
return testutils.NumTokens(config.LifecyclerConfig.RingConfig.Mock, "localhost")
8685
})
8786
}
8887

@@ -195,21 +194,6 @@ func TestIngesterBadTransfer(t *testing.T) {
195194
require.Equal(t, ring.PENDING, ing.lifecycler.GetState())
196195
}
197196

198-
func numTokens(c ring.KVClient, name string) int {
199-
ringDesc, err := c.Get(context.Background(), ring.ConsulKey)
200-
if err != nil {
201-
level.Error(util.Logger).Log("msg", "error reading consul", "err", err)
202-
return 0
203-
}
204-
count := 0
205-
for _, token := range ringDesc.(*ring.Desc).Tokens {
206-
if token.Ingester == name {
207-
count++
208-
}
209-
}
210-
return count
211-
}
212-
213197
type ingesterTransferChunkStreamMock struct {
214198
ctx context.Context
215199
reqs chan *client.TimeSeriesChunk

pkg/ring/consul_client.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ type ConsulConfig struct {
2828
ConsistentReads bool
2929
}
3030

31-
// RegisterFlags adds the flags required to config this to the given FlagSet
32-
func (cfg *ConsulConfig) RegisterFlags(f *flag.FlagSet) {
33-
f.StringVar(&cfg.Host, "consul.hostname", "localhost:8500", "Hostname and port of Consul.")
34-
f.StringVar(&cfg.Prefix, "consul.prefix", "collectors/", "Prefix for keys in Consul.")
35-
f.StringVar(&cfg.ACLToken, "consul.acltoken", "", "ACL Token used to interact with Consul.")
36-
f.DurationVar(&cfg.HTTPClientTimeout, "consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to consul")
37-
f.BoolVar(&cfg.ConsistentReads, "consul.consistent-reads", true, "Enable consistent reads to consul.")
31+
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
32+
func (cfg *ConsulConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
33+
f.StringVar(&cfg.Host, prefix+"consul.hostname", "localhost:8500", "Hostname and port of Consul.")
34+
f.StringVar(&cfg.Prefix, prefix+"consul.prefix", "collectors/", "Prefix for keys in Consul.")
35+
f.StringVar(&cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.")
36+
f.DurationVar(&cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to consul")
37+
f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to consul.")
3838
}
3939

4040
type kv interface {

pkg/ring/lifecycler.go

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,23 @@ import (
1818
)
1919

2020
var (
21-
consulHeartbeats = promauto.NewCounter(prometheus.CounterOpts{
22-
Name: "cortex_ingester_consul_heartbeats_total",
21+
consulHeartbeats = promauto.NewCounterVec(prometheus.CounterOpts{
22+
Name: "cortex_member_consul_heartbeats_total",
2323
Help: "The total number of heartbeats sent to consul.",
24-
})
25-
tokensOwned = promauto.NewGauge(prometheus.GaugeOpts{
26-
Name: "cortex_ingester_ring_tokens_owned",
24+
}, []string{"name"})
25+
tokensOwned = promauto.NewGaugeVec(prometheus.GaugeOpts{
26+
Name: "cortex_member_ring_tokens_owned",
2727
Help: "The number of tokens owned in the ring.",
28-
})
29-
tokensToOwn = promauto.NewGauge(prometheus.GaugeOpts{
30-
Name: "cortex_ingester_ring_tokens_to_own",
28+
}, []string{"name"})
29+
tokensToOwn = promauto.NewGaugeVec(prometheus.GaugeOpts{
30+
Name: "cortex_member_ring_tokens_to_own",
3131
Help: "The number of tokens to own in the ring.",
32-
})
32+
}, []string{"name"})
3333
shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
3434
Name: "cortex_shutdown_duration_seconds",
3535
Help: "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush).",
3636
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
37-
}, []string{"op", "status"})
37+
}, []string{"op", "status", "name"})
3838
)
3939

4040
// LifecyclerConfig is the config to build a Lifecycler.
@@ -61,15 +61,20 @@ type LifecyclerConfig struct {
6161

6262
// RegisterFlags adds the flags required to config this to the given FlagSet
6363
func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) {
64-
cfg.RingConfig.RegisterFlags(f)
64+
cfg.RegisterFlagsWithPrefix("", f)
65+
}
66+
67+
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
68+
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
69+
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f)
6570

66-
f.IntVar(&cfg.NumTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
67-
f.DurationVar(&cfg.HeartbeatPeriod, "ingester.heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.")
68-
f.DurationVar(&cfg.JoinAfter, "ingester.join-after", 0*time.Second, "Period to wait for a claim from another ingester; will join automatically after this.")
69-
f.DurationVar(&cfg.MinReadyDuration, "ingester.min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.")
70-
f.BoolVar(&cfg.ClaimOnRollout, "ingester.claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.")
71-
f.BoolVar(&cfg.NormaliseTokens, "ingester.normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
72-
f.DurationVar(&cfg.FinalSleep, "ingester.final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
71+
f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.")
72+
f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.")
73+
f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.")
74+
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.")
75+
f.BoolVar(&cfg.ClaimOnRollout, prefix+"claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.")
76+
f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
77+
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
7378

7479
hostname, err := os.Hostname()
7580
if err != nil {
@@ -78,10 +83,10 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) {
7883
}
7984

8085
cfg.InfNames = []string{"eth0", "en0"}
81-
f.Var((*flagext.Strings)(&cfg.InfNames), "ingester.interface", "Name of network interface to read address from.")
82-
f.StringVar(&cfg.Addr, "ingester.addr", "", "IP address to advertise in consul.")
83-
f.IntVar(&cfg.Port, "ingester.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).")
84-
f.StringVar(&cfg.ID, "ingester.ID", hostname, "ID to register into consul.")
86+
f.Var((*flagext.Strings)(&cfg.InfNames), prefix+"lifecycler.interface", "Name of network interface to read address from.")
87+
f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in consul.")
88+
f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).")
89+
f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register into consul.")
8590
}
8691

8792
// FlushTransferer controls the shutdown of an ingester.
@@ -103,8 +108,9 @@ type Lifecycler struct {
103108
actorChan chan func()
104109

105110
// These values are initialised at startup, and never change
106-
ID string
107-
addr string
111+
ID string
112+
Addr string
113+
RingName string
108114

109115
// We need to remember the ingester state just in case consul goes away and comes
110116
// back empty. And it changes during lifecycle of ingester.
@@ -119,7 +125,7 @@ type Lifecycler struct {
119125
}
120126

121127
// NewLifecycler makes and starts a new Lifecycler.
122-
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Lifecycler, error) {
128+
func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, name string) (*Lifecycler, error) {
123129
addr := cfg.Addr
124130
if addr == "" {
125131
var err error
@@ -143,7 +149,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life
143149
flushTransferer: flushTransferer,
144150
KVStore: store,
145151

146-
addr: fmt.Sprintf("%s:%d", addr, port),
152+
Addr: fmt.Sprintf("%s:%d", addr, port),
147153
ID: cfg.ID,
148154

149155
quit: make(chan struct{}),
@@ -153,7 +159,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life
153159
startTime: time.Now(),
154160
}
155161

156-
tokensToOwn.Set(float64(cfg.NumTokens))
162+
tokensToOwn.WithLabelValues(l.RingName).Set(float64(cfg.NumTokens))
157163

158164
l.done.Add(1)
159165
go l.loop()
@@ -223,7 +229,7 @@ func (i *Lifecycler) getTokens() []uint32 {
223229
}
224230

225231
func (i *Lifecycler) setTokens(tokens []uint32) {
226-
tokensOwned.Set(float64(len(tokens)))
232+
tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens)))
227233

228234
i.stateMtx.Lock()
229235
defer i.stateMtx.Unlock()
@@ -275,7 +281,7 @@ func (i *Lifecycler) Shutdown() {
275281

276282
func (i *Lifecycler) loop() {
277283
defer func() {
278-
level.Info(util.Logger).Log("msg", "Ingester.loop() exited gracefully")
284+
level.Info(util.Logger).Log("msg", "member.loop() exited gracefully")
279285
i.done.Done()
280286
}()
281287

@@ -308,7 +314,7 @@ loop:
308314
}
309315

310316
case <-heartbeatTicker.C:
311-
consulHeartbeats.Inc()
317+
consulHeartbeats.WithLabelValues(i.RingName).Inc()
312318
if err := i.updateConsul(context.Background()); err != nil {
313319
level.Error(util.Logger).Log("msg", "failed to write to consul, sleeping", "err", err)
314320
}
@@ -336,7 +342,7 @@ heartbeatLoop:
336342
for {
337343
select {
338344
case <-heartbeatTicker.C:
339-
consulHeartbeats.Inc()
345+
consulHeartbeats.WithLabelValues(i.RingName).Inc()
340346
if err := i.updateConsul(context.Background()); err != nil {
341347
level.Error(util.Logger).Log("msg", "failed to write to consul, sleeping", "err", err)
342348
}
@@ -371,7 +377,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
371377
if !ok {
372378
// Either we are a new ingester, or consul must have restarted
373379
level.Info(util.Logger).Log("msg", "entry not found in ring, adding with no tokens")
374-
ringDesc.AddIngester(i.ID, i.addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens)
380+
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens)
375381
return ringDesc, true, nil
376382
}
377383

@@ -403,7 +409,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context) error {
403409

404410
newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
405411
i.setState(ACTIVE)
406-
ringDesc.AddIngester(i.ID, i.addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)
412+
ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)
407413

408414
tokens := append(myTokens, newTokens...)
409415
sort.Sort(sortableUint32(tokens))
@@ -428,11 +434,11 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
428434
if !ok {
429435
// consul must have restarted
430436
level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens")
431-
ringDesc.AddIngester(i.ID, i.addr, i.getTokens(), i.GetState(), i.cfg.NormaliseTokens)
437+
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState(), i.cfg.NormaliseTokens)
432438
} else {
433439
ingesterDesc.Timestamp = time.Now().Unix()
434440
ingesterDesc.State = i.GetState()
435-
ingesterDesc.Addr = i.addr
441+
ingesterDesc.Addr = i.Addr
436442
ringDesc.Ingesters[i.ID] = ingesterDesc
437443
}
438444

@@ -464,17 +470,17 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
464470
transferStart := time.Now()
465471
if err := i.flushTransferer.TransferOut(ctx); err != nil {
466472
level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err)
467-
shutdownDuration.WithLabelValues("transfer", "fail").Observe(time.Since(transferStart).Seconds())
473+
shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds())
468474
} else {
469475
flushRequired = false
470-
shutdownDuration.WithLabelValues("transfer", "success").Observe(time.Since(transferStart).Seconds())
476+
shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds())
471477
}
472478
}
473479

474480
if flushRequired {
475481
flushStart := time.Now()
476482
i.flushTransferer.Flush()
477-
shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds())
483+
shutdownDuration.WithLabelValues("flush", "success", i.RingName).Observe(time.Since(flushStart).Seconds())
478484
}
479485

480486
// Sleep so the shutdownDuration metric can be collected.
@@ -483,6 +489,8 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
483489

484490
// unregister removes our entry from consul.
485491
func (i *Lifecycler) unregister(ctx context.Context) error {
492+
level.Debug(util.Logger).Log("msg", "unregistering member from ring")
493+
486494
return i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
487495
if in == nil {
488496
return nil, false, fmt.Errorf("found empty ring when trying to unregister")

pkg/ring/lifecycler_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestRingNormaliseMigration(t *testing.T) {
2929
flagext.DefaultValues(&ringConfig)
3030
ringConfig.Mock = NewInMemoryKVClient()
3131

32-
r, err := New(ringConfig)
32+
r, err := New(ringConfig, "ingester")
3333
require.NoError(t, err)
3434
defer r.Stop()
3535

@@ -45,7 +45,7 @@ func TestRingNormaliseMigration(t *testing.T) {
4545
lifecyclerConfig1.FinalSleep = 0
4646

4747
ft := &flushTransferer{}
48-
l1, err := NewLifecycler(lifecyclerConfig1, ft)
48+
l1, err := NewLifecycler(lifecyclerConfig1, ft, "ingester")
4949
require.NoError(t, err)
5050

5151
// Check this ingester joined, is active, and has one token.
@@ -70,7 +70,7 @@ func TestRingNormaliseMigration(t *testing.T) {
7070
lifecyclerConfig2.ID = "ing2"
7171
lifecyclerConfig1.FinalSleep = 0
7272

73-
l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{})
73+
l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester")
7474
require.NoError(t, err)
7575

7676
// This will block until l1 has successfully left the ring.

pkg/ring/replication_strategy_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func TestReplicationStrategy(t *testing.T) {
8787
Mock: NewInMemoryKVClient(),
8888
HeartbeatTimeout: 100 * time.Second,
8989
ReplicationFactor: tc.RF,
90-
})
90+
}, "ingester")
9191
require.NoError(t, err)
9292

9393
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {

0 commit comments

Comments
 (0)