diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 0ef18b988a..b8656efbaf 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -75,6 +75,7 @@ type Config struct { // Config for chunk flushing FlushCheckPeriod time.Duration MaxChunkIdle time.Duration + FlushOpTimeout time.Duration ConcurrentFlushes int ChunkEncoding string @@ -104,6 +105,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", promql.StalenessDelta, "Maximum chunk idle time before flushing.") + f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.") f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.") f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", "1", "Encoding version to use for chunks.") diff --git a/pkg/ingester/ingester_flush.go b/pkg/ingester/ingester_flush.go index e4acaa407f..a135a1fb9d 100644 --- a/pkg/ingester/ingester_flush.go +++ b/pkg/ingester/ingester_flush.go @@ -167,6 +167,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat // flush the chunks without locking the series, as we don't want to hold the series lock for the duration of the dynamo/s3 rpcs. ctx := user.InjectOrgID(context.Background(), userID) + ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) + defer cancel() // releases resources if slowOperation completes before timeout elapses + err := i.flushChunks(ctx, fp, series.metric, chunks) if err != nil { return err