diff --git a/ingester/ingester.go b/ingester/ingester.go index 085867c649..f3e6ff688d 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -54,17 +54,18 @@ var ( // Ingester deals with "in flight" chunks. // Its like MemorySeriesStorage, but simpler. type Ingester struct { - cfg Config - chunkStore cortex.Store - stopLock sync.RWMutex - stopped bool - quit chan struct{} - done chan struct{} - flushSeriesLimiter cortex.Semaphore + cfg Config + chunkStore cortex.Store + stopLock sync.RWMutex + stopped bool + quit chan struct{} + done sync.WaitGroup userStateLock sync.Mutex userState map[string]*userState + flushQueue *priorityQueue + ingestedSamples prometheus.Counter discardedSamples *prometheus.CounterVec chunkUtilization prometheus.Histogram @@ -78,10 +79,11 @@ type Ingester struct { // Config configures an Ingester. type Config struct { - FlushCheckPeriod time.Duration - MaxChunkAge time.Duration - RateUpdatePeriod time.Duration - Ring *ring.Ring + FlushCheckPeriod time.Duration + MaxChunkAge time.Duration + RateUpdatePeriod time.Duration + Ring *ring.Ring + ConcurrentFlushes int } // UserStats models ingestion statistics for one user. @@ -99,6 +101,21 @@ type userState struct { ingestedSamples *ewmaRate } +type flushOp struct { + from model.Time + userID string + fp model.Fingerprint + immediate bool +} + +func (o *flushOp) Key() string { + return fmt.Sprintf("%s-%d-%v", o.userID, o.fp, o.immediate) +} + +func (o *flushOp) Priority() int64 { + return int64(o.from) +} + // New constructs a new Ingester. func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) { if cfg.FlushCheckPeriod == 0 { @@ -110,16 +127,19 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) { if cfg.RateUpdatePeriod == 0 { cfg.RateUpdatePeriod = 15 * time.Second } + if cfg.ConcurrentFlushes <= 0 { + cfg.ConcurrentFlushes = 25 + } i := &Ingester{ - cfg: cfg, - chunkStore: chunkStore, - quit: make(chan struct{}), - done: make(chan struct{}), - flushSeriesLimiter: cortex.NewSemaphore(maxConcurrentFlushSeries), + cfg: cfg, + chunkStore: chunkStore, + quit: make(chan struct{}), userState: map[string]*userState{}, + flushQueue: newPriorityQueue(), + ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_ingested_samples_total", Help: "The total number of samples ingested.", @@ -164,6 +184,12 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) { }), } + i.done.Add(cfg.ConcurrentFlushes) + for j := 0; j < cfg.ConcurrentFlushes; j++ { + go i.flushLoop() + } + + i.done.Add(1) go i.loop() return i, nil } @@ -400,15 +426,23 @@ func (i *Ingester) Stop() { i.stopped = true i.stopLock.Unlock() + // Closing i.quit triggers i.loop() to exit; i.loop() exiting + // will trigger i.flushLoop()s to exit. close(i.quit) - <-i.done + + i.done.Wait() } func (i *Ingester) loop() { defer func() { i.flushAllUsers(true) - close(i.done) - log.Infof("Ingester exited gracefully") + + // We close flush queue here to ensure the flushLoops pick + // up all the flushes triggered by the last run + i.flushQueue.Close() + + log.Infof("Ingester.loop() exited gracefully") + i.done.Done() }() flushTick := time.Tick(i.cfg.FlushCheckPeriod) @@ -426,9 +460,6 @@ func (i *Ingester) loop() { } func (i *Ingester) flushAllUsers(immediate bool) { - log.Infof("Flushing chunks... (exiting: %v)", immediate) - defer log.Infof("Done flushing chunks.") - if i.chunkStore == nil { return } @@ -440,21 +471,12 @@ func (i *Ingester) flushAllUsers(immediate bool) { } i.userStateLock.Unlock() - var wg sync.WaitGroup for _, userID := range userIDs { - wg.Add(1) - go func(userID string) { - i.flushUser(userID, immediate) - wg.Done() - }(userID) + i.flushUser(userID, immediate) } - wg.Wait() } func (i *Ingester) flushUser(userID string, immediate bool) { - log.Infof("Flushing user %s...", userID) - defer log.Infof("Done flushing user %s.", userID) - i.userStateLock.Lock() userState, ok := i.userState[userID] i.userStateLock.Unlock() @@ -464,8 +486,9 @@ func (i *Ingester) flushUser(userID string, immediate bool) { return } - ctx := user.WithID(context.Background(), userID) - i.flushAllSeries(ctx, userState, immediate) + for pair := range userState.fpToSeries.iter() { + i.flushSeries(userState, pair.fp, pair.series, immediate) + } // TODO: this is probably slow, and could be done in a better way. i.userStateLock.Lock() @@ -475,55 +498,80 @@ func (i *Ingester) flushUser(userID string, immediate bool) { i.userStateLock.Unlock() } -func (i *Ingester) flushAllSeries(ctx context.Context, state *userState, immediate bool) { - var wg sync.WaitGroup - for pair := range state.fpToSeries.iter() { - wg.Add(1) - i.flushSeriesLimiter.Acquire() - go func(pair fingerprintSeriesPair) { - if err := i.flushSeries(ctx, state, pair.fp, pair.series, immediate); err != nil { - log.Errorf("Failed to flush chunks for series: %v", err) - } - i.flushSeriesLimiter.Release() - wg.Done() - }(pair) - } - wg.Wait() -} - -func (i *Ingester) flushSeries(ctx context.Context, u *userState, fp model.Fingerprint, series *memorySeries, immediate bool) error { +func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memorySeries, immediate bool) { u.fpLocker.Lock(fp) // Decide what chunks to flush - if immediate || time.Now().Sub(series.head().FirstTime().Time()) > i.cfg.MaxChunkAge { + firstTime := series.head().FirstTime() + if immediate || model.Now().Sub(series.head().FirstTime()) > i.cfg.MaxChunkAge { series.headChunkClosed = true series.head().MaybePopulateLastTime() } - chunks := series.chunkDescs + chunks := len(series.chunkDescs) if !series.headChunkClosed { - chunks = chunks[:len(chunks)-1] + chunks-- } u.fpLocker.Unlock(fp) - if len(chunks) == 0 { - return nil - } - // flush the chunks without locking the series - if err := i.flushChunks(ctx, fp, series.metric, chunks); err != nil { - i.chunkStoreFailures.Add(float64(len(chunks))) - return err + if chunks == 0 { + return } - // now remove the chunks - u.fpLocker.Lock(fp) - series.chunkDescs = series.chunkDescs[len(chunks):] - i.memoryChunks.Sub(float64(len(chunks))) - if len(series.chunkDescs) == 0 { - u.fpToSeries.del(fp) - u.index.delete(series.metric, fp) + i.flushQueue.Enqueue(&flushOp{firstTime, u.userID, fp, immediate}) +} + +func (i *Ingester) flushLoop() { + defer func() { + log.Info("Ingester.flushLoop() exited") + i.done.Done() + }() + + for { + o := i.flushQueue.Dequeue() + if o == nil { + return + } + op := o.(*flushOp) + + // get the user + i.userStateLock.Lock() + userState, ok := i.userState[op.userID] + i.userStateLock.Unlock() + if !ok { + continue + } + ctx := user.WithID(context.Background(), op.userID) + + // Decide what chunks to flush + series, ok := userState.fpToSeries.get(op.fp) + if !ok { + continue + } + + userState.fpLocker.Lock(op.fp) + chunks := series.chunkDescs + if !series.headChunkClosed { + chunks = chunks[1:] + } + userState.fpLocker.Unlock(op.fp) + + // flush the chunks without locking the series + if err := i.flushChunks(ctx, op.fp, series.metric, chunks); err != nil { + log.Errorf("Failed to flush chunks: %v", err) + i.chunkStoreFailures.Add(float64(len(chunks))) + continue + } + + // now remove the chunks + userState.fpLocker.Lock(op.fp) + series.chunkDescs = series.chunkDescs[len(chunks):] + i.memoryChunks.Sub(float64(len(chunks))) + if len(series.chunkDescs) == 0 { + userState.fpToSeries.del(op.fp) + userState.index.delete(series.metric, op.fp) + } + userState.fpLocker.Unlock(op.fp) } - u.fpLocker.Unlock(fp) - return nil } func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric model.Metric, chunks []*prom_chunk.Desc) error { diff --git a/ingester/priority_queue.go b/ingester/priority_queue.go new file mode 100644 index 0000000000..28f2329aad --- /dev/null +++ b/ingester/priority_queue.go @@ -0,0 +1,89 @@ +package ingester + +import ( + "container/heap" + "sync" +) + +type priorityQueue struct { + lock sync.Mutex + cond *sync.Cond + closed bool + hit map[string]struct{} + queue queue +} + +type op interface { + Key() string + Priority() int64 +} + +type queue []op + +func (q queue) Len() int { return len(q) } +func (q queue) Less(i, j int) bool { return q[i].Priority() < q[j].Priority() } +func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } + +// Push and Pop use pointer receivers because they modify the slice's length, +// not just its contents. +func (q *queue) Push(x interface{}) { + *q = append(*q, x.(op)) +} + +func (q *queue) Pop() interface{} { + old := *q + n := len(old) + x := old[n-1] + *q = old[0 : n-1] + return x +} + +func newPriorityQueue() *priorityQueue { + pq := &priorityQueue{ + hit: map[string]struct{}{}, + } + pq.cond = sync.NewCond(&pq.lock) + heap.Init(&pq.queue) + return pq +} + +func (pq *priorityQueue) Close() { + pq.lock.Lock() + defer pq.lock.Unlock() + pq.closed = true + pq.cond.Broadcast() +} + +func (pq *priorityQueue) Enqueue(op op) { + pq.lock.Lock() + defer pq.lock.Unlock() + + if pq.closed { + panic("enqueue on closed queue") + } + + _, enqueued := pq.hit[op.Key()] + if enqueued { + return + } + + heap.Push(&pq.queue, op) + pq.cond.Broadcast() +} + +func (pq *priorityQueue) Dequeue() op { + pq.lock.Lock() + defer pq.lock.Unlock() + + for len(pq.queue) == 0 && !pq.closed { + pq.cond.Wait() + } + + if len(pq.queue) == 0 && pq.closed { + return nil + } + + op := heap.Pop(&pq.queue).(op) + delete(pq.hit, op.Key()) + return op +}