Skip to content

Commit 2d35012

Browse files
committed
Read and write from/to multiple ingesters
1 parent 094681c commit 2d35012

File tree

3 files changed

+144
-41
lines changed

3 files changed

+144
-41
lines changed

cmd/prism/main.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ type cfg struct {
8282
flushPeriod time.Duration
8383
maxChunkAge time.Duration
8484
numTokens int
85+
86+
distributorConfig prism.DistributorConfig
8587
}
8688

8789
func main() {
@@ -101,6 +103,11 @@ func main() {
101103
flag.DurationVar(&cfg.flushPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
102104
flag.DurationVar(&cfg.maxChunkAge, "ingester.max-chunk-age", 10*time.Minute, "Maximum chunk age before flushing.")
103105
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
106+
flag.IntVar(&cfg.distributorConfig.ReadReplicas, "distributor.read-replicas", 3, "The number of available ingesters to read from.")
107+
flag.IntVar(&cfg.distributorConfig.WriteReplicas, "distributor.write-replicas", 3, "The number of available ingesters to write to.")
108+
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
109+
flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.")
110+
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
104111
flag.Parse()
105112

106113
chunkStore, err := setupChunkStore(cfg)
@@ -117,8 +124,12 @@ func main() {
117124
switch cfg.mode {
118125
case modeDistributor:
119126
ring := ring.New(consul)
127+
cfg.distributorConfig.Ring = ring
128+
cfg.distributorConfig.ClientFactory = func(address string) (*prism.IngesterClient, error) {
129+
return prism.NewIngesterClient(address, cfg.remoteTimeout)
130+
}
120131
defer ring.Stop()
121-
setupDistributor(cfg, ring, chunkStore)
132+
setupDistributor(cfg.distributorConfig, chunkStore)
122133
if err != nil {
123134
log.Fatalf("Error initializing distributor: %v", err)
124135
}
@@ -180,17 +191,10 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
180191
}
181192

182193
func setupDistributor(
183-
cfg cfg,
184-
ring *ring.Ring,
194+
cfg prism.DistributorConfig,
185195
chunkStore chunk.Store,
186196
) {
187-
clientFactory := func(address string) (*prism.IngesterClient, error) {
188-
return prism.NewIngesterClient(address, cfg.remoteTimeout)
189-
}
190-
distributor := prism.NewDistributor(prism.DistributorConfig{
191-
Ring: ring,
192-
ClientFactory: clientFactory,
193-
})
197+
distributor := prism.NewDistributor(cfg)
194198
prometheus.MustRegister(distributor)
195199

196200
prefix := "/api/prom"

distributor.go

Lines changed: 94 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"fmt"
1818
"hash/fnv"
1919
"sync"
20+
"time"
2021

2122
"github.com/prometheus/client_golang/prometheus"
2223
"github.com/prometheus/common/model"
@@ -39,10 +40,9 @@ var (
3940
// Distributor is a storage.SampleAppender and a prism.Querier which
4041
// forwards appends and queries to individual ingesters.
4142
type Distributor struct {
42-
ring ReadRing
43-
clientFactory IngesterClientFactory
44-
clientsMtx sync.RWMutex
45-
clients map[string]*IngesterClient
43+
cfg DistributorConfig
44+
clientsMtx sync.RWMutex
45+
clients map[string]*IngesterClient
4646

4747
queryDuration *prometheus.HistogramVec
4848
receivedSamples prometheus.Counter
@@ -53,8 +53,8 @@ type Distributor struct {
5353
type ReadRing interface {
5454
prometheus.Collector
5555

56-
Get(key uint32) (ring.IngesterDesc, error)
57-
GetAll() []ring.IngesterDesc
56+
Get(key uint32, n int, heartbeatTimeout time.Duration) ([]ring.IngesterDesc, error)
57+
GetAll(heartbeatTimeout time.Duration) []ring.IngesterDesc
5858
}
5959

6060
// IngesterClientFactory creates ingester clients.
@@ -65,14 +65,19 @@ type IngesterClientFactory func(string) (*IngesterClient, error)
6565
type DistributorConfig struct {
6666
Ring ReadRing
6767
ClientFactory IngesterClientFactory
68+
69+
ReadReplicas int
70+
WriteReplicas int
71+
MinReadSuccesses int
72+
MinWriteSuccesses int
73+
HeartbeatTimeout time.Duration
6874
}
6975

7076
// NewDistributor constructs a new Distributor
7177
func NewDistributor(cfg DistributorConfig) *Distributor {
7278
return &Distributor{
73-
ring: cfg.Ring,
74-
clientFactory: cfg.ClientFactory,
75-
clients: map[string]*IngesterClient{},
79+
cfg: cfg,
80+
clients: map[string]*IngesterClient{},
7681
queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
7782
Namespace: "prometheus",
7883
Name: "distributor_query_duration_seconds",
@@ -108,7 +113,7 @@ func (d *Distributor) getClientFor(hostname string) (*IngesterClient, error) {
108113
return client, nil
109114
}
110115

111-
client, err := d.clientFactory(hostname)
116+
client, err := d.cfg.ClientFactory(hostname)
112117
if err != nil {
113118
return nil, err
114119
}
@@ -140,12 +145,14 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
140145
samplesByIngester := map[string][]*model.Sample{}
141146
for _, sample := range samples {
142147
key := tokenForMetric(userID, sample.Metric)
143-
collector, err := d.ring.Get(key)
148+
ingesters, err := d.cfg.Ring.Get(key, d.cfg.WriteReplicas, d.cfg.HeartbeatTimeout)
144149
if err != nil {
145150
return err
146151
}
147-
otherSamples := samplesByIngester[collector.Hostname]
148-
samplesByIngester[collector.Hostname] = append(otherSamples, sample)
152+
for _, ingester := range ingesters {
153+
otherSamples := samplesByIngester[ingester.Hostname]
154+
samplesByIngester[ingester.Hostname] = append(otherSamples, sample)
155+
}
149156
}
150157

151158
errs := make(chan error)
@@ -155,12 +162,19 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
155162
}(hostname, samples)
156163
}
157164
var lastErr error
165+
successes := 0
158166
for i := 0; i < len(samplesByIngester); i++ {
159167
if err := <-errs; err != nil {
160168
lastErr = err
169+
continue
161170
}
171+
successes++
172+
}
173+
174+
if successes < d.cfg.MinWriteSuccesses {
175+
return fmt.Errorf("too few successful writes, last error was: %v", lastErr)
162176
}
163-
return lastErr
177+
return nil
164178
}
165179

166180
func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error {
@@ -189,6 +203,8 @@ func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelV
189203
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
190204
var result model.Matrix
191205
err := instrument.TimeRequestHistogram("duration", d.queryDuration, func() error {
206+
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
207+
192208
metricName, err := metricNameFromLabelMatchers(matchers...)
193209
if err != nil {
194210
return err
@@ -199,29 +215,83 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
199215
return err
200216
}
201217

202-
collector, err := d.ring.Get(tokenFor(userID, metricName))
218+
ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReadReplicas, d.cfg.HeartbeatTimeout)
203219
if err != nil {
204220
return err
205221
}
206222

207-
client, err := d.getClientFor(collector.Hostname)
208-
if err != nil {
209-
return err
223+
// Fetch samples from multiple ingesters and group them by fingerprint (unsorted
224+
// and with overlap).
225+
successes := 0
226+
var lastErr error
227+
for _, ing := range ingesters {
228+
client, err := d.getClientFor(ing.Hostname)
229+
if err != nil {
230+
return err
231+
}
232+
matrix, err := client.Query(ctx, from, to, matchers...)
233+
if err != nil {
234+
lastErr = err
235+
continue
236+
}
237+
successes++
238+
239+
for _, ss := range matrix {
240+
fp := ss.Metric.Fingerprint()
241+
if mss, ok := fpToSampleStream[fp]; !ok {
242+
fpToSampleStream[fp] = &model.SampleStream{
243+
Metric: ss.Metric,
244+
Values: ss.Values,
245+
}
246+
} else {
247+
mss.Values = mergeSamples(fpToSampleStream[fp].Values, ss.Values)
248+
}
249+
}
210250
}
211251

212-
result, err = client.Query(ctx, from, to, matchers...)
213-
if err != nil {
214-
return err
252+
if successes < d.cfg.MinReadSuccesses {
253+
return fmt.Errorf("too few successful reads, last error was: %v", lastErr)
254+
}
255+
256+
result = make(model.Matrix, 0, len(fpToSampleStream))
257+
for _, ss := range fpToSampleStream {
258+
result = append(result, ss)
215259
}
260+
216261
return nil
217262
})
218263
return result, err
219264
}
220265

266+
func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
267+
result := make([]model.SamplePair, 0, len(a)+len(b))
268+
i, j := 0, 0
269+
for i < len(a) && j < len(b) {
270+
if a[i].Timestamp < b[j].Timestamp {
271+
result = append(result, a[i])
272+
i++
273+
} else if a[i].Timestamp > b[j].Timestamp {
274+
result = append(result, b[j])
275+
j++
276+
} else {
277+
result = append(result, a[i])
278+
i++
279+
j++
280+
}
281+
}
282+
for ; i < len(a); i++ {
283+
result = append(result, a[i])
284+
}
285+
for ; j < len(b); j++ {
286+
result = append(result, b[j])
287+
}
288+
return result
289+
}
290+
221291
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
222292
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
223293
valueSet := map[model.LabelValue]struct{}{}
224-
for _, c := range d.ring.GetAll() {
294+
for _, c := range d.cfg.Ring.GetAll(d.cfg.HeartbeatTimeout) {
225295
client, err := d.getClientFor(c.Hostname)
226296
if err != nil {
227297
return nil, err
@@ -252,7 +322,7 @@ func (d *Distributor) Describe(ch chan<- *prometheus.Desc) {
252322
d.queryDuration.Describe(ch)
253323
ch <- d.receivedSamples.Desc()
254324
d.sendDuration.Describe(ch)
255-
d.ring.Describe(ch)
325+
d.cfg.Ring.Describe(ch)
256326
ch <- numClientsDesc
257327
}
258328

@@ -261,7 +331,7 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) {
261331
d.queryDuration.Collect(ch)
262332
ch <- d.receivedSamples
263333
d.sendDuration.Collect(ch)
264-
d.ring.Collect(ch)
334+
d.cfg.Ring.Collect(ch)
265335

266336
d.clientsMtx.RLock()
267337
defer d.clientsMtx.RUnlock()

ring/ring.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"math"
2121
"sort"
2222
"sync"
23+
"time"
2324

2425
"github.com/prometheus/client_golang/prometheus"
2526
"github.com/prometheus/common/log"
@@ -101,24 +102,52 @@ func (r *Ring) loop() {
101102
})
102103
}
103104

104-
// Get returns a collector close to the hash in the circle.
105-
func (r *Ring) Get(key uint32) (IngesterDesc, error) {
105+
// Get returns up to n ingesters close to the hash in the circle.
106+
func (r *Ring) Get(key uint32, n int, heartbeatTimeout time.Duration) ([]IngesterDesc, error) {
106107
r.mtx.RLock()
107108
defer r.mtx.RUnlock()
108109
if len(r.ringDesc.Tokens) == 0 {
109-
return IngesterDesc{}, ErrEmptyRing
110+
return nil, ErrEmptyRing
110111
}
111-
i := r.search(key)
112-
return r.ringDesc.Ingesters[r.ringDesc.Tokens[i].Ingester], nil
112+
113+
ingesters := make([]IngesterDesc, 0, n)
114+
distinctHosts := map[string]struct{}{}
115+
start := r.search(key)
116+
iterations := 0
117+
for i := start; len(distinctHosts) < n && iterations < len(r.ringDesc.Tokens); i++ {
118+
iterations++
119+
// Wrap i around in the ring.
120+
i %= len(r.ringDesc.Tokens)
121+
122+
// We want n *distinct* ingesters.
123+
host := r.ringDesc.Tokens[i].Ingester
124+
if _, ok := distinctHosts[host]; ok {
125+
continue
126+
}
127+
distinctHosts[host] = struct{}{}
128+
129+
ing := r.ringDesc.Ingesters[host]
130+
131+
// Out of the n distinct subsequent ingesters, skip those that have not heartbeated in a while.
132+
if time.Now().Sub(ing.Timestamp) > heartbeatTimeout {
133+
continue
134+
}
135+
136+
ingesters = append(ingesters, ing)
137+
}
138+
return ingesters, nil
113139
}
114140

115-
// GetAll returns all ingesters in the circle.
116-
func (r *Ring) GetAll() []IngesterDesc {
141+
// GetAll returns all available ingesters in the circle.
142+
func (r *Ring) GetAll(heartbeatTimeout time.Duration) []IngesterDesc {
117143
r.mtx.RLock()
118144
defer r.mtx.RUnlock()
119145

120146
ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
121147
for _, ingester := range r.ringDesc.Ingesters {
148+
if time.Now().Sub(ingester.Timestamp) > heartbeatTimeout {
149+
continue
150+
}
122151
ingesters = append(ingesters, ingester)
123152
}
124153
return ingesters

0 commit comments

Comments
 (0)