diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0998d62efac..98e60ce5905 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -77,6 +77,7 @@ type Config struct { ClientCleanupPeriod time.Duration IngestionRateLimit float64 IngestionBurstSize int + WaitForAllIngesters bool // for testing ingesterClientFactory func(addr string, cfg ingester_client.Config) (client.IngesterClient, error) @@ -92,6 +93,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") + flag.BoolVar(&cfg.WaitForAllIngesters, "distributor.wait-for-all-ingesters", false, "Turning this on will no longer short circuit ingester writes once a minimum quorum is reached.") } // New constructs a new Distributor @@ -250,8 +252,10 @@ func tokenFor(userID string, name []byte) uint32 { type sampleTracker struct { labels []client.LabelPair sample client.Sample + total int minSuccess int maxFailures int + finished int32 succeeded int32 failed int32 } @@ -316,6 +320,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie // We need a response from a quorum of ingesters, which is n/2 + 1. minSuccess := (len(ingesters[i]) / 2) + 1 samples[i].minSuccess = minSuccess + samples[i].total = len(ingesters[i]) samples[i].maxFailures = len(ingesters[i]) - minSuccess // Skip those that have not heartbeated in a while. NB these are still @@ -388,19 +393,35 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe // goroutine will write to either channel. for i := range sampleTrackers { if err != nil { - if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) { - continue - } - if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { - pushTracker.err <- err + if atomic.AddInt32(&sampleTrackers[i].failed, 1) > int32(sampleTrackers[i].maxFailures) { + if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 { + pushTracker.err <- err + } } + } + if d.cfg.WaitForAllIngesters { + waitForAll(sampleTrackers[i], pushTracker) } else { - if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) { - continue - } - if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { - pushTracker.done <- struct{}{} - } + shortCircuit(err, sampleTrackers[i], pushTracker) + } + } +} + +func shortCircuit(err error, sampleTracker *sampleTracker, pushTracker *pushTracker) { + if err == nil { + if atomic.AddInt32(&sampleTracker.succeeded, 1) != int32(sampleTracker.minSuccess) { + return + } + if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { + pushTracker.done <- struct{}{} + } + } +} + +func waitForAll(sampleTracker *sampleTracker, pushTracker *pushTracker) { + if atomic.AddInt32(&sampleTracker.finished, 1) == int32(sampleTracker.total) { + if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 { + pushTracker.done <- struct{}{} } } }