Skip to content

Commit 24dbd8d

Browse files
authored
Merge pull request #49 from weaveworks/replication
[WIP] Read and write from/to multiple ingesters
2 parents 094681c + 502cbc5 commit 24dbd8d

File tree

3 files changed

+179
-47
lines changed

3 files changed

+179
-47
lines changed

cmd/prism/main.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ type cfg struct {
8282
flushPeriod time.Duration
8383
maxChunkAge time.Duration
8484
numTokens int
85+
86+
distributorConfig prism.DistributorConfig
8587
}
8688

8789
func main() {
@@ -101,6 +103,10 @@ func main() {
101103
flag.DurationVar(&cfg.flushPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
102104
flag.DurationVar(&cfg.maxChunkAge, "ingester.max-chunk-age", 10*time.Minute, "Maximum chunk age before flushing.")
103105
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
106+
flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
107+
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
108+
flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.")
109+
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
104110
flag.Parse()
105111

106112
chunkStore, err := setupChunkStore(cfg)
@@ -117,8 +123,12 @@ func main() {
117123
switch cfg.mode {
118124
case modeDistributor:
119125
ring := ring.New(consul)
126+
cfg.distributorConfig.Ring = ring
127+
cfg.distributorConfig.ClientFactory = func(address string) (*prism.IngesterClient, error) {
128+
return prism.NewIngesterClient(address, cfg.remoteTimeout)
129+
}
120130
defer ring.Stop()
121-
setupDistributor(cfg, ring, chunkStore)
131+
setupDistributor(cfg.distributorConfig, chunkStore)
122132
if err != nil {
123133
log.Fatalf("Error initializing distributor: %v", err)
124134
}
@@ -180,17 +190,10 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
180190
}
181191

182192
func setupDistributor(
183-
cfg cfg,
184-
ring *ring.Ring,
193+
cfg prism.DistributorConfig,
185194
chunkStore chunk.Store,
186195
) {
187-
clientFactory := func(address string) (*prism.IngesterClient, error) {
188-
return prism.NewIngesterClient(address, cfg.remoteTimeout)
189-
}
190-
distributor := prism.NewDistributor(prism.DistributorConfig{
191-
Ring: ring,
192-
ClientFactory: clientFactory,
193-
})
196+
distributor := prism.NewDistributor(cfg)
194197
prometheus.MustRegister(distributor)
195198

196199
prefix := "/api/prom"

distributor.go

Lines changed: 130 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"fmt"
1818
"hash/fnv"
1919
"sync"
20+
"time"
2021

2122
"github.com/prometheus/client_golang/prometheus"
2223
"github.com/prometheus/common/model"
@@ -39,22 +40,25 @@ var (
3940
// Distributor is a storage.SampleAppender and a prism.Querier which
4041
// forwards appends and queries to individual ingesters.
4142
type Distributor struct {
42-
ring ReadRing
43-
clientFactory IngesterClientFactory
44-
clientsMtx sync.RWMutex
45-
clients map[string]*IngesterClient
46-
47-
queryDuration *prometheus.HistogramVec
48-
receivedSamples prometheus.Counter
49-
sendDuration *prometheus.HistogramVec
43+
cfg DistributorConfig
44+
clientsMtx sync.RWMutex
45+
clients map[string]*IngesterClient
46+
47+
queryDuration *prometheus.HistogramVec
48+
receivedSamples prometheus.Counter
49+
sendDuration *prometheus.HistogramVec
50+
ingesterAppends *prometheus.CounterVec
51+
ingesterAppendFailures *prometheus.CounterVec
52+
ingesterQueries *prometheus.CounterVec
53+
ingesterQueryFailures *prometheus.CounterVec
5054
}
5155

5256
// ReadRing represents the read inferface to the ring.
5357
type ReadRing interface {
5458
prometheus.Collector
5559

56-
Get(key uint32) (ring.IngesterDesc, error)
57-
GetAll() []ring.IngesterDesc
60+
Get(key uint32, n int, heartbeatTimeout time.Duration) ([]ring.IngesterDesc, error)
61+
GetAll(heartbeatTimeout time.Duration) []ring.IngesterDesc
5862
}
5963

6064
// IngesterClientFactory creates ingester clients.
@@ -65,14 +69,18 @@ type IngesterClientFactory func(string) (*IngesterClient, error)
6569
type DistributorConfig struct {
6670
Ring ReadRing
6771
ClientFactory IngesterClientFactory
72+
73+
ReplicationFactor int
74+
MinReadSuccesses int
75+
MinWriteSuccesses int
76+
HeartbeatTimeout time.Duration
6877
}
6978

7079
// NewDistributor constructs a new Distributor
7180
func NewDistributor(cfg DistributorConfig) *Distributor {
7281
return &Distributor{
73-
ring: cfg.Ring,
74-
clientFactory: cfg.ClientFactory,
75-
clients: map[string]*IngesterClient{},
82+
cfg: cfg,
83+
clients: map[string]*IngesterClient{},
7684
queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
7785
Namespace: "prometheus",
7886
Name: "distributor_query_duration_seconds",
@@ -87,9 +95,29 @@ func NewDistributor(cfg DistributorConfig) *Distributor {
8795
sendDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
8896
Namespace: "prometheus",
8997
Name: "distributor_send_duration_seconds",
90-
Help: "Time spent sending sample batches to ingesters.",
98+
Help: "Time spent sending a sample batch to multiple replicated ingesters.",
9199
Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5, 1},
92100
}, []string{"method", "status_code"}),
101+
ingesterAppends: prometheus.NewCounterVec(prometheus.CounterOpts{
102+
Namespace: "prometheus",
103+
Name: "distributor_ingester_appends_total",
104+
Help: "The total number of batch appends sent to ingesters.",
105+
}, []string{"ingester"}),
106+
ingesterAppendFailures: prometheus.NewCounterVec(prometheus.CounterOpts{
107+
Namespace: "prometheus",
108+
Name: "distributor_ingester_appends_total",
109+
Help: "The total number of failed batch appends sent to ingesters.",
110+
}, []string{"ingester"}),
111+
ingesterQueries: prometheus.NewCounterVec(prometheus.CounterOpts{
112+
Namespace: "prometheus",
113+
Name: "distributor_ingester_queries_total",
114+
Help: "The total number of queries sent to ingesters.",
115+
}, []string{"ingester"}),
116+
ingesterQueryFailures: prometheus.NewCounterVec(prometheus.CounterOpts{
117+
Namespace: "prometheus",
118+
Name: "distributor_ingester_appends_total",
119+
Help: "The total number of failed queries sent to ingesters.",
120+
}, []string{"ingester"}),
93121
}
94122
}
95123

@@ -108,7 +136,7 @@ func (d *Distributor) getClientFor(hostname string) (*IngesterClient, error) {
108136
return client, nil
109137
}
110138

111-
client, err := d.clientFactory(hostname)
139+
client, err := d.cfg.ClientFactory(hostname)
112140
if err != nil {
113141
return nil, err
114142
}
@@ -140,12 +168,14 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
140168
samplesByIngester := map[string][]*model.Sample{}
141169
for _, sample := range samples {
142170
key := tokenForMetric(userID, sample.Metric)
143-
collector, err := d.ring.Get(key)
171+
ingesters, err := d.cfg.Ring.Get(key, d.cfg.ReplicationFactor, d.cfg.HeartbeatTimeout)
144172
if err != nil {
145173
return err
146174
}
147-
otherSamples := samplesByIngester[collector.Hostname]
148-
samplesByIngester[collector.Hostname] = append(otherSamples, sample)
175+
for _, ingester := range ingesters {
176+
otherSamples := samplesByIngester[ingester.Hostname]
177+
samplesByIngester[ingester.Hostname] = append(otherSamples, sample)
178+
}
149179
}
150180

151181
errs := make(chan error)
@@ -155,22 +185,34 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
155185
}(hostname, samples)
156186
}
157187
var lastErr error
188+
successes := 0
158189
for i := 0; i < len(samplesByIngester); i++ {
159190
if err := <-errs; err != nil {
160191
lastErr = err
192+
continue
161193
}
194+
successes++
195+
}
196+
197+
if successes < d.cfg.MinWriteSuccesses {
198+
return fmt.Errorf("too few successful writes, last error was: %v", lastErr)
162199
}
163-
return lastErr
200+
return nil
164201
}
165202

166203
func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error {
167204
client, err := d.getClientFor(hostname)
168205
if err != nil {
169206
return err
170207
}
171-
return instrument.TimeRequestHistogram("send", d.sendDuration, func() error {
208+
err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error {
172209
return client.Append(ctx, samples)
173210
})
211+
if err != nil {
212+
d.ingesterAppendFailures.WithLabelValues(hostname).Inc()
213+
}
214+
d.ingesterAppends.WithLabelValues(hostname).Inc()
215+
return err
174216
}
175217

176218
func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelValue, error) {
@@ -189,6 +231,8 @@ func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelV
189231
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
190232
var result model.Matrix
191233
err := instrument.TimeRequestHistogram("duration", d.queryDuration, func() error {
234+
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
235+
192236
metricName, err := metricNameFromLabelMatchers(matchers...)
193237
if err != nil {
194238
return err
@@ -199,29 +243,85 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
199243
return err
200244
}
201245

202-
collector, err := d.ring.Get(tokenFor(userID, metricName))
246+
ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, d.cfg.HeartbeatTimeout)
203247
if err != nil {
204248
return err
205249
}
206250

207-
client, err := d.getClientFor(collector.Hostname)
208-
if err != nil {
209-
return err
251+
// Fetch samples from multiple ingesters and group them by fingerprint (unsorted
252+
// and with overlap).
253+
successes := 0
254+
var lastErr error
255+
for _, ing := range ingesters {
256+
client, err := d.getClientFor(ing.Hostname)
257+
if err != nil {
258+
return err
259+
}
260+
matrix, err := client.Query(ctx, from, to, matchers...)
261+
d.ingesterQueries.WithLabelValues(ing.Hostname).Inc()
262+
if err != nil {
263+
lastErr = err
264+
d.ingesterQueryFailures.WithLabelValues(ing.Hostname).Inc()
265+
continue
266+
}
267+
successes++
268+
269+
for _, ss := range matrix {
270+
fp := ss.Metric.Fingerprint()
271+
if mss, ok := fpToSampleStream[fp]; !ok {
272+
fpToSampleStream[fp] = &model.SampleStream{
273+
Metric: ss.Metric,
274+
Values: ss.Values,
275+
}
276+
} else {
277+
mss.Values = mergeSamples(fpToSampleStream[fp].Values, ss.Values)
278+
}
279+
}
210280
}
211281

212-
result, err = client.Query(ctx, from, to, matchers...)
213-
if err != nil {
214-
return err
282+
if successes < d.cfg.MinReadSuccesses {
283+
return fmt.Errorf("too few successful reads, last error was: %v", lastErr)
215284
}
285+
286+
result = make(model.Matrix, 0, len(fpToSampleStream))
287+
for _, ss := range fpToSampleStream {
288+
result = append(result, ss)
289+
}
290+
216291
return nil
217292
})
218293
return result, err
219294
}
220295

296+
func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
297+
result := make([]model.SamplePair, 0, len(a)+len(b))
298+
i, j := 0, 0
299+
for i < len(a) && j < len(b) {
300+
if a[i].Timestamp < b[j].Timestamp {
301+
result = append(result, a[i])
302+
i++
303+
} else if a[i].Timestamp > b[j].Timestamp {
304+
result = append(result, b[j])
305+
j++
306+
} else {
307+
result = append(result, a[i])
308+
i++
309+
j++
310+
}
311+
}
312+
for ; i < len(a); i++ {
313+
result = append(result, a[i])
314+
}
315+
for ; j < len(b); j++ {
316+
result = append(result, b[j])
317+
}
318+
return result
319+
}
320+
221321
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
222322
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
223323
valueSet := map[model.LabelValue]struct{}{}
224-
for _, c := range d.ring.GetAll() {
324+
for _, c := range d.cfg.Ring.GetAll(d.cfg.HeartbeatTimeout) {
225325
client, err := d.getClientFor(c.Hostname)
226326
if err != nil {
227327
return nil, err
@@ -252,7 +352,7 @@ func (d *Distributor) Describe(ch chan<- *prometheus.Desc) {
252352
d.queryDuration.Describe(ch)
253353
ch <- d.receivedSamples.Desc()
254354
d.sendDuration.Describe(ch)
255-
d.ring.Describe(ch)
355+
d.cfg.Ring.Describe(ch)
256356
ch <- numClientsDesc
257357
}
258358

@@ -261,7 +361,7 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
261361
d.queryDuration.Collect(ch)
262362
ch <- d.receivedSamples
263363
d.sendDuration.Collect(ch)
264-
d.ring.Collect(ch)
364+
d.cfg.Ring.Collect(ch)
265365

266366
d.clientsMtx.RLock()
267367
defer d.clientsMtx.RUnlock()

0 commit comments

Comments
 (0)