Skip to content

Ability to flush chunks with stale markers early. #1759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [FEATURE] Global limit on the max series per user and metric #1760
* `-ingester.max-global-series-per-user`
* `-ingester.max-global-series-per-metric`
* [FEATURE] Flush chunks with stale markers early with `ingester.max-stale-chunk-idle`. #1759
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708
Expand Down
8 changes: 8 additions & 0 deletions docs/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ It also talks to a KVStore and has it's own copies of the same flags used by the

The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created. (default 12h)

- `-ingester.max-chunk-idle`

If a series doesn't receive a sample for this duration, it is flushed and removed from memory.

- `-ingester.max-stale-chunk-idle`

If a series receives a [staleness marker](https://www.robustperception.io/staleness-and-promql), then we wait for this duration to get another sample before we close and flush this series, removing it from memory. You want it to be at least 2x the scrape interval as you don't want a single failed scrape to cause a chunk flush.

- `-ingester.chunk-age-jitter`

To reduce load on the database exactly 12 hours after starting, the age limit is reduced by a varying amount up to this. (default 20m)
Expand Down
20 changes: 15 additions & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ const (
reasonMultipleChunksInSeries
reasonAged
reasonIdle
reasonStale
)

func (f flushReason) String() string {
Expand All @@ -142,6 +143,8 @@ func (f flushReason) String() string {
return "Aged"
case reasonIdle:
return "Idle"
case reasonStale:
return "Stale"
default:
panic("unrecognised flushReason")
}
Expand Down Expand Up @@ -179,13 +182,13 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint,
return reasonMultipleChunksInSeries
} else if len(series.chunkDescs) > 0 {
// Otherwise look in more detail at the first chunk
return i.shouldFlushChunk(series.chunkDescs[0], fp)
return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale())
}

return noFlush
}

func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason {
func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsStale bool) flushReason {
if c.flushed { // don't flush chunks we've already flushed
return noFlush
}
Expand All @@ -200,11 +203,18 @@ func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint) flushReason {
return reasonAged
}

// Chunk should be flushed if their last update is older then MaxChunkIdle
// Chunk should be flushed if their last update is older then MaxChunkIdle.
if model.Now().Sub(c.LastUpdate) > i.cfg.MaxChunkIdle {
return reasonIdle
}

// A chunk that has a stale marker can be flushed if possible.
if i.cfg.MaxStaleChunkIdle > 0 &&
lastValueIsStale &&
model.Now().Sub(c.LastUpdate) > i.cfg.MaxStaleChunkIdle {
return reasonStale
}

return noFlush
}

Expand Down Expand Up @@ -259,13 +269,13 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.

// Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it.
chunks := series.chunkDescs
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp) != noFlush) {
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) {
series.closeHead()
} else {
chunks = chunks[:len(chunks)-1]
}

if reason == reasonIdle && series.headChunkClosed {
if (reason == reasonIdle || reason == reasonStale) && series.headChunkClosed {
if minChunkLength := i.limits.MinChunkLength(userID); minChunkLength > 0 {
chunkLength := 0
for _, c := range chunks {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type Config struct {
FlushCheckPeriod time.Duration
RetainPeriod time.Duration
MaxChunkIdle time.Duration
MaxStaleChunkIdle time.Duration
FlushOpTimeout time.Duration
MaxChunkAge time.Duration
ChunkAgeJitter time.Duration
Expand Down Expand Up @@ -138,6 +139,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 5*time.Minute, "Maximum chunk idle time before flushing.")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-stale-chunk-idle", 0, "Maximum chunk idle time for chunks terminating in stale markers before flushing. 0 disables it and a stale series is not flushed until the max-chunk-idle timeout is reached.")
Copy link
Contributor

@bboreham bboreham Nov 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops - typo MaxChunkIdle #1788

f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.ChunkAgeJitter, "ingester.chunk-age-jitter", 20*time.Minute, "Range of time to subtract from MaxChunkAge to spread out flushes")
f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge")
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/value"

"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
Expand Down Expand Up @@ -210,6 +211,10 @@ func (s *memorySeries) setChunks(descs []*desc) error {
return nil
}

func (s *memorySeries) isStale() bool {
return s.lastSampleValueSet && value.IsStaleNaN(float64(s.lastSampleValue))
}

type desc struct {
C encoding.Chunk // nil if chunk is evicted.
FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable.
Expand Down