Skip to content

Commit b76cde7

Browse files
committed
Ingester: Added -blocks-storage.tsdb.head-chunks-write-queue-size allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Signed-off-by: tanghengjian <[email protected]>
1 parent d595cba commit b76cde7

File tree

5 files changed

+36
-19
lines changed

5 files changed

+36
-19
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
66

77
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
8+
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
89

910
## 1.14.0 in progress
1011

docs/configuration/config-file-reference.md

+5
Original file line numberDiff line numberDiff line change
@@ -3714,6 +3714,11 @@ tsdb:
37143714
# be stored. 0 or less means disabled.
37153715
# CLI flag: -blocks-storage.tsdb.max-exemplars
37163716
[max_exemplars: <int> | default = 0]
3717+
3718+
# The size of the in-memory queue used before flushing chunks to the disk.
3719+
# CLI flag: -blocks-storage.tsdb.head-chunks-write-queue-size
3720+
[head_chunks_write_queue_size: <int> | default = 0]
3721+
37173722
```
37183723

37193724
### `compactor_config`

pkg/ingester/ingester.go

+1
Original file line numberDiff line numberDiff line change
@@ -1846,6 +1846,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
18461846
EnableExemplarStorage: enableExemplars,
18471847
IsolationDisabled: true,
18481848
MaxExemplars: int64(i.cfg.BlocksStorageConfig.TSDB.MaxExemplars),
1849+
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
18491850
}, nil)
18501851
if err != nil {
18511852
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

pkg/ingester/metrics.go

+26-19
Original file line numberDiff line numberDiff line change
@@ -336,25 +336,26 @@ type tsdbMetrics struct {
336336
uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total)
337337

338338
// Metrics aggregated from TSDB.
339-
tsdbCompactionsTotal *prometheus.Desc
340-
tsdbCompactionDuration *prometheus.Desc
341-
tsdbFsyncDuration *prometheus.Desc
342-
tsdbPageFlushes *prometheus.Desc
343-
tsdbPageCompletions *prometheus.Desc
344-
tsdbWALTruncateFail *prometheus.Desc
345-
tsdbWALTruncateTotal *prometheus.Desc
346-
tsdbWALTruncateDuration *prometheus.Desc
347-
tsdbWALCorruptionsTotal *prometheus.Desc
348-
tsdbWALWritesFailed *prometheus.Desc
349-
tsdbHeadTruncateFail *prometheus.Desc
350-
tsdbHeadTruncateTotal *prometheus.Desc
351-
tsdbHeadGcDuration *prometheus.Desc
352-
tsdbActiveAppenders *prometheus.Desc
353-
tsdbSeriesNotFound *prometheus.Desc
354-
tsdbChunks *prometheus.Desc
355-
tsdbChunksCreatedTotal *prometheus.Desc
356-
tsdbChunksRemovedTotal *prometheus.Desc
357-
tsdbMmapChunkCorruptionTotal *prometheus.Desc
339+
tsdbCompactionsTotal *prometheus.Desc
340+
tsdbCompactionDuration *prometheus.Desc
341+
tsdbFsyncDuration *prometheus.Desc
342+
tsdbPageFlushes *prometheus.Desc
343+
tsdbPageCompletions *prometheus.Desc
344+
tsdbWALTruncateFail *prometheus.Desc
345+
tsdbWALTruncateTotal *prometheus.Desc
346+
tsdbWALTruncateDuration *prometheus.Desc
347+
tsdbWALCorruptionsTotal *prometheus.Desc
348+
tsdbWALWritesFailed *prometheus.Desc
349+
tsdbHeadTruncateFail *prometheus.Desc
350+
tsdbHeadTruncateTotal *prometheus.Desc
351+
tsdbHeadGcDuration *prometheus.Desc
352+
tsdbActiveAppenders *prometheus.Desc
353+
tsdbSeriesNotFound *prometheus.Desc
354+
tsdbChunks *prometheus.Desc
355+
tsdbChunksCreatedTotal *prometheus.Desc
356+
tsdbChunksRemovedTotal *prometheus.Desc
357+
tsdbMmapChunkCorruptionTotal *prometheus.Desc
358+
tsdbChunkwriteQueueOperationsTotal *prometheus.Desc
358359

359360
tsdbExemplarsTotal *prometheus.Desc
360361
tsdbExemplarsInStorage *prometheus.Desc
@@ -478,6 +479,10 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
478479
"cortex_ingester_tsdb_mmap_chunk_corruptions_total",
479480
"Total number of memory-mapped TSDB chunk corruptions.",
480481
nil, nil),
482+
tsdbChunkwriteQueueOperationsTotal: prometheus.NewDesc(
483+
"cortex_ingester_tsdb_chunk_write_queue_operations_total",
484+
"Number of currently tsdb chunk write queues.",
485+
[]string{"user", "operation"}, nil),
481486
tsdbLoadedBlocks: prometheus.NewDesc(
482487
"cortex_ingester_tsdb_blocks_loaded",
483488
"Number of currently loaded data blocks",
@@ -579,6 +584,7 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
579584
out <- sm.tsdbChunksCreatedTotal
580585
out <- sm.tsdbChunksRemovedTotal
581586
out <- sm.tsdbMmapChunkCorruptionTotal
587+
out <- sm.tsdbChunkwriteQueueOperationsTotal
582588
out <- sm.tsdbLoadedBlocks
583589
out <- sm.tsdbSymbolTableSize
584590
out <- sm.tsdbReloads
@@ -628,6 +634,7 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
628634
data.SendSumOfCountersPerUser(out, sm.tsdbChunksCreatedTotal, "prometheus_tsdb_head_chunks_created_total")
629635
data.SendSumOfCountersPerUser(out, sm.tsdbChunksRemovedTotal, "prometheus_tsdb_head_chunks_removed_total")
630636
data.SendSumOfCounters(out, sm.tsdbMmapChunkCorruptionTotal, "prometheus_tsdb_mmap_chunk_corruptions_total")
637+
data.SendSumOfCountersPerUserWithLabels(out, sm.tsdbChunkwriteQueueOperationsTotal, "prometheus_tsdb_chunk_write_queue_operations_total", "operation")
631638
data.SendSumOfGauges(out, sm.tsdbLoadedBlocks, "prometheus_tsdb_blocks_loaded")
632639
data.SendSumOfGaugesPerUser(out, sm.tsdbSymbolTableSize, "prometheus_tsdb_symbol_table_size_bytes")
633640
data.SendSumOfCounters(out, sm.tsdbReloads, "prometheus_tsdb_reloads_total")

pkg/storage/tsdb/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ type TSDBConfig struct {
136136
WALSegmentSizeBytes int `yaml:"wal_segment_size_bytes"`
137137
FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"`
138138
CloseIdleTSDBTimeout time.Duration `yaml:"close_idle_tsdb_timeout"`
139+
//The size of the in-memory queue used before flushing chunks to the disk.
140+
HeadChunksWriteQueueSize int `yaml:"head_chunks_write_queue_size"`
139141

140142
// MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup.
141143
MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"`
@@ -173,6 +175,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
173175
f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.")
174176
f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.")
175177
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
178+
f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
176179
}
177180

178181
// Validate the config.

0 commit comments

Comments
 (0)