diff --git a/CHANGELOG.md b/CHANGELOG.md index 98a292a883..e46dd9069a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,11 @@ ## master / unreleased * [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 * [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 +* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783 * [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787 ## 1.13.0 2022-07-14 + * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 * [CHANGE] query-frontend: Do not print anything in the logs of `query-frontend` if a in-progress query has been canceled (context canceled) to avoid spam. #4562 * [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597 diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4fc49fa635..907e14ecf0 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -554,7 +554,7 @@ func TestPush_QuorumError(t *testing.T) { _, err := d.Push(ctx, request) status, ok := status.FromError(err) require.True(t, ok) - require.True(t, status.Code() == 429 || status.Code() == 500) + require.Equal(t, codes.Code(429), status.Code()) } // Simulating 1 error -> Should return 2xx diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 26d4cb203a..db3aa4800f 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -30,19 +30,29 @@ type itemTracker struct { failed4xx atomic.Int32 failed5xx atomic.Int32 remaining atomic.Int32 - err atomic.Error + err4xx atomic.Error + err5xx atomic.Error } func (i *itemTracker) recordError(err error) int32 { - i.err.Store(err) if status, ok := status.FromError(err); ok && status.Code()/100 == 4 { + i.err4xx.Store(err) return i.failed4xx.Inc() } + i.err5xx.Store(err) return i.failed5xx.Inc() } +func (i *itemTracker) getError() error { + if i.failed5xx.Load() > i.failed4xx.Load() { + return i.err5xx.Load() + } + + return i.err4xx.Load() +} + // DoBatch request against a set of keys in the ring, handling replication and // failures. For example if we want to write N items where they may all // hit different instances, and we want them all replicated R ways with @@ -142,12 +152,13 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { errCount := sampleTrackers[i].recordError(err) // We should return an error if we reach the maxFailure (quorum) on a given error family OR // we dont have any remaining ingesters to try - // Ex: 2xx, 4xx, 5xx -> return 5xx + // Ex: 2xx, 4xx, 5xx -> return 4xx + // Ex: 2xx, 5xx, 4xx -> return 4xx // Ex: 4xx, 4xx, _ -> return 4xx // Ex: 5xx, _, 5xx -> return 5xx if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- err + b.err <- sampleTrackers[i].getError() } } } else { @@ -165,7 +176,7 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // Ex: 4xx, 5xx, 2xx if sampleTrackers[i].remaining.Dec() == 0 { if b.rpcsFailed.Inc() == 1 { - b.err <- sampleTrackers[i].err.Load() + b.err <- sampleTrackers[i].getError() } } }