diff --git a/ingester/ingester.go b/ingester/ingester.go index 09dc7edb1b..ed2fdcd6f8 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -506,26 +506,18 @@ func (i *Ingester) flushUser(userID string, immediate bool) { } func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memorySeries, immediate bool) { - u.fpLocker.Lock(fp) - // Decide what chunks to flush + // Enqueue this series flushing if the oldest chunk is older than the threshold + + u.fpLocker.Lock(fp) firstTime := series.head().FirstTime() - if immediate || model.Now().Sub(firstTime) > i.cfg.MaxChunkAge { - series.headChunkClosed = true - series.head().MaybePopulateLastTime() - } - chunks := len(series.chunkDescs) - if !series.headChunkClosed { - chunks-- - } + flush := immediate || model.Now().Sub(firstTime) > i.cfg.MaxChunkAge u.fpLocker.Unlock(fp) - if chunks == 0 { - return + if flush { + flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) + i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, u.userID, fp, immediate}) } - - flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) - i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, u.userID, fp, immediate}) } func (i *Ingester) flushLoop(j int) { @@ -557,8 +549,14 @@ func (i *Ingester) flushLoop(j int) { } userState.fpLocker.Lock(op.fp) + + // Assume we're going to flush everything chunks := series.chunkDescs - if !series.headChunkClosed { + + // If the head chunk is old enough, close it + if op.immediate || model.Now().Sub(series.head().FirstTime()) > i.cfg.MaxChunkAge { + series.closeHead() + } else { chunks = chunks[:len(chunks)-1] } userState.fpLocker.Unlock(op.fp) diff --git a/ingester/series.go b/ingester/series.go index 05503a22e6..9765b7e74d 100644 --- a/ingester/series.go +++ b/ingester/series.go @@ -147,6 +147,11 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) { return len(chunks) - 1, nil } +func (s *memorySeries) closeHead() { + s.headChunkClosed = true + s.head().MaybePopulateLastTime() +} + // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors.