From 8bc46120254552bad1437ac340e7cceda83915be Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Nov 2016 14:51:22 +0000 Subject: [PATCH 1/5] Add per-token state, and use it to not write to Leaving ingesters --- cmd/cortex/main.go | 8 +++- distributor.go | 78 +++++++++++++++++++++++++++++--------- ring/ingester_lifecycle.go | 25 +++++++++++- ring/model.go | 15 ++++++-- ring/ring.go | 53 ++++++++++++++++++++------ 5 files changed, 143 insertions(+), 36 deletions(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 357b7c1c6d..99dd09be94 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -125,10 +125,14 @@ func main() { // network errors. log.Fatalf("Could not register ingester: %v", err) } - defer registration.Unregister() prometheus.MustRegister(registration) ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess) - defer ing.Stop() + + defer func() { + registration.ChangeState(ring.Leaving) + ing.Stop() + registration.Unregister() + }() default: log.Fatalf("Mode %s not supported!", cfg.mode) } diff --git a/distributor.go b/distributor.go index 386ffe2346..5e847aac75 100644 --- a/distributor.go +++ b/distributor.go @@ -4,6 +4,7 @@ import ( "fmt" "hash/fnv" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -46,7 +47,8 @@ type Distributor struct { type ReadRing interface { prometheus.Collector - Get(key uint32, n int) ([]ring.IngesterDesc, error) + Get(key uint32, n int, op ring.Operation) ([]ring.IngesterDesc, error) + BatchGet(keys []uint32, n int, op ring.Operation) ([][]ring.IngesterDesc, error) GetAll() []ring.IngesterDesc } @@ -154,6 +156,12 @@ func tokenFor(userID string, name model.LabelValue) uint32 { return h.Sum32() } +type sampleTracker struct { + sample *model.Sample + minSuccess int + succeeded int32 +} + // Append implements SampleAppender. func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error { userID, err := user.GetID(ctx) @@ -163,46 +171,77 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error d.receivedSamples.Add(float64(len(samples))) - samplesByIngester := map[string][]*model.Sample{} - for _, sample := range samples { - key := tokenForMetric(userID, sample.Metric) - ingesters, err := d.cfg.Ring.Get(key, d.cfg.ReplicationFactor) - if err != nil { - return err + sampleTrackers := make([]sampleTracker, len(samples), len(samples)) + samplesByIngester := map[string][]*sampleTracker{} + keys := make([]uint32, len(samples), len(samples)) + + for i, sample := range samples { + keys[i] = tokenForMetric(userID, sample.Metric) + } + + ingesters, err := d.cfg.Ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write) + if err != nil { + return err + } + + for i := range samples { + sampleTrackers[i] = sampleTracker{ + sample: samples[i], + minSuccess: (len(ingesters[i]) / 2) + 1, + succeeded: 0, + } + + // Skip those that have not heartbeated in a while. NB these are still + // included in the calculation of minSuccess, so too many failed ingesters + // will cause the whole write to fail. + liveIngesters := make([]string, len(ingesters[i])) + for _, ingester := range ingesters[i] { + if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout { + liveIngesters = append(liveIngesters, ingester.Hostname) + } + } + if len(liveIngesters) < sampleTrackers[i].minSuccess { + return fmt.Errorf("Wanted at least %d live ingesters to process write, had %d.", + sampleTrackers[i].minSuccess, len(liveIngesters)) } - for _, ingester := range ingesters { - otherSamples := samplesByIngester[ingester.Hostname] - samplesByIngester[ingester.Hostname] = append(otherSamples, sample) + + for _, liveIngester := range liveIngesters { + sampleForIngester := samplesByIngester[liveIngester] + samplesByIngester[liveIngester] = append(sampleForIngester, &sampleTrackers[i]) } } errs := make(chan error) for hostname, samples := range samplesByIngester { - go func(hostname string, samples []*model.Sample) { + go func(hostname string, samples []*sampleTracker) { errs <- d.sendSamples(ctx, hostname, samples) }(hostname, samples) } var lastErr error - successes := 0 for i := 0; i < len(samplesByIngester); i++ { if err := <-errs; err != nil { lastErr = err continue } - successes++ } - - if successes < d.cfg.MinWriteSuccesses { - return fmt.Errorf("too few successful writes, last error was: %v", lastErr) + for i := range sampleTrackers { + if sampleTrackers[i].succeeded < int32(sampleTrackers[i].minSuccess) { + return fmt.Errorf("need %d successful writes, only got %d, last error was: %v", + sampleTrackers[i].minSuccess, sampleTrackers[i].succeeded, lastErr) + } } return nil } -func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error { +func (d *Distributor) sendSamples(ctx context.Context, hostname string, sampleTrackers []*sampleTracker) error { client, err := d.getClientFor(hostname) if err != nil { return err } + samples := make([]*model.Sample, len(sampleTrackers), len(sampleTrackers)) + for i := range sampleTrackers { + samples[i] = sampleTrackers[i].sample + } err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error { return client.Append(ctx, samples) }) @@ -210,6 +249,9 @@ func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples d.ingesterAppendFailures.WithLabelValues(hostname).Inc() } d.ingesterAppends.WithLabelValues(hostname).Inc() + for i := range sampleTrackers { + atomic.AddInt32(&sampleTrackers[i].succeeded, 1) + } return err } @@ -241,7 +283,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . return err } - ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor) + ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read) if err != nil { return err } diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index 50f8c7a39f..d1f14094c3 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -31,6 +31,11 @@ type IngesterRegistration struct { quit chan struct{} wait sync.WaitGroup + // We need to remember the token state just in case consul goes away and comes + // back empty. Channel is users to tell the actor to update consul on state changes. + state TokenState + stateChange chan TokenState + consulHeartbeats prometheus.Counter } @@ -56,6 +61,9 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In hostname: fmt.Sprintf("%s:%d", addr, listenPort), quit: make(chan struct{}), + state: Active, + stateChange: make(chan TokenState), + consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_consul_heartbeats_total", Help: "The total number of heartbeats sent to consul.", @@ -67,6 +75,11 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In return r, nil } +func (r *IngesterRegistration) ChangeState(state TokenState) { + log.Info("Leaving the ring") + r.stateChange <- state +} + // Unregister removes ingester config from Consul; will block // until we'll successfully unregistered. func (r *IngesterRegistration) Unregister() { @@ -108,7 +121,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 { tokens = append(tokens, newTokens...) } - ringDesc.addIngester(r.id, r.hostname, tokens) + ringDesc.addIngester(r.id, r.hostname, tokens, r.state) return ringDesc, true, nil } if err := r.consul.CAS(consulKey, descFactory, pickTokens); err != nil { @@ -130,18 +143,26 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { if !ok { // consul must have restarted log.Infof("Found empty ring, inserting tokens!") - ringDesc.addIngester(r.id, r.hostname, tokens) + ringDesc.addIngester(r.id, r.hostname, tokens, r.state) } else { ingesterDesc.Timestamp = time.Now() ringDesc.Ingesters[r.id] = ingesterDesc + for i := range ringDesc.Tokens { + ringDesc.Tokens[i].State = r.state + } } return ringDesc, true, nil } + ticker := time.NewTicker(heartbeatInterval) defer ticker.Stop() for { select { + case r.state = <-r.stateChange: + if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil { + log.Errorf("Failed to write to consul, sleeping: %v", err) + } case <-ticker.C: r.consulHeartbeats.Inc() if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil { diff --git a/ring/model.go b/ring/model.go index 73de428a59..8d2101f591 100644 --- a/ring/model.go +++ b/ring/model.go @@ -7,6 +7,13 @@ import ( "github.com/prometheus/common/log" ) +type TokenState int + +const ( + Active TokenState = iota + Leaving +) + // Desc is the serialised state in Consul representing // all ingesters (ie, the ring). type Desc struct { @@ -29,8 +36,9 @@ func (ts TokenDescs) Less(i, j int) bool { return ts[i].Token < ts[j].Token } // TokenDesc describes an individual token in the ring. type TokenDesc struct { - Token uint32 `json:"tokens"` - Ingester string `json:"ingester"` + Token uint32 `json:"tokens"` + Ingester string `json:"ingester"` + State TokenState `json:"state"` } func descFactory() interface{} { @@ -43,7 +51,7 @@ func newDesc() *Desc { } } -func (d *Desc) addIngester(id, hostname string, tokens []uint32) { +func (d *Desc) addIngester(id, hostname string, tokens []uint32, state TokenState) { d.Ingesters[id] = IngesterDesc{ Hostname: hostname, Timestamp: time.Now(), @@ -53,6 +61,7 @@ func (d *Desc) addIngester(id, hostname string, tokens []uint32) { d.Tokens = append(d.Tokens, TokenDesc{ Token: token, Ingester: id, + State: state, }) } diff --git a/ring/ring.go b/ring/ring.go index e87a2fa19a..e50bc0f0b1 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -96,8 +96,36 @@ func (r *Ring) loop() { }) } -// Get returns up to n ingesters close to the hash in the circle. -func (r *Ring) Get(key uint32, n int) ([]IngesterDesc, error) { +type Operation int + +const ( + Read Operation = iota + Write +) + +// Get returns n (or more) ingesters which form the replicas for the given key. +func (r *Ring) Get(key uint32, n int, op Operation) ([]IngesterDesc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + return r.getInternal(key, n, op) +} + +func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]IngesterDesc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + result := make([][]IngesterDesc, len(keys), len(keys)) + for i, key := range keys { + ingesters, err := r.getInternal(key, n, op) + if err != nil { + return nil, err + } + result[i] = ingesters + } + return result, nil +} + +func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, error) { r.mtx.RLock() defer r.mtx.RUnlock() if len(r.ringDesc.Tokens) == 0 { @@ -114,19 +142,22 @@ func (r *Ring) Get(key uint32, n int) ([]IngesterDesc, error) { i %= len(r.ringDesc.Tokens) // We want n *distinct* ingesters. - host := r.ringDesc.Tokens[i].Ingester - if _, ok := distinctHosts[host]; ok { + token := r.ringDesc.Tokens[i] + if _, ok := distinctHosts[token.Ingester]; ok { continue } - distinctHosts[host] = struct{}{} - - ing := r.ringDesc.Ingesters[host] - - // Out of the n distinct subsequent ingesters, skip those that have not heartbeated in a while. - if time.Now().Sub(ing.Timestamp) > r.heartbeatTimeout { - continue + distinctHosts[token.Ingester] = struct{}{} + + // If we encounter a Leaving token, for reads we should bump n, + // for writes we bump n and skip the token. + if token.State == Leaving { + n++ + if op == Write { + continue + } } + ing := r.ringDesc.Ingesters[token.Ingester] ingesters = append(ingesters, ing) } return ingesters, nil From 2a58c2a4dc36fd56610e72627c58af5f8d21de4e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Nov 2016 15:03:23 +0000 Subject: [PATCH 2/5] Nits --- cmd/cortex/main.go | 1 + distributor.go | 4 ++-- ring/ingester_lifecycle.go | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 99dd09be94..748528778a 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -128,6 +128,7 @@ func main() { prometheus.MustRegister(registration) ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess) + // Deferring a func to make ordering obvious defer func() { registration.ChangeState(ring.Leaving) ing.Stop() diff --git a/distributor.go b/distributor.go index 5e847aac75..9599f2f55f 100644 --- a/distributor.go +++ b/distributor.go @@ -192,7 +192,7 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error } // Skip those that have not heartbeated in a while. NB these are still - // included in the calculation of minSuccess, so too many failed ingesters + // included in the calculation of minSuccess, so if too many failed ingesters // will cause the whole write to fail. liveIngesters := make([]string, len(ingesters[i])) for _, ingester := range ingesters[i] { @@ -201,7 +201,7 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error } } if len(liveIngesters) < sampleTrackers[i].minSuccess { - return fmt.Errorf("Wanted at least %d live ingesters to process write, had %d.", + return fmt.Errorf("wanted at least %d live ingesters to process write, had %d.", sampleTrackers[i].minSuccess, len(liveIngesters)) } diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index d1f14094c3..3243ee9810 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -61,6 +61,7 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In hostname: fmt.Sprintf("%s:%d", addr, listenPort), quit: make(chan struct{}), + // Only read/written on actor goroutine. state: Active, stateChange: make(chan TokenState), @@ -76,7 +77,7 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In } func (r *IngesterRegistration) ChangeState(state TokenState) { - log.Info("Leaving the ring") + log.Info("Changing token state to: %v", state) r.stateChange <- state } From 15fbf270a67f41e49ad142278e1e52d39e7eb71a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Nov 2016 15:16:03 +0000 Subject: [PATCH 3/5] Fix lint --- distributor.go | 2 +- ring/ingester_lifecycle.go | 2 ++ ring/model.go | 2 ++ ring/ring.go | 18 +++++++++++------- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/distributor.go b/distributor.go index 9599f2f55f..f2ad727af2 100644 --- a/distributor.go +++ b/distributor.go @@ -201,7 +201,7 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error } } if len(liveIngesters) < sampleTrackers[i].minSuccess { - return fmt.Errorf("wanted at least %d live ingesters to process write, had %d.", + return fmt.Errorf("wanted at least %d live ingesters to process write, had %d", sampleTrackers[i].minSuccess, len(liveIngesters)) } diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index 3243ee9810..beb6d4dcae 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -76,6 +76,8 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In return r, nil } +// ChangeState changes the state of all tokens owned by this +// ingester in the ring. func (r *IngesterRegistration) ChangeState(state TokenState) { log.Info("Changing token state to: %v", state) r.stateChange <- state diff --git a/ring/model.go b/ring/model.go index 8d2101f591..df1d13ab77 100644 --- a/ring/model.go +++ b/ring/model.go @@ -7,8 +7,10 @@ import ( "github.com/prometheus/common/log" ) +// TokenState describes the state of a token type TokenState int +// Values for TokenState const ( Active TokenState = iota Leaving diff --git a/ring/ring.go b/ring/ring.go index e50bc0f0b1..525d70c4ef 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -18,6 +18,15 @@ const ( unhealthyLabel = "unhealthy" ) +// Operation can be Read or Write +type Operation int + +// Values for Operation +const ( + Read Operation = iota + Write +) + type uint32s []uint32 func (x uint32s) Len() int { return len(x) } @@ -96,13 +105,6 @@ func (r *Ring) loop() { }) } -type Operation int - -const ( - Read Operation = iota - Write -) - // Get returns n (or more) ingesters which form the replicas for the given key. func (r *Ring) Get(key uint32, n int, op Operation) ([]IngesterDesc, error) { r.mtx.RLock() @@ -110,6 +112,8 @@ func (r *Ring) Get(key uint32, n int, op Operation) ([]IngesterDesc, error) { return r.getInternal(key, n, op) } +// BatchGet returns n (or more) ingesters which form the replicas for the given key. +// The order of the result matches the order of the input. func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]IngesterDesc, error) { r.mtx.RLock() defer r.mtx.RUnlock() From 1d03d820439fd5d5b999b8672b5e445edeceed8a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Nov 2016 16:34:52 +0000 Subject: [PATCH 4/5] Review feedback --- cmd/cortex/main.go | 4 ++-- distributor.go | 17 ++++++++--------- ring/ingester_lifecycle.go | 8 ++++---- ring/ring.go | 8 ++++++-- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 748528778a..199dbd9e8c 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -93,7 +93,6 @@ func main() { flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.") - flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.") flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests") flag.Parse() @@ -125,7 +124,6 @@ func main() { // network errors. log.Fatalf("Could not register ingester: %v", err) } - prometheus.MustRegister(registration) ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess) // Deferring a func to make ordering obvious @@ -134,6 +132,8 @@ func main() { ing.Stop() registration.Unregister() }() + + prometheus.MustRegister(registration) default: log.Fatalf("Mode %s not supported!", cfg.mode) } diff --git a/distributor.go b/distributor.go index f2ad727af2..c68bbad69d 100644 --- a/distributor.go +++ b/distributor.go @@ -63,7 +63,6 @@ type DistributorConfig struct { ReplicationFactor int MinReadSuccesses int - MinWriteSuccesses int HeartbeatTimeout time.Duration } @@ -72,9 +71,6 @@ func NewDistributor(cfg DistributorConfig) (*Distributor, error) { if 0 > cfg.ReplicationFactor { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } - if cfg.MinWriteSuccesses > cfg.ReplicationFactor { - return nil, fmt.Errorf("MinWriteSuccesses > ReplicationFactor: %d > %d", cfg.MinWriteSuccesses, cfg.ReplicationFactor) - } if cfg.MinReadSuccesses > cfg.ReplicationFactor { return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor) } @@ -171,10 +167,7 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error d.receivedSamples.Add(float64(len(samples))) - sampleTrackers := make([]sampleTracker, len(samples), len(samples)) - samplesByIngester := map[string][]*sampleTracker{} keys := make([]uint32, len(samples), len(samples)) - for i, sample := range samples { keys[i] = tokenForMetric(userID, sample.Metric) } @@ -184,9 +177,12 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error return err } + sampleTrackers := make([]sampleTracker, len(samples), len(samples)) + samplesByIngester := map[string][]*sampleTracker{} for i := range samples { sampleTrackers[i] = sampleTracker{ - sample: samples[i], + sample: samples[i], + // We need a response from a quorum of ingesters, which is n/2 + 1. minSuccess: (len(ingesters[i]) / 2) + 1, succeeded: 0, } @@ -194,12 +190,15 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters // will cause the whole write to fail. - liveIngesters := make([]string, len(ingesters[i])) + liveIngesters := make([]string, 0, len(ingesters[i])) for _, ingester := range ingesters[i] { if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout { liveIngesters = append(liveIngesters, ingester.Hostname) } } + + // This is just a shortcut - if there are not minSuccess available ingesters, + // after filtering out dead ones, don't even both trying. if len(liveIngesters) < sampleTrackers[i].minSuccess { return fmt.Errorf("wanted at least %d live ingesters to process write, had %d", sampleTrackers[i].minSuccess, len(liveIngesters)) diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index beb6d4dcae..c55aad04ea 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -32,7 +32,7 @@ type IngesterRegistration struct { wait sync.WaitGroup // We need to remember the token state just in case consul goes away and comes - // back empty. Channel is users to tell the actor to update consul on state changes. + // back empty. Channel is used to tell the actor to update consul on state changes. state TokenState stateChange chan TokenState @@ -134,7 +134,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 { } func (r *IngesterRegistration) heartbeat(tokens []uint32) { - heartbeat := func(in interface{}) (out interface{}, retry bool, err error) { + updateConsul := func(in interface{}) (out interface{}, retry bool, err error) { var ringDesc *Desc if in == nil { ringDesc = newDesc() @@ -163,12 +163,12 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { for { select { case r.state = <-r.stateChange: - if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil { + if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil { log.Errorf("Failed to write to consul, sleeping: %v", err) } case <-ticker.C: r.consulHeartbeats.Inc() - if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil { + if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil { log.Errorf("Failed to write to consul, sleeping: %v", err) } case <-r.quit: diff --git a/ring/ring.go b/ring/ring.go index 525d70c4ef..aece2eae56 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -152,8 +152,12 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, err } distinctHosts[token.Ingester] = struct{}{} - // If we encounter a Leaving token, for reads we should bump n, - // for writes we bump n and skip the token. + // Ingesters that are Leaving do not count to the replication limit. We do + // not want to Write to them because they are about to go away, but we do + // want to write the extra replica somewhere. So we increase the size of the + // set of replicas for the key. This means we have to also increase the + // size of the replica set for read, but we can read from Leaving ingesters, + // so don't skip it in this case. if token.State == Leaving { n++ if op == Write { From 473d0b5e4c812f9afa705a4bf6ddb5765f6bdd9f Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 4 Nov 2016 17:49:11 +0000 Subject: [PATCH 5/5] go1.7.3 --- cortex-build/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cortex-build/Dockerfile b/cortex-build/Dockerfile index d12b1e0a8b..b9456d9608 100644 --- a/cortex-build/Dockerfile +++ b/cortex-build/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.6.3 +FROM golang:1.7.3 RUN apt-get update && apt-get install -y python-requests python-yaml file jq && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN go clean -i net && \