Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
Expand All @@ -70,9 +71,20 @@
#include <filesystem>
#include <functional>
#include <optional>
#include <ranges>

using namespace std::chrono_literals;

namespace {

class priority_compaction_exception final : public std::runtime_error {
public:
explicit priority_compaction_exception(const std::string& msg)
: std::runtime_error(msg) {}
};
Comment thread
WillemKauf marked this conversation as resolved.

} // namespace

namespace storage {
using logs_type = absl::flat_hash_map<model::ntp, log_housekeeping_meta>;

Expand Down Expand Up @@ -357,6 +369,11 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
co_return;
}

// Before each regular compaction, compact any priority logs that need
// it. This ensures priority partitions (e.g. __consumer_offsets) are
// not starved by long-running compactions.
co_await compact_priority_logs(collection_threshold);

auto& current_log = _logs_list.front();

_logs_list.shift_forward();
Expand Down Expand Up @@ -395,7 +412,44 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
continue;
}

co_await do_housekeeping(current_log, collection_threshold);
// Set up timer-based preemption: if compaction exceeds the configured
// timeout and a priority partition needs compaction, abort so priority
// partitions can be serviced. If no priority partition needs
// compaction, rearm the timer to check on a shorter interval.
Comment on lines +417 to +418

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no priority partition needs compaction, rearm the timer to check on a shorter interval.

I can see that it uses a shorter interval by looking at the code, but the comment doesn't help explain why a shorter interval is used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add more color inline here if you'd like, but the point is that once the timer is fired for the first time, we have already exceeded log_compaction_max_priority_wait_ms, and so at any point if a priority partition needs compaction, we need to bail (therefore we should rearm on a shorter time frame than log_compaction_max_priority_wait_ms)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, that explanation makes sense.

I can add more color inline here

up to you. if your intention was to highlight arming the timer at a smaller timeout then the comment needs to be updated. otherwise you could leave it or remove it.

ss::abort_source preempt_as;
const auto& ntp = current_log.handle->config().ntp();
auto timeout
= config::shard_local_cfg().log_compaction_max_priority_wait_ms();
ss::timer<ss::lowres_clock> preempt_timer;
preempt_timer.set_callback(
[this, &preempt_as, &ntp, &preempt_timer, timeout] {
bool priority_needs_compaction = std::ranges::any_of(
_priority_logs_list,
[](const auto& m) { return m.handle->needs_compaction(); });
if (priority_needs_compaction) {
vlog(
gclog.info,
"{}: compaction exceeded {}ms, preempting for priority "
"partitions",
ntp,
timeout.count());
preempt_as.request_abort_ex(priority_compaction_exception(
"Compaction pre-empted for compaction of priority "
"partition"));
} else {
// Rearm on a shorter time interval.
preempt_timer.arm(5min);
}
});
preempt_timer.arm(timeout);

try {
co_await do_housekeeping(
current_log, collection_threshold, preempt_as);
} catch (const priority_compaction_exception& e) {
// Preempted for priority partition compaction.
vlog(gclog.info, "{} - {}", ntp, e);
}
}
}

Expand Down Expand Up @@ -603,6 +657,43 @@ ss::future<> log_manager::gc_loop() {
}
}

ss::future<>
log_manager::compact_priority_logs(model::timestamp collection_threshold) {
for (auto& log_meta : _priority_logs_list) {
if (_abort_source.abort_requested()) {
co_return;
}

if (!log_meta.handle->needs_compaction()) {
vlog(
gclog.trace,
"{}: dirty ratio ({}) < min.cleanable.dirty.ratio ({}) and "
"time since earliest dirty timestamp does not exceed "
"max.compaction.lag.ms ({}), skipping compaction.",
log_meta.handle->config().ntp(),
log_meta.handle->dirty_ratio(),
log_meta.handle->config().min_cleanable_dirty_ratio(),
log_meta.handle->config().max_compaction_lag_ms());
continue;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't the priority compaction handling here also do something equivalent to

        if (is_not_set(current_log.flags, bflags::should_compact)) {
            // Still perform gc() here on a regular `log_compaction_interval_ms`
            // basis. Use `try_get_units()` to avoid concurrent garbage
            // collection with `gc_loop()`- if we fail to obtain units,
            // it is because urgent garbage collection is already underway for
            // this log.
            auto units = current_log.housekeeping_lock.try_get_units();
            if (units.has_value()) {
                co_await current_log.handle->gc(
                  gc_config(collection_threshold, _config.retention_bytes()));
            }
            continue;
        }

which looks like it is applied to non-priority partitions in the main loop. i don't actually know if it should be done or not, but this is an example of why factoring out the "what should i work on next" is great, because then we can have unified handling of that choice and this question likely doesn't even arise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah potentially a good point. Once again, because __consumer_offsets is the only priority topic, gc() should have no effect whatsoever here, but if we were to extend the definition of "priority", this would certainly need to be called.

but this is an example of why factoring out the "what should i work on next" is great, because then we can have unified handling of that choice and this question likely doesn't even arise.

Agreed, refactoring the entirety of housekeeping out into something more intelligent than a loop over the intrusive list would be great.


auto gate = log_meta.housekeeping_gate.hold();

auto housekeeping_lock_holder
= co_await log_meta.housekeeping_lock.get_units();

if (!log_meta.link.is_linked()) {
continue;
}

co_await do_housekeeping(log_meta, collection_threshold);
}
}

bool log_manager::is_priority_ntp(const model::ntp& ntp) const {
return model::is_consumer_offsets_topic(ntp);
}

ss::future<> log_manager::do_housekeeping(
log_housekeeping_meta& meta,
model::timestamp collection_threshold,
Expand Down Expand Up @@ -824,7 +915,18 @@ ss::future<ss::shared_ptr<log>> log_manager::do_manage(
std::move(translator_batch_types));
auto [it, success] = _logs.emplace(
l->config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);

// Add to appropriate list based on whether this is a priority NTP.
if (is_priority_ntp(l->config().ntp())) {
_priority_logs_list.push_back(*it->second);
vlog(
stlog.debug,
"Tracking {} as priority partition for compaction",
l->config().ntp());
} else {
_logs_list.push_back(*it->second);
}

update_log_count();
vassert(success, "Could not keep track of:{} - concurrency issue", l);
co_return l;
Expand Down
9 changes: 9 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ class log_manager {
bool _gc_triggered{false};
ssx::semaphore _gc_sem{0, "log_manager::gc"};

// Check if an NTP is a priority partition for compaction.
bool is_priority_ntp(const model::ntp& ntp) const;

// Run housekeeping (gc + compaction) on a single partition.
// The optional abort source is combined with _abort_source to create
// a composite that triggers on either (used for timer-based preemption).
Expand All @@ -303,6 +306,10 @@ class log_manager {
model::timestamp collection_threshold,
model::opt_abort_source_t preempt_source = std::nullopt);

// Compact any priority partitions that need it. Called before each
// regular compaction iteration to ensure priority logs aren't starved.
ss::future<> compact_priority_logs(model::timestamp collection_threshold);

disk_space_alert _disk_space_alert{disk_space_alert::ok};

ss::future<> dispatch_topic_dir_deletion(ss::sstring dir);
Expand All @@ -319,6 +326,8 @@ class log_manager {
simple_time_jitter<ss::lowres_clock> _trigger_gc_jitter;
logs_type _logs;
compaction_list_type _logs_list;
// List of priority partitions managed by housekeeping_loop().
compaction_list_type _priority_logs_list;
batch_cache _batch_cache;

// Hash key-map to use across multiple compactions to reuse reserved memory
Expand Down