Skip to content

Add compactor plan #4316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ compactor:
# CLI flag: -compactor.disabled-tenants
[disabled_tenants: <string> | default = ""]

# Enable planner filter which will filter groups of blocks within the Cortex
# compactor instead of using the Thanos to group blocks.
# CLI flag: -compactor.planner-filter-enabled
[planner_filter_enabled: <boolean> | default = false]

# Shard tenants across multiple compactor instances. Sharding is required if
# you run multiple compactor instances, in order to coordinate compactions and
# avoid race conditions leading to the same tenant blocks simultaneously
Expand Down
50 changes: 42 additions & 8 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ type Config struct {
// Allow downstream projects to customise the blocks compactor.
BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"`
BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`

// Flag to enable planner filter
PlannerFilterEnabled bool `yaml:"planner_filter_enabled"`
}

// RegisterFlags registers the Compactor flags.
Expand All @@ -146,6 +149,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.")
f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", true, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.")
f.BoolVar(&cfg.PlannerFilterEnabled, "compactor.planner-filter-enabled", false, "Filter and plan blocks within PlannerFilter instead of through Thanos planner and grouper.")

f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
Expand Down Expand Up @@ -606,21 +610,51 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
time.Duration(c.compactorCfg.DeletionDelay.Seconds()/2)*time.Second,
c.compactorCfg.MetaSyncConcurrency)

fetcherFilters := []block.MetadataFilter{
// Remove the ingester ID because we don't shard blocks anymore, while still
// honoring the shard ID if sharding was done in the past.
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
ignoreDeletionMarkFilter,
deduplicateBlocksFilter,
}

// If config is set to use planner filter then generate plans and append it to the fetcherFilters
if c.compactorCfg.PlannerFilterEnabled {
level.Info(c.logger).Log("msg", "Compactor using planner filter")

// Create a new planner filter
f, err := NewPlannerFilter(
ctx,
userID,
ulogger,
bucket,
fetcherFilters,
c.compactorCfg,
c.metaSyncDirForUser(userID),
)
if err != nil {
return err
}

// Generate all parallelizable plans
err = f.fetchBlocksAndGeneratePlans(ctx)
if err != nil {
return err
}

// Add the planner filter to the fetcher's filters
fetcherFilters = append(fetcherFilters, f)
}

fetcher, err := block.NewMetaFetcher(
ulogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
c.metaSyncDirForUser(userID),
reg,
// List of filters to apply (order matters).
[]block.MetadataFilter{
// Remove the ingester ID because we don't shard blocks anymore, while still
// honoring the shard ID if sharding was done in the past.
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
ignoreDeletionMarkFilter,
deduplicateBlocksFilter,
},
fetcherFilters,
nil,
)
if err != nil {
Expand Down
Loading