Skip to content

Commit 68b534b

Browse files
committed
Returning quorum error
Signed-off-by: Alan Protasio <[email protected]>
1 parent ca9b4a0 commit 68b534b

File tree

2 files changed

+144
-12
lines changed

2 files changed

+144
-12
lines changed

pkg/distributor/distributor_test.go

Lines changed: 113 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"testing"
1515
"time"
1616

17+
"google.golang.org/grpc/codes"
18+
1719
"github.com/go-kit/log"
1820
"github.com/prometheus/client_golang/prometheus"
1921
"github.com/prometheus/client_golang/prometheus/testutil"
@@ -24,6 +26,7 @@ import (
2426
"github.com/stretchr/testify/require"
2527
"github.com/weaveworks/common/httpgrpc"
2628
"github.com/weaveworks/common/user"
29+
"go.uber.org/atomic"
2730
"google.golang.org/grpc"
2831
"google.golang.org/grpc/health/grpc_health_v1"
2932
"google.golang.org/grpc/status"
@@ -490,6 +493,106 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
490493
}
491494
}
492495

496+
func TestPush_QuorumError(t *testing.T) {
497+
var limits validation.Limits
498+
flagext.DefaultValues(&limits)
499+
500+
limits.IngestionRate = math.MaxFloat64
501+
502+
dists, ingesters, r, _ := prepare(t, prepConfig{
503+
numDistributors: 1,
504+
numIngesters: 3,
505+
happyIngesters: 0,
506+
shuffleShardSize: 3,
507+
shardByAllLabels: true,
508+
shuffleShardEnabled: true,
509+
limits: &limits,
510+
})
511+
512+
ctx := user.InjectOrgID(context.Background(), "user")
513+
514+
d := dists[0]
515+
516+
// Using 489 just to make sure we are not hitting the &limits
517+
// Simulating 2 4xx and 1 5xx -> Should return 4xx
518+
ingesters[0].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
519+
ingesters[1].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
520+
ingesters[2].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
521+
522+
for i := 0; i < 1000; i++ {
523+
request := makeWriteRequest(0, 30, 20)
524+
_, err := d.Push(ctx, request)
525+
status, ok := status.FromError(err)
526+
require.True(t, ok)
527+
require.Equal(t, codes.Code(489), status.Code())
528+
}
529+
530+
// Simulating 2 5xx and 1 4xx -> Should return 5xx
531+
ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
532+
ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
533+
ingesters[2].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
534+
535+
for i := 0; i < 10000; i++ {
536+
request := makeWriteRequest(0, 300, 200)
537+
_, err := d.Push(ctx, request)
538+
status, ok := status.FromError(err)
539+
require.True(t, ok)
540+
require.Equal(t, codes.Code(500), status.Code())
541+
}
542+
543+
// Simulating 2 different errors and 1 success -> This case we may return any of the errors
544+
ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
545+
ingesters[1].failResp.Store(httpgrpc.Errorf(489, "Throttling"))
546+
ingesters[2].happy.Store(true)
547+
548+
for i := 0; i < 1000; i++ {
549+
request := makeWriteRequest(0, 30, 20)
550+
_, err := d.Push(ctx, request)
551+
status, ok := status.FromError(err)
552+
require.True(t, ok)
553+
require.True(t, status.Code() == 489 || status.Code() == 500)
554+
}
555+
556+
// Simulating 1 error -> Should return 2xx
557+
ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
558+
ingesters[1].happy.Store(true)
559+
ingesters[2].happy.Store(true)
560+
561+
for i := 0; i < 1000; i++ {
562+
request := makeWriteRequest(0, 30, 20)
563+
_, err := d.Push(ctx, request)
564+
require.NoError(t, err)
565+
}
566+
567+
// Simulating an unhealthy ingester (ingester 2)
568+
ingesters[0].failResp.Store(httpgrpc.Errorf(500, "InternalServerError"))
569+
ingesters[1].happy.Store(true)
570+
ingesters[2].happy.Store(true)
571+
572+
err := r.KVClient.CAS(context.Background(), ring.IngesterRingKey, func(in interface{}) (interface{}, bool, error) {
573+
r := in.(*ring.Desc)
574+
ingester2 := r.Ingesters["2"]
575+
ingester2.State = ring.LEFT
576+
ingester2.Timestamp = time.Now().Unix()
577+
r.Ingesters["2"] = ingester2
578+
return in, true, nil
579+
})
580+
581+
require.NoError(t, err)
582+
583+
// Give time to the ring get updated with the KV value
584+
time.Sleep(5 * time.Second)
585+
586+
for i := 0; i < 1000; i++ {
587+
request := makeWriteRequest(0, 30, 20)
588+
_, err := d.Push(ctx, request)
589+
require.Error(t, err)
590+
status, ok := status.FromError(err)
591+
require.True(t, ok)
592+
require.Equal(t, codes.Code(500), status.Code())
593+
}
594+
}
595+
493596
func TestDistributor_PushInstanceLimits(t *testing.T) {
494597

495598
type testPush struct {
@@ -1949,7 +2052,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
19492052
ingesters := []mockIngester{}
19502053
for i := 0; i < cfg.happyIngesters; i++ {
19512054
ingesters = append(ingesters, mockIngester{
1952-
happy: true,
2055+
happy: *atomic.NewBool(true),
19532056
queryDelay: cfg.queryDelay,
19542057
})
19552058
}
@@ -1961,7 +2064,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
19612064

19622065
ingesters = append(ingesters, mockIngester{
19632066
queryDelay: cfg.queryDelay,
1964-
failResp: miError,
2067+
failResp: *atomic.NewError(miError),
19652068
})
19662069
}
19672070

@@ -2208,8 +2311,8 @@ type mockIngester struct {
22082311
sync.Mutex
22092312
client.IngesterClient
22102313
grpc_health_v1.HealthClient
2211-
happy bool
2212-
failResp error
2314+
happy atomic.Bool
2315+
failResp atomic.Error
22132316
stats client.UsersStatsResponse
22142317
timeseries map[uint32]*cortexpb.PreallocTimeseries
22152318
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
@@ -2247,8 +2350,8 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt
22472350

22482351
i.trackCall("Push")
22492352

2250-
if !i.happy {
2251-
return nil, i.failResp
2353+
if !i.happy.Load() {
2354+
return nil, i.failResp.Load()
22522355
}
22532356

22542357
if i.timeseries == nil {
@@ -2305,7 +2408,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts
23052408

23062409
i.trackCall("Query")
23072410

2308-
if !i.happy {
2411+
if !i.happy.Load() {
23092412
return nil, errFail
23102413
}
23112414

@@ -2331,7 +2434,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
23312434

23322435
i.trackCall("QueryStream")
23332436

2334-
if !i.happy {
2437+
if !i.happy.Load() {
23352438
return nil, errFail
23362439
}
23372440

@@ -2395,7 +2498,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
23952498

23962499
i.trackCall("MetricsForLabelMatchers")
23972500

2398-
if !i.happy {
2501+
if !i.happy.Load() {
23992502
return nil, errFail
24002503
}
24012504

@@ -2421,7 +2524,7 @@ func (i *mockIngester) MetricsMetadata(ctx context.Context, req *client.MetricsM
24212524

24222525
i.trackCall("MetricsMetadata")
24232526

2424-
if !i.happy {
2527+
if !i.happy.Load() {
24252528
return nil, errFail
24262529
}
24272530

pkg/ring/batch.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"sync"
77

8+
"google.golang.org/grpc/status"
9+
810
"go.uber.org/atomic"
911
)
1012

@@ -25,7 +27,20 @@ type itemTracker struct {
2527
minSuccess int
2628
maxFailures int
2729
succeeded atomic.Int32
28-
failed atomic.Int32
30+
failed4xx atomic.Int32
31+
failed5xx atomic.Int32
32+
remaining atomic.Int32
33+
err atomic.Error
34+
}
35+
36+
func (i *itemTracker) recordError(err error) int32 {
37+
i.err.Store(err)
38+
39+
if status, ok := status.FromError(err); ok && status.Code()/100 == 4 {
40+
return i.failed4xx.Inc()
41+
}
42+
43+
return i.failed5xx.Inc()
2944
}
3045

3146
// DoBatch request against a set of keys in the ring, handling replication and
@@ -62,6 +77,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
6277
}
6378
itemTrackers[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
6479
itemTrackers[i].maxFailures = replicationSet.MaxErrors
80+
itemTrackers[i].remaining.Store(int32(len(replicationSet.Instances)))
6581

6682
for _, desc := range replicationSet.Instances {
6783
curr, found := instances[desc.Addr]
@@ -122,15 +138,28 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
122138
// The use of atomic increments here guarantees only a single sendSamples
123139
// goroutine will write to either channel.
124140
for i := range sampleTrackers {
141+
verifyIfShouldReturn := func() {
142+
if sampleTrackers[i].remaining.Dec() == 0 {
143+
if b.rpcsFailed.Inc() == 1 {
144+
b.err <- sampleTrackers[i].err.Load()
145+
}
146+
}
147+
}
148+
125149
if err != nil {
126-
if sampleTrackers[i].failed.Inc() <= int32(sampleTrackers[i].maxFailures) {
150+
errCount := sampleTrackers[i].recordError(err)
151+
152+
if errCount <= int32(sampleTrackers[i].maxFailures) {
153+
verifyIfShouldReturn()
127154
continue
128155
}
156+
129157
if b.rpcsFailed.Inc() == 1 {
130158
b.err <- err
131159
}
132160
} else {
133161
if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) {
162+
verifyIfShouldReturn()
134163
continue
135164
}
136165
if b.rpcsPending.Dec() == 0 {

0 commit comments

Comments
 (0)