Skip to content

Commit f2d1405

Browse files
authored
Ability to flush chunks with stale markers early. (#1759)
* Ability to flush chunks with stale markers early. Signed-off-by: Goutham Veeramachaneni <[email protected]> * Address feedback! Signed-off-by: Goutham Veeramachaneni <[email protected]>
1 parent 00566f6 commit f2d1405

File tree

5 files changed

+31
-5
lines changed

5 files changed

+31
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [FEATURE] Global limit on the max series per user and metric #1760
99
* `-ingester.max-global-series-per-user`
1010
* `-ingester.max-global-series-per-metric`
11+
* [FEATURE] Flush chunks with stale markers early with `ingester.max-stale-chunk-idle`. #1759
1112
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
1213
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
1314
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708

docs/arguments.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
175175

176176
The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created. (default 12h)
177177

178+
- `-ingester.max-chunk-idle`
179+
180+
If a series doesn't receive a sample for this duration, it is flushed and removed from memory.
181+
182+
- `-ingester.max-stale-chunk-idle`
183+
184+
If a series receives a [staleness marker](https://www.robustperception.io/staleness-and-promql), then we wait for this duration to get another sample before we close and flush this series, removing it from memory. You want it to be at least 2x the scrape interval as you don't want a single failed scrape to cause a chunk flush.
185+
178186
- `-ingester.chunk-age-jitter`
179187

180188
To reduce load on the database exactly 12 hours after starting, the age limit is reduced by a varying amount up to this. (default 20m)

pkg/ingester/flush.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ const (
128128
reasonMultipleChunksInSeries
129129
reasonAged
130130
reasonIdle
131+
reasonStale
131132
)
132133

133134
func (f flushReason) String() string {
@@ -142,6 +143,8 @@ func (f flushReason) String() string {
142143
return "Aged"
143144
case reasonIdle:
144145
return "Idle"
146+
case reasonStale:
147+
return "Stale"
145148
default:
146149
panic("unrecognised flushReason")
147150
}
@@ -179,13 +182,13 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint,
179182
return reasonMultipleChunksInSeries
180183
} else if len(series.chunkDescs) > 0 {
181184
// Otherwise look in more detail at the first chunk
182-
return i.shouldFlushChunk(series.chunkDescs[0], fp)
185+
return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale())
183186
}
184187

185188
return noFlush
186189
}
187190

188-
func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason {
191+
func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsStale bool) flushReason {
189192
if c.flushed { // don't flush chunks we've already flushed
190193
return noFlush
191194
}
@@ -200,11 +203,18 @@ func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason {
200203
return reasonAged
201204
}
202205

203-
// Chunk should be flushed if their last update is older then MaxChunkIdle
206+
// Chunk should be flushed if their last update is older then MaxChunkIdle.
204207
if model.Now().Sub(c.LastUpdate) > i.cfg.MaxChunkIdle {
205208
return reasonIdle
206209
}
207210

211+
// A chunk that has a stale marker can be flushed if possible.
212+
if i.cfg.MaxStaleChunkIdle > 0 &&
213+
lastValueIsStale &&
214+
model.Now().Sub(c.LastUpdate) > i.cfg.MaxStaleChunkIdle {
215+
return reasonStale
216+
}
217+
208218
return noFlush
209219
}
210220

@@ -259,13 +269,13 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
259269

260270
// Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it.
261271
chunks := series.chunkDescs
262-
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp) != noFlush) {
272+
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) {
263273
series.closeHead()
264274
} else {
265275
chunks = chunks[:len(chunks)-1]
266276
}
267277

268-
if reason == reasonIdle && series.headChunkClosed {
278+
if (reason == reasonIdle || reason == reasonStale) && series.headChunkClosed {
269279
if minChunkLength := i.limits.MinChunkLength(userID); minChunkLength > 0 {
270280
chunkLength := 0
271281
for _, c := range chunks {

pkg/ingester/ingester.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ type Config struct {
109109
FlushCheckPeriod time.Duration
110110
RetainPeriod time.Duration
111111
MaxChunkIdle time.Duration
112+
MaxStaleChunkIdle time.Duration
112113
FlushOpTimeout time.Duration
113114
MaxChunkAge time.Duration
114115
ChunkAgeJitter time.Duration
@@ -138,6 +139,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
138139
f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.")
139140
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.")
140141
f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 5*time.Minute, "Maximum chunk idle time before flushing.")
142+
f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-stale-chunk-idle", 0, "Maximum chunk idle time for chunks terminating in stale markers before flushing. 0 disables it and a stale series is not flushed until the max-chunk-idle timeout is reached.")
141143
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age before flushing.")
142144
f.DurationVar(&cfg.ChunkAgeJitter, "ingester.chunk-age-jitter", 20*time.Minute, "Range of time to subtract from MaxChunkAge to spread out flushes")
143145
f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge")

pkg/ingester/series.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/prometheus/client_golang/prometheus"
88
"github.com/prometheus/common/model"
99
"github.com/prometheus/prometheus/pkg/labels"
10+
"github.com/prometheus/prometheus/pkg/value"
1011

1112
"github.com/cortexproject/cortex/pkg/chunk/encoding"
1213
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
@@ -210,6 +211,10 @@ func (s *memorySeries) setChunks(descs []*desc) error {
210211
return nil
211212
}
212213

214+
func (s *memorySeries) isStale() bool {
215+
return s.lastSampleValueSet && value.IsStaleNaN(float64(s.lastSampleValue))
216+
}
217+
213218
type desc struct {
214219
C encoding.Chunk // nil if chunk is evicted.
215220
FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable.

0 commit comments

Comments
 (0)