diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 135cbb438f..3c4060a09b 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -67,6 +67,10 @@ var ( Name: "cortex_ingester_dropped_chunks_total", Help: "Total number of chunks dropped from flushing because they have too few samples.", }) + oldestUnflushedChunkTimestamp = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_oldest_unflushed_chunk_timestamp_seconds", + Help: "Unix timestamp of the oldest unflushed chunk in the memory", + }) ) // Flush triggers a flush of all the chunks and closes the flush queues. @@ -110,14 +114,23 @@ func (i *Ingester) sweepUsers(immediate bool) { return } + oldest := model.Time(0) + for id, state := range i.userStates.cp() { for pair := range state.fpToSeries.iter() { state.fpLocker.Lock(pair.fp) i.sweepSeries(id, pair.fp, pair.series, immediate) i.removeFlushedChunks(state, pair.fp, pair.series) + first := pair.series.firstUnflushedChunkTime() state.fpLocker.Unlock(pair.fp) + + if first > 0 && (oldest == 0 || first < oldest) { + oldest = first + } } } + + oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix())) } type flushReason int diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index f39c3e2fcb..ddbc470fb0 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -151,6 +151,19 @@ func (s *memorySeries) firstTime() model.Time { return s.chunkDescs[0].FirstTime } +// Returns time of oldest chunk in the series, that isn't flushed. If there are +// no chunks, or all chunks are flushed, returns 0. +// The caller must have locked the fingerprint of the memorySeries. +func (s *memorySeries) firstUnflushedChunkTime() model.Time { + for _, c := range s.chunkDescs { + if !c.flushed { + return c.FirstTime + } + } + + return 0 +} + // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors.