Skip to content

Commit 49632fa

Browse files
authored
Merge pull request #113 from weaveworks/flush-queue-length-metric
Add metric for flush queue length
2 parents 21b1b4c + ccfd65d commit 49632fa

File tree

3 files changed

+137
-16
lines changed

3 files changed

+137
-16
lines changed

ingester/ingester.go

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ import (
1919
)
2020

2121
const (
22-
ingesterSubsystem = "ingester"
23-
maxConcurrentFlushSeries = 100
24-
22+
ingesterSubsystem = "ingester"
2523
discardReasonLabel = "reason"
2624

2725
// Reasons to discard samples.
@@ -40,6 +38,11 @@ var (
4038
"The current number of users in memory.",
4139
nil, nil,
4240
)
41+
flushQueueLengthDesc = prometheus.NewDesc(
42+
"cortex_ingester_flush_queue_length",
43+
"The total number of series pending in the flush queue.",
44+
nil, nil,
45+
)
4346

4447
// ErrOutOfOrderSample is returned if a sample has a timestamp before the latest
4548
// timestamp in the series it is appended to.
@@ -64,7 +67,9 @@ type Ingester struct {
6467
userStateLock sync.Mutex
6568
userState map[string]*userState
6669

67-
flushQueue *priorityQueue
70+
// One queue per flush thread. Fingerprint is used to
71+
// pick a queue.
72+
flushQueues []*priorityQueue
6873

6974
ingestedSamples prometheus.Counter
7075
discardedSamples *prometheus.CounterVec
@@ -136,9 +141,8 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
136141
chunkStore: chunkStore,
137142
quit: make(chan struct{}),
138143

139-
userState: map[string]*userState{},
140-
141-
flushQueue: newPriorityQueue(),
144+
userState: map[string]*userState{},
145+
flushQueues: make([]*priorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
142146

143147
ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
144148
Name: "cortex_ingester_ingested_samples_total",
@@ -186,7 +190,8 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
186190

187191
i.done.Add(cfg.ConcurrentFlushes)
188192
for j := 0; j < cfg.ConcurrentFlushes; j++ {
189-
go i.flushLoop()
193+
i.flushQueues[j] = newPriorityQueue()
194+
go i.flushLoop(j)
190195
}
191196

192197
i.done.Add(1)
@@ -439,7 +444,9 @@ func (i *Ingester) loop() {
439444

440445
// We close flush queue here to ensure the flushLoops pick
441446
// up all the flushes triggered by the last run
442-
i.flushQueue.Close()
447+
for _, flushQueue := range i.flushQueues {
448+
flushQueue.Close()
449+
}
443450

444451
log.Infof("Ingester.loop() exited gracefully")
445452
i.done.Done()
@@ -503,7 +510,7 @@ func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memor
503510

504511
// Decide what chunks to flush
505512
firstTime := series.head().FirstTime()
506-
if immediate || model.Now().Sub(series.head().FirstTime()) > i.cfg.MaxChunkAge {
513+
if immediate || model.Now().Sub(firstTime) > i.cfg.MaxChunkAge {
507514
series.headChunkClosed = true
508515
series.head().MaybePopulateLastTime()
509516
}
@@ -517,17 +524,18 @@ func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memor
517524
return
518525
}
519526

520-
i.flushQueue.Enqueue(&flushOp{firstTime, u.userID, fp, immediate})
527+
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
528+
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, u.userID, fp, immediate})
521529
}
522530

523-
func (i *Ingester) flushLoop() {
531+
func (i *Ingester) flushLoop(j int) {
524532
defer func() {
525533
log.Info("Ingester.flushLoop() exited")
526534
i.done.Done()
527535
}()
528536

529537
for {
530-
o := i.flushQueue.Dequeue()
538+
o := i.flushQueues[j].Dequeue()
531539
if o == nil {
532540
return
533541
}
@@ -551,10 +559,14 @@ func (i *Ingester) flushLoop() {
551559
userState.fpLocker.Lock(op.fp)
552560
chunks := series.chunkDescs
553561
if !series.headChunkClosed {
554-
chunks = chunks[1:]
562+
chunks = chunks[:len(chunks)-1]
555563
}
556564
userState.fpLocker.Unlock(op.fp)
557565

566+
if len(chunks) == 0 {
567+
continue
568+
}
569+
558570
// flush the chunks without locking the series
559571
if err := i.flushChunks(ctx, op.fp, series.metric, chunks); err != nil {
560572
log.Errorf("Failed to flush chunks: %v", err)
@@ -610,6 +622,7 @@ func (i *Ingester) updateRates() {
610622
func (i *Ingester) Describe(ch chan<- *prometheus.Desc) {
611623
ch <- memorySeriesDesc
612624
ch <- memoryUsersDesc
625+
ch <- flushQueueLengthDesc
613626
ch <- i.ingestedSamples.Desc()
614627
i.discardedSamples.Describe(ch)
615628
ch <- i.chunkUtilization.Desc()
@@ -641,6 +654,16 @@ func (i *Ingester) Collect(ch chan<- prometheus.Metric) {
641654
prometheus.GaugeValue,
642655
float64(numUsers),
643656
)
657+
658+
flushQueueLength := 0
659+
for _, flushQueue := range i.flushQueues {
660+
flushQueueLength += flushQueue.Length()
661+
}
662+
ch <- prometheus.MustNewConstMetric(
663+
flushQueueLengthDesc,
664+
prometheus.GaugeValue,
665+
float64(flushQueueLength),
666+
)
644667
ch <- i.ingestedSamples
645668
i.discardedSamples.Collect(ch)
646669
ch <- i.chunkUtilization

ingester/priority_queue.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ type priorityQueue struct {
1515

1616
type op interface {
1717
Key() string
18-
Priority() int64
18+
Priority() int64 // The larger the number the higher the priority.
1919
}
2020

2121
type queue []op
2222

2323
func (q queue) Len() int { return len(q) }
24-
func (q queue) Less(i, j int) bool { return q[i].Priority() < q[j].Priority() }
24+
func (q queue) Less(i, j int) bool { return q[i].Priority() > q[j].Priority() }
2525
func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
2626

2727
// Push and Pop use pointer receivers because they modify the slice's length,
@@ -47,6 +47,12 @@ func newPriorityQueue() *priorityQueue {
4747
return pq
4848
}
4949

50+
func (pq *priorityQueue) Length() int {
51+
pq.lock.Lock()
52+
defer pq.lock.Unlock()
53+
return len(pq.queue)
54+
}
55+
5056
func (pq *priorityQueue) Close() {
5157
pq.lock.Lock()
5258
defer pq.lock.Unlock()
@@ -67,10 +73,13 @@ func (pq *priorityQueue) Enqueue(op op) {
6773
return
6874
}
6975

76+
pq.hit[op.Key()] = struct{}{}
7077
heap.Push(&pq.queue, op)
7178
pq.cond.Broadcast()
7279
}
7380

81+
// Dequeue will return the op with the highest priority; block if queue is
82+
// empty; returns nil if queue is closed.
7483
func (pq *priorityQueue) Dequeue() op {
7584
pq.lock.Lock()
7685
defer pq.lock.Unlock()

ingester/priority_queue_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package ingester
2+
3+
import (
4+
"runtime"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
type item int64
13+
14+
func (i item) Priority() int64 {
15+
return int64(i)
16+
}
17+
18+
func (i item) Key() string {
19+
return strconv.FormatInt(int64(i), 10)
20+
}
21+
22+
func TestPriorityQueueBasic(t *testing.T) {
23+
queue := newPriorityQueue()
24+
assert.Equal(t, 0, queue.Length(), "Expected length = 0")
25+
26+
queue.Enqueue(item(1))
27+
assert.Equal(t, 1, queue.Length(), "Expected length = 1")
28+
29+
i, ok := queue.Dequeue().(item)
30+
assert.True(t, ok, "Expected cast to succeed")
31+
assert.Equal(t, item(1), i, "Expected to dequeue item(1)")
32+
33+
queue.Close()
34+
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
35+
}
36+
37+
func TestPriorityQueuePriorities(t *testing.T) {
38+
queue := newPriorityQueue()
39+
queue.Enqueue(item(1))
40+
queue.Enqueue(item(2))
41+
42+
assert.Equal(t, item(2), queue.Dequeue().(item), "Expected to dequeue item(2)")
43+
assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)")
44+
45+
queue.Close()
46+
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
47+
}
48+
49+
func TestPriorityQueuePriorities2(t *testing.T) {
50+
queue := newPriorityQueue()
51+
queue.Enqueue(item(2))
52+
queue.Enqueue(item(1))
53+
54+
assert.Equal(t, item(2), queue.Dequeue().(item), "Expected to dequeue item(2)")
55+
assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)")
56+
57+
queue.Close()
58+
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
59+
}
60+
61+
func TestPriorityQueueDedupe(t *testing.T) {
62+
queue := newPriorityQueue()
63+
queue.Enqueue(item(1))
64+
queue.Enqueue(item(1))
65+
66+
assert.Equal(t, 1, queue.Length(), "Expected length = 1")
67+
assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)")
68+
69+
queue.Close()
70+
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
71+
}
72+
73+
func TestPriorityQueueWait(t *testing.T) {
74+
queue := newPriorityQueue()
75+
76+
done := make(chan struct{})
77+
go func() {
78+
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
79+
close(done)
80+
}()
81+
82+
runtime.Gosched()
83+
queue.Close()
84+
select {
85+
case <-done:
86+
case <-time.After(100 * time.Millisecond):
87+
t.Fatal("Close didn't unblock Dequeue.")
88+
}
89+
}

0 commit comments

Comments
 (0)