diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9e2efae532..3deebc36d1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -72,7 +72,6 @@ type Config struct { BillingConfig billing.Config IngesterClientConfig ingester_client.Config - ReplicationFactor int RemoteTimeout time.Duration ClientCleanupPeriod time.Duration IngestionRateLimit float64 @@ -88,7 +87,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.") cfg.BillingConfig.RegisterFlags(f) cfg.IngesterClientConfig.RegisterFlags(f) - flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") @@ -98,9 +96,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // New constructs a new Distributor func New(cfg Config, ring ring.ReadRing) (*Distributor, error) { - if 0 > cfg.ReplicationFactor { - return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) - } if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = ingester_client.MakeIngesterClient } @@ -189,7 +184,13 @@ func (d *Distributor) Stop() { func (d *Distributor) removeStaleIngesterClients() { ingesters := map[string]struct{}{} - for _, ing := range d.ring.GetAll() { + replicationSet, err := d.ring.GetAll() + if err != nil { + level.Error(util.Logger).Log("msg", "error removing stale ingester clients", "err", err) + return + } + + for _, ing := range replicationSet.Ingesters { ingesters[ing.Addr] = struct{}{} } @@ -270,45 +271,17 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie return nil, errIngestionRateLimitExceeded } - var ingesters [][]*ring.IngesterDesc - if err := instrument.TimeRequestHistogram(ctx, "Distributor.Push[ring-lookup]", d.sendDuration, func(context.Context) error { - var err error - ingesters, err = d.ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write) - if err != nil { - return err - } - return nil - }); err != nil { + replicationSets, err := d.ring.BatchGet(keys, ring.Write) + if err != nil { return nil, err } samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{} - for i := range samples { - // We need a response from a quorum of ingesters, which is n/2 + 1. - minSuccess := (len(ingesters[i]) / 2) + 1 - samples[i].minSuccess = minSuccess - samples[i].maxFailures = len(ingesters[i]) - minSuccess - - // Skip those that have not heartbeated in a while. NB these are still - // included in the calculation of minSuccess, so if too many failed ingesters - // will cause the whole write to fail. - liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i])) - for _, ingester := range ingesters[i] { - if d.ring.IsHealthy(ingester) { - liveIngesters = append(liveIngesters, ingester) - } - } - - // This is just a shortcut - if there are not minSuccess available ingesters, - // after filtering out dead ones, don't even bother trying. - if len(liveIngesters) < minSuccess { - return nil, fmt.Errorf("wanted at least %d live ingesters to process write, had %d", - minSuccess, len(liveIngesters)) - } - - for _, liveIngester := range liveIngesters { - sampleForIngester := samplesByIngester[liveIngester] - samplesByIngester[liveIngester] = append(sampleForIngester, &samples[i]) + for i, replicationSet := range replicationSets { + samples[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors + samples[i].maxFailures = replicationSet.MaxErrors + for _, ingester := range replicationSet.Ingesters { + samplesByIngester[ingester] = append(samplesByIngester[ingester], &samples[i]) } } @@ -425,58 +398,53 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . } // Get ingesters by metricName if one exists, otherwise get all ingesters - var ingesters []*ring.IngesterDesc + var replicationSet ring.ReplicationSet if ok && metricNameMatcher.Type == labels.MatchEqual { - ingesters, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), d.cfg.ReplicationFactor, ring.Read) - if err != nil { - return promql.ErrStorage(err) - } + replicationSet, err = d.ring.Get(tokenFor(userID, []byte(metricNameMatcher.Value)), ring.Read) } else { - ingesters = d.ring.GetAll() + replicationSet, err = d.ring.GetAll() + } + if err != nil { + return promql.ErrStorage(err) } - matrix, err = d.queryIngesters(ctx, ingesters, d.cfg.ReplicationFactor, req) + matrix, err = d.queryIngesters(ctx, replicationSet, req) return promql.ErrStorage(err) }) return matrix, err } // Query implements Querier. -func (d *Distributor) queryIngesters(ctx context.Context, ingesters []*ring.IngesterDesc, replicationFactor int, req *client.QueryRequest) (model.Matrix, error) { - // We need a response from a quorum of ingesters, where maxErrs is n/2, where n is the replicationFactor - maxErrs := replicationFactor / 2 - minSuccess := len(ingesters) - maxErrs - if len(ingesters) < minSuccess { - return nil, fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess) - } - +func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (model.Matrix, error) { // Fetch samples from multiple ingesters - var numErrs int32 - errReceived := make(chan error) - results := make(chan model.Matrix, len(ingesters)) - - for _, ing := range ingesters { + errs := make(chan error, len(replicationSet.Ingesters)) + results := make(chan model.Matrix, len(replicationSet.Ingesters)) + for _, ing := range replicationSet.Ingesters { go func(ing *ring.IngesterDesc) { result, err := d.queryIngester(ctx, ing, req) if err != nil { - if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) { - errReceived <- err - } + errs <- err } else { results <- result } }(ing) } - // Only wait for minSuccess ingesters (or an error), and accumulate the samples + // Only wait for minSuccessful responses (or maxErrors), and accumulate the samples // by fingerprint, merging them into any existing samples. fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} - for i := 0; i < minSuccess; i++ { + minSuccess := len(replicationSet.Ingesters) - replicationSet.MaxErrors + var numErrs, numSuccess int + for numSuccess < minSuccess { select { - case err := <-errReceived: - return nil, err + case err := <-errs: + numErrs++ + if numErrs > replicationSet.MaxErrors { + return nil, err + } case result := <-results: + numSuccess++ for _, ss := range result { fp := ss.Metric.Fingerprint() mss, ok := fpToSampleStream[fp] @@ -516,9 +484,13 @@ func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, // forAllIngesters runs f, in parallel, for all ingesters func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) { + replicationSet, err := d.ring.GetAll() + if err != nil { + return nil, err + } + resps, errs := make(chan interface{}), make(chan error) - ingesters := d.ring.GetAll() - for _, ingester := range ingesters { + for _, ingester := range replicationSet.Ingesters { go func(ingester *ring.IngesterDesc) { client, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { @@ -537,7 +509,7 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{} var lastErr error result, numErrs := []interface{}{}, 0 - for range ingesters { + for range replicationSet.Ingesters { select { case resp := <-resps: result = append(result, resp) @@ -545,9 +517,11 @@ func (d *Distributor) forAllIngesters(f func(client.IngesterClient) (interface{} numErrs++ } } - if numErrs > d.cfg.ReplicationFactor/2 { + + if numErrs > replicationSet.MaxErrors { return nil, lastErr } + return result, nil } @@ -624,8 +598,8 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) { totalStats.NumSeries += resp.(*client.UserStatsResponse).NumSeries } - totalStats.IngestionRate /= float64(d.cfg.ReplicationFactor) - totalStats.NumSeries /= uint64(d.cfg.ReplicationFactor) + totalStats.IngestionRate /= float64(d.ring.ReplicationFactor()) + totalStats.NumSeries /= uint64(d.ring.ReplicationFactor()) return totalStats, nil } @@ -645,8 +619,11 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) { req := &client.UserStatsRequest{} ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID // Not using d.forAllIngesters(), so we can fail after first error. - ingesters := d.ring.GetAll() - for _, ingester := range ingesters { + replicationSet, err := d.ring.GetAll() + if err != nil { + return nil, err + } + for _, ingester := range replicationSet.Ingesters { client, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return nil, err diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index bb46ea75e1..69d9d270f2 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -20,28 +20,36 @@ import ( // mockRing doesn't do any consistent hashing, just returns same ingesters for every query. type mockRing struct { prometheus.Counter - ingesters []*ring.IngesterDesc - heartbeatTimeout time.Duration + ingesters []*ring.IngesterDesc } -func (r mockRing) Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) { - return r.ingesters[:n], nil +func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) { + return ring.ReplicationSet{ + Ingesters: r.ingesters[:3], + MaxErrors: 1, + }, nil } -func (r mockRing) BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) { - result := [][]*ring.IngesterDesc{} +func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) { + result := []ring.ReplicationSet{} for i := 0; i < len(keys); i++ { - result = append(result, r.ingesters[:n]) + result = append(result, ring.ReplicationSet{ + Ingesters: r.ingesters[:3], + MaxErrors: 1, + }) } return result, nil } -func (r mockRing) GetAll() []*ring.IngesterDesc { - return r.ingesters +func (r mockRing) GetAll() (ring.ReplicationSet, error) { + return ring.ReplicationSet{ + Ingesters: r.ingesters, + MaxErrors: 1, + }, nil } -func (r mockRing) IsHealthy(ingester *ring.IngesterDesc) bool { - return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout +func (r mockRing) ReplicationFactor() int { + return 3 } type mockIngester struct { @@ -159,12 +167,10 @@ func TestDistributorPush(t *testing.T) { Counter: prometheus.NewCounter(prometheus.CounterOpts{ Name: "foo", }), - ingesters: ingesterDescs, - heartbeatTimeout: 1 * time.Minute, + ingesters: ingesterDescs, } d, err := New(Config{ - ReplicationFactor: 3, RemoteTimeout: 1 * time.Minute, ClientCleanupPeriod: 1 * time.Minute, IngestionRateLimit: 10000, @@ -299,12 +305,10 @@ func TestDistributorQuery(t *testing.T) { Counter: prometheus.NewCounter(prometheus.CounterOpts{ Name: "foo", }), - ingesters: ingesterDescs, - heartbeatTimeout: 1 * time.Minute, + ingesters: ingesterDescs, } d, err := New(Config{ - ReplicationFactor: 3, RemoteTimeout: 1 * time.Minute, ClientCleanupPeriod: 1 * time.Minute, IngestionRateLimit: 10000, diff --git a/pkg/ring/http.go b/pkg/ring/http.go index e3b067a098..c0746d6ad2 100644 --- a/pkg/ring/http.go +++ b/pkg/ring/http.go @@ -102,7 +102,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { ing := r.ringDesc.Ingesters[id] timestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(ing) { + if !r.IsHealthy(ing, Reporting) { state = unhealthy } diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go new file mode 100644 index 0000000000..2e0c8d5f4c --- /dev/null +++ b/pkg/ring/replication_strategy.go @@ -0,0 +1,62 @@ +package ring + +import ( + "fmt" + "time" +) + +// replicationStrategy decides, given the set of ingesters eligible for a key, +// which ingesters you will try and write to and how many failures you will +// tolerate. +// - Filters out dead ingesters so the one doesn't even try to write to them. +// - Checks there is enough ingesters for an operation to succeed. +func (r *Ring) replicationStrategy(ingesters []*IngesterDesc, op Operation) ( + liveIngesters []*IngesterDesc, maxFailure int, err error, +) { + // We need a response from a quorum of ingesters, which is n/2 + 1. In the + // case of a node joining/leaving, the actual replica set might be bigger + // than the replication factor, so use the bigger or the two. + replicationFactor := r.cfg.ReplicationFactor + if len(ingesters) > replicationFactor { + replicationFactor = len(ingesters) + } + minSuccess := (replicationFactor / 2) + 1 + maxFailure = replicationFactor - minSuccess + + // Skip those that have not heartbeated in a while. NB these are still + // included in the calculation of minSuccess, so if too many failed ingesters + // will cause the whole write to fail. + liveIngesters = make([]*IngesterDesc, 0, len(ingesters)) + for _, ingester := range ingesters { + if r.IsHealthy(ingester, op) { + liveIngesters = append(liveIngesters, ingester) + } else { + maxFailure-- + } + } + + // This is just a shortcut - if there are not minSuccess available ingesters, + // after filtering out dead ones, don't even bother trying. + if maxFailure < 0 || len(liveIngesters) < minSuccess { + err = fmt.Errorf("at least %d live ingesters required, could only find %d", + minSuccess, len(liveIngesters)) + return + } + + return +} + +// IsHealthy checks whether an ingester appears to be alive and heartbeating +func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool { + if op == Write && ingester.State != ACTIVE { + return false + } else if op == Read && ingester.State == JOINING { + return false + } + return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.cfg.HeartbeatTimeout +} + +// ReplicationFactor of the ring. +func (r *Ring) ReplicationFactor() int { + return r.cfg.ReplicationFactor +} diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go new file mode 100644 index 0000000000..f81f10583e --- /dev/null +++ b/pkg/ring/replication_strategy_test.go @@ -0,0 +1,104 @@ +package ring + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReplicationStrategy(t *testing.T) { + for i, tc := range []struct { + RF, LiveIngesters, DeadIngesters int + op Operation // Will default to READ + ExpectedMaxFailure int + ExpectedError string + }{ + // Ensure it works for a single ingester, for local testing. + { + RF: 1, + LiveIngesters: 1, + ExpectedMaxFailure: 0, + }, + + { + RF: 1, + DeadIngesters: 1, + ExpectedError: "at least 1 live ingesters required, could only find 0", + }, + + // Ensure it works for the default production config. + { + RF: 3, + LiveIngesters: 3, + ExpectedMaxFailure: 1, + }, + + { + RF: 3, + LiveIngesters: 2, + DeadIngesters: 1, + ExpectedMaxFailure: 0, + }, + + { + RF: 3, + LiveIngesters: 1, + DeadIngesters: 2, + ExpectedError: "at least 2 live ingesters required, could only find 1", + }, + + // Ensure it works when adding / removing nodes. + + // A node is joining or leaving, replica set expands. + { + RF: 3, + LiveIngesters: 4, + ExpectedMaxFailure: 1, + }, + + { + RF: 3, + LiveIngesters: 3, + DeadIngesters: 1, + ExpectedMaxFailure: 0, + }, + + { + RF: 3, + LiveIngesters: 2, + DeadIngesters: 2, + ExpectedError: "at least 3 live ingesters required, could only find 2", + }, + } { + ingesters := []*IngesterDesc{} + for i := 0; i < tc.LiveIngesters; i++ { + ingesters = append(ingesters, &IngesterDesc{ + Timestamp: time.Now().Unix(), + }) + } + for i := 0; i < tc.DeadIngesters; i++ { + ingesters = append(ingesters, &IngesterDesc{}) + } + + r, err := New(Config{ + Mock: NewInMemoryKVClient(), + HeartbeatTimeout: 100 * time.Second, + ReplicationFactor: tc.RF, + }) + require.NoError(t, err) + + t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { + liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, tc.op) + if tc.ExpectedError == "" { + assert.NoError(t, err) + assert.Equal(t, tc.LiveIngesters, len(liveIngesters)) + assert.Equal(t, tc.ExpectedMaxFailure, maxFailure) + } else { + assert.EqualError(t, err, tc.ExpectedError) + } + }) + } +} diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 41f189dabc..bd9c08f22b 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -6,6 +6,7 @@ import ( "context" "errors" "flag" + "fmt" "math" "sort" "sync" @@ -27,10 +28,17 @@ const ( type ReadRing interface { prometheus.Collector - Get(key uint32, n int, op Operation) ([]*IngesterDesc, error) - BatchGet(keys []uint32, n int, op Operation) ([][]*IngesterDesc, error) - GetAll() []*IngesterDesc - IsHealthy(*IngesterDesc) bool + Get(key uint32, op Operation) (ReplicationSet, error) + BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) + GetAll() (ReplicationSet, error) + ReplicationFactor() int +} + +// ReplicationSet describes the ingesters to talk to for a given key, and how +// many errors to tolerate. +type ReplicationSet struct { + Ingesters []*IngesterDesc + MaxErrors int } // Operation can be Read or Write @@ -40,6 +48,7 @@ type Operation int const ( Read Operation = iota Write + Reporting // Special value for inquiring about health ) type uint32s []uint32 @@ -49,14 +58,15 @@ func (x uint32s) Less(i, j int) bool { return x[i] < x[j] } func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] } // ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash. -var ErrEmptyRing = errors.New("empty circle") +var ErrEmptyRing = errors.New("empty ring") // Config for a Ring type Config struct { ConsulConfig - store string - HeartbeatTimeout time.Duration - Mock KVClient + store string + HeartbeatTimeout time.Duration + ReplicationFactor int + Mock KVClient } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -65,14 +75,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.store, "ring.store", "consul", "Backend storage to use for the ring (consul, inmemory).") f.DurationVar(&cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") + f.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") } -// Ring holds the information about the members of the consistent hash circle. +// Ring holds the information about the members of the consistent hash ring. type Ring struct { - KVClient KVClient - done chan struct{} - quit context.CancelFunc - heartbeatTimeout time.Duration + cfg Config + KVClient KVClient + done chan struct{} + quit context.CancelFunc mtx sync.RWMutex ringDesc *Desc @@ -84,6 +95,10 @@ type Ring struct { // New creates a new Ring func New(cfg Config) (*Ring, error) { + if cfg.ReplicationFactor <= 0 { + return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) + } + store := cfg.Mock if store == nil { var err error @@ -101,10 +116,10 @@ func New(cfg Config) (*Ring, error) { } r := &Ring{ - KVClient: store, - heartbeatTimeout: cfg.HeartbeatTimeout, - done: make(chan struct{}), - ringDesc: &Desc{}, + cfg: cfg, + KVClient: store, + done: make(chan struct{}), + ringDesc: &Desc{}, ingesterOwnershipDesc: prometheus.NewDesc( "cortex_ring_ingester_ownership_percent", "The percent ownership of the ring by ingester", @@ -150,38 +165,41 @@ func (r *Ring) loop(ctx context.Context) { } // Get returns n (or more) ingesters which form the replicas for the given key. -func (r *Ring) Get(key uint32, n int, op Operation) ([]*IngesterDesc, error) { +func (r *Ring) Get(key uint32, op Operation) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() - return r.getInternal(key, n, op) + return r.getInternal(key, op) } -// BatchGet returns n (or more) ingesters which form the replicas for the given key. -// The order of the result matches the order of the input. -func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]*IngesterDesc, error) { +// BatchGet returns ReplicationFactor (or more) ingesters which form the replicas +// for the given keys. The order of the result matches the order of the input. +func (r *Ring) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() - result := make([][]*IngesterDesc, len(keys), len(keys)) + result := make([]ReplicationSet, len(keys), len(keys)) for i, key := range keys { - ingesters, err := r.getInternal(key, n, op) + rs, err := r.getInternal(key, op) if err != nil { return nil, err } - result[i] = ingesters + result[i] = rs } return result, nil } -func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, error) { +func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) { if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 { - return nil, ErrEmptyRing + return ReplicationSet{}, ErrEmptyRing } - ingesters := make([]*IngesterDesc, 0, n) - distinctHosts := map[string]struct{}{} - start := r.search(key) - iterations := 0 + var ( + n = r.cfg.ReplicationFactor + ingesters = make([]*IngesterDesc, 0, n) + distinctHosts = map[string]struct{}{} + start = r.search(key) + iterations = 0 + ) for i := start; len(distinctHosts) < n && iterations < len(r.ringDesc.Tokens); i++ { iterations++ // Wrap i around in the ring. @@ -195,47 +213,60 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, er distinctHosts[token.Ingester] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] - // Ingesters that are not ACTIVE do not count to the replication limit. We do - // not want to Write to them because they are about to go away, but we do - // want to write the extra replica somewhere. So we increase the size of the - // set of replicas for the key. This means we have to also increase the + // We do not want to Write to Ingesters that are not ACTIVE, but we do want + // to write the extra replica somewhere. So we increase the size of the set + // of replicas for the key. This means we have to also increase the // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. + // NB dead ingester will be filtered later (by replication_strategy.go). if op == Write && ingester.State != ACTIVE { n++ - continue } else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) { n++ - continue } ingesters = append(ingesters, ingester) } - return ingesters, nil -} -// IsHealthy checks whether an ingester appears to be alive and heartbeating -func (r *Ring) IsHealthy(ingester *IngesterDesc) bool { - return time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= r.heartbeatTimeout + liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, op) + if err != nil { + return ReplicationSet{}, err + } + + return ReplicationSet{ + Ingesters: liveIngesters, + MaxErrors: maxFailure, + }, nil } -// GetAll returns all available ingesters in the circle. -func (r *Ring) GetAll() []*IngesterDesc { +// GetAll returns all available ingesters in the ring. +func (r *Ring) GetAll() (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() - if r.ringDesc == nil { - return nil + if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 { + return ReplicationSet{}, ErrEmptyRing } ingesters := make([]*IngesterDesc, 0, len(r.ringDesc.Ingesters)) + maxErrors := r.cfg.ReplicationFactor / 2 + for _, ingester := range r.ringDesc.Ingesters { - if !r.IsHealthy(ingester) { + if !r.IsHealthy(ingester, Read) { + maxErrors-- continue } ingesters = append(ingesters, ingester) } - return ingesters + + if maxErrors < 0 { + return ReplicationSet{}, fmt.Errorf("too many failed ingesters") + } + + return ReplicationSet{ + Ingesters: ingesters, + MaxErrors: maxErrors, + }, nil } func (r *Ring) search(key uint32) int { @@ -295,7 +326,7 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { JOINING.String(): 0, } for _, ingester := range r.ringDesc.Ingesters { - if !r.IsHealthy(ingester) { + if !r.IsHealthy(ingester, Reporting) { byState[unhealthy]++ } else { byState[ingester.State.String()]++ diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index b9a32a49de..e90f3db720 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -28,7 +28,8 @@ func BenchmarkRing(b *testing.B) { consul.PutBytes(ConsulKey, ringBytes) r, err := New(Config{ - Mock: consul, + Mock: consul, + ReplicationFactor: 3, }) if err != nil { b.Fatal(err) @@ -38,6 +39,6 @@ func BenchmarkRing(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { keys := GenerateTokens(100, nil) - r.BatchGet(keys, 3, Write) + r.BatchGet(keys, Write) } }