Skip to content

storage: manage priority logs in housekeeping_loop#29382

Merged
WillemKauf merged 3 commits into
redpanda-data:devfrom
WillemKauf:storage_priority_compaction
Jan 27, 2026
Merged

storage: manage priority logs in housekeeping_loop#29382
WillemKauf merged 3 commits into
redpanda-data:devfrom
WillemKauf:storage_priority_compaction

Conversation

@WillemKauf

@WillemKauf WillemKauf commented Jan 23, 2026

Copy link
Copy Markdown
Contributor

The driving force behind this change is the fact that we have seen partitions in __consumer_offsets get placed on the same shard as heavy-weight compaction partitions which dominate run-time, leading to starvation and a intermitently large __consumer_offsets topic.

To prevent this, constantly evaluate if priority partitions need compaction on the interval log_compaction_max_priority_wait_ms. When any priority partition requires compaction, the long running in-progress compaction will be preempted and the priority partitions can be compacted.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.3.x
  • v25.2.x
  • v25.1.x

Release Notes

Improvements

  • Better prioritize compaction of __consumer_offsets partitions when heavy-weight compactions are present on a broker.

@WillemKauf WillemKauf requested review from andrwng and dotnwat January 23, 2026 18:34
@WillemKauf WillemKauf requested a review from a team as a code owner January 23, 2026 18:34
Copilot AI review requested due to automatic review settings January 23, 2026 18:34
@WillemKauf

WillemKauf commented Jan 23, 2026

Copy link
Copy Markdown
Contributor Author

Backporting to v25.3.x should be safe/clean.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a priority housekeeping loop to prevent starvation of critical partitions like __consumer_offsets when they share a shard with heavy-weight compaction workloads. The solution implements a preemptive compaction mechanism with configurable wait times.

Changes:

  • Added a new priority_housekeeping_loop() that monitors and compacts priority partitions (e.g., __consumer_offsets) independently
  • Introduced a _compaction_mutex to serialize compaction operations and _compaction_preempt_as abort source to enable preemption of regular compaction
  • Added a new cluster configuration log_compaction_max_priority_wait_ms (default: 30 minutes) to control when priority compaction preempts regular compaction

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/v/storage/log_manager.h Added priority housekeeping infrastructure including new member variables, methods, and the _priority_logs_list
src/v/storage/log_manager.cc Implemented priority housekeeping loop, preemption logic, and refactored housekeeping code into do_housekeeping() shared method
src/v/storage/tests/storage_e2e_test.cc Added test verifying priority compaction preempts regular compaction after configured wait time
src/v/storage/tests/common.h Added test accessor methods for _compaction_mutex and _compaction_preempt_as
src/v/ssx/mutex.h Added get_units(duration timeout) overload to support timed mutex acquisition
src/v/config/configuration.h Declared new log_compaction_max_priority_wait_ms configuration property
src/v/config/configuration.cc Defined log_compaction_max_priority_wait_ms with default value of 30 minutes
src/v/storage/BUILD Added dependency on ssx:abort_source

Comment thread src/v/storage/log_manager.cc
Comment thread src/v/storage/log_manager.cc
Comment thread src/v/storage/log_manager.cc Outdated
Comment thread src/v/storage/log_manager.cc Outdated
@WillemKauf WillemKauf force-pushed the storage_priority_compaction branch from 40d55fc to 7f32600 Compare January 23, 2026 18:54
@vbotbuildovich

vbotbuildovich commented Jan 23, 2026

Copy link
Copy Markdown
Collaborator

Retry command for Build#79566

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/transactions/compaction_e2e_test.py::CompactionE2EIdempotencyTest.test_basic_compaction@{"initial_cleanup_policy":"delete","workload":"IDEMPOTENCY"}
tests/rptest/transactions/compaction_e2e_test.py::CompactionE2EIdempotencyTest.test_basic_compaction@{"initial_cleanup_policy":"delete","workload":"TX_UNIQUE_KEYS"}
tests/rptest/transactions/compaction_e2e_test.py::CompactionE2EIdempotencyTest.test_basic_compaction@{"initial_cleanup_policy":"delete","workload":"TX"}
tests/rptest/transactions/compaction_e2e_test.py::CompactionE2EIdempotencyTest.test_basic_compaction@{"initial_cleanup_policy":"compact","workload":"TX_UNIQUE_KEYS"}
tests/rptest/transactions/compaction_e2e_test.py::CompactionE2EIdempotencyTest.test_basic_compaction@{"initial_cleanup_policy":"compact","workload":"IDEMPOTENCY"}

@vbotbuildovich

vbotbuildovich commented Jan 23, 2026

Copy link
Copy Markdown
Collaborator

CI test results

test results on build#79566
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
MountUnmountIcebergTest test_simple_remount {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/79566#019bec49-07c1-4f49-8be4-626e1400e4d3 FLAKY 8/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.1693, p0=0.5245, reject_threshold=0.0100. adj_baseline=0.4267, p1=0.1280, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=MountUnmountIcebergTest&test_method=test_simple_remount
CompactionE2EIdempotencyTest test_basic_compaction {"initial_cleanup_policy": "compact", "workload": "IDEMPOTENCY"} integration https://buildkite.com/redpanda/redpanda/builds/79566#019bec49-07ca-4681-8bad-74dd1e562218 FLAKY 5/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CompactionE2EIdempotencyTest&test_method=test_basic_compaction
CompactionE2EIdempotencyTest test_basic_compaction {"initial_cleanup_policy": "compact", "workload": "TX_UNIQUE_KEYS"} integration https://buildkite.com/redpanda/redpanda/builds/79566#019bec49-07c1-4f49-8be4-626e1400e4d3 FLAKY 1/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CompactionE2EIdempotencyTest&test_method=test_basic_compaction
CompactionE2EIdempotencyTest test_basic_compaction {"initial_cleanup_policy": "delete", "workload": "IDEMPOTENCY"} integration https://buildkite.com/redpanda/redpanda/builds/79566#019bec49-07c2-4a75-aa85-63185862dcea FLAKY 9/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CompactionE2EIdempotencyTest&test_method=test_basic_compaction
CompactionE2EIdempotencyTest test_basic_compaction {"initial_cleanup_policy": "delete", "workload": "TX"} integration https://buildkite.com/redpanda/redpanda/builds/79566#019bec49-07c3-425c-9b04-16168fb2b46d FLAKY 5/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CompactionE2EIdempotencyTest&test_method=test_basic_compaction
CompactionE2EIdempotencyTest test_basic_compaction {"initial_cleanup_policy": "delete", "workload": "TX_UNIQUE_KEYS"} integration https://buildkite.com/redpanda/redpanda/builds/79566#019bec49-07c5-4d13-a74a-80e63155c1d2 FLAKY 5/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=CompactionE2EIdempotencyTest&test_method=test_basic_compaction
test results on build#79581
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
ControllerLogLimitMirrorMakerTests test_mirror_maker_with_limits null integration https://buildkite.com/redpanda/redpanda/builds/79581#019becd8-8aff-4b69-a4e3-0d623491fcfa FLAKY 19/21 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0045, p0=0.0867, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3917, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ControllerLogLimitMirrorMakerTests&test_method=test_mirror_maker_with_limits
RedpandaNodeOperationsSmokeTest test_node_ops_smoke_test {"cloud_storage_type": 1, "mixed_versions": false} integration https://buildkite.com/redpanda/redpanda/builds/79581#019beccf-748d-4b6d-bfb3-3dfc6c7c45b6 FAIL 0/1 https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=RedpandaNodeOperationsSmokeTest&test_method=test_node_ops_smoke_test
TestReadReplicaService test_identical_lwms_after_delete_records {"cloud_storage_type": 1, "partition_count": 5} integration https://buildkite.com/redpanda/redpanda/builds/79581#019beccf-748b-4b08-99c5-bde22db79953 FLAKY 7/11 Test FAILS after retries.Significant increase in flaky rate(baseline=0.0000, p0=0.0000, reject_threshold=0.0100) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=TestReadReplicaService&test_method=test_identical_lwms_after_delete_records
ScalingUpTest test_fast_node_addition null integration https://buildkite.com/redpanda/redpanda/builds/79581#019becd8-8b23-4acb-9888-d410f3625318 FLAKY 19/21 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0192, p0=0.3208, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3917, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ScalingUpTest&test_method=test_fast_node_addition

Comment thread src/v/config/configuration.cc Outdated
@WillemKauf WillemKauf force-pushed the storage_priority_compaction branch 3 times, most recently from 8293935 to 98a579a Compare January 23, 2026 21:19
@vbotbuildovich

vbotbuildovich commented Jan 23, 2026

Copy link
Copy Markdown
Collaborator

Retry command for Build#79581

please wait until all jobs are finished before running the slash command

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/read_replica_e2e_test.py::TestReadReplicaService.test_identical_lwms_after_delete_records@{"cloud_storage_type":1,"partition_count":5}
tests/rptest/tests/random_node_operations_smoke_test.py::RedpandaNodeOperationsSmokeTest.test_node_ops_smoke_test@{"cloud_storage_type":1,"mixed_versions":false}

@WillemKauf

Copy link
Copy Markdown
Contributor Author

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
tests/rptest/tests/read_replica_e2e_test.py::TestReadReplicaService.test_identical_lwms_after_delete_records@{"cloud_storage_type":1,"partition_count":5}
tests/rptest/tests/random_node_operations_smoke_test.py::RedpandaNodeOperationsSmokeTest.test_node_ops_smoke_test@{"cloud_storage_type":1,"mixed_versions":false}

// We clamp the offset up to which we can remove transactional control
// batches to the last snapshot taken by the transactional stm. This
// ensures that we do not remove control batches that may be needed to
// reconstruct the state machine during recovery.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh. Why did claude remove this comment when moving the function? I'll have to force push to fix this again- going to wait for other review comments to come in before doing so.

Comment thread src/v/storage/log_manager.cc Outdated
}
}

ss::future<> log_manager::priority_housekeeping_loop() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline a way to possibly simplify this without much code change to the existing loop

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked. LMKWYT

This will be useful in a future commit for de-duplicating code.
@WillemKauf WillemKauf force-pushed the storage_priority_compaction branch from 98a579a to dc02da6 Compare January 26, 2026 22:20
@WillemKauf WillemKauf requested a review from dotnwat January 26, 2026 22:20
The driving force behind this change is the fact that we have seen partitions
in `__consumer_offsets` get placed on the same shard as heavy-weight compaction
partitions which dominate run-time, leading to starvation and a intermitently
large `__consumer_offsets` topic.

To prevent this, constantly evaluate if priority partitions need compaction on
the interval `log_compaction_max_priority_wait_ms`. When any priority partition
requires compaction, the long running in-progress compaction will be preempted
and the priority partitions can be compacted.
@WillemKauf WillemKauf force-pushed the storage_priority_compaction branch from dc02da6 to 2beadb7 Compare January 26, 2026 22:21
@WillemKauf WillemKauf changed the title storage: add priority_housekeeping_loop storage: manage priority logs in 'housekeeping_loop` Jan 26, 2026
@WillemKauf WillemKauf changed the title storage: manage priority logs in 'housekeeping_loop` storage: manage priority logs in housekeeping_loop Jan 26, 2026

@dotnwat dotnwat left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks pretty good. ill review in more detail tomorrow.

one question: should we preempt all compactions (even priority compactions) that have been running for a long time? that is, given more than one priority compaction, should we try to make progress across the priority compactions?

@piyushredpanda

Copy link
Copy Markdown
Contributor

@rockwotj as an fyi in case he has an opinion

@WillemKauf

WillemKauf commented Jan 27, 2026

Copy link
Copy Markdown
Contributor Author

one question: should we preempt all compactions (even priority compactions) that have been running for a long time?

I think, for now, given that we know __consumer_offsets partitions currently are the only priority partitions and its compactability is quite high, we should just allow those compactions to finish without pre-emption.

that is, given more than one priority compaction, should we try to make progress across the priority compactions?

Ideally yes, if we should ever extend our "priority" definition.

@dotnwat

dotnwat commented Jan 27, 2026

Copy link
Copy Markdown
Member

one question: should we preempt all compactions (even priority compactions) that have been running for a long time?

I think, for now, given that we know __consumer_offsets partitions currently are the only priority partitions and its compactability is quite high, we should just allow those compactions to finish without pre-emption.

that is, given more than one priority compaction, should we try to make progress across the priority compactions?

Ideally yes, if we should ever extend our "priority" definition.

SGTM

@dotnwat dotnwat left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Comment on lines +623 to +633
// Create a composite abort source that triggers on shutdown or
// preemption. If no preempt_source is provided, just use _abort_source
// directly.
std::optional<ssx::composite_abort_source> composite_as;
auto& as = [this, &preempt_source, &composite_as]() -> ss::abort_source& {
if (preempt_source.has_value()) {
composite_as.emplace(_abort_source, preempt_source->get());
return composite_as->as();
}
return _abort_source;
}();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaict this commit is intended to be "factor out code for reuse", so this is net-new functionality and should probably be in the next commit?

continue;
}

current_log.flags |= bflags::compacted;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would one benefit of setting this flag before compaction ran is that if a problem occurred while compacting that housekeeping wouldn't reconsider it for a while?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is actually only considered when initializing the key-offset map starting on L299, it's not used for any scheduling decisions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh ok cool

log_meta.handle->config().min_cleanable_dirty_ratio(),
log_meta.handle->config().max_compaction_lag_ms());
continue;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't the priority compaction handling here also do something equivalent to

        if (is_not_set(current_log.flags, bflags::should_compact)) {
            // Still perform gc() here on a regular `log_compaction_interval_ms`
            // basis. Use `try_get_units()` to avoid concurrent garbage
            // collection with `gc_loop()`- if we fail to obtain units,
            // it is because urgent garbage collection is already underway for
            // this log.
            auto units = current_log.housekeeping_lock.try_get_units();
            if (units.has_value()) {
                co_await current_log.handle->gc(
                  gc_config(collection_threshold, _config.retention_bytes()));
            }
            continue;
        }

which looks like it is applied to non-priority partitions in the main loop. i don't actually know if it should be done or not, but this is an example of why factoring out the "what should i work on next" is great, because then we can have unified handling of that choice and this question likely doesn't even arise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah potentially a good point. Once again, because __consumer_offsets is the only priority topic, gc() should have no effect whatsoever here, but if we were to extend the definition of "priority", this would certainly need to be called.

but this is an example of why factoring out the "what should i work on next" is great, because then we can have unified handling of that choice and this question likely doesn't even arise.

Agreed, refactoring the entirety of housekeeping out into something more intelligent than a loop over the intrusive list would be great.

Comment on lines +417 to +418
// partitions can be serviced. If no priority partition needs
// compaction, rearm the timer to check on a shorter interval.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no priority partition needs compaction, rearm the timer to check on a shorter interval.

I can see that it uses a shorter interval by looking at the code, but the comment doesn't help explain why a shorter interval is used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add more color inline here if you'd like, but the point is that once the timer is fired for the first time, we have already exceeded log_compaction_max_priority_wait_ms, and so at any point if a priority partition needs compaction, we need to bail (therefore we should rearm on a shorter time frame than log_compaction_max_priority_wait_ms)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, that explanation makes sense.

I can add more color inline here

up to you. if your intention was to highlight arming the timer at a smaller timeout then the comment needs to be updated. otherwise you could leave it or remove it.

@WillemKauf WillemKauf requested a review from dotnwat January 27, 2026 16:21
@WillemKauf WillemKauf merged commit 76ccb1e into redpanda-data:dev Jan 27, 2026
22 checks passed
@vbotbuildovich

Copy link
Copy Markdown
Collaborator

/backport v25.3.x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants