Skip to content

[WIP] Read and write from/to multiple ingesters #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions cmd/prism/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type cfg struct {
flushPeriod time.Duration
maxChunkAge time.Duration
numTokens int

distributorConfig prism.DistributorConfig
}

func main() {
Expand All @@ -101,6 +103,10 @@ func main() {
flag.DurationVar(&cfg.flushPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
flag.DurationVar(&cfg.maxChunkAge, "ingester.max-chunk-age", 10*time.Minute, "Maximum chunk age before flushing.")
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.")
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
flag.Parse()

chunkStore, err := setupChunkStore(cfg)
Expand All @@ -117,8 +123,12 @@ func main() {
switch cfg.mode {
case modeDistributor:
ring := ring.New(consul)
cfg.distributorConfig.Ring = ring
cfg.distributorConfig.ClientFactory = func(address string) (*prism.IngesterClient, error) {
return prism.NewIngesterClient(address, cfg.remoteTimeout)
}
defer ring.Stop()
setupDistributor(cfg, ring, chunkStore)
setupDistributor(cfg.distributorConfig, chunkStore)
if err != nil {
log.Fatalf("Error initializing distributor: %v", err)
}
Expand Down Expand Up @@ -180,17 +190,10 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
}

func setupDistributor(
cfg cfg,
ring *ring.Ring,
cfg prism.DistributorConfig,
chunkStore chunk.Store,
) {
clientFactory := func(address string) (*prism.IngesterClient, error) {
return prism.NewIngesterClient(address, cfg.remoteTimeout)
}
distributor := prism.NewDistributor(prism.DistributorConfig{
Ring: ring,
ClientFactory: clientFactory,
})
distributor := prism.NewDistributor(cfg)
prometheus.MustRegister(distributor)

prefix := "/api/prom"
Expand Down
160 changes: 130 additions & 30 deletions distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"hash/fnv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand All @@ -39,22 +40,25 @@ var (
// Distributor is a storage.SampleAppender and a prism.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
ring ReadRing
clientFactory IngesterClientFactory
clientsMtx sync.RWMutex
clients map[string]*IngesterClient

queryDuration *prometheus.HistogramVec
receivedSamples prometheus.Counter
sendDuration *prometheus.HistogramVec
cfg DistributorConfig
clientsMtx sync.RWMutex
clients map[string]*IngesterClient

queryDuration *prometheus.HistogramVec
receivedSamples prometheus.Counter
sendDuration *prometheus.HistogramVec
ingesterAppends *prometheus.CounterVec
ingesterAppendFailures *prometheus.CounterVec
ingesterQueries *prometheus.CounterVec
ingesterQueryFailures *prometheus.CounterVec
}

// ReadRing represents the read inferface to the ring.
type ReadRing interface {
prometheus.Collector

Get(key uint32) (ring.IngesterDesc, error)
GetAll() []ring.IngesterDesc
Get(key uint32, n int, heartbeatTimeout time.Duration) ([]ring.IngesterDesc, error)
GetAll(heartbeatTimeout time.Duration) []ring.IngesterDesc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach might be to exclude the heartbeatTimeout from the parameters and instead implement it as a decorator. Not sure it's a better approach, mind you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From discussion: consider making the heartbeat timeout a property of the ring instead of passing it in on each call. Then the ring could also report in its metrics Collect() method how many total vs. timed out ingesters there are.

}

// IngesterClientFactory creates ingester clients.
Expand All @@ -65,14 +69,18 @@ type IngesterClientFactory func(string) (*IngesterClient, error)
type DistributorConfig struct {
Ring ReadRing
ClientFactory IngesterClientFactory

ReplicationFactor int
MinReadSuccesses int
MinWriteSuccesses int
HeartbeatTimeout time.Duration
}

// NewDistributor constructs a new Distributor
func NewDistributor(cfg DistributorConfig) *Distributor {
return &Distributor{
ring: cfg.Ring,
clientFactory: cfg.ClientFactory,
clients: map[string]*IngesterClient{},
cfg: cfg,
clients: map[string]*IngesterClient{},
queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "prometheus",
Name: "distributor_query_duration_seconds",
Expand All @@ -87,9 +95,29 @@ func NewDistributor(cfg DistributorConfig) *Distributor {
sendDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "prometheus",
Name: "distributor_send_duration_seconds",
Help: "Time spent sending sample batches to ingesters.",
Help: "Time spent sending a sample batch to multiple replicated ingesters.",
Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5, 1},
}, []string{"method", "status_code"}),
ingesterAppends: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "prometheus",
Name: "distributor_ingester_appends_total",
Help: "The total number of batch appends sent to ingesters.",
}, []string{"ingester"}),
ingesterAppendFailures: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "prometheus",
Name: "distributor_ingester_appends_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"}),
ingesterQueries: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "prometheus",
Name: "distributor_ingester_queries_total",
Help: "The total number of queries sent to ingesters.",
}, []string{"ingester"}),
ingesterQueryFailures: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "prometheus",
Name: "distributor_ingester_appends_total",
Help: "The total number of failed queries sent to ingesters.",
}, []string{"ingester"}),
}
}

Expand All @@ -108,7 +136,7 @@ func (d *Distributor) getClientFor(hostname string) (*IngesterClient, error) {
return client, nil
}

client, err := d.clientFactory(hostname)
client, err := d.cfg.ClientFactory(hostname)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,12 +168,14 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
samplesByIngester := map[string][]*model.Sample{}
for _, sample := range samples {
key := tokenForMetric(userID, sample.Metric)
collector, err := d.ring.Get(key)
ingesters, err := d.cfg.Ring.Get(key, d.cfg.ReplicationFactor, d.cfg.HeartbeatTimeout)
if err != nil {
return err
}
otherSamples := samplesByIngester[collector.Hostname]
samplesByIngester[collector.Hostname] = append(otherSamples, sample)
for _, ingester := range ingesters {
otherSamples := samplesByIngester[ingester.Hostname]
samplesByIngester[ingester.Hostname] = append(otherSamples, sample)
}
}

errs := make(chan error)
Expand All @@ -155,22 +185,34 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
}(hostname, samples)
}
var lastErr error
successes := 0
for i := 0; i < len(samplesByIngester); i++ {
if err := <-errs; err != nil {
lastErr = err
continue
}
successes++
}

if successes < d.cfg.MinWriteSuccesses {
return fmt.Errorf("too few successful writes, last error was: %v", lastErr)
}
return lastErr
return nil
}

func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error {
client, err := d.getClientFor(hostname)
if err != nil {
return err
}
return instrument.TimeRequestHistogram("send", d.sendDuration, func() error {
err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error {
return client.Append(ctx, samples)
})
if err != nil {
d.ingesterAppendFailures.WithLabelValues(hostname).Inc()
}
d.ingesterAppends.WithLabelValues(hostname).Inc()
return err
}

func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelValue, error) {
Expand All @@ -189,6 +231,8 @@ func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelV
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
var result model.Matrix
err := instrument.TimeRequestHistogram("duration", d.queryDuration, func() error {
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}

metricName, err := metricNameFromLabelMatchers(matchers...)
if err != nil {
return err
Expand All @@ -199,29 +243,85 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
return err
}

collector, err := d.ring.Get(tokenFor(userID, metricName))
ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, d.cfg.HeartbeatTimeout)
if err != nil {
return err
}

client, err := d.getClientFor(collector.Hostname)
if err != nil {
return err
// Fetch samples from multiple ingesters and group them by fingerprint (unsorted
// and with overlap).
successes := 0
var lastErr error
for _, ing := range ingesters {
client, err := d.getClientFor(ing.Hostname)
if err != nil {
return err
}
matrix, err := client.Query(ctx, from, to, matchers...)
d.ingesterQueries.WithLabelValues(ing.Hostname).Inc()
if err != nil {
lastErr = err
d.ingesterQueryFailures.WithLabelValues(ing.Hostname).Inc()
continue
}
successes++

for _, ss := range matrix {
fp := ss.Metric.Fingerprint()
if mss, ok := fpToSampleStream[fp]; !ok {
fpToSampleStream[fp] = &model.SampleStream{
Metric: ss.Metric,
Values: ss.Values,
}
} else {
mss.Values = mergeSamples(fpToSampleStream[fp].Values, ss.Values)
}
}
}

result, err = client.Query(ctx, from, to, matchers...)
if err != nil {
return err
if successes < d.cfg.MinReadSuccesses {
return fmt.Errorf("too few successful reads, last error was: %v", lastErr)
}

result = make(model.Matrix, 0, len(fpToSampleStream))
for _, ss := range fpToSampleStream {
result = append(result, ss)
}

return nil
})
return result, err
}

func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
result := make([]model.SamplePair, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i])
i++
} else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for the value at b[i] to be different from a[i]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that's prevented in the ingesters, like in Prometheus itself. If two ingesters do end up having different values for the same timestamp for some broken reason, it's still a good thing to just filter that out here.

i++
j++
}
}
for ; i < len(a); i++ {
result = append(result, a[i])
}
for ; j < len(b); j++ {
result = append(result, b[j])
}
return result
}

// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
valueSet := map[model.LabelValue]struct{}{}
for _, c := range d.ring.GetAll() {
for _, c := range d.cfg.Ring.GetAll(d.cfg.HeartbeatTimeout) {
client, err := d.getClientFor(c.Hostname)
if err != nil {
return nil, err
Expand Down Expand Up @@ -252,7 +352,7 @@ func (d *Distributor) Describe(ch chan<- *prometheus.Desc) {
d.queryDuration.Describe(ch)
ch <- d.receivedSamples.Desc()
d.sendDuration.Describe(ch)
d.ring.Describe(ch)
d.cfg.Ring.Describe(ch)
ch <- numClientsDesc
}

Expand All @@ -261,7 +361,7 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
d.queryDuration.Collect(ch)
ch <- d.receivedSamples
d.sendDuration.Collect(ch)
d.ring.Collect(ch)
d.cfg.Ring.Collect(ch)

d.clientsMtx.RLock()
defer d.clientsMtx.RUnlock()
Expand Down
Loading