diff --git a/cmd/prism/main.go b/cmd/prism/main.go index a56df5dd103..bc2fedef52c 100644 --- a/cmd/prism/main.go +++ b/cmd/prism/main.go @@ -82,6 +82,8 @@ type cfg struct { flushPeriod time.Duration maxChunkAge time.Duration numTokens int + + distributorConfig prism.DistributorConfig } func main() { @@ -101,6 +103,10 @@ func main() { flag.DurationVar(&cfg.flushPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") flag.DurationVar(&cfg.maxChunkAge, "ingester.max-chunk-age", 10*time.Minute, "Maximum chunk age before flushing.") flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") + flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") + flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.") + flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.") + flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") flag.Parse() chunkStore, err := setupChunkStore(cfg) @@ -117,8 +123,12 @@ func main() { switch cfg.mode { case modeDistributor: ring := ring.New(consul) + cfg.distributorConfig.Ring = ring + cfg.distributorConfig.ClientFactory = func(address string) (*prism.IngesterClient, error) { + return prism.NewIngesterClient(address, cfg.remoteTimeout) + } defer ring.Stop() - setupDistributor(cfg, ring, chunkStore) + setupDistributor(cfg.distributorConfig, chunkStore) if err != nil { log.Fatalf("Error initializing distributor: %v", err) } @@ -180,17 +190,10 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) { } func setupDistributor( - cfg cfg, - ring *ring.Ring, + cfg prism.DistributorConfig, chunkStore chunk.Store, ) { - clientFactory := func(address string) (*prism.IngesterClient, error) { - return prism.NewIngesterClient(address, cfg.remoteTimeout) - } - distributor := prism.NewDistributor(prism.DistributorConfig{ - Ring: ring, - ClientFactory: clientFactory, - }) + distributor := prism.NewDistributor(cfg) prometheus.MustRegister(distributor) prefix := "/api/prom" diff --git a/distributor.go b/distributor.go index e01a7b80cfa..387099437f2 100644 --- a/distributor.go +++ b/distributor.go @@ -17,6 +17,7 @@ import ( "fmt" "hash/fnv" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -39,22 +40,25 @@ var ( // Distributor is a storage.SampleAppender and a prism.Querier which // forwards appends and queries to individual ingesters. type Distributor struct { - ring ReadRing - clientFactory IngesterClientFactory - clientsMtx sync.RWMutex - clients map[string]*IngesterClient - - queryDuration *prometheus.HistogramVec - receivedSamples prometheus.Counter - sendDuration *prometheus.HistogramVec + cfg DistributorConfig + clientsMtx sync.RWMutex + clients map[string]*IngesterClient + + queryDuration *prometheus.HistogramVec + receivedSamples prometheus.Counter + sendDuration *prometheus.HistogramVec + ingesterAppends *prometheus.CounterVec + ingesterAppendFailures *prometheus.CounterVec + ingesterQueries *prometheus.CounterVec + ingesterQueryFailures *prometheus.CounterVec } // ReadRing represents the read inferface to the ring. type ReadRing interface { prometheus.Collector - Get(key uint32) (ring.IngesterDesc, error) - GetAll() []ring.IngesterDesc + Get(key uint32, n int, heartbeatTimeout time.Duration) ([]ring.IngesterDesc, error) + GetAll(heartbeatTimeout time.Duration) []ring.IngesterDesc } // IngesterClientFactory creates ingester clients. @@ -65,14 +69,18 @@ type IngesterClientFactory func(string) (*IngesterClient, error) type DistributorConfig struct { Ring ReadRing ClientFactory IngesterClientFactory + + ReplicationFactor int + MinReadSuccesses int + MinWriteSuccesses int + HeartbeatTimeout time.Duration } // NewDistributor constructs a new Distributor func NewDistributor(cfg DistributorConfig) *Distributor { return &Distributor{ - ring: cfg.Ring, - clientFactory: cfg.ClientFactory, - clients: map[string]*IngesterClient{}, + cfg: cfg, + clients: map[string]*IngesterClient{}, queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "prometheus", Name: "distributor_query_duration_seconds", @@ -87,9 +95,29 @@ func NewDistributor(cfg DistributorConfig) *Distributor { sendDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "prometheus", Name: "distributor_send_duration_seconds", - Help: "Time spent sending sample batches to ingesters.", + Help: "Time spent sending a sample batch to multiple replicated ingesters.", Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5, 1}, }, []string{"method", "status_code"}), + ingesterAppends: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "prometheus", + Name: "distributor_ingester_appends_total", + Help: "The total number of batch appends sent to ingesters.", + }, []string{"ingester"}), + ingesterAppendFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "prometheus", + Name: "distributor_ingester_appends_total", + Help: "The total number of failed batch appends sent to ingesters.", + }, []string{"ingester"}), + ingesterQueries: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "prometheus", + Name: "distributor_ingester_queries_total", + Help: "The total number of queries sent to ingesters.", + }, []string{"ingester"}), + ingesterQueryFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "prometheus", + Name: "distributor_ingester_appends_total", + Help: "The total number of failed queries sent to ingesters.", + }, []string{"ingester"}), } } @@ -108,7 +136,7 @@ func (d *Distributor) getClientFor(hostname string) (*IngesterClient, error) { return client, nil } - client, err := d.clientFactory(hostname) + client, err := d.cfg.ClientFactory(hostname) if err != nil { return nil, err } @@ -140,12 +168,14 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error samplesByIngester := map[string][]*model.Sample{} for _, sample := range samples { key := tokenForMetric(userID, sample.Metric) - collector, err := d.ring.Get(key) + ingesters, err := d.cfg.Ring.Get(key, d.cfg.ReplicationFactor, d.cfg.HeartbeatTimeout) if err != nil { return err } - otherSamples := samplesByIngester[collector.Hostname] - samplesByIngester[collector.Hostname] = append(otherSamples, sample) + for _, ingester := range ingesters { + otherSamples := samplesByIngester[ingester.Hostname] + samplesByIngester[ingester.Hostname] = append(otherSamples, sample) + } } errs := make(chan error) @@ -155,12 +185,19 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error }(hostname, samples) } var lastErr error + successes := 0 for i := 0; i < len(samplesByIngester); i++ { if err := <-errs; err != nil { lastErr = err + continue } + successes++ + } + + if successes < d.cfg.MinWriteSuccesses { + return fmt.Errorf("too few successful writes, last error was: %v", lastErr) } - return lastErr + return nil } func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error { @@ -168,9 +205,14 @@ func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples if err != nil { return err } - return instrument.TimeRequestHistogram("send", d.sendDuration, func() error { + err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error { return client.Append(ctx, samples) }) + if err != nil { + d.ingesterAppendFailures.WithLabelValues(hostname).Inc() + } + d.ingesterAppends.WithLabelValues(hostname).Inc() + return err } func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelValue, error) { @@ -189,6 +231,8 @@ func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelV func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) { var result model.Matrix err := instrument.TimeRequestHistogram("duration", d.queryDuration, func() error { + fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} + metricName, err := metricNameFromLabelMatchers(matchers...) if err != nil { return err @@ -199,29 +243,85 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . return err } - collector, err := d.ring.Get(tokenFor(userID, metricName)) + ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, d.cfg.HeartbeatTimeout) if err != nil { return err } - client, err := d.getClientFor(collector.Hostname) - if err != nil { - return err + // Fetch samples from multiple ingesters and group them by fingerprint (unsorted + // and with overlap). + successes := 0 + var lastErr error + for _, ing := range ingesters { + client, err := d.getClientFor(ing.Hostname) + if err != nil { + return err + } + matrix, err := client.Query(ctx, from, to, matchers...) + d.ingesterQueries.WithLabelValues(ing.Hostname).Inc() + if err != nil { + lastErr = err + d.ingesterQueryFailures.WithLabelValues(ing.Hostname).Inc() + continue + } + successes++ + + for _, ss := range matrix { + fp := ss.Metric.Fingerprint() + if mss, ok := fpToSampleStream[fp]; !ok { + fpToSampleStream[fp] = &model.SampleStream{ + Metric: ss.Metric, + Values: ss.Values, + } + } else { + mss.Values = mergeSamples(fpToSampleStream[fp].Values, ss.Values) + } + } } - result, err = client.Query(ctx, from, to, matchers...) - if err != nil { - return err + if successes < d.cfg.MinReadSuccesses { + return fmt.Errorf("too few successful reads, last error was: %v", lastErr) } + + result = make(model.Matrix, 0, len(fpToSampleStream)) + for _, ss := range fpToSampleStream { + result = append(result, ss) + } + return nil }) return result, err } +func mergeSamples(a, b []model.SamplePair) []model.SamplePair { + result := make([]model.SamplePair, 0, len(a)+len(b)) + i, j := 0, 0 + for i < len(a) && j < len(b) { + if a[i].Timestamp < b[j].Timestamp { + result = append(result, a[i]) + i++ + } else if a[i].Timestamp > b[j].Timestamp { + result = append(result, b[j]) + j++ + } else { + result = append(result, a[i]) + i++ + j++ + } + } + for ; i < len(a); i++ { + result = append(result, a[i]) + } + for ; j < len(b); j++ { + result = append(result, b[j]) + } + return result +} + // LabelValuesForLabelName returns all of the label values that are associated with a given label name. func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { valueSet := map[model.LabelValue]struct{}{} - for _, c := range d.ring.GetAll() { + for _, c := range d.cfg.Ring.GetAll(d.cfg.HeartbeatTimeout) { client, err := d.getClientFor(c.Hostname) if err != nil { return nil, err @@ -252,7 +352,7 @@ func (d *Distributor) Describe(ch chan<- *prometheus.Desc) { d.queryDuration.Describe(ch) ch <- d.receivedSamples.Desc() d.sendDuration.Describe(ch) - d.ring.Describe(ch) + d.cfg.Ring.Describe(ch) ch <- numClientsDesc } @@ -261,7 +361,7 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) { d.queryDuration.Collect(ch) ch <- d.receivedSamples d.sendDuration.Collect(ch) - d.ring.Collect(ch) + d.cfg.Ring.Collect(ch) d.clientsMtx.RLock() defer d.clientsMtx.RUnlock() diff --git a/ring/ring.go b/ring/ring.go index 971c007327b..e357e9f53b8 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -20,6 +20,7 @@ import ( "math" "sort" "sync" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" @@ -101,24 +102,52 @@ func (r *Ring) loop() { }) } -// Get returns a collector close to the hash in the circle. -func (r *Ring) Get(key uint32) (IngesterDesc, error) { +// Get returns up to n ingesters close to the hash in the circle. +func (r *Ring) Get(key uint32, n int, heartbeatTimeout time.Duration) ([]IngesterDesc, error) { r.mtx.RLock() defer r.mtx.RUnlock() if len(r.ringDesc.Tokens) == 0 { - return IngesterDesc{}, ErrEmptyRing + return nil, ErrEmptyRing } - i := r.search(key) - return r.ringDesc.Ingesters[r.ringDesc.Tokens[i].Ingester], nil + + ingesters := make([]IngesterDesc, 0, n) + distinctHosts := map[string]struct{}{} + start := r.search(key) + iterations := 0 + for i := start; len(distinctHosts) < n && iterations < len(r.ringDesc.Tokens); i++ { + iterations++ + // Wrap i around in the ring. + i %= len(r.ringDesc.Tokens) + + // We want n *distinct* ingesters. + host := r.ringDesc.Tokens[i].Ingester + if _, ok := distinctHosts[host]; ok { + continue + } + distinctHosts[host] = struct{}{} + + ing := r.ringDesc.Ingesters[host] + + // Out of the n distinct subsequent ingesters, skip those that have not heartbeated in a while. + if time.Now().Sub(ing.Timestamp) > heartbeatTimeout { + continue + } + + ingesters = append(ingesters, ing) + } + return ingesters, nil } -// GetAll returns all ingesters in the circle. -func (r *Ring) GetAll() []IngesterDesc { +// GetAll returns all available ingesters in the circle. +func (r *Ring) GetAll(heartbeatTimeout time.Duration) []IngesterDesc { r.mtx.RLock() defer r.mtx.RUnlock() ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range r.ringDesc.Ingesters { + if time.Now().Sub(ingester.Timestamp) > heartbeatTimeout { + continue + } ingesters = append(ingesters, ingester) } return ingesters