Skip to content

Commit 3dd2ced

Browse files
authored
Merge pull request #90 from weaveworks/87-ingester-lifecycle
Add per-token state, and use it to not write to Leaving ingesters
2 parents 2aba2a4 + 473d0b5 commit 3dd2ced

File tree

6 files changed

+164
-44
lines changed

6 files changed

+164
-44
lines changed

cmd/cortex/main.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ func main() {
9393
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
9494
flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
9595
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
96-
flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.")
9796
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
9897
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests")
9998
flag.Parse()
@@ -125,10 +124,16 @@ func main() {
125124
// network errors.
126125
log.Fatalf("Could not register ingester: %v", err)
127126
}
128-
defer registration.Unregister()
129-
prometheus.MustRegister(registration)
130127
ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess)
131-
defer ing.Stop()
128+
129+
// Deferring a func to make ordering obvious
130+
defer func() {
131+
registration.ChangeState(ring.Leaving)
132+
ing.Stop()
133+
registration.Unregister()
134+
}()
135+
136+
prometheus.MustRegister(registration)
132137
default:
133138
log.Fatalf("Mode %s not supported!", cfg.mode)
134139
}

cortex-build/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.6.3
1+
FROM golang:1.7.3
22
RUN apt-get update && apt-get install -y python-requests python-yaml file jq && \
33
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
44
RUN go clean -i net && \

distributor.go

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"hash/fnv"
66
"sync"
7+
"sync/atomic"
78
"time"
89

910
"github.com/prometheus/client_golang/prometheus"
@@ -46,7 +47,8 @@ type Distributor struct {
4647
type ReadRing interface {
4748
prometheus.Collector
4849

49-
Get(key uint32, n int) ([]ring.IngesterDesc, error)
50+
Get(key uint32, n int, op ring.Operation) ([]ring.IngesterDesc, error)
51+
BatchGet(keys []uint32, n int, op ring.Operation) ([][]ring.IngesterDesc, error)
5052
GetAll() []ring.IngesterDesc
5153
}
5254

@@ -61,7 +63,6 @@ type DistributorConfig struct {
6163

6264
ReplicationFactor int
6365
MinReadSuccesses int
64-
MinWriteSuccesses int
6566
HeartbeatTimeout time.Duration
6667
}
6768

@@ -70,9 +71,6 @@ func NewDistributor(cfg DistributorConfig) (*Distributor, error) {
7071
if 0 > cfg.ReplicationFactor {
7172
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
7273
}
73-
if cfg.MinWriteSuccesses > cfg.ReplicationFactor {
74-
return nil, fmt.Errorf("MinWriteSuccesses > ReplicationFactor: %d > %d", cfg.MinWriteSuccesses, cfg.ReplicationFactor)
75-
}
7674
if cfg.MinReadSuccesses > cfg.ReplicationFactor {
7775
return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor)
7876
}
@@ -154,6 +152,12 @@ func tokenFor(userID string, name model.LabelValue) uint32 {
154152
return h.Sum32()
155153
}
156154

155+
type sampleTracker struct {
156+
sample *model.Sample
157+
minSuccess int
158+
succeeded int32
159+
}
160+
157161
// Append implements SampleAppender.
158162
func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error {
159163
userID, err := user.GetID(ctx)
@@ -163,53 +167,90 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
163167

164168
d.receivedSamples.Add(float64(len(samples)))
165169

166-
samplesByIngester := map[string][]*model.Sample{}
167-
for _, sample := range samples {
168-
key := tokenForMetric(userID, sample.Metric)
169-
ingesters, err := d.cfg.Ring.Get(key, d.cfg.ReplicationFactor)
170-
if err != nil {
171-
return err
170+
keys := make([]uint32, len(samples), len(samples))
171+
for i, sample := range samples {
172+
keys[i] = tokenForMetric(userID, sample.Metric)
173+
}
174+
175+
ingesters, err := d.cfg.Ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write)
176+
if err != nil {
177+
return err
178+
}
179+
180+
sampleTrackers := make([]sampleTracker, len(samples), len(samples))
181+
samplesByIngester := map[string][]*sampleTracker{}
182+
for i := range samples {
183+
sampleTrackers[i] = sampleTracker{
184+
sample: samples[i],
185+
// We need a response from a quorum of ingesters, which is n/2 + 1.
186+
minSuccess: (len(ingesters[i]) / 2) + 1,
187+
succeeded: 0,
172188
}
173-
for _, ingester := range ingesters {
174-
otherSamples := samplesByIngester[ingester.Hostname]
175-
samplesByIngester[ingester.Hostname] = append(otherSamples, sample)
189+
190+
// Skip those that have not heartbeated in a while. NB these are still
191+
// included in the calculation of minSuccess, so if too many failed ingesters
192+
// will cause the whole write to fail.
193+
liveIngesters := make([]string, 0, len(ingesters[i]))
194+
for _, ingester := range ingesters[i] {
195+
if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout {
196+
liveIngesters = append(liveIngesters, ingester.Hostname)
197+
}
198+
}
199+
200+
// This is just a shortcut - if there are not minSuccess available ingesters,
201+
// after filtering out dead ones, don't even both trying.
202+
if len(liveIngesters) < sampleTrackers[i].minSuccess {
203+
return fmt.Errorf("wanted at least %d live ingesters to process write, had %d",
204+
sampleTrackers[i].minSuccess, len(liveIngesters))
205+
}
206+
207+
for _, liveIngester := range liveIngesters {
208+
sampleForIngester := samplesByIngester[liveIngester]
209+
samplesByIngester[liveIngester] = append(sampleForIngester, &sampleTrackers[i])
176210
}
177211
}
178212

179213
errs := make(chan error)
180214
for hostname, samples := range samplesByIngester {
181-
go func(hostname string, samples []*model.Sample) {
215+
go func(hostname string, samples []*sampleTracker) {
182216
errs <- d.sendSamples(ctx, hostname, samples)
183217
}(hostname, samples)
184218
}
185219
var lastErr error
186-
successes := 0
187220
for i := 0; i < len(samplesByIngester); i++ {
188221
if err := <-errs; err != nil {
189222
lastErr = err
190223
continue
191224
}
192-
successes++
193225
}
194-
195-
if successes < d.cfg.MinWriteSuccesses {
196-
return fmt.Errorf("too few successful writes, last error was: %v", lastErr)
226+
for i := range sampleTrackers {
227+
if sampleTrackers[i].succeeded < int32(sampleTrackers[i].minSuccess) {
228+
return fmt.Errorf("need %d successful writes, only got %d, last error was: %v",
229+
sampleTrackers[i].minSuccess, sampleTrackers[i].succeeded, lastErr)
230+
}
197231
}
198232
return nil
199233
}
200234

201-
func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error {
235+
func (d *Distributor) sendSamples(ctx context.Context, hostname string, sampleTrackers []*sampleTracker) error {
202236
client, err := d.getClientFor(hostname)
203237
if err != nil {
204238
return err
205239
}
240+
samples := make([]*model.Sample, len(sampleTrackers), len(sampleTrackers))
241+
for i := range sampleTrackers {
242+
samples[i] = sampleTrackers[i].sample
243+
}
206244
err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error {
207245
return client.Append(ctx, samples)
208246
})
209247
if err != nil {
210248
d.ingesterAppendFailures.WithLabelValues(hostname).Inc()
211249
}
212250
d.ingesterAppends.WithLabelValues(hostname).Inc()
251+
for i := range sampleTrackers {
252+
atomic.AddInt32(&sampleTrackers[i].succeeded, 1)
253+
}
213254
return err
214255
}
215256

@@ -241,7 +282,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
241282
return err
242283
}
243284

244-
ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor)
285+
ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read)
245286
if err != nil {
246287
return err
247288
}

ring/ingester_lifecycle.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ type IngesterRegistration struct {
3131
quit chan struct{}
3232
wait sync.WaitGroup
3333

34+
// We need to remember the token state just in case consul goes away and comes
35+
// back empty. Channel is used to tell the actor to update consul on state changes.
36+
state TokenState
37+
stateChange chan TokenState
38+
3439
consulHeartbeats prometheus.Counter
3540
}
3641

@@ -56,6 +61,10 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In
5661
hostname: fmt.Sprintf("%s:%d", addr, listenPort),
5762
quit: make(chan struct{}),
5863

64+
// Only read/written on actor goroutine.
65+
state: Active,
66+
stateChange: make(chan TokenState),
67+
5968
consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{
6069
Name: "cortex_ingester_consul_heartbeats_total",
6170
Help: "The total number of heartbeats sent to consul.",
@@ -67,6 +76,13 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In
6776
return r, nil
6877
}
6978

79+
// ChangeState changes the state of all tokens owned by this
80+
// ingester in the ring.
81+
func (r *IngesterRegistration) ChangeState(state TokenState) {
82+
log.Info("Changing token state to: %v", state)
83+
r.stateChange <- state
84+
}
85+
7086
// Unregister removes ingester config from Consul; will block
7187
// until we'll successfully unregistered.
7288
func (r *IngesterRegistration) Unregister() {
@@ -108,7 +124,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 {
108124
tokens = append(tokens, newTokens...)
109125
}
110126

111-
ringDesc.addIngester(r.id, r.hostname, tokens)
127+
ringDesc.addIngester(r.id, r.hostname, tokens, r.state)
112128
return ringDesc, true, nil
113129
}
114130
if err := r.consul.CAS(consulKey, descFactory, pickTokens); err != nil {
@@ -118,7 +134,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 {
118134
}
119135

120136
func (r *IngesterRegistration) heartbeat(tokens []uint32) {
121-
heartbeat := func(in interface{}) (out interface{}, retry bool, err error) {
137+
updateConsul := func(in interface{}) (out interface{}, retry bool, err error) {
122138
var ringDesc *Desc
123139
if in == nil {
124140
ringDesc = newDesc()
@@ -130,21 +146,29 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
130146
if !ok {
131147
// consul must have restarted
132148
log.Infof("Found empty ring, inserting tokens!")
133-
ringDesc.addIngester(r.id, r.hostname, tokens)
149+
ringDesc.addIngester(r.id, r.hostname, tokens, r.state)
134150
} else {
135151
ingesterDesc.Timestamp = time.Now()
136152
ringDesc.Ingesters[r.id] = ingesterDesc
153+
for i := range ringDesc.Tokens {
154+
ringDesc.Tokens[i].State = r.state
155+
}
137156
}
138157

139158
return ringDesc, true, nil
140159
}
160+
141161
ticker := time.NewTicker(heartbeatInterval)
142162
defer ticker.Stop()
143163
for {
144164
select {
165+
case r.state = <-r.stateChange:
166+
if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil {
167+
log.Errorf("Failed to write to consul, sleeping: %v", err)
168+
}
145169
case <-ticker.C:
146170
r.consulHeartbeats.Inc()
147-
if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil {
171+
if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil {
148172
log.Errorf("Failed to write to consul, sleeping: %v", err)
149173
}
150174
case <-r.quit:

ring/model.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ import (
77
"github.com/prometheus/common/log"
88
)
99

10+
// TokenState describes the state of a token
11+
type TokenState int
12+
13+
// Values for TokenState
14+
const (
15+
Active TokenState = iota
16+
Leaving
17+
)
18+
1019
// Desc is the serialised state in Consul representing
1120
// all ingesters (ie, the ring).
1221
type Desc struct {
@@ -29,8 +38,9 @@ func (ts TokenDescs) Less(i, j int) bool { return ts[i].Token < ts[j].Token }
2938

3039
// TokenDesc describes an individual token in the ring.
3140
type TokenDesc struct {
32-
Token uint32 `json:"tokens"`
33-
Ingester string `json:"ingester"`
41+
Token uint32 `json:"tokens"`
42+
Ingester string `json:"ingester"`
43+
State TokenState `json:"state"`
3444
}
3545

3646
func descFactory() interface{} {
@@ -43,7 +53,7 @@ func newDesc() *Desc {
4353
}
4454
}
4555

46-
func (d *Desc) addIngester(id, hostname string, tokens []uint32) {
56+
func (d *Desc) addIngester(id, hostname string, tokens []uint32, state TokenState) {
4757
d.Ingesters[id] = IngesterDesc{
4858
Hostname: hostname,
4959
Timestamp: time.Now(),
@@ -53,6 +63,7 @@ func (d *Desc) addIngester(id, hostname string, tokens []uint32) {
5363
d.Tokens = append(d.Tokens, TokenDesc{
5464
Token: token,
5565
Ingester: id,
66+
State: state,
5667
})
5768
}
5869

0 commit comments

Comments
 (0)