diff --git a/CHANGELOG.md b/CHANGELOG.md index 9aff092718..0a45219499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 * [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366 - +* [BUGFIX] Querier: After query-frontend restart, querier may have lower than configured concurrency. #4417 ## 1.10.0 / 2021-08-03 diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 6811c26f34..c22856d3f7 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -208,6 +208,8 @@ func (w *querierWorker) AddressRemoved(address string) { w.mu.Lock() p := w.managers[address] delete(w.managers, address) + // Called with lock. + w.resetConcurrency() w.mu.Unlock() if p != nil { diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index d36c6c7565..7f3d0b4ae9 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -3,6 +3,7 @@ package worker import ( "context" "fmt" + "math/rand" "testing" "time" @@ -17,46 +18,52 @@ import ( func TestResetConcurrency(t *testing.T) { tests := []struct { - name string - parallelism int - maxConcurrent int - numTargets int - expectedConcurrency int + name string + parallelism int + maxConcurrent int + numTargets int + expectedConcurrency int + expectedConcurrencyAfterTargetRemoval int }{ { - name: "Test create at least one processor per target", - parallelism: 0, - maxConcurrent: 0, - numTargets: 2, - expectedConcurrency: 2, + name: "Test create at least one processor per target", + parallelism: 0, + maxConcurrent: 0, + numTargets: 2, + expectedConcurrency: 2, + expectedConcurrencyAfterTargetRemoval: 1, }, { - name: "Test parallelism per target", - parallelism: 4, - maxConcurrent: 0, - numTargets: 2, - expectedConcurrency: 8, + name: "Test parallelism per target", + parallelism: 4, + maxConcurrent: 0, + numTargets: 2, + expectedConcurrency: 8, + expectedConcurrencyAfterTargetRemoval: 4, }, { - name: "Test Total Parallelism with a remainder", - parallelism: 1, - maxConcurrent: 7, - numTargets: 4, - expectedConcurrency: 7, + name: "Test Total Parallelism with a remainder", + parallelism: 1, + maxConcurrent: 7, + numTargets: 4, + expectedConcurrency: 7, + expectedConcurrencyAfterTargetRemoval: 7, }, { - name: "Test Total Parallelism dividing evenly", - parallelism: 1, - maxConcurrent: 6, - numTargets: 2, - expectedConcurrency: 6, + name: "Test Total Parallelism dividing evenly", + parallelism: 1, + maxConcurrent: 6, + numTargets: 2, + expectedConcurrency: 6, + expectedConcurrencyAfterTargetRemoval: 6, }, { - name: "Test Total Parallelism at least one worker per target", - parallelism: 1, - maxConcurrent: 3, - numTargets: 6, - expectedConcurrency: 6, + name: "Test Total Parallelism at least one worker per target", + parallelism: 1, + maxConcurrent: 3, + numTargets: 6, + expectedConcurrency: 6, + expectedConcurrencyAfterTargetRemoval: 5, }, } @@ -82,6 +89,12 @@ func TestResetConcurrency(t *testing.T) { return getConcurrentProcessors(w) }) + // now we remove an address and ensure we still have the expected concurrency + w.AddressRemoved(fmt.Sprintf("127.0.0.1:%d", rand.Intn(tt.numTargets))) + test.Poll(t, 250*time.Millisecond, tt.expectedConcurrencyAfterTargetRemoval, func() interface{} { + return getConcurrentProcessors(w) + }) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), w)) assert.Equal(t, 0, getConcurrentProcessors(w)) })