Skip to content

Commit 9fe7552

Browse files
committed
refactor
Signed-off-by: Alan Protasio <[email protected]>
1 parent 68b534b commit 9fe7552

File tree

2 files changed

+29
-28
lines changed

2 files changed

+29
-28
lines changed

pkg/distributor/distributor_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -513,23 +513,23 @@ func TestPush_QuorumError(t *testing.T) {
513513

514514
d := dists[0]
515515

516-
// Using 489 just to make sure we are not hitting the &limits
516+
// Using 429 just to make sure we are not hitting the &limits
517517
// Simulating 2 4xx and 1 5xx -> Should return 4xx
518-
ingesters[0].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
518+
ingesters[0].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
519519
ingesters[1].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
520-
ingesters[2].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
520+
ingesters[2].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
521521

522522
for i := 0; i < 1000; i++ {
523523
request := makeWriteRequest(0, 30, 20)
524524
_, err := d.Push(ctx, request)
525525
status, ok := status.FromError(err)
526526
require.True(t, ok)
527-
require.Equal(t, codes.Code(489), status.Code())
527+
require.Equal(t, codes.Code(429), status.Code())
528528
}
529529

530530
// Simulating 2 5xx and 1 4xx -> Should return 5xx
531531
ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
532-
ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
532+
ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
533533
ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
534534

535535
for i := 0; i < 10000; i++ {
@@ -542,15 +542,15 @@ func TestPush_QuorumError(t *testing.T) {
542542

543543
// Simulating 2 different errors and 1 success -> This case we may return any of the errors
544544
ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
545-
ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
545+
ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
546546
ingesters[2].happy.Store(true)
547547

548548
for i := 0; i < 1000; i++ {
549549
request := makeWriteRequest(0, 30, 20)
550550
_, err := d.Push(ctx, request)
551551
status, ok := status.FromError(err)
552552
require.True(t, ok)
553-
require.True(t, status.Code() == 489 || status.Code() == 500)
553+
require.True(t, status.Code() == 429 || status.Code() == 500)
554554
}
555555

556556
// Simulating 1 error -> Should return 2xx
@@ -581,7 +581,7 @@ func TestPush_QuorumError(t *testing.T) {
581581
require.NoError(t, err)
582582

583583
// Give time to the ring get updated with the KV value
584-
time.Sleep(5 * time.Second)
584+
time.Sleep(time.Second)
585585

586586
for i := 0; i < 1000; i++ {
587587
request := makeWriteRequest(0, 30, 20)

pkg/ring/batch.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -138,32 +138,33 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
138138
// The use of atomic increments here guarantees only a single sendSamples
139139
// goroutine will write to either channel.
140140
for i := range sampleTrackers {
141-
verifyIfShouldReturn := func() {
142-
if sampleTrackers[i].remaining.Dec() == 0 {
143-
if b.rpcsFailed.Inc() == 1 {
144-
b.err <- sampleTrackers[i].err.Load()
145-
}
146-
}
147-
}
148-
149141
if err != nil {
142+
// We count the error by error family (4xx and 5xx)
150143
errCount := sampleTrackers[i].recordError(err)
151-
152-
if errCount <= int32(sampleTrackers[i].maxFailures) {
153-
verifyIfShouldReturn()
154-
continue
155-
}
156-
157-
if b.rpcsFailed.Inc() == 1 {
158-
b.err <- err
144+
// We should return an error if we reach the maxFailure (quorum) on a give error family OR
145+
// we dont have any remaining ingesters to try
146+
// Ex: 2xx, 4xx, 5xx -> return 5xx
147+
// Ex: 4xx, 4xx, 2xx -> return 4xx
148+
if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 {
149+
if b.rpcsFailed.Inc() == 1 {
150+
b.err <- err
151+
}
159152
}
160153
} else {
161-
if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) {
162-
verifyIfShouldReturn()
154+
// We should return success if we succeeded calling `minSuccess` ingesters
155+
if sampleTrackers[i].succeeded.Inc() >= int32(sampleTrackers[i].minSuccess) {
156+
if b.rpcsPending.Dec() == 0 {
157+
b.done <- struct{}{}
158+
}
163159
continue
164160
}
165-
if b.rpcsPending.Dec() == 0 {
166-
b.done <- struct{}{}
161+
162+
// If we suceeded to call this particular ingester but we dont have any remaining ingesters to try
163+
// and we did not succeeded calling `minSuccess` ingesters we need to return the last error
164+
if sampleTrackers[i].remaining.Dec() == 0 {
165+
if b.rpcsFailed.Inc() == 1 {
166+
b.err <- sampleTrackers[i].err.Load()
167+
}
167168
}
168169
}
169170
}

0 commit comments

Comments
 (0)