diff --git a/CHANGELOG.md b/CHANGELOG.md index 2639a810d5..3da2e917b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 +* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 * [FEATURE] Query Frontend: Log query params in query frontend even if error happens. #5005 ## 1.14.0 in progress diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index f199fe8815..f6f9bd2d46 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -870,6 +870,10 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout [close_idle_tsdb_timeout: | default = 0s] + # The size of the in-memory queue used before flushing chunks to the disk. + # CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size + [head_chunks_write_queue_size: | default = 0] + # limit the number of concurrently opening TSDB's on startup # CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup [max_tsdb_opening_concurrency_on_startup: | default = 10] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index b7d8c24f41..2c6a66d26e 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -934,6 +934,10 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout [close_idle_tsdb_timeout: | default = 0s] + # The size of the in-memory queue used before flushing chunks to the disk. + # CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size + [head_chunks_write_queue_size: | default = 0] + # limit the number of concurrently opening TSDB's on startup # CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup [max_tsdb_opening_concurrency_on_startup: | default = 10] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e717e6ec7a..c252b1abb9 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3706,6 +3706,10 @@ tsdb: # CLI flag: -blocks-storage.tsdb.close-idle-tsdb-timeout [close_idle_tsdb_timeout: | default = 0s] + # The size of the in-memory queue used before flushing chunks to the disk. + # CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size + [head_chunks_write_queue_size: | default = 0] + # limit the number of concurrently opening TSDB's on startup # CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup [max_tsdb_opening_concurrency_on_startup: | default = 10] diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8273c5117c..8593c89952 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1846,6 +1846,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { EnableExemplarStorage: enableExemplars, IsolationDisabled: true, MaxExemplars: int64(i.cfg.BlocksStorageConfig.TSDB.MaxExemplars), + HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 585cc45df9..42cf86b97f 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -336,25 +336,26 @@ type tsdbMetrics struct { uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total) // Metrics aggregated from TSDB. - tsdbCompactionsTotal *prometheus.Desc - tsdbCompactionDuration *prometheus.Desc - tsdbFsyncDuration *prometheus.Desc - tsdbPageFlushes *prometheus.Desc - tsdbPageCompletions *prometheus.Desc - tsdbWALTruncateFail *prometheus.Desc - tsdbWALTruncateTotal *prometheus.Desc - tsdbWALTruncateDuration *prometheus.Desc - tsdbWALCorruptionsTotal *prometheus.Desc - tsdbWALWritesFailed *prometheus.Desc - tsdbHeadTruncateFail *prometheus.Desc - tsdbHeadTruncateTotal *prometheus.Desc - tsdbHeadGcDuration *prometheus.Desc - tsdbActiveAppenders *prometheus.Desc - tsdbSeriesNotFound *prometheus.Desc - tsdbChunks *prometheus.Desc - tsdbChunksCreatedTotal *prometheus.Desc - tsdbChunksRemovedTotal *prometheus.Desc - tsdbMmapChunkCorruptionTotal *prometheus.Desc + tsdbCompactionsTotal *prometheus.Desc + tsdbCompactionDuration *prometheus.Desc + tsdbFsyncDuration *prometheus.Desc + tsdbPageFlushes *prometheus.Desc + tsdbPageCompletions *prometheus.Desc + tsdbWALTruncateFail *prometheus.Desc + tsdbWALTruncateTotal *prometheus.Desc + tsdbWALTruncateDuration *prometheus.Desc + tsdbWALCorruptionsTotal *prometheus.Desc + tsdbWALWritesFailed *prometheus.Desc + tsdbHeadTruncateFail *prometheus.Desc + tsdbHeadTruncateTotal *prometheus.Desc + tsdbHeadGcDuration *prometheus.Desc + tsdbActiveAppenders *prometheus.Desc + tsdbSeriesNotFound *prometheus.Desc + tsdbChunks *prometheus.Desc + tsdbChunksCreatedTotal *prometheus.Desc + tsdbChunksRemovedTotal *prometheus.Desc + tsdbMmapChunkCorruptionTotal *prometheus.Desc + tsdbChunkwriteQueueOperationsTotal *prometheus.Desc tsdbExemplarsTotal *prometheus.Desc tsdbExemplarsInStorage *prometheus.Desc @@ -478,6 +479,10 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { "cortex_ingester_tsdb_mmap_chunk_corruptions_total", "Total number of memory-mapped TSDB chunk corruptions.", nil, nil), + tsdbChunkwriteQueueOperationsTotal: prometheus.NewDesc( + "cortex_ingester_tsdb_chunk_write_queue_operations_total", + "Number of currently tsdb chunk write queues.", + []string{"user", "operation"}, nil), tsdbLoadedBlocks: prometheus.NewDesc( "cortex_ingester_tsdb_blocks_loaded", "Number of currently loaded data blocks", @@ -579,6 +584,7 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { out <- sm.tsdbChunksCreatedTotal out <- sm.tsdbChunksRemovedTotal out <- sm.tsdbMmapChunkCorruptionTotal + out <- sm.tsdbChunkwriteQueueOperationsTotal out <- sm.tsdbLoadedBlocks out <- sm.tsdbSymbolTableSize out <- sm.tsdbReloads @@ -628,6 +634,7 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCountersPerUser(out, sm.tsdbChunksCreatedTotal, "prometheus_tsdb_head_chunks_created_total") data.SendSumOfCountersPerUser(out, sm.tsdbChunksRemovedTotal, "prometheus_tsdb_head_chunks_removed_total") data.SendSumOfCounters(out, sm.tsdbMmapChunkCorruptionTotal, "prometheus_tsdb_mmap_chunk_corruptions_total") + data.SendSumOfCountersPerUserWithLabels(out, sm.tsdbChunkwriteQueueOperationsTotal, "prometheus_tsdb_chunk_write_queue_operations_total", "operation") data.SendSumOfGauges(out, sm.tsdbLoadedBlocks, "prometheus_tsdb_blocks_loaded") data.SendSumOfGaugesPerUser(out, sm.tsdbSymbolTableSize, "prometheus_tsdb_symbol_table_size_bytes") data.SendSumOfCounters(out, sm.tsdbReloads, "prometheus_tsdb_reloads_total") diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 2fcc0cded1..ce72921038 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -136,6 +136,8 @@ type TSDBConfig struct { WALSegmentSizeBytes int `yaml:"wal_segment_size_bytes"` FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"` CloseIdleTSDBTimeout time.Duration `yaml:"close_idle_tsdb_timeout"` + // The size of the in-memory queue used before flushing chunks to the disk. + HeadChunksWriteQueueSize int `yaml:"head_chunks_write_queue_size"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup. MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -173,6 +175,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.") f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.") f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.") + f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.") } // Validate the config.