Skip to content

Commit 1937b48

Browse files
committed
Comments
1 parent 9944ca1 commit 1937b48

File tree

2 files changed

+26
-28
lines changed

2 files changed

+26
-28
lines changed

pkg/distributor/distributor_test.go

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
494494
}
495495

496496
func TestPush_QuorumError(t *testing.T) {
497+
497498
var limits validation.Limits
498499
flagext.DefaultValues(&limits)
499500

@@ -513,13 +514,16 @@ func TestPush_QuorumError(t *testing.T) {
513514

514515
d := dists[0]
515516

517+
// we should run several write request to make sure we dont have any race condition on the batchTracker#record code
518+
numberOfWrites := 10000
519+
516520
// Using 429 just to make sure we are not hitting the &limits
517521
// Simulating 2 4xx and 1 5xx -> Should return 4xx
518522
ingesters[0].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
519523
ingesters[1].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
520524
ingesters[2].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
521525

522-
for i := 0; i < 1000; i++ {
526+
for i := 0; i < numberOfWrites; i++ {
523527
request := makeWriteRequest(0, 30, 20)
524528
_, err := d.Push(ctx, request)
525529
status, ok := status.FromError(err)
@@ -532,7 +536,7 @@ func TestPush_QuorumError(t *testing.T) {
532536
ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
533537
ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
534538

535-
for i := 0; i < 10000; i++ {
539+
for i := 0; i < numberOfWrites; i++ {
536540
request := makeWriteRequest(0, 300, 200)
537541
_, err := d.Push(ctx, request)
538542
status, ok := status.FromError(err)
@@ -545,7 +549,7 @@ func TestPush_QuorumError(t *testing.T) {
545549
ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
546550
ingesters[2].happy.Store(true)
547551

548-
for i := 0; i < 1; i++ {
552+
for i := 0; i < numberOfWrites; i++ {
549553
request := makeWriteRequest(0, 30, 20)
550554
_, err := d.Push(ctx, request)
551555
status, ok := status.FromError(err)
@@ -569,18 +573,6 @@ func TestPush_QuorumError(t *testing.T) {
569573
ingesters[1].happy.Store(true)
570574
ingesters[2].happy.Store(true)
571575

572-
// Wait group to check when the ring got updated
573-
wg := &sync.WaitGroup{}
574-
wg.Add(1)
575-
576-
go func() {
577-
r.KVClient.WatchKey(context.Background(), ingester.RingKey, func(i interface{}) bool {
578-
wg.Done()
579-
// False will terminate the watch
580-
return false
581-
})
582-
}()
583-
584576
err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
585577
r := in.(*ring.Desc)
586578
ingester2 := r.Ingesters["2"]
@@ -593,9 +585,15 @@ func TestPush_QuorumError(t *testing.T) {
593585
require.NoError(t, err)
594586

595587
// Give time to the ring get updated with the KV value
596-
wg.Wait()
588+
for {
589+
replicationSet, _ := r.GetAllHealthy(ring.Read)
590+
if len(replicationSet.Instances) == 2 {
591+
break
592+
}
593+
time.Sleep(100 * time.Millisecond)
594+
}
597595

598-
for i := 0; i < 1000; i++ {
596+
for i := 0; i < numberOfWrites; i++ {
599597
request := makeWriteRequest(0, 30, 20)
600598
_, err := d.Push(ctx, request)
601599
require.Error(t, err)

pkg/ring/batch.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,19 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
128128
}
129129

130130
func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
131-
// If we succeed, decrement each sample's pending count by one. If we reach
132-
// the required number of successful puts on this sample, then decrement the
133-
// number of pending samples by one. If we successfully push all samples to
134-
// min success instances, wake up the waiting rpc so it can return early.
135-
// Similarly, track the number of errors, and if it exceeds maxFailures
136-
// shortcut the waiting rpc.
131+
// If we reach the required number of successful puts on this sample, then decrement the
132+
// number of pending samples by one.
137133
//
138-
// The use of atomic increments here guarantees only a single sendSamples
139-
// goroutine will write to either channel.
134+
// The use of atomic increments here is needed as:
135+
// * rpcsPending and rpcsPending guarantees only a single sendSamples goroutine will write to either channel
136+
// * succeeded, failed4xx, failed5xx and remaining guarantees that the "return decision" is made atomically
137+
// avoiding race condition
140138
for i := range sampleTrackers {
141139
if err != nil {
142-
// We count the error by error family (4xx and 5xx)
140+
// Track the number of errors by error family, and if it exceeds maxFailures
141+
// shortcut the waiting rpc.
143142
errCount := sampleTrackers[i].recordError(err)
144-
// We should return an error if we reach the maxFailure (quorum) on a give error family OR
143+
// We should return an error if we reach the maxFailure (quorum) on a given error family OR
145144
// we dont have any remaining ingesters to try
146145
// Ex: 2xx, 4xx, 5xx -> return 5xx
147146
// Ex: 4xx, 4xx, _ -> return 4xx
@@ -152,7 +151,8 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
152151
}
153152
}
154153
} else {
155-
// We should return success if we succeeded calling `minSuccess` ingesters.
154+
// If we successfully push all samples to min success instances,
155+
// wake up the waiting rpc so it can return early.
156156
if sampleTrackers[i].succeeded.Inc() >= int32(sampleTrackers[i].minSuccess) {
157157
if b.rpcsPending.Dec() == 0 {
158158
b.done <- struct{}{}

0 commit comments

Comments
 (0)