From bf5a9f11ce761d43026104cdff97063962c08e23 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Tue, 10 Aug 2021 16:40:46 -0700 Subject: [PATCH 1/5] Fix bug where querier may not be able to achieve max-concurrent Signed-off-by: Alvin Lin --- pkg/querier/worker/worker.go | 2 + pkg/querier/worker/worker_test.go | 73 ++++++++++++++++++------------- 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 6811c26f34..c22856d3f7 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -208,6 +208,8 @@ func (w *querierWorker) AddressRemoved(address string) { w.mu.Lock() p := w.managers[address] delete(w.managers, address) + // Called with lock. + w.resetConcurrency() w.mu.Unlock() if p != nil { diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index d36c6c7565..7ad4599ca2 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -3,6 +3,7 @@ package worker import ( "context" "fmt" + "math/rand" "testing" "time" @@ -17,46 +18,52 @@ import ( func TestResetConcurrency(t *testing.T) { tests := []struct { - name string - parallelism int - maxConcurrent int - numTargets int - expectedConcurrency int + name string + parallelism int + maxConcurrent int + numTargets int + expectedConcurrency int + exepctedConcurrencyAfterTargetRemoval int }{ { - name: "Test create at least one processor per target", - parallelism: 0, - maxConcurrent: 0, - numTargets: 2, - expectedConcurrency: 2, + name: "Test create at least one processor per target", + parallelism: 0, + maxConcurrent: 0, + numTargets: 2, + expectedConcurrency: 2, + exepctedConcurrencyAfterTargetRemoval: 1, }, { - name: "Test parallelism per target", - parallelism: 4, - maxConcurrent: 0, - numTargets: 2, - expectedConcurrency: 8, + name: "Test parallelism per target", + parallelism: 4, + maxConcurrent: 0, + numTargets: 2, + expectedConcurrency: 8, + exepctedConcurrencyAfterTargetRemoval: 4, }, { - name: "Test Total Parallelism with a remainder", - parallelism: 1, - maxConcurrent: 7, - numTargets: 4, - expectedConcurrency: 7, + name: "Test Total Parallelism with a remainder", + parallelism: 1, + maxConcurrent: 7, + numTargets: 4, + expectedConcurrency: 7, + exepctedConcurrencyAfterTargetRemoval: 7, }, { - name: "Test Total Parallelism dividing evenly", - parallelism: 1, - maxConcurrent: 6, - numTargets: 2, - expectedConcurrency: 6, + name: "Test Total Parallelism dividing evenly", + parallelism: 1, + maxConcurrent: 6, + numTargets: 2, + expectedConcurrency: 6, + exepctedConcurrencyAfterTargetRemoval: 6, }, { - name: "Test Total Parallelism at least one worker per target", - parallelism: 1, - maxConcurrent: 3, - numTargets: 6, - expectedConcurrency: 6, + name: "Test Total Parallelism at least one worker per target", + parallelism: 1, + maxConcurrent: 3, + numTargets: 6, + expectedConcurrency: 6, + exepctedConcurrencyAfterTargetRemoval: 5, }, } @@ -82,6 +89,12 @@ func TestResetConcurrency(t *testing.T) { return getConcurrentProcessors(w) }) + // now we remove an address and ensure we still have the expected concurrency + w.AddressRemoved(fmt.Sprintf("127.0.0.1:%d", rand.Intn(tt.numTargets))) + test.Poll(t, 250*time.Millisecond, tt.exepctedConcurrencyAfterTargetRemoval, func() interface{} { + return getConcurrentProcessors(w) + }) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), w)) assert.Equal(t, 0, getConcurrentProcessors(w)) }) From 052efec4d2e1acf712fac3335192344e03db88db Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Tue, 10 Aug 2021 16:49:32 -0700 Subject: [PATCH 2/5] Update change log Signed-off-by: Alvin Lin --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9aff092718..c0ec8a423d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 * [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366 - +* [BUGFIX] Querier: Fix bug where querier may not be able to achieve max-concurrent. #4417 ## 1.10.0 / 2021-08-03 From 4e07179fd668e4e77f90e8e1f1bcb5aeb612f3a0 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Wed, 11 Aug 2021 10:12:58 -0700 Subject: [PATCH 3/5] Update CHANGELOG.md Co-authored-by: Bryan Boreham Signed-off-by: Alvin Lin --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c0ec8a423d..8a9a26b951 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 * [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366 -* [BUGFIX] Querier: Fix bug where querier may not be able to achieve max-concurrent. #4417 +* [BUGFIX] Querier can have lower concurrency after query-frontend restart. #4417 ## 1.10.0 / 2021-08-03 From c805e33fc7771645b78885456d6561d42e20d56a Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Wed, 11 Aug 2021 10:13:59 -0700 Subject: [PATCH 4/5] Update pkg/querier/worker/worker_test.go Co-authored-by: Arve Knudsen Signed-off-by: Alvin Lin --- pkg/querier/worker/worker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index 7ad4599ca2..41c598ed4b 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -23,7 +23,7 @@ func TestResetConcurrency(t *testing.T) { maxConcurrent int numTargets int expectedConcurrency int - exepctedConcurrencyAfterTargetRemoval int + expectedConcurrencyAfterTargetRemoval int }{ { name: "Test create at least one processor per target", From 2fbbe42a95bbc517d9fc2b0ab466b8ebbbdb3014 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Wed, 11 Aug 2021 10:23:39 -0700 Subject: [PATCH 5/5] Address PR comments Signed-off-by: Alvin Lin --- CHANGELOG.md | 2 +- pkg/querier/worker/worker_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a9a26b951..0a45219499 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ * [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 * [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304 * [BUGFIX] Ingester: fixed ingester stuck on start up (LEAVING ring state) when `-ingester.heartbeat-period=0` and `-ingester.unregister-on-shutdown=false`. #4366 -* [BUGFIX] Querier can have lower concurrency after query-frontend restart. #4417 +* [BUGFIX] Querier: After query-frontend restart, querier may have lower than configured concurrency. #4417 ## 1.10.0 / 2021-08-03 diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index 41c598ed4b..7f3d0b4ae9 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -31,7 +31,7 @@ func TestResetConcurrency(t *testing.T) { maxConcurrent: 0, numTargets: 2, expectedConcurrency: 2, - exepctedConcurrencyAfterTargetRemoval: 1, + expectedConcurrencyAfterTargetRemoval: 1, }, { name: "Test parallelism per target", @@ -39,7 +39,7 @@ func TestResetConcurrency(t *testing.T) { maxConcurrent: 0, numTargets: 2, expectedConcurrency: 8, - exepctedConcurrencyAfterTargetRemoval: 4, + expectedConcurrencyAfterTargetRemoval: 4, }, { name: "Test Total Parallelism with a remainder", @@ -47,7 +47,7 @@ func TestResetConcurrency(t *testing.T) { maxConcurrent: 7, numTargets: 4, expectedConcurrency: 7, - exepctedConcurrencyAfterTargetRemoval: 7, + expectedConcurrencyAfterTargetRemoval: 7, }, { name: "Test Total Parallelism dividing evenly", @@ -55,7 +55,7 @@ func TestResetConcurrency(t *testing.T) { maxConcurrent: 6, numTargets: 2, expectedConcurrency: 6, - exepctedConcurrencyAfterTargetRemoval: 6, + expectedConcurrencyAfterTargetRemoval: 6, }, { name: "Test Total Parallelism at least one worker per target", @@ -63,7 +63,7 @@ func TestResetConcurrency(t *testing.T) { maxConcurrent: 3, numTargets: 6, expectedConcurrency: 6, - exepctedConcurrencyAfterTargetRemoval: 5, + expectedConcurrencyAfterTargetRemoval: 5, }, } @@ -91,7 +91,7 @@ func TestResetConcurrency(t *testing.T) { // now we remove an address and ensure we still have the expected concurrency w.AddressRemoved(fmt.Sprintf("127.0.0.1:%d", rand.Intn(tt.numTargets))) - test.Poll(t, 250*time.Millisecond, tt.exepctedConcurrencyAfterTargetRemoval, func() interface{} { + test.Poll(t, 250*time.Millisecond, tt.expectedConcurrencyAfterTargetRemoval, func() interface{} { return getConcurrentProcessors(w) })