Skip to content

Commit ededdc2

Browse files
committed
Make reserved querier to match exact priority + change query length gauge vector in user request queue
Signed-off-by: Justin Jung <[email protected]>
1 parent ca3fb58 commit ededdc2

File tree

6 files changed

+160
-55
lines changed

6 files changed

+160
-55
lines changed

pkg/scheduler/queue/queue.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,14 @@ type RequestQueue struct {
6262
queues *queues
6363
stopped bool
6464

65-
queueLength *prometheus.GaugeVec // Per user, priority and type of the queue (fifo or priority).
6665
totalRequests *prometheus.CounterVec // Per user and priority.
6766
discardedRequests *prometheus.CounterVec // Per user and priority.
6867
}
6968

7069
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits, registerer prometheus.Registerer) *RequestQueue {
7170
q := &RequestQueue{
72-
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits),
71+
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits, queueLength),
7372
connectedQuerierWorkers: atomic.NewInt32(0),
74-
queueLength: queueLength,
7573
totalRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
7674
Name: "cortex_request_queue_requests_total",
7775
Help: "Total number of query requests going to the request queue.",
@@ -108,10 +106,6 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl
108106
return errors.New("no queue found")
109107
}
110108

111-
queueType := "fifo"
112-
if q.queues.limits.QueryPriority(userID).Enabled {
113-
queueType = "priority"
114-
}
115109
metricLabels := prometheus.Labels{
116110
"user": userID,
117111
"priority": priority,
@@ -124,11 +118,6 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl
124118
}
125119

126120
queue.enqueueRequest(req)
127-
q.queueLength.With(prometheus.Labels{
128-
"user": userID,
129-
"priority": priority,
130-
"type": queueType,
131-
}).Inc()
132121
q.cond.Broadcast()
133122
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
134123
if successFn != nil {
@@ -170,10 +159,10 @@ FindQueue:
170159

171160
// Pick next request from the queue.
172161
for {
173-
minPriority, checkMinPriority := q.getMinPriorityForQuerier(userID, querierID)
174-
request := queue.dequeueRequest(minPriority, checkMinPriority)
162+
priority, matchPriority := q.getPriorityForQuerier(userID, querierID)
163+
request := queue.dequeueRequest(priority, matchPriority)
175164
if request == nil {
176-
// the queue does not contain request with the min priority, break to wait for more requests
165+
// the queue does not contain request with the min priority, wait for more requests
177166
querierWait = true
178167
goto FindQueue
179168
}
@@ -182,17 +171,6 @@ FindQueue:
182171
q.queues.deleteQueue(userID)
183172
}
184173

185-
queueType := "fifo"
186-
if q.queues.limits.QueryPriority(userID).Enabled {
187-
queueType = "priority"
188-
}
189-
metricLabels := prometheus.Labels{
190-
"user": userID,
191-
"priority": strconv.FormatInt(request.Priority(), 10),
192-
"type": queueType,
193-
}
194-
q.queueLength.With(metricLabels).Dec()
195-
196174
// Tell close() we've processed a request.
197175
q.cond.Broadcast()
198176

@@ -206,7 +184,7 @@ FindQueue:
206184
goto FindQueue
207185
}
208186

209-
func (q *RequestQueue) getMinPriorityForQuerier(userID string, querierID string) (int64, bool) {
187+
func (q *RequestQueue) getPriorityForQuerier(userID string, querierID string) (int64, bool) {
210188
if priority, ok := q.queues.userQueues[userID].reservedQueriers[querierID]; ok {
211189
return priority, true
212190
}

pkg/scheduler/queue/queue_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,10 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {
226226

227227
assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {}))
228228
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {}))
229-
assert.NoError(t, queue.EnqueueRequest("userID", priority2Request, 1, func() {}))
229+
assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {}))
230230

231231
nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1")
232-
assert.Equal(t, priority2Request, nextRequest)
232+
assert.Equal(t, priority1Request, nextRequest)
233233

234234
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1")
235235
assert.Equal(t, priority1Request, nextRequest)
@@ -243,6 +243,18 @@ func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) {
243243
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1")
244244
assert.Nil(t, nextRequest)
245245
assert.Equal(t, 1, queue.queues.userQueues["userID"].queue.length())
246+
247+
assert.NoError(t, queue.EnqueueRequest("userID", priority2Request, 1, func() {}))
248+
249+
ctxTimeout, cancel = context.WithTimeout(ctx, 1*time.Second)
250+
defer cancel()
251+
252+
time.AfterFunc(2*time.Second, func() {
253+
queue.cond.Broadcast()
254+
})
255+
nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1")
256+
assert.Nil(t, nextRequest)
257+
assert.Equal(t, 2, queue.queues.userQueues["userID"].queue.length())
246258
}
247259

248260
func TestExitingRequestsShouldPersistEvenIfTheConfigHasChanged(t *testing.T) {

pkg/scheduler/queue/user_queues.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import (
66
"sort"
77
"time"
88

9-
"github.com/cortexproject/cortex/pkg/util/validation"
9+
"github.com/prometheus/client_golang/prometheus"
1010

1111
"github.com/cortexproject/cortex/pkg/util"
12+
"github.com/cortexproject/cortex/pkg/util/validation"
1213
)
1314

1415
// Limits needed for the Query Scheduler - interface used for decoupling.
@@ -57,6 +58,8 @@ type queues struct {
5758
sortedQueriers []string
5859

5960
limits Limits
61+
62+
queueLength *prometheus.GaugeVec // Per user, type and priority.
6063
}
6164

6265
type userQueue struct {
@@ -79,7 +82,7 @@ type userQueue struct {
7982
index int
8083
}
8184

82-
func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits) *queues {
85+
func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits, queueLength *prometheus.GaugeVec) *queues {
8386
return &queues{
8487
userQueues: map[string]*userQueue{},
8588
users: nil,
@@ -88,6 +91,7 @@ func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limit
8891
queriers: map[string]*querier{},
8992
sortedQueriers: nil,
9093
limits: limits,
94+
queueLength: queueLength,
9195
}
9296
}
9397

@@ -195,7 +199,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue
195199

196200
func (q *queues) createUserRequestQueue(userID string) userRequestQueue {
197201
if q.limits.QueryPriority(userID).Enabled {
198-
return NewPriorityRequestQueue(util.NewPriorityQueue(nil))
202+
return NewPriorityRequestQueue(util.NewPriorityQueue(nil), userID, q.queueLength)
199203
}
200204

201205
queueSize := q.limits.MaxOutstandingPerTenant(userID)
@@ -206,7 +210,7 @@ func (q *queues) createUserRequestQueue(userID string) userRequestQueue {
206210
queueSize = q.maxUserQueueSize
207211
}
208212

209-
return NewFIFORequestQueue(make(chan Request, queueSize))
213+
return NewFIFORequestQueue(make(chan Request, queueSize), userID, q.queueLength)
210214
}
211215

212216
// Finds next queue for the querier. To support fair scheduling between users, client is expected

pkg/scheduler/queue/user_queues_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
func TestQueues(t *testing.T) {
18-
uq := newUserQueues(0, 0, MockLimits{})
18+
uq := newUserQueues(0, 0, MockLimits{}, nil)
1919
assert.NotNil(t, uq)
2020
assert.NoError(t, isConsistent(uq))
2121

@@ -70,7 +70,7 @@ func TestQueues(t *testing.T) {
7070
}
7171

7272
func TestQueuesWithQueriers(t *testing.T) {
73-
uq := newUserQueues(0, 0, MockLimits{})
73+
uq := newUserQueues(0, 0, MockLimits{}, nil)
7474
assert.NotNil(t, uq)
7575
assert.NoError(t, isConsistent(uq))
7676

@@ -147,7 +147,7 @@ func TestQueuesConsistency(t *testing.T) {
147147

148148
for testName, testData := range tests {
149149
t.Run(testName, func(t *testing.T) {
150-
uq := newUserQueues(0, testData.forgetDelay, MockLimits{})
150+
uq := newUserQueues(0, testData.forgetDelay, MockLimits{}, nil)
151151
assert.NotNil(t, uq)
152152
assert.NoError(t, isConsistent(uq))
153153

@@ -196,7 +196,7 @@ func TestQueues_ForgetDelay(t *testing.T) {
196196
)
197197

198198
now := time.Now()
199-
uq := newUserQueues(0, forgetDelay, MockLimits{})
199+
uq := newUserQueues(0, forgetDelay, MockLimits{}, nil)
200200
assert.NotNil(t, uq)
201201
assert.NoError(t, isConsistent(uq))
202202

@@ -288,7 +288,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
288288
)
289289

290290
now := time.Now()
291-
uq := newUserQueues(0, forgetDelay, MockLimits{})
291+
uq := newUserQueues(0, forgetDelay, MockLimits{}, nil)
292292
assert.NotNil(t, uq)
293293
assert.NoError(t, isConsistent(uq))
294294

@@ -357,7 +357,7 @@ func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) {
357357
limits := MockLimits{
358358
MaxOutstanding: 3,
359359
}
360-
q := newUserQueues(0, 0, limits)
360+
q := newUserQueues(0, 0, limits, nil)
361361
q.addQuerierConnection("q-1")
362362
q.addQuerierConnection("q-2")
363363
q.addQuerierConnection("q-3")

pkg/scheduler/queue/user_request_queue.go

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,90 @@
11
package queue
22

3-
import "github.com/cortexproject/cortex/pkg/util"
3+
import (
4+
"strconv"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
8+
"github.com/cortexproject/cortex/pkg/util"
9+
)
410

511
type userRequestQueue interface {
612
enqueueRequest(Request)
7-
dequeueRequest(minPriority int64, checkMinPriority bool) Request
13+
dequeueRequest(int64, bool) Request
814
length() int
915
}
1016

1117
type FIFORequestQueue struct {
12-
queue chan Request
18+
queue chan Request
19+
userID string
20+
queueLength *prometheus.GaugeVec
1321
}
1422

15-
func NewFIFORequestQueue(queue chan Request) *FIFORequestQueue {
16-
return &FIFORequestQueue{queue: queue}
23+
func NewFIFORequestQueue(queue chan Request, userID string, queueLength *prometheus.GaugeVec) *FIFORequestQueue {
24+
return &FIFORequestQueue{queue: queue, userID: userID, queueLength: queueLength}
1725
}
1826

1927
func (f *FIFORequestQueue) enqueueRequest(r Request) {
2028
f.queue <- r
29+
if f.queueLength != nil {
30+
f.queueLength.With(prometheus.Labels{
31+
"user": f.userID,
32+
"priority": strconv.FormatInt(r.Priority(), 10),
33+
"type": "fifo",
34+
}).Inc()
35+
}
2136
}
2237

2338
func (f *FIFORequestQueue) dequeueRequest(_ int64, _ bool) Request {
24-
return <-f.queue
39+
r := <-f.queue
40+
if f.queueLength != nil {
41+
f.queueLength.With(prometheus.Labels{
42+
"user": f.userID,
43+
"priority": strconv.FormatInt(r.Priority(), 10),
44+
"type": "fifo",
45+
}).Dec()
46+
}
47+
return r
2548
}
2649

2750
func (f *FIFORequestQueue) length() int {
2851
return len(f.queue)
2952
}
3053

3154
type PriorityRequestQueue struct {
32-
queue *util.PriorityQueue
55+
queue *util.PriorityQueue
56+
userID string
57+
queueLength *prometheus.GaugeVec
3358
}
3459

35-
func NewPriorityRequestQueue(queue *util.PriorityQueue) *PriorityRequestQueue {
36-
return &PriorityRequestQueue{queue: queue}
60+
func NewPriorityRequestQueue(queue *util.PriorityQueue, userID string, queueLength *prometheus.GaugeVec) *PriorityRequestQueue {
61+
return &PriorityRequestQueue{queue: queue, userID: userID, queueLength: queueLength}
3762
}
3863

3964
func (f *PriorityRequestQueue) enqueueRequest(r Request) {
4065
f.queue.Enqueue(r)
66+
if f.queueLength != nil {
67+
f.queueLength.With(prometheus.Labels{
68+
"user": f.userID,
69+
"priority": strconv.FormatInt(r.Priority(), 10),
70+
"type": "priority",
71+
}).Inc()
72+
}
4173
}
4274

43-
func (f *PriorityRequestQueue) dequeueRequest(minPriority int64, checkMinPriority bool) Request {
44-
if checkMinPriority && f.queue.Peek().Priority() < minPriority {
75+
func (f *PriorityRequestQueue) dequeueRequest(priority int64, matchPriority bool) Request {
76+
if matchPriority && f.queue.Peek().Priority() != priority {
4577
return nil
4678
}
47-
return f.queue.Dequeue()
79+
r := f.queue.Dequeue()
80+
if f.queueLength != nil {
81+
f.queueLength.With(prometheus.Labels{
82+
"user": f.userID,
83+
"priority": strconv.FormatInt(r.Priority(), 10),
84+
"type": "priority",
85+
}).Dec()
86+
}
87+
return r
4888
}
4989

5090
func (f *PriorityRequestQueue) length() int {

0 commit comments

Comments
 (0)