From bce1d7c1671675947909cbbcc455b869a0275df2 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 25 Oct 2019 11:24:54 +0530 Subject: [PATCH 1/2] Ability to flush chunks with stale markers early. Signed-off-by: Goutham Veeramachaneni --- CHANGELOG.md | 1 + docs/arguments.md | 8 ++++++++ pkg/ingester/flush.go | 21 ++++++++++++++++----- pkg/ingester/ingester.go | 2 ++ 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89ce9ff0b8f..623edccab4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [FEATURE] Global limit on the max series per user and metric #1760 * `-ingester.max-global-series-per-user` * `-ingester.max-global-series-per-metric` +* [FEATURE] Flush chunks with stale markers early with `ingester.max-stale-chunk-idle`. #1759 * [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706 * [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708 * [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708 diff --git a/docs/arguments.md b/docs/arguments.md index 9c876f6fd91..bf82d644108 100644 --- a/docs/arguments.md +++ b/docs/arguments.md @@ -175,6 +175,14 @@ It also talks to a KVStore and has it's own copies of the same flags used by the The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created. (default 12h) +- `-ingester.max-chunk-idle` + + If a series doesn't receive a sample for this duration, it is flushed and removed from memory. + +- `-ingester.max-stale-chunk-idle` + + If a series receives a [staleness marker](https://www.robustperception.io/staleness-and-promql), then we wait for this duration to get another sample before we close and flush this series, removing it from memory. You want it to be atleast 2x the scrape interval as you don't want a single failed scrape to cause a chunk flush. + - `-ingester.chunk-age-jitter` To reduce load on the database exactly 12 hours after starting, the age limit is reduced by a varying amount up to this. (default 20m) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 8e0b272f999..6974ed96151 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/value" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/util" @@ -128,6 +129,7 @@ const ( reasonMultipleChunksInSeries reasonAged reasonIdle + reasonStaleIdle ) func (f flushReason) String() string { @@ -142,6 +144,8 @@ func (f flushReason) String() string { return "Aged" case reasonIdle: return "Idle" + case reasonStaleIdle: + return "StaleIdle" default: panic("unrecognised flushReason") } @@ -179,13 +183,13 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, return reasonMultipleChunksInSeries } else if len(series.chunkDescs) > 0 { // Otherwise look in more detail at the first chunk - return i.shouldFlushChunk(series.chunkDescs[0], fp) + return i.shouldFlushChunk(series.chunkDescs[0], fp, value.IsStaleNaN(float64(series.lastSampleValue))) } return noFlush } -func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason { +func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsStale bool) flushReason { if c.flushed { // don't flush chunks we've already flushed return noFlush } @@ -200,11 +204,18 @@ func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason { return reasonAged } - // Chunk should be flushed if their last update is older then MaxChunkIdle + // Chunk should be flushed if their last update is older then MaxChunkIdle. if model.Now().Sub(c.LastUpdate) > i.cfg.MaxChunkIdle { return reasonIdle } + // A chunk that has a stale marker can be flushed if possible. + if i.cfg.MaxStaleChunkIdle > 0 && + lastValueIsStale && + model.Now().Sub(c.LastUpdate) > i.cfg.MaxStaleChunkIdle { + return reasonStaleIdle + } + return noFlush } @@ -259,13 +270,13 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. // Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it. chunks := series.chunkDescs - if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp) != noFlush) { + if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, value.IsStaleNaN(float64(series.lastSampleValue))) != noFlush) { series.closeHead() } else { chunks = chunks[:len(chunks)-1] } - if reason == reasonIdle && series.headChunkClosed { + if (reason == reasonIdle || reason == reasonStaleIdle) && series.headChunkClosed { if minChunkLength := i.limits.MinChunkLength(userID); minChunkLength > 0 { chunkLength := 0 for _, c := range chunks { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d04b6819f81..45b8d0108d7 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -109,6 +109,7 @@ type Config struct { FlushCheckPeriod time.Duration RetainPeriod time.Duration MaxChunkIdle time.Duration + MaxStaleChunkIdle time.Duration FlushOpTimeout time.Duration MaxChunkAge time.Duration ChunkAgeJitter time.Duration @@ -138,6 +139,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 5*time.Minute, "Maximum chunk idle time before flushing.") + f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-stale-chunk-idle", 0, "Maximum chunk idle time for chunks terminating in stale markers before flushing.") f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age before flushing.") f.DurationVar(&cfg.ChunkAgeJitter, "ingester.chunk-age-jitter", 20*time.Minute, "Range of time to subtract from MaxChunkAge to spread out flushes") f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge") From 256885565b54e4e7480e441367331746b1461ed4 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 1 Nov 2019 00:08:39 +0530 Subject: [PATCH 2/2] Address feedback! Signed-off-by: Goutham Veeramachaneni --- docs/arguments.md | 2 +- pkg/ingester/flush.go | 15 +++++++-------- pkg/ingester/ingester.go | 2 +- pkg/ingester/series.go | 5 +++++ 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/docs/arguments.md b/docs/arguments.md index bf82d644108..019398e9e1c 100644 --- a/docs/arguments.md +++ b/docs/arguments.md @@ -181,7 +181,7 @@ It also talks to a KVStore and has it's own copies of the same flags used by the - `-ingester.max-stale-chunk-idle` - If a series receives a [staleness marker](https://www.robustperception.io/staleness-and-promql), then we wait for this duration to get another sample before we close and flush this series, removing it from memory. You want it to be atleast 2x the scrape interval as you don't want a single failed scrape to cause a chunk flush. + If a series receives a [staleness marker](https://www.robustperception.io/staleness-and-promql), then we wait for this duration to get another sample before we close and flush this series, removing it from memory. You want it to be at least 2x the scrape interval as you don't want a single failed scrape to cause a chunk flush. - `-ingester.chunk-age-jitter` diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 6974ed96151..135cbb438f2 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -13,7 +13,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/value" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/util" @@ -129,7 +128,7 @@ const ( reasonMultipleChunksInSeries reasonAged reasonIdle - reasonStaleIdle + reasonStale ) func (f flushReason) String() string { @@ -144,8 +143,8 @@ func (f flushReason) String() string { return "Aged" case reasonIdle: return "Idle" - case reasonStaleIdle: - return "StaleIdle" + case reasonStale: + return "Stale" default: panic("unrecognised flushReason") } @@ -183,7 +182,7 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, return reasonMultipleChunksInSeries } else if len(series.chunkDescs) > 0 { // Otherwise look in more detail at the first chunk - return i.shouldFlushChunk(series.chunkDescs[0], fp, value.IsStaleNaN(float64(series.lastSampleValue))) + return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale()) } return noFlush @@ -213,7 +212,7 @@ func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsSt if i.cfg.MaxStaleChunkIdle > 0 && lastValueIsStale && model.Now().Sub(c.LastUpdate) > i.cfg.MaxStaleChunkIdle { - return reasonStaleIdle + return reasonStale } return noFlush @@ -270,13 +269,13 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. // Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it. chunks := series.chunkDescs - if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, value.IsStaleNaN(float64(series.lastSampleValue))) != noFlush) { + if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) { series.closeHead() } else { chunks = chunks[:len(chunks)-1] } - if (reason == reasonIdle || reason == reasonStaleIdle) && series.headChunkClosed { + if (reason == reasonIdle || reason == reasonStale) && series.headChunkClosed { if minChunkLength := i.limits.MinChunkLength(userID); minChunkLength > 0 { chunkLength := 0 for _, c := range chunks { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 45b8d0108d7..d22890013f1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -139,7 +139,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 5*time.Minute, "Maximum chunk idle time before flushing.") - f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-stale-chunk-idle", 0, "Maximum chunk idle time for chunks terminating in stale markers before flushing.") + f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-stale-chunk-idle", 0, "Maximum chunk idle time for chunks terminating in stale markers before flushing. 0 disables it and a stale series is not flushed until the max-chunk-idle timeout is reached.") f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age before flushing.") f.DurationVar(&cfg.ChunkAgeJitter, "ingester.chunk-age-jitter", 20*time.Minute, "Range of time to subtract from MaxChunkAge to spread out flushes") f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge") diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 8aafdada733..f39c3e2fcb3 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/value" "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" @@ -210,6 +211,10 @@ func (s *memorySeries) setChunks(descs []*desc) error { return nil } +func (s *memorySeries) isStale() bool { + return s.lastSampleValueSet && value.IsStaleNaN(float64(s.lastSampleValue)) +} + type desc struct { C encoding.Chunk // nil if chunk is evicted. FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable.