diff --git a/ingester/ingester.go b/ingester/ingester.go index c597667a72..09dc7edb1b 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -19,9 +19,7 @@ import ( ) const ( - ingesterSubsystem = "ingester" - maxConcurrentFlushSeries = 100 - + ingesterSubsystem = "ingester" discardReasonLabel = "reason" // Reasons to discard samples. @@ -40,6 +38,11 @@ var ( "The current number of users in memory.", nil, nil, ) + flushQueueLengthDesc = prometheus.NewDesc( + "cortex_ingester_flush_queue_length", + "The total number of series pending in the flush queue.", + nil, nil, + ) // ErrOutOfOrderSample is returned if a sample has a timestamp before the latest // timestamp in the series it is appended to. @@ -64,7 +67,9 @@ type Ingester struct { userStateLock sync.Mutex userState map[string]*userState - flushQueue *priorityQueue + // One queue per flush thread. Fingerprint is used to + // pick a queue. + flushQueues []*priorityQueue ingestedSamples prometheus.Counter discardedSamples *prometheus.CounterVec @@ -136,9 +141,8 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) { chunkStore: chunkStore, quit: make(chan struct{}), - userState: map[string]*userState{}, - - flushQueue: newPriorityQueue(), + userState: map[string]*userState{}, + flushQueues: make([]*priorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes), ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_ingested_samples_total", @@ -186,7 +190,8 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) { i.done.Add(cfg.ConcurrentFlushes) for j := 0; j < cfg.ConcurrentFlushes; j++ { - go i.flushLoop() + i.flushQueues[j] = newPriorityQueue() + go i.flushLoop(j) } i.done.Add(1) @@ -439,7 +444,9 @@ func (i *Ingester) loop() { // We close flush queue here to ensure the flushLoops pick // up all the flushes triggered by the last run - i.flushQueue.Close() + for _, flushQueue := range i.flushQueues { + flushQueue.Close() + } log.Infof("Ingester.loop() exited gracefully") i.done.Done() @@ -503,7 +510,7 @@ func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memor // Decide what chunks to flush firstTime := series.head().FirstTime() - if immediate || model.Now().Sub(series.head().FirstTime()) > i.cfg.MaxChunkAge { + if immediate || model.Now().Sub(firstTime) > i.cfg.MaxChunkAge { series.headChunkClosed = true series.head().MaybePopulateLastTime() } @@ -517,17 +524,18 @@ func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memor return } - i.flushQueue.Enqueue(&flushOp{firstTime, u.userID, fp, immediate}) + flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) + i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, u.userID, fp, immediate}) } -func (i *Ingester) flushLoop() { +func (i *Ingester) flushLoop(j int) { defer func() { log.Info("Ingester.flushLoop() exited") i.done.Done() }() for { - o := i.flushQueue.Dequeue() + o := i.flushQueues[j].Dequeue() if o == nil { return } @@ -551,10 +559,14 @@ func (i *Ingester) flushLoop() { userState.fpLocker.Lock(op.fp) chunks := series.chunkDescs if !series.headChunkClosed { - chunks = chunks[1:] + chunks = chunks[:len(chunks)-1] } userState.fpLocker.Unlock(op.fp) + if len(chunks) == 0 { + continue + } + // flush the chunks without locking the series if err := i.flushChunks(ctx, op.fp, series.metric, chunks); err != nil { log.Errorf("Failed to flush chunks: %v", err) @@ -610,6 +622,7 @@ func (i *Ingester) updateRates() { func (i *Ingester) Describe(ch chan<- *prometheus.Desc) { ch <- memorySeriesDesc ch <- memoryUsersDesc + ch <- flushQueueLengthDesc ch <- i.ingestedSamples.Desc() i.discardedSamples.Describe(ch) ch <- i.chunkUtilization.Desc() @@ -641,6 +654,16 @@ func (i *Ingester) Collect(ch chan<- prometheus.Metric) { prometheus.GaugeValue, float64(numUsers), ) + + flushQueueLength := 0 + for _, flushQueue := range i.flushQueues { + flushQueueLength += flushQueue.Length() + } + ch <- prometheus.MustNewConstMetric( + flushQueueLengthDesc, + prometheus.GaugeValue, + float64(flushQueueLength), + ) ch <- i.ingestedSamples i.discardedSamples.Collect(ch) ch <- i.chunkUtilization diff --git a/ingester/priority_queue.go b/ingester/priority_queue.go index 28f2329aad..6044b26921 100644 --- a/ingester/priority_queue.go +++ b/ingester/priority_queue.go @@ -15,13 +15,13 @@ type priorityQueue struct { type op interface { Key() string - Priority() int64 + Priority() int64 // The larger the number the higher the priority. } type queue []op func (q queue) Len() int { return len(q) } -func (q queue) Less(i, j int) bool { return q[i].Priority() < q[j].Priority() } +func (q queue) Less(i, j int) bool { return q[i].Priority() > q[j].Priority() } func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } // Push and Pop use pointer receivers because they modify the slice's length, @@ -47,6 +47,12 @@ func newPriorityQueue() *priorityQueue { return pq } +func (pq *priorityQueue) Length() int { + pq.lock.Lock() + defer pq.lock.Unlock() + return len(pq.queue) +} + func (pq *priorityQueue) Close() { pq.lock.Lock() defer pq.lock.Unlock() @@ -67,10 +73,13 @@ func (pq *priorityQueue) Enqueue(op op) { return } + pq.hit[op.Key()] = struct{}{} heap.Push(&pq.queue, op) pq.cond.Broadcast() } +// Dequeue will return the op with the highest priority; block if queue is +// empty; returns nil if queue is closed. func (pq *priorityQueue) Dequeue() op { pq.lock.Lock() defer pq.lock.Unlock() diff --git a/ingester/priority_queue_test.go b/ingester/priority_queue_test.go new file mode 100644 index 0000000000..105547c484 --- /dev/null +++ b/ingester/priority_queue_test.go @@ -0,0 +1,89 @@ +package ingester + +import ( + "runtime" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type item int64 + +func (i item) Priority() int64 { + return int64(i) +} + +func (i item) Key() string { + return strconv.FormatInt(int64(i), 10) +} + +func TestPriorityQueueBasic(t *testing.T) { + queue := newPriorityQueue() + assert.Equal(t, 0, queue.Length(), "Expected length = 0") + + queue.Enqueue(item(1)) + assert.Equal(t, 1, queue.Length(), "Expected length = 1") + + i, ok := queue.Dequeue().(item) + assert.True(t, ok, "Expected cast to succeed") + assert.Equal(t, item(1), i, "Expected to dequeue item(1)") + + queue.Close() + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") +} + +func TestPriorityQueuePriorities(t *testing.T) { + queue := newPriorityQueue() + queue.Enqueue(item(1)) + queue.Enqueue(item(2)) + + assert.Equal(t, item(2), queue.Dequeue().(item), "Expected to dequeue item(2)") + assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)") + + queue.Close() + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") +} + +func TestPriorityQueuePriorities2(t *testing.T) { + queue := newPriorityQueue() + queue.Enqueue(item(2)) + queue.Enqueue(item(1)) + + assert.Equal(t, item(2), queue.Dequeue().(item), "Expected to dequeue item(2)") + assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)") + + queue.Close() + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") +} + +func TestPriorityQueueDedupe(t *testing.T) { + queue := newPriorityQueue() + queue.Enqueue(item(1)) + queue.Enqueue(item(1)) + + assert.Equal(t, 1, queue.Length(), "Expected length = 1") + assert.Equal(t, item(1), queue.Dequeue().(item), "Expected to dequeue item(1)") + + queue.Close() + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") +} + +func TestPriorityQueueWait(t *testing.T) { + queue := newPriorityQueue() + + done := make(chan struct{}) + go func() { + assert.Nil(t, queue.Dequeue(), "Expect nil dequeue") + close(done) + }() + + runtime.Gosched() + queue.Close() + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Fatal("Close didn't unblock Dequeue.") + } +}