Skip to content

Commit 9944ca1

Browse files
committed
rebase
Signed-off-by: Alan Protasio <[email protected]>
1 parent 9fe7552 commit 9944ca1

File tree

2 files changed

+44
-30
lines changed

2 files changed

+44
-30
lines changed

pkg/distributor/distributor_test.go

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func TestDistributor_Push(t *testing.T) {
262262
limits.IngestionRate = 20
263263
limits.IngestionBurstSize = 20
264264

265-
ds, _, regs := prepare(t, prepConfig{
265+
ds, _, regs, _ := prepare(t, prepConfig{
266266
numIngesters: tc.numIngesters,
267267
happyIngesters: tc.happyIngesters,
268268
numDistributors: 1,
@@ -291,7 +291,7 @@ func TestDistributor_Push(t *testing.T) {
291291
}
292292

293293
func TestDistributor_MetricsCleanup(t *testing.T) {
294-
dists, _, regs := prepare(t, prepConfig{
294+
dists, _, regs, _ := prepare(t, prepConfig{
295295
numDistributors: 1,
296296
})
297297
d := dists[0]
@@ -468,7 +468,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
468468
limits.IngestionBurstSize = testData.ingestionBurstSize
469469

470470
// Start all expected distributors
471-
distributors, _, _ := prepare(t, prepConfig{
471+
distributors, _, _, _ := prepare(t, prepConfig{
472472
numIngesters: 3,
473473
happyIngesters: 3,
474474
numDistributors: testData.distributors,
@@ -499,7 +499,7 @@ func TestPush_QuorumError(t *testing.T) {
499499

500500
limits.IngestionRate = math.MaxFloat64
501501

502-
dists, ingesters, r, _ := prepare(t, prepConfig{
502+
dists, ingesters, _, r := prepare(t, prepConfig{
503503
numDistributors: 1,
504504
numIngesters: 3,
505505
happyIngesters: 0,
@@ -545,7 +545,7 @@ func TestPush_QuorumError(t *testing.T) {
545545
ingesters[1].failResp.Store(httpgrpc.Errorf(429, "Throttling"))
546546
ingesters[2].happy.Store(true)
547547

548-
for i := 0; i < 1000; i++ {
548+
for i := 0; i < 1; i++ {
549549
request := makeWriteRequest(0, 30, 20)
550550
_, err := d.Push(ctx, request)
551551
status, ok := status.FromError(err)
@@ -558,7 +558,7 @@ func TestPush_QuorumError(t *testing.T) {
558558
ingesters[1].happy.Store(true)
559559
ingesters[2].happy.Store(true)
560560

561-
for i := 0; i < 1000; i++ {
561+
for i := 0; i < 1; i++ {
562562
request := makeWriteRequest(0, 30, 20)
563563
_, err := d.Push(ctx, request)
564564
require.NoError(t, err)
@@ -569,7 +569,19 @@ func TestPush_QuorumError(t *testing.T) {
569569
ingesters[1].happy.Store(true)
570570
ingesters[2].happy.Store(true)
571571

572-
err := r.KVClient.CAS(context.Background(), ring.IngesterRingKey, func(in interface{}) (interface{}, bool, error) {
572+
// Wait group to check when the ring got updated
573+
wg := &sync.WaitGroup{}
574+
wg.Add(1)
575+
576+
go func() {
577+
r.KVClient.WatchKey(context.Background(), ingester.RingKey, func(i interface{}) bool {
578+
wg.Done()
579+
// False will terminate the watch
580+
return false
581+
})
582+
}()
583+
584+
err := r.KVClient.CAS(context.Background(), ingester.RingKey, func(in interface{}) (interface{}, bool, error) {
573585
r := in.(*ring.Desc)
574586
ingester2 := r.Ingesters["2"]
575587
ingester2.State = ring.LEFT
@@ -581,7 +593,7 @@ func TestPush_QuorumError(t *testing.T) {
581593
require.NoError(t, err)
582594

583595
// Give time to the ring get updated with the KV value
584-
time.Sleep(time.Second)
596+
wg.Wait()
585597

586598
for i := 0; i < 1000; i++ {
587599
request := makeWriteRequest(0, 30, 20)
@@ -707,7 +719,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
707719
flagext.DefaultValues(limits)
708720

709721
// Start all expected distributors
710-
distributors, _, regs := prepare(t, prepConfig{
722+
distributors, _, regs, _ := prepare(t, prepConfig{
711723
numIngesters: 3,
712724
happyIngesters: 3,
713725
numDistributors: 1,
@@ -799,7 +811,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
799811
limits.AcceptHASamples = true
800812
limits.MaxLabelValueLength = 15
801813

802-
ds, _, _ := prepare(t, prepConfig{
814+
ds, _, _, _ := prepare(t, prepConfig{
803815
numIngesters: 3,
804816
happyIngesters: 3,
805817
numDistributors: 1,
@@ -963,7 +975,7 @@ func TestDistributor_PushQuery(t *testing.T) {
963975

964976
for _, tc := range testcases {
965977
t.Run(tc.name, func(t *testing.T) {
966-
ds, ingesters, _ := prepare(t, prepConfig{
978+
ds, ingesters, _, _ := prepare(t, prepConfig{
967979
numIngesters: tc.numIngesters,
968980
happyIngesters: tc.happyIngesters,
969981
numDistributors: 1,
@@ -1014,7 +1026,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
10141026
limits.MaxChunksPerQuery = maxChunksLimit
10151027

10161028
// Prepare distributors.
1017-
ds, _, _ := prepare(t, prepConfig{
1029+
ds, _, _, _ := prepare(t, prepConfig{
10181030
numIngesters: 3,
10191031
happyIngesters: 3,
10201032
numDistributors: 1,
@@ -1070,7 +1082,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
10701082
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0))
10711083

10721084
// Prepare distributors.
1073-
ds, _, _ := prepare(t, prepConfig{
1085+
ds, _, _, _ := prepare(t, prepConfig{
10741086
numIngesters: 3,
10751087
happyIngesters: 3,
10761088
numDistributors: 1,
@@ -1123,7 +1135,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
11231135
// Prepare distributors.
11241136
// Use replication factor of 2 to always read all the chunks from both ingesters,
11251137
// this guarantees us to always read the same chunks and have a stable test.
1126-
ds, _, _ := prepare(t, prepConfig{
1138+
ds, _, _, _ := prepare(t, prepConfig{
11271139
numIngesters: 2,
11281140
happyIngesters: 2,
11291141
numDistributors: 1,
@@ -1245,7 +1257,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
12451257
limits.DropLabels = tc.removeLabels
12461258
limits.AcceptHASamples = tc.removeReplica
12471259

1248-
ds, ingesters, _ := prepare(t, prepConfig{
1260+
ds, ingesters, _, _ := prepare(t, prepConfig{
12491261
numIngesters: 2,
12501262
happyIngesters: 2,
12511263
numDistributors: 1,
@@ -1296,7 +1308,7 @@ func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T)
12961308
limits.DropLabels = tc.removeLabels
12971309
limits.AcceptHASamples = tc.removeReplica
12981310

1299-
ds, _, _ := prepare(t, prepConfig{
1311+
ds, _, _, _ := prepare(t, prepConfig{
13001312
numIngesters: 2,
13011313
happyIngesters: 2,
13021314
numDistributors: 1,
@@ -1390,7 +1402,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
13901402

13911403
for testName, testData := range tests {
13921404
t.Run(testName, func(t *testing.T) {
1393-
ds, ingesters, _ := prepare(t, prepConfig{
1405+
ds, ingesters, _, _ := prepare(t, prepConfig{
13941406
numIngesters: 2,
13951407
happyIngesters: 2,
13961408
numDistributors: 1,
@@ -1450,7 +1462,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
14501462

14511463
for testName, tc := range tests {
14521464
t.Run(testName, func(t *testing.T) {
1453-
ds, _, _ := prepare(t, prepConfig{
1465+
ds, _, _, _ := prepare(t, prepConfig{
14541466
numIngesters: 2,
14551467
happyIngesters: 2,
14561468
numDistributors: 1,
@@ -1512,7 +1524,7 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) {
15121524

15131525
for testName, tc := range tests {
15141526
t.Run(testName, func(t *testing.T) {
1515-
ds, _, _ := prepare(t, prepConfig{
1527+
ds, _, _, _ := prepare(t, prepConfig{
15161528
numIngesters: 2,
15171529
happyIngesters: 2,
15181530
numDistributors: 1,
@@ -1813,7 +1825,7 @@ func TestSlowQueries(t *testing.T) {
18131825
expectedErr = errFail
18141826
}
18151827

1816-
ds, _, _ := prepare(t, prepConfig{
1828+
ds, _, _, _ := prepare(t, prepConfig{
18171829
numIngesters: nIngesters,
18181830
happyIngesters: happy,
18191831
numDistributors: 1,
@@ -1922,7 +1934,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
19221934
now := model.Now()
19231935

19241936
// Create distributor
1925-
ds, ingesters, _ := prepare(t, prepConfig{
1937+
ds, ingesters, _, _ := prepare(t, prepConfig{
19261938
numIngesters: numIngesters,
19271939
happyIngesters: numIngesters,
19281940
numDistributors: 1,
@@ -1980,7 +1992,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
19801992
for testName, testData := range tests {
19811993
t.Run(testName, func(t *testing.T) {
19821994
// Create distributor
1983-
ds, ingesters, _ := prepare(t, prepConfig{
1995+
ds, ingesters, _, _ := prepare(t, prepConfig{
19841996
numIngesters: numIngesters,
19851997
happyIngesters: numIngesters,
19861998
numDistributors: 1,
@@ -2048,7 +2060,7 @@ type prepConfig struct {
20482060
errFail error
20492061
}
20502062

2051-
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry) {
2063+
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) {
20522064
ingesters := []mockIngester{}
20532065
for i := 0; i < cfg.happyIngesters; i++ {
20542066
ingesters = append(ingesters, mockIngester{
@@ -2186,7 +2198,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p
21862198

21872199
t.Cleanup(func() { stopAll(distributors, ingestersRing) })
21882200

2189-
return distributors, ingesters, registries
2201+
return distributors, ingesters, registries, ingestersRing
21902202
}
21912203

21922204
func stopAll(ds []*Distributor, r *ring.Ring) {
@@ -2684,7 +2696,7 @@ func TestDistributorValidation(t *testing.T) {
26842696
limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour)
26852697
limits.MaxLabelNamesPerSeries = 2
26862698

2687-
ds, _, _ := prepare(t, prepConfig{
2699+
ds, _, _, _ := prepare(t, prepConfig{
26882700
numIngesters: 3,
26892701
happyIngesters: 3,
26902702
numDistributors: 1,
@@ -2865,7 +2877,7 @@ func TestDistributor_Push_Relabel(t *testing.T) {
28652877
flagext.DefaultValues(&limits)
28662878
limits.MetricRelabelConfigs = tc.metricRelabelConfigs
28672879

2868-
ds, ingesters, _ := prepare(t, prepConfig{
2880+
ds, ingesters, _, _ := prepare(t, prepConfig{
28692881
numIngesters: 2,
28702882
happyIngesters: 2,
28712883
numDistributors: 1,
@@ -2916,7 +2928,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
29162928
flagext.DefaultValues(&limits)
29172929
limits.MetricRelabelConfigs = metricRelabelConfigs
29182930

2919-
ds, ingesters, regs := prepare(t, prepConfig{
2931+
ds, ingesters, regs, _ := prepare(t, prepConfig{
29202932
numIngesters: 2,
29212933
happyIngesters: 2,
29222934
numDistributors: 1,

pkg/ring/batch.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,23 +144,25 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
144144
// We should return an error if we reach the maxFailure (quorum) on a give error family OR
145145
// we dont have any remaining ingesters to try
146146
// Ex: 2xx, 4xx, 5xx -> return 5xx
147-
// Ex: 4xx, 4xx, 2xx -> return 4xx
147+
// Ex: 4xx, 4xx, _ -> return 4xx
148+
// Ex: 5xx, _, 5xx -> return 5xx
148149
if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 {
149150
if b.rpcsFailed.Inc() == 1 {
150151
b.err <- err
151152
}
152153
}
153154
} else {
154-
// We should return success if we succeeded calling `minSuccess` ingesters
155+
// We should return success if we succeeded calling `minSuccess` ingesters.
155156
if sampleTrackers[i].succeeded.Inc() >= int32(sampleTrackers[i].minSuccess) {
156157
if b.rpcsPending.Dec() == 0 {
157158
b.done <- struct{}{}
158159
}
159160
continue
160161
}
161162

162-
// If we suceeded to call this particular ingester but we dont have any remaining ingesters to try
163+
// If we succeeded to call this particular ingester but we dont have any remaining ingesters to try
163164
// and we did not succeeded calling `minSuccess` ingesters we need to return the last error
165+
// Ex: 4xx, 5xx, 2xx
164166
if sampleTrackers[i].remaining.Dec() == 0 {
165167
if b.rpcsFailed.Inc() == 1 {
166168
b.err <- sampleTrackers[i].err.Load()

0 commit comments

Comments
 (0)