Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions cmd/prism/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type cfg struct {
flushPeriod time.Duration
maxChunkAge time.Duration
numTokens int

distributorConfig prism.DistributorConfig
}

func main() {
Expand All @@ -101,6 +103,11 @@ func main() {
flag.DurationVar(&cfg.flushPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
flag.DurationVar(&cfg.maxChunkAge, "ingester.max-chunk-age", 10*time.Minute, "Maximum chunk age before flushing.")
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
flag.IntVar(&cfg.distributorConfig.ReadReplicas, "distributor.read-replicas", 3, "The number of available ingesters to read from.")
flag.IntVar(&cfg.distributorConfig.WriteReplicas, "distributor.write-replicas", 3, "The number of available ingesters to write to.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just want number of replicas. I can't imagine a circumstance where we'd want these numbers to differ.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not 100% sure about this, but you're probably right and I can't see how differing numbers would make sense.

flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.")
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
flag.Parse()

chunkStore, err := setupChunkStore(cfg)
Expand All @@ -117,8 +124,12 @@ func main() {
switch cfg.mode {
case modeDistributor:
ring := ring.New(consul)
cfg.distributorConfig.Ring = ring
cfg.distributorConfig.ClientFactory = func(address string) (*prism.IngesterClient, error) {
return prism.NewIngesterClient(address, cfg.remoteTimeout)
}
defer ring.Stop()
setupDistributor(cfg, ring, chunkStore)
setupDistributor(cfg.distributorConfig, chunkStore)
if err != nil {
log.Fatalf("Error initializing distributor: %v", err)
}
Expand Down Expand Up @@ -180,17 +191,10 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
}

func setupDistributor(
cfg cfg,
ring *ring.Ring,
cfg prism.DistributorConfig,
chunkStore chunk.Store,
) {
clientFactory := func(address string) (*prism.IngesterClient, error) {
return prism.NewIngesterClient(address, cfg.remoteTimeout)
}
distributor := prism.NewDistributor(prism.DistributorConfig{
Ring: ring,
ClientFactory: clientFactory,
})
distributor := prism.NewDistributor(cfg)
prometheus.MustRegister(distributor)

prefix := "/api/prom"
Expand Down
118 changes: 94 additions & 24 deletions distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"hash/fnv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand All @@ -39,10 +40,9 @@ var (
// Distributor is a storage.SampleAppender and a prism.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
ring ReadRing
clientFactory IngesterClientFactory
clientsMtx sync.RWMutex
clients map[string]*IngesterClient
cfg DistributorConfig
clientsMtx sync.RWMutex
clients map[string]*IngesterClient

queryDuration *prometheus.HistogramVec
receivedSamples prometheus.Counter
Expand All @@ -53,8 +53,8 @@ type Distributor struct {
type ReadRing interface {
prometheus.Collector

Get(key uint32) (ring.IngesterDesc, error)
GetAll() []ring.IngesterDesc
Get(key uint32, n int, heartbeatTimeout time.Duration) ([]ring.IngesterDesc, error)
GetAll(heartbeatTimeout time.Duration) []ring.IngesterDesc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach might be to exclude the heartbeatTimeout from the parameters and instead implement it as a decorator. Not sure it's a better approach, mind you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From discussion: consider making the heartbeat timeout a property of the ring instead of passing it in on each call. Then the ring could also report in its metrics Collect() method how many total vs. timed out ingesters there are.

}

// IngesterClientFactory creates ingester clients.
Expand All @@ -65,14 +65,19 @@ type IngesterClientFactory func(string) (*IngesterClient, error)
type DistributorConfig struct {
Ring ReadRing
ClientFactory IngesterClientFactory

ReadReplicas int
WriteReplicas int
MinReadSuccesses int
MinWriteSuccesses int
HeartbeatTimeout time.Duration
}

// NewDistributor constructs a new Distributor
func NewDistributor(cfg DistributorConfig) *Distributor {
return &Distributor{
ring: cfg.Ring,
clientFactory: cfg.ClientFactory,
clients: map[string]*IngesterClient{},
cfg: cfg,
clients: map[string]*IngesterClient{},
queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "prometheus",
Name: "distributor_query_duration_seconds",
Expand Down Expand Up @@ -108,7 +113,7 @@ func (d *Distributor) getClientFor(hostname string) (*IngesterClient, error) {
return client, nil
}

client, err := d.clientFactory(hostname)
client, err := d.cfg.ClientFactory(hostname)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,12 +145,14 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
samplesByIngester := map[string][]*model.Sample{}
for _, sample := range samples {
key := tokenForMetric(userID, sample.Metric)
collector, err := d.ring.Get(key)
ingesters, err := d.cfg.Ring.Get(key, d.cfg.WriteReplicas, d.cfg.HeartbeatTimeout)
if err != nil {
return err
}
otherSamples := samplesByIngester[collector.Hostname]
samplesByIngester[collector.Hostname] = append(otherSamples, sample)
for _, ingester := range ingesters {
otherSamples := samplesByIngester[ingester.Hostname]
samplesByIngester[ingester.Hostname] = append(otherSamples, sample)
}
}

errs := make(chan error)
Expand All @@ -155,12 +162,19 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
}(hostname, samples)
}
var lastErr error
successes := 0
for i := 0; i < len(samplesByIngester); i++ {
if err := <-errs; err != nil {
lastErr = err
continue
}
successes++
}

if successes < d.cfg.MinWriteSuccesses {
return fmt.Errorf("too few successful writes, last error was: %v", lastErr)
}
return lastErr
return nil
}

func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error {
Expand Down Expand Up @@ -189,6 +203,8 @@ func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelV
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
var result model.Matrix
err := instrument.TimeRequestHistogram("duration", d.queryDuration, func() error {
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}

metricName, err := metricNameFromLabelMatchers(matchers...)
if err != nil {
return err
Expand All @@ -199,29 +215,83 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
return err
}

collector, err := d.ring.Get(tokenFor(userID, metricName))
ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReadReplicas, d.cfg.HeartbeatTimeout)
if err != nil {
return err
}

client, err := d.getClientFor(collector.Hostname)
if err != nil {
return err
// Fetch samples from multiple ingesters and group them by fingerprint (unsorted
// and with overlap).
successes := 0
var lastErr error
for _, ing := range ingesters {
client, err := d.getClientFor(ing.Hostname)
if err != nil {
return err
}
matrix, err := client.Query(ctx, from, to, matchers...)
if err != nil {
lastErr = err
continue
}
successes++

for _, ss := range matrix {
fp := ss.Metric.Fingerprint()
if mss, ok := fpToSampleStream[fp]; !ok {
fpToSampleStream[fp] = &model.SampleStream{
Metric: ss.Metric,
Values: ss.Values,
}
} else {
mss.Values = mergeSamples(fpToSampleStream[fp].Values, ss.Values)
}
}
}

result, err = client.Query(ctx, from, to, matchers...)
if err != nil {
return err
if successes < d.cfg.MinReadSuccesses {
return fmt.Errorf("too few successful reads, last error was: %v", lastErr)
}

result = make(model.Matrix, 0, len(fpToSampleStream))
for _, ss := range fpToSampleStream {
result = append(result, ss)
}

return nil
})
return result, err
}

func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
result := make([]model.SamplePair, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i])
i++
} else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for the value at b[i] to be different from a[i]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that's prevented in the ingesters, like in Prometheus itself. If two ingesters do end up having different values for the same timestamp for some broken reason, it's still a good thing to just filter that out here.

i++
j++
}
}
for ; i < len(a); i++ {
result = append(result, a[i])
}
for ; j < len(b); j++ {
result = append(result, b[j])
}
return result
}

// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
valueSet := map[model.LabelValue]struct{}{}
for _, c := range d.ring.GetAll() {
for _, c := range d.cfg.Ring.GetAll(d.cfg.HeartbeatTimeout) {
client, err := d.getClientFor(c.Hostname)
if err != nil {
return nil, err
Expand Down Expand Up @@ -252,7 +322,7 @@ func (d *Distributor) Describe(ch chan<- *prometheus.Desc) {
d.queryDuration.Describe(ch)
ch <- d.receivedSamples.Desc()
d.sendDuration.Describe(ch)
d.ring.Describe(ch)
d.cfg.Ring.Describe(ch)
ch <- numClientsDesc
}

Expand All @@ -261,7 +331,7 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
d.queryDuration.Collect(ch)
ch <- d.receivedSamples
d.sendDuration.Collect(ch)
d.ring.Collect(ch)
d.cfg.Ring.Collect(ch)

d.clientsMtx.RLock()
defer d.clientsMtx.RUnlock()
Expand Down
43 changes: 36 additions & 7 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"sort"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
Expand Down Expand Up @@ -101,24 +102,52 @@ func (r *Ring) loop() {
})
}

// Get returns a collector close to the hash in the circle.
func (r *Ring) Get(key uint32) (IngesterDesc, error) {
// Get returns up to n ingesters close to the hash in the circle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth having a new type that implements Collector but has multiple ingesters as a backend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like: Distributor -> Replicator -> Ingesters?

Hmm, could make sense, but one more type. Let's defer that decision for later, I would propose?

Copy link
Contributor

@jml jml Oct 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree re deferring.

func (r *Ring) Get(key uint32, n int, heartbeatTimeout time.Duration) ([]IngesterDesc, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if len(r.ringDesc.Tokens) == 0 {
return IngesterDesc{}, ErrEmptyRing
return nil, ErrEmptyRing
}
i := r.search(key)
return r.ringDesc.Ingesters[r.ringDesc.Tokens[i].Ingester], nil

ingesters := make([]IngesterDesc, 0, n)
distinctHosts := map[string]struct{}{}
start := r.search(key)
iterations := 0
for i := start; len(distinctHosts) < n && iterations < len(r.ringDesc.Tokens); i++ {
iterations++
// Wrap i around in the ring.
i %= len(r.ringDesc.Tokens)

// We want n *distinct* ingesters.
host := r.ringDesc.Tokens[i].Ingester
if _, ok := distinctHosts[host]; ok {
continue
}
distinctHosts[host] = struct{}{}

ing := r.ringDesc.Ingesters[host]

// Out of the n distinct subsequent ingesters, skip those that have not heartbeated in a while.
if time.Now().Sub(ing.Timestamp) > heartbeatTimeout {
continue
}

ingesters = append(ingesters, ing)
}
return ingesters, nil
}

// GetAll returns all ingesters in the circle.
func (r *Ring) GetAll() []IngesterDesc {
// GetAll returns all available ingesters in the circle.
func (r *Ring) GetAll(heartbeatTimeout time.Duration) []IngesterDesc {
r.mtx.RLock()
defer r.mtx.RUnlock()

ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
for _, ingester := range r.ringDesc.Ingesters {
if time.Now().Sub(ingester.Timestamp) > heartbeatTimeout {
continue
}
ingesters = append(ingesters, ingester)
}
return ingesters
Expand Down