Skip to content

Commit 1f16b06

Browse files
pstibranygouthamve
authored andcommitted
Export Unix timestamp of oldest unflushed chunk in the memory. (#1776)
* Export Unix timestamp of oldest unflushed chunk in the memory. Signed-off-by: Peter Štibraný <[email protected]> * Use 'cortex_oldest_unflushed_chunk_timestamp_seconds' as metric name. Signed-off-by: Peter Štibraný <[email protected]> * Removed unnecessary check for chunks slice length. Signed-off-by: Peter Štibraný <[email protected]>
1 parent 7e17b89 commit 1f16b06

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

pkg/ingester/flush.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ var (
6767
Name: "cortex_ingester_dropped_chunks_total",
6868
Help: "Total number of chunks dropped from flushing because they have too few samples.",
6969
})
70+
oldestUnflushedChunkTimestamp = promauto.NewGauge(prometheus.GaugeOpts{
71+
Name: "cortex_oldest_unflushed_chunk_timestamp_seconds",
72+
Help: "Unix timestamp of the oldest unflushed chunk in the memory",
73+
})
7074
)
7175

7276
// Flush triggers a flush of all the chunks and closes the flush queues.
@@ -110,14 +114,23 @@ func (i *Ingester) sweepUsers(immediate bool) {
110114
return
111115
}
112116

117+
oldest := model.Time(0)
118+
113119
for id, state := range i.userStates.cp() {
114120
for pair := range state.fpToSeries.iter() {
115121
state.fpLocker.Lock(pair.fp)
116122
i.sweepSeries(id, pair.fp, pair.series, immediate)
117123
i.removeFlushedChunks(state, pair.fp, pair.series)
124+
first := pair.series.firstUnflushedChunkTime()
118125
state.fpLocker.Unlock(pair.fp)
126+
127+
if first > 0 && (oldest == 0 || first < oldest) {
128+
oldest = first
129+
}
119130
}
120131
}
132+
133+
oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
121134
}
122135

123136
type flushReason int

pkg/ingester/series.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,19 @@ func (s *memorySeries) firstTime() model.Time {
151151
return s.chunkDescs[0].FirstTime
152152
}
153153

154+
// Returns time of oldest chunk in the series, that isn't flushed. If there are
155+
// no chunks, or all chunks are flushed, returns 0.
156+
// The caller must have locked the fingerprint of the memorySeries.
157+
func (s *memorySeries) firstUnflushedChunkTime() model.Time {
158+
for _, c := range s.chunkDescs {
159+
if !c.flushed {
160+
return c.FirstTime
161+
}
162+
}
163+
164+
return 0
165+
}
166+
154167
// head returns a pointer to the head chunk descriptor. The caller must have
155168
// locked the fingerprint of the memorySeries. This method will panic if this
156169
// series has no chunk descriptors.

0 commit comments

Comments
 (0)