Skip to content

Stop early-return after majority success in distributor writes #732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Config struct {
ClientCleanupPeriod time.Duration
IngestionRateLimit float64
IngestionBurstSize int
WaitForAllIngesters bool

// for testing
ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error)
Expand All @@ -92,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
flag.BoolVar(&cfg.WaitForAllIngesters, "distributor.wait-for-all-ingesters", false, "Turning this on will no longer short circuit ingester writes once a minimum quorum is reached.")
}

// New constructs a new Distributor
Expand Down Expand Up @@ -250,8 +252,10 @@ func tokenFor(userID string, name []byte) uint32 {
type sampleTracker struct {
labels []client.LabelPair
sample client.Sample
total int
minSuccess int
maxFailures int
finished int32
succeeded int32
failed int32
}
Expand Down Expand Up @@ -316,6 +320,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
// We need a response from a quorum of ingesters, which is n/2 + 1.
minSuccess := (len(ingesters[i]) / 2) + 1
samples[i].minSuccess = minSuccess
samples[i].total = len(ingesters[i])
samples[i].maxFailures = len(ingesters[i]) - minSuccess

// Skip those that have not heartbeated in a while. NB these are still
Expand Down Expand Up @@ -388,19 +393,35 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe
// goroutine will write to either channel.
for i := range sampleTrackers {
if err != nil {
if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) {
continue
}
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
pushTracker.err <- err
if atomic.AddInt32(&sampleTrackers[i].failed, 1) > int32(sampleTrackers[i].maxFailures) {
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
pushTracker.err <- err
}
}
}
if d.cfg.WaitForAllIngesters {
waitForAll(sampleTrackers[i], pushTracker)
} else {
if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) {
continue
}
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
pushTracker.done <- struct{}{}
}
shortCircuit(err, sampleTrackers[i], pushTracker)
}
}
}

func shortCircuit(err error, sampleTracker *sampleTracker, pushTracker *pushTracker) {
if err == nil {
if atomic.AddInt32(&sampleTracker.succeeded, 1) != int32(sampleTracker.minSuccess) {
return
}
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
pushTracker.done <- struct{}{}
}
}
}

func waitForAll(sampleTracker *sampleTracker, pushTracker *pushTracker) {
if atomic.AddInt32(&sampleTracker.finished, 1) == int32(sampleTracker.total) {
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
pushTracker.done <- struct{}{}
}
}
}
Expand Down