Skip to content

Commit 98052c7

Browse files
authored
Make querier_sharding_test less brittle (#4970)
* Make querier_sharding_test less brittle Signed-off-by: Alvin Lin <[email protected]> * Make querier_sharding_test less brittle Signed-off-by: Alvin Lin <[email protected]> * rate limit query per second to avoid 429 Signed-off-by: Alvin Lin <[email protected]> * rate limit query per second to avoid 429 Signed-off-by: Alvin Lin <[email protected]> * Better var name la Signed-off-by: Alvin Lin <[email protected]> Signed-off-by: Alvin Lin <[email protected]>
1 parent 0a1c112 commit 98052c7

File tree

1 file changed

+37
-16
lines changed

1 file changed

+37
-16
lines changed

integration/querier_sharding_test.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ func TestQuerierNoShardingWithQueryScheduler(t *testing.T) {
5555

5656
func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
5757
// Going to high starts hitting file descriptor limit, since we run all queriers concurrently.
58-
const numQueries = 100
58+
const batchSize = 100
59+
const numQueries = 500
5960

6061
s, err := e2e.NewScenario(networkName)
6162
require.NoError(t, err)
@@ -141,26 +142,30 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
141142
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients"))
142143
}
143144

145+
batches := generateBatches(batchSize, numQueries)
146+
144147
wg := sync.WaitGroup{}
145148

146149
// Run all queries concurrently to get better distribution of requests between queriers.
147-
for i := 0; i < numQueries; i++ {
148-
wg.Add(1)
149-
150-
go func() {
151-
defer wg.Done()
152-
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID)
153-
require.NoError(t, err)
154-
155-
result, err := c.Query("series_1", now)
156-
require.NoError(t, err)
157-
require.Equal(t, model.ValVector, result.Type())
158-
assert.Equal(t, expectedVector, result.(model.Vector))
159-
}()
150+
for _, concurrentQueries := range batches {
151+
for i := 0; i < concurrentQueries; i++ {
152+
wg.Add(1)
153+
154+
go func() {
155+
defer wg.Done()
156+
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID)
157+
require.NoError(t, err)
158+
159+
result, err := c.Query("series_1", now)
160+
require.NoError(t, err)
161+
require.Equal(t, model.ValVector, result.Type())
162+
assert.Equal(t, expectedVector, result.(model.Vector))
163+
}()
164+
}
165+
166+
wg.Wait()
160167
}
161168

162-
wg.Wait()
163-
164169
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numQueries), "cortex_query_frontend_queries_total"))
165170

166171
// Verify that only single querier handled all the queries when sharding is enabled, otherwise queries have been fairly distributed across queriers.
@@ -194,3 +199,19 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
194199
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)
195200
assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler)
196201
}
202+
203+
func generateBatches(batchSize, total int) []int {
204+
remain := total
205+
batches := []int{}
206+
for remain != 0 {
207+
if remain > batchSize {
208+
batches = append(batches, batchSize)
209+
remain -= batchSize
210+
} else {
211+
batches = append(batches, remain)
212+
remain = 0
213+
}
214+
}
215+
216+
return batches
217+
}

0 commit comments

Comments
 (0)