Skip to content

Commit 1d03d82

Browse files
committed
Review feedback
1 parent 15fbf27 commit 1d03d82

File tree

4 files changed

+20
-17
lines changed

4 files changed

+20
-17
lines changed

cmd/cortex/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ func main() {
9393
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
9494
flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
9595
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
96-
flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.")
9796
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
9897
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests")
9998
flag.Parse()
@@ -125,7 +124,6 @@ func main() {
125124
// network errors.
126125
log.Fatalf("Could not register ingester: %v", err)
127126
}
128-
prometheus.MustRegister(registration)
129127
ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess)
130128

131129
// Deferring a func to make ordering obvious
@@ -134,6 +132,8 @@ func main() {
134132
ing.Stop()
135133
registration.Unregister()
136134
}()
135+
136+
prometheus.MustRegister(registration)
137137
default:
138138
log.Fatalf("Mode %s not supported!", cfg.mode)
139139
}

distributor.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ type DistributorConfig struct {
6363

6464
ReplicationFactor int
6565
MinReadSuccesses int
66-
MinWriteSuccesses int
6766
HeartbeatTimeout time.Duration
6867
}
6968

@@ -72,9 +71,6 @@ func NewDistributor(cfg DistributorConfig) (*Distributor, error) {
7271
if 0 > cfg.ReplicationFactor {
7372
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
7473
}
75-
if cfg.MinWriteSuccesses > cfg.ReplicationFactor {
76-
return nil, fmt.Errorf("MinWriteSuccesses > ReplicationFactor: %d > %d", cfg.MinWriteSuccesses, cfg.ReplicationFactor)
77-
}
7874
if cfg.MinReadSuccesses > cfg.ReplicationFactor {
7975
return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor)
8076
}
@@ -171,10 +167,7 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
171167

172168
d.receivedSamples.Add(float64(len(samples)))
173169

174-
sampleTrackers := make([]sampleTracker, len(samples), len(samples))
175-
samplesByIngester := map[string][]*sampleTracker{}
176170
keys := make([]uint32, len(samples), len(samples))
177-
178171
for i, sample := range samples {
179172
keys[i] = tokenForMetric(userID, sample.Metric)
180173
}
@@ -184,22 +177,28 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error
184177
return err
185178
}
186179

180+
sampleTrackers := make([]sampleTracker, len(samples), len(samples))
181+
samplesByIngester := map[string][]*sampleTracker{}
187182
for i := range samples {
188183
sampleTrackers[i] = sampleTracker{
189-
sample: samples[i],
184+
sample: samples[i],
185+
// We need a response from a quorum of ingesters, which is n/2 + 1.
190186
minSuccess: (len(ingesters[i]) / 2) + 1,
191187
succeeded: 0,
192188
}
193189

194190
// Skip those that have not heartbeated in a while. NB these are still
195191
// included in the calculation of minSuccess, so if too many failed ingesters
196192
// will cause the whole write to fail.
197-
liveIngesters := make([]string, len(ingesters[i]))
193+
liveIngesters := make([]string, 0, len(ingesters[i]))
198194
for _, ingester := range ingesters[i] {
199195
if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout {
200196
liveIngesters = append(liveIngesters, ingester.Hostname)
201197
}
202198
}
199+
200+
// This is just a shortcut - if there are not minSuccess available ingesters,
201+
// after filtering out dead ones, don't even both trying.
203202
if len(liveIngesters) < sampleTrackers[i].minSuccess {
204203
return fmt.Errorf("wanted at least %d live ingesters to process write, had %d",
205204
sampleTrackers[i].minSuccess, len(liveIngesters))

ring/ingester_lifecycle.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type IngesterRegistration struct {
3232
wait sync.WaitGroup
3333

3434
// We need to remember the token state just in case consul goes away and comes
35-
// back empty. Channel is users to tell the actor to update consul on state changes.
35+
// back empty. Channel is used to tell the actor to update consul on state changes.
3636
state TokenState
3737
stateChange chan TokenState
3838

@@ -134,7 +134,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 {
134134
}
135135

136136
func (r *IngesterRegistration) heartbeat(tokens []uint32) {
137-
heartbeat := func(in interface{}) (out interface{}, retry bool, err error) {
137+
updateConsul := func(in interface{}) (out interface{}, retry bool, err error) {
138138
var ringDesc *Desc
139139
if in == nil {
140140
ringDesc = newDesc()
@@ -163,12 +163,12 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
163163
for {
164164
select {
165165
case r.state = <-r.stateChange:
166-
if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil {
166+
if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil {
167167
log.Errorf("Failed to write to consul, sleeping: %v", err)
168168
}
169169
case <-ticker.C:
170170
r.consulHeartbeats.Inc()
171-
if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil {
171+
if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil {
172172
log.Errorf("Failed to write to consul, sleeping: %v", err)
173173
}
174174
case <-r.quit:

ring/ring.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,12 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, err
152152
}
153153
distinctHosts[token.Ingester] = struct{}{}
154154

155-
// If we encounter a Leaving token, for reads we should bump n,
156-
// for writes we bump n and skip the token.
155+
// Ingesters that are Leaving do not count to the replication limit. We do
156+
// not want to Write to them because they are about to go away, but we do
157+
// want to write the extra replica somewhere. So we increase the size of the
158+
// set of replicas for the key. This means we have to also increase the
159+
// size of the replica set for read, but we can read from Leaving ingesters,
160+
// so don't skip it in this case.
157161
if token.State == Leaving {
158162
n++
159163
if op == Write {

0 commit comments

Comments
 (0)