diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 357b7c1c6d..199dbd9e8c 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -93,7 +93,6 @@ func main() { flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.") - flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.") flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests") flag.Parse() @@ -125,10 +124,16 @@ func main() { // network errors. log.Fatalf("Could not register ingester: %v", err) } - defer registration.Unregister() - prometheus.MustRegister(registration) ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess) - defer ing.Stop() + + // Deferring a func to make ordering obvious + defer func() { + registration.ChangeState(ring.Leaving) + ing.Stop() + registration.Unregister() + }() + + prometheus.MustRegister(registration) default: log.Fatalf("Mode %s not supported!", cfg.mode) } diff --git a/cortex-build/Dockerfile b/cortex-build/Dockerfile index d12b1e0a8b..b9456d9608 100644 --- a/cortex-build/Dockerfile +++ b/cortex-build/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.6.3 +FROM golang:1.7.3 RUN apt-get update && apt-get install -y python-requests python-yaml file jq && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN go clean -i net && \ diff --git a/distributor.go b/distributor.go index 386ffe2346..c68bbad69d 100644 --- a/distributor.go +++ b/distributor.go @@ -4,6 +4,7 @@ import ( "fmt" "hash/fnv" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -46,7 +47,8 @@ type Distributor struct { type ReadRing interface { prometheus.Collector - Get(key uint32, n int) ([]ring.IngesterDesc, error) + Get(key uint32, n int, op ring.Operation) ([]ring.IngesterDesc, error) + BatchGet(keys []uint32, n int, op ring.Operation) ([][]ring.IngesterDesc, error) GetAll() []ring.IngesterDesc } @@ -61,7 +63,6 @@ type DistributorConfig struct { ReplicationFactor int MinReadSuccesses int - MinWriteSuccesses int HeartbeatTimeout time.Duration } @@ -70,9 +71,6 @@ func NewDistributor(cfg DistributorConfig) (*Distributor, error) { if 0 > cfg.ReplicationFactor { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } - if cfg.MinWriteSuccesses > cfg.ReplicationFactor { - return nil, fmt.Errorf("MinWriteSuccesses > ReplicationFactor: %d > %d", cfg.MinWriteSuccesses, cfg.ReplicationFactor) - } if cfg.MinReadSuccesses > cfg.ReplicationFactor { return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor) } @@ -154,6 +152,12 @@ func tokenFor(userID string, name model.LabelValue) uint32 { return h.Sum32() } +type sampleTracker struct { + sample *model.Sample + minSuccess int + succeeded int32 +} + // Append implements SampleAppender. func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error { userID, err := user.GetID(ctx) @@ -163,46 +167,80 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error d.receivedSamples.Add(float64(len(samples))) - samplesByIngester := map[string][]*model.Sample{} - for _, sample := range samples { - key := tokenForMetric(userID, sample.Metric) - ingesters, err := d.cfg.Ring.Get(key, d.cfg.ReplicationFactor) - if err != nil { - return err + keys := make([]uint32, len(samples), len(samples)) + for i, sample := range samples { + keys[i] = tokenForMetric(userID, sample.Metric) + } + + ingesters, err := d.cfg.Ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write) + if err != nil { + return err + } + + sampleTrackers := make([]sampleTracker, len(samples), len(samples)) + samplesByIngester := map[string][]*sampleTracker{} + for i := range samples { + sampleTrackers[i] = sampleTracker{ + sample: samples[i], + // We need a response from a quorum of ingesters, which is n/2 + 1. + minSuccess: (len(ingesters[i]) / 2) + 1, + succeeded: 0, } - for _, ingester := range ingesters { - otherSamples := samplesByIngester[ingester.Hostname] - samplesByIngester[ingester.Hostname] = append(otherSamples, sample) + + // Skip those that have not heartbeated in a while. NB these are still + // included in the calculation of minSuccess, so if too many failed ingesters + // will cause the whole write to fail. + liveIngesters := make([]string, 0, len(ingesters[i])) + for _, ingester := range ingesters[i] { + if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout { + liveIngesters = append(liveIngesters, ingester.Hostname) + } + } + + // This is just a shortcut - if there are not minSuccess available ingesters, + // after filtering out dead ones, don't even both trying. + if len(liveIngesters) < sampleTrackers[i].minSuccess { + return fmt.Errorf("wanted at least %d live ingesters to process write, had %d", + sampleTrackers[i].minSuccess, len(liveIngesters)) + } + + for _, liveIngester := range liveIngesters { + sampleForIngester := samplesByIngester[liveIngester] + samplesByIngester[liveIngester] = append(sampleForIngester, &sampleTrackers[i]) } } errs := make(chan error) for hostname, samples := range samplesByIngester { - go func(hostname string, samples []*model.Sample) { + go func(hostname string, samples []*sampleTracker) { errs <- d.sendSamples(ctx, hostname, samples) }(hostname, samples) } var lastErr error - successes := 0 for i := 0; i < len(samplesByIngester); i++ { if err := <-errs; err != nil { lastErr = err continue } - successes++ } - - if successes < d.cfg.MinWriteSuccesses { - return fmt.Errorf("too few successful writes, last error was: %v", lastErr) + for i := range sampleTrackers { + if sampleTrackers[i].succeeded < int32(sampleTrackers[i].minSuccess) { + return fmt.Errorf("need %d successful writes, only got %d, last error was: %v", + sampleTrackers[i].minSuccess, sampleTrackers[i].succeeded, lastErr) + } } return nil } -func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error { +func (d *Distributor) sendSamples(ctx context.Context, hostname string, sampleTrackers []*sampleTracker) error { client, err := d.getClientFor(hostname) if err != nil { return err } + samples := make([]*model.Sample, len(sampleTrackers), len(sampleTrackers)) + for i := range sampleTrackers { + samples[i] = sampleTrackers[i].sample + } err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error { return client.Append(ctx, samples) }) @@ -210,6 +248,9 @@ func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples d.ingesterAppendFailures.WithLabelValues(hostname).Inc() } d.ingesterAppends.WithLabelValues(hostname).Inc() + for i := range sampleTrackers { + atomic.AddInt32(&sampleTrackers[i].succeeded, 1) + } return err } @@ -241,7 +282,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . return err } - ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor) + ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read) if err != nil { return err } diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index 50f8c7a39f..c55aad04ea 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -31,6 +31,11 @@ type IngesterRegistration struct { quit chan struct{} wait sync.WaitGroup + // We need to remember the token state just in case consul goes away and comes + // back empty. Channel is used to tell the actor to update consul on state changes. + state TokenState + stateChange chan TokenState + consulHeartbeats prometheus.Counter } @@ -56,6 +61,10 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In hostname: fmt.Sprintf("%s:%d", addr, listenPort), quit: make(chan struct{}), + // Only read/written on actor goroutine. + state: Active, + stateChange: make(chan TokenState), + consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_consul_heartbeats_total", Help: "The total number of heartbeats sent to consul.", @@ -67,6 +76,13 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In return r, nil } +// ChangeState changes the state of all tokens owned by this +// ingester in the ring. +func (r *IngesterRegistration) ChangeState(state TokenState) { + log.Info("Changing token state to: %v", state) + r.stateChange <- state +} + // Unregister removes ingester config from Consul; will block // until we'll successfully unregistered. func (r *IngesterRegistration) Unregister() { @@ -108,7 +124,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 { tokens = append(tokens, newTokens...) } - ringDesc.addIngester(r.id, r.hostname, tokens) + ringDesc.addIngester(r.id, r.hostname, tokens, r.state) return ringDesc, true, nil } if err := r.consul.CAS(consulKey, descFactory, pickTokens); err != nil { @@ -118,7 +134,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 { } func (r *IngesterRegistration) heartbeat(tokens []uint32) { - heartbeat := func(in interface{}) (out interface{}, retry bool, err error) { + updateConsul := func(in interface{}) (out interface{}, retry bool, err error) { var ringDesc *Desc if in == nil { ringDesc = newDesc() @@ -130,21 +146,29 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { if !ok { // consul must have restarted log.Infof("Found empty ring, inserting tokens!") - ringDesc.addIngester(r.id, r.hostname, tokens) + ringDesc.addIngester(r.id, r.hostname, tokens, r.state) } else { ingesterDesc.Timestamp = time.Now() ringDesc.Ingesters[r.id] = ingesterDesc + for i := range ringDesc.Tokens { + ringDesc.Tokens[i].State = r.state + } } return ringDesc, true, nil } + ticker := time.NewTicker(heartbeatInterval) defer ticker.Stop() for { select { + case r.state = <-r.stateChange: + if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil { + log.Errorf("Failed to write to consul, sleeping: %v", err) + } case <-ticker.C: r.consulHeartbeats.Inc() - if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil { + if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil { log.Errorf("Failed to write to consul, sleeping: %v", err) } case <-r.quit: diff --git a/ring/model.go b/ring/model.go index 73de428a59..df1d13ab77 100644 --- a/ring/model.go +++ b/ring/model.go @@ -7,6 +7,15 @@ import ( "github.com/prometheus/common/log" ) +// TokenState describes the state of a token +type TokenState int + +// Values for TokenState +const ( + Active TokenState = iota + Leaving +) + // Desc is the serialised state in Consul representing // all ingesters (ie, the ring). type Desc struct { @@ -29,8 +38,9 @@ func (ts TokenDescs) Less(i, j int) bool { return ts[i].Token < ts[j].Token } // TokenDesc describes an individual token in the ring. type TokenDesc struct { - Token uint32 `json:"tokens"` - Ingester string `json:"ingester"` + Token uint32 `json:"tokens"` + Ingester string `json:"ingester"` + State TokenState `json:"state"` } func descFactory() interface{} { @@ -43,7 +53,7 @@ func newDesc() *Desc { } } -func (d *Desc) addIngester(id, hostname string, tokens []uint32) { +func (d *Desc) addIngester(id, hostname string, tokens []uint32, state TokenState) { d.Ingesters[id] = IngesterDesc{ Hostname: hostname, Timestamp: time.Now(), @@ -53,6 +63,7 @@ func (d *Desc) addIngester(id, hostname string, tokens []uint32) { d.Tokens = append(d.Tokens, TokenDesc{ Token: token, Ingester: id, + State: state, }) } diff --git a/ring/ring.go b/ring/ring.go index e87a2fa19a..aece2eae56 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -18,6 +18,15 @@ const ( unhealthyLabel = "unhealthy" ) +// Operation can be Read or Write +type Operation int + +// Values for Operation +const ( + Read Operation = iota + Write +) + type uint32s []uint32 func (x uint32s) Len() int { return len(x) } @@ -96,8 +105,31 @@ func (r *Ring) loop() { }) } -// Get returns up to n ingesters close to the hash in the circle. -func (r *Ring) Get(key uint32, n int) ([]IngesterDesc, error) { +// Get returns n (or more) ingesters which form the replicas for the given key. +func (r *Ring) Get(key uint32, n int, op Operation) ([]IngesterDesc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + return r.getInternal(key, n, op) +} + +// BatchGet returns n (or more) ingesters which form the replicas for the given key. +// The order of the result matches the order of the input. +func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]IngesterDesc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + result := make([][]IngesterDesc, len(keys), len(keys)) + for i, key := range keys { + ingesters, err := r.getInternal(key, n, op) + if err != nil { + return nil, err + } + result[i] = ingesters + } + return result, nil +} + +func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, error) { r.mtx.RLock() defer r.mtx.RUnlock() if len(r.ringDesc.Tokens) == 0 { @@ -114,19 +146,26 @@ func (r *Ring) Get(key uint32, n int) ([]IngesterDesc, error) { i %= len(r.ringDesc.Tokens) // We want n *distinct* ingesters. - host := r.ringDesc.Tokens[i].Ingester - if _, ok := distinctHosts[host]; ok { + token := r.ringDesc.Tokens[i] + if _, ok := distinctHosts[token.Ingester]; ok { continue } - distinctHosts[host] = struct{}{} + distinctHosts[token.Ingester] = struct{}{} - ing := r.ringDesc.Ingesters[host] - - // Out of the n distinct subsequent ingesters, skip those that have not heartbeated in a while. - if time.Now().Sub(ing.Timestamp) > r.heartbeatTimeout { - continue + // Ingesters that are Leaving do not count to the replication limit. We do + // not want to Write to them because they are about to go away, but we do + // want to write the extra replica somewhere. So we increase the size of the + // set of replicas for the key. This means we have to also increase the + // size of the replica set for read, but we can read from Leaving ingesters, + // so don't skip it in this case. + if token.State == Leaving { + n++ + if op == Write { + continue + } } + ing := r.ringDesc.Ingesters[token.Ingester] ingesters = append(ingesters, ing) } return ingesters, nil