Skip to content

Add metric for flush queue length #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ import (
)

const (
ingesterSubsystem = "ingester"
maxConcurrentFlushSeries = 100

ingesterSubsystem = "ingester"
discardReasonLabel = "reason"

// Reasons to discard samples.
Expand All @@ -40,6 +38,11 @@ var (
"The current number of users in memory.",
nil, nil,
)
flushQueueLengthDesc = prometheus.NewDesc(
"cortex_ingester_flush_queue_length",
"The total number of series pending in the flush queue.",
nil, nil,
)

// ErrOutOfOrderSample is returned if a sample has a timestamp before the latest
// timestamp in the series it is appended to.
Expand All @@ -64,7 +67,9 @@ type Ingester struct {
userStateLock sync.Mutex
userState map[string]*userState

flushQueue *priorityQueue
// One queue per flush thread. Fingerprint is used to
// pick a queue.
flushQueues []*priorityQueue

ingestedSamples prometheus.Counter
discardedSamples *prometheus.CounterVec
Expand Down Expand Up @@ -136,9 +141,8 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
chunkStore: chunkStore,
quit: make(chan struct{}),

userState: map[string]*userState{},

flushQueue: newPriorityQueue(),
userState: map[string]*userState{},
flushQueues: make([]*priorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),

ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_ingested_samples_total",
Expand Down Expand Up @@ -186,7 +190,8 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {

i.done.Add(cfg.ConcurrentFlushes)
for j := 0; j < cfg.ConcurrentFlushes; j++ {
go i.flushLoop()
i.flushQueues[j] = newPriorityQueue()
go i.flushLoop(j)
}

i.done.Add(1)
Expand Down Expand Up @@ -439,7 +444,9 @@ func (i *Ingester) loop() {

// We close flush queue here to ensure the flushLoops pick
// up all the flushes triggered by the last run
i.flushQueue.Close()
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}

log.Infof("Ingester.loop() exited gracefully")
i.done.Done()
Expand Down Expand Up @@ -503,7 +510,7 @@ func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memor

// Decide what chunks to flush
firstTime := series.head().FirstTime()
if immediate || model.Now().Sub(series.head().FirstTime()) > i.cfg.MaxChunkAge {
if immediate || model.Now().Sub(firstTime) > i.cfg.MaxChunkAge {
series.headChunkClosed = true
series.head().MaybePopulateLastTime()
}
Expand All @@ -517,17 +524,18 @@ func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memor
return
}

i.flushQueue.Enqueue(&flushOp{firstTime, u.userID, fp, immediate})
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, u.userID, fp, immediate})
}

func (i *Ingester) flushLoop() {
func (i *Ingester) flushLoop(j int) {
defer func() {
log.Info("Ingester.flushLoop() exited")
i.done.Done()
}()

for {
o := i.flushQueue.Dequeue()
o := i.flushQueues[j].Dequeue()
if o == nil {
return
}
Expand All @@ -551,10 +559,14 @@ func (i *Ingester) flushLoop() {
userState.fpLocker.Lock(op.fp)
chunks := series.chunkDescs
if !series.headChunkClosed {
chunks = chunks[1:]
chunks = chunks[:len(chunks)-1]
}
userState.fpLocker.Unlock(op.fp)

if len(chunks) == 0 {
continue
}

// flush the chunks without locking the series
if err := i.flushChunks(ctx, op.fp, series.metric, chunks); err != nil {
log.Errorf("Failed to flush chunks: %v", err)
Expand Down Expand Up @@ -610,6 +622,7 @@ func (i *Ingester) updateRates() {
func (i *Ingester) Describe(ch chan<- *prometheus.Desc) {
ch <- memorySeriesDesc
ch <- memoryUsersDesc
ch <- flushQueueLengthDesc
ch <- i.ingestedSamples.Desc()
i.discardedSamples.Describe(ch)
ch <- i.chunkUtilization.Desc()
Expand Down Expand Up @@ -641,6 +654,16 @@ func (i *Ingester) Collect(ch chan<- prometheus.Metric) {
prometheus.GaugeValue,
float64(numUsers),
)

flushQueueLength := 0
for _, flushQueue := range i.flushQueues {
flushQueueLength += flushQueue.Length()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not sum(q.Length() for q in i.flushQueues)?

:trollface:

ch <- prometheus.MustNewConstMetric(
flushQueueLengthDesc,
prometheus.GaugeValue,
float64(flushQueueLength),
)
ch <- i.ingestedSamples
i.discardedSamples.Collect(ch)
ch <- i.chunkUtilization
Expand Down
13 changes: 11 additions & 2 deletions ingester/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type priorityQueue struct {

type op interface {
Key() string
Priority() int64
Priority() int64 // The larger the number the higher the priority.
}

type queue []op

func (q queue) Len() int { return len(q) }
func (q queue) Less(i, j int) bool { return q[i].Priority() < q[j].Priority() }
func (q queue) Less(i, j int) bool { return q[i].Priority() > q[j].Priority() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this sort of thing wrong all the time. I think it might help if the interface had a doc comment that said "The larger the number the higher the priority."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }

// Push and Pop use pointer receivers because they modify the slice's length,
Expand All @@ -47,6 +47,12 @@ func newPriorityQueue() *priorityQueue {
return pq
}

func (pq *priorityQueue) Length() int {
pq.lock.Lock()
defer pq.lock.Unlock()
return len(pq.queue)
}

func (pq *priorityQueue) Close() {
pq.lock.Lock()
defer pq.lock.Unlock()
Expand All @@ -67,10 +73,13 @@ func (pq *priorityQueue) Enqueue(op op) {
return
}

pq.hit[op.Key()] = struct{}{}
heap.Push(&pq.queue, op)
pq.cond.Broadcast()
}

// Dequeue will return the op with the highest priority; block if queue is
// empty; returns nil if queue is closed.
func (pq *priorityQueue) Dequeue() op {
pq.lock.Lock()
defer pq.lock.Unlock()
Expand Down
89 changes: 89 additions & 0 deletions ingester/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package ingester

import (
"runtime"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type item int64

func (i item) Priority() int64 {
return int64(i)
}

func (i item) Key() string {
return strconv.FormatInt(int64(i), 10)
}

func TestPriorityQueueBasic(t *testing.T) {
queue := newPriorityQueue()
assert.Equal(t, 0, queue.Length(), "Expected length = 0")

queue.Enqueue(item(1))
assert.Equal(t, 1, queue.Length(), "Expected length = 1")

i, ok := queue.Dequeue().(item)
assert.True(t, ok, "Expected cast to succeed")
assert.Equal(t, item(1), i, "Expected to dequeue item(1)")

queue.Close()
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
}

func TestPriorityQueuePriorities(t *testing.T) {
queue := newPriorityQueue()
queue.Enqueue(item(1))
queue.Enqueue(item(2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest adding another test which is almost the same as this one, except that it enqueues the items in the opposite order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


assert.Equal(t, item(2), queue.Dequeue().(item), "Expected to dequeue item(2)")
assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)")

queue.Close()
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
}

func TestPriorityQueuePriorities2(t *testing.T) {
queue := newPriorityQueue()
queue.Enqueue(item(2))
queue.Enqueue(item(1))

assert.Equal(t, item(2), queue.Dequeue().(item), "Expected to dequeue item(2)")
assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)")

queue.Close()
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
}

func TestPriorityQueueDedupe(t *testing.T) {
queue := newPriorityQueue()
queue.Enqueue(item(1))
queue.Enqueue(item(1))

assert.Equal(t, 1, queue.Length(), "Expected length = 1")
assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)")

queue.Close()
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
}

func TestPriorityQueueWait(t *testing.T) {
queue := newPriorityQueue()

done := make(chan struct{})
go func() {
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
close(done)
}()

runtime.Gosched()
queue.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test has a race condition, I think.

Sometimes it will test:

queue := newPriorityQueue()
queue.Close()
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue on closed queue.")

And other times it will test:

queue := newPriorityQueue()
assert.Nil(t, queue.Dequeue(), "Expect nil dequeue")
// above blocks until the following happens:
queue.Close()

I had a bit of think and couldn't come up with a way of guaranteeing the second ordering, only of making it more likely by adding a delay.

Initially, I wasn't sure which path you wanted to test. I think it would be more clear if you added another test that explicitly dealt with the first case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue.Dequeue() blocks - and will only unblock on .Close(). Thats the test!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's the second example. However, it won't test that all the time. In fact, it might never be testing it because Close() could always be getting called before Dequeue.

I think adding a delay before calling Close is the only even vaguely appropriate "fix", and even then it's only a probabilistic one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah in general this stuff is hard to test in go. Could stick a https://golang.org/pkg/runtime/#Gosched there, but again, only probabilistic.

select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Fatal("Close didn't unblock Dequeue.")
}
}