Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,13 @@ configuration::configuration()
"How often to trigger background compaction.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
10s)
, log_compaction_max_priority_wait_ms(
*this,
"log_compaction_max_priority_wait_ms",
"Maximum time a priority partition (for example, __consumer_offsets) can "
"wait for compaction before preempting regular compaction.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
30min)
, tombstone_retention_ms(
*this,
"tombstone_retention_ms",
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ struct configuration final : public config_store {
// same as log.retention.ms in kafka
retention_duration_property log_retention_ms;
property<std::chrono::milliseconds> log_compaction_interval_ms;
property<std::chrono::milliseconds> log_compaction_max_priority_wait_ms;
// same as delete.retention.ms in kafka
property<std::optional<std::chrono::milliseconds>> tombstone_retention_ms;
bounded_property<std::optional<double>, numeric_bounds>
Expand Down
1 change: 1 addition & 0 deletions src/v/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ redpanda_cc_library(
],
implementation_deps = [
":log_manager_probe",
"//src/v/ssx:abort_source",
"//src/v/syschecks",
],
visibility = ["//visibility:public"],
Expand Down
296 changes: 210 additions & 86 deletions src/v/storage/log_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "model/metadata.h"
#include "model/timestamp.h"
#include "resource_mgmt/memory_groups.h"
#include "ssx/abort_source.h"
#include "ssx/async-clear.h"
#include "ssx/future-util.h"
#include "ssx/mutex.h"
Expand Down Expand Up @@ -53,6 +54,7 @@
#include <seastar/core/semaphore.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/with_scheduling_group.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
Expand All @@ -69,9 +71,20 @@
#include <filesystem>
#include <functional>
#include <optional>
#include <ranges>

using namespace std::chrono_literals;

namespace {

class priority_compaction_exception final : public std::runtime_error {
public:
explicit priority_compaction_exception(const std::string& msg)
: std::runtime_error(msg) {}
};
Comment thread
WillemKauf marked this conversation as resolved.

} // namespace

namespace storage {
using logs_type = absl::flat_hash_map<model::ntp, log_housekeeping_meta>;

Expand All @@ -85,7 +98,6 @@ log_config::log_config(
, segment_size_jitter(0) // For deterministic behavior in unit tests.
, compacted_segment_size(config::mock_binding<size_t>(256_MiB))
, max_compacted_segment_size(config::mock_binding<size_t>(5_GiB))

, retention_bytes(config::mock_binding<std::optional<size_t>>(std::nullopt))
, compaction_interval(
config::mock_binding<std::chrono::milliseconds>(std::chrono::minutes(10)))
Expand Down Expand Up @@ -357,6 +369,11 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
co_return;
}

// Before each regular compaction, compact any priority logs that need
// it. This ensures priority partitions (e.g. __consumer_offsets) are
// not starved by long-running compactions.
co_await compact_priority_logs(collection_threshold);

auto& current_log = _logs_list.front();

_logs_list.shift_forward();
Expand All @@ -382,11 +399,9 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
continue;
}

Comment thread
WillemKauf marked this conversation as resolved.
current_log.flags |= bflags::compacted;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would one benefit of setting this flag before compaction ran is that if a problem occurred while compacting that housekeeping wouldn't reconsider it for a while?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is actually only considered when initializing the key-offset map starting on L299, it's not used for any scheduling decisions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh ok cool

current_log.last_compaction = ss::lowres_clock::now();

auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config(
current_log.handle->config().ntp());
if (!current_log.link.is_linked()) {
continue;
}

// Obtain housekeeping lock to prevent concurrency of
// log->housekeeping() with gc fibre.
Expand All @@ -397,87 +412,44 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
continue;
}

// Until we better implement bailing out of compaction, the best thing
// we can do for observability is add a watchdog here.
auto ntp = current_log.handle->config().ntp();
ssx::watchdog wd5m(5min, [ntp] {
vlog(
gclog.warn, "{}: Housekeeping process exceeding 5 minutes", ntp);
});

// NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents
// the removal of the parent object. this makes awaiting housekeeping
// safe against removal of segments from _logs_list
auto& log = current_log.handle;
auto pinned_kafka_offset
= current_log.handle->stm_manager()->lowest_pinned_data_offset();
std::optional<model::offset> max_unpinned_offset;
if (pinned_kafka_offset) {
auto local_log_start = log->offsets().start_offset;
auto kafka_local_start = model::offset_cast(
log->from_log_offset(local_log_start));
if (*pinned_kafka_offset >= kafka_local_start) {
// Translate the pinned Kafka offset.
max_unpinned_offset = model::prev_offset(
log->to_log_offset(kafka::offset_cast(*pinned_kafka_offset)));
} else {
// The pin falls below the log start, in which case the entire
// local log is pinned.
max_unpinned_offset = model::prev_offset(
log->offsets().start_offset);
}
}
model::offset max_compactible_offset
= current_log.handle->stm_manager()->max_removable_local_log_offset();
model::offset max_tombstone_remove_offset
= current_log.handle->stm_manager()->max_tombstone_remove_offset();
model::offset max_tx_end_remove_offset
= current_log.handle->stm_manager()->max_tx_end_remove_offset();
model::offset tx_snapshot_offset
= current_log.handle->stm_manager()->tx_snapshot_offset();
// We clamp the offset up to which we can remove transactional control
// batches to the last snapshot taken by the transactional stm. This
// ensures that we do not remove control batches that may be needed to
// reconstruct the state machine during recovery.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh. Why did claude remove this comment when moving the function? I'll have to force push to fix this again- going to wait for other review comments to come in before doing so.

model::offset max_tx_remove_offset = std::min(
max_tx_end_remove_offset, tx_snapshot_offset);
// Set up timer-based preemption: if compaction exceeds the configured
// timeout and a priority partition needs compaction, abort so priority
// partitions can be serviced. If no priority partition needs
// compaction, rearm the timer to check on a shorter interval.
Comment on lines +417 to +418

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no priority partition needs compaction, rearm the timer to check on a shorter interval.

I can see that it uses a shorter interval by looking at the code, but the comment doesn't help explain why a shorter interval is used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add more color inline here if you'd like, but the point is that once the timer is fired for the first time, we have already exceeded log_compaction_max_priority_wait_ms, and so at any point if a priority partition needs compaction, we need to bail (therefore we should rearm on a shorter time frame than log_compaction_max_priority_wait_ms)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, that explanation makes sense.

I can add more color inline here

up to you. if your intention was to highlight arming the timer at a smaller timeout then the comment needs to be updated. otherwise you could leave it or remove it.

ss::abort_source preempt_as;
const auto& ntp = current_log.handle->config().ntp();
auto timeout
= config::shard_local_cfg().log_compaction_max_priority_wait_ms();
ss::timer<ss::lowres_clock> preempt_timer;
preempt_timer.set_callback(
[this, &preempt_as, &ntp, &preempt_timer, timeout] {
bool priority_needs_compaction = std::ranges::any_of(
_priority_logs_list,
[](const auto& m) { return m.handle->needs_compaction(); });
if (priority_needs_compaction) {
vlog(
gclog.info,
"{}: compaction exceeded {}ms, preempting for priority "
"partitions",
ntp,
timeout.count());
preempt_as.request_abort_ex(priority_compaction_exception(
"Compaction pre-empted for compaction of priority "
"partition"));
} else {
// Rearm on a shorter time interval.
preempt_timer.arm(5min);
}
});
preempt_timer.arm(timeout);

vlog(
gclog.trace,
"{}: max tombstone remove offset: {}, max tx remove offset: {}, max "
"tx end remove snapshot: {}, tx_snapshot_offset: {}",
ntp,
max_tombstone_remove_offset,
max_tx_remove_offset,
max_tx_end_remove_offset,
tx_snapshot_offset);
if (
max_unpinned_offset
&& *max_unpinned_offset < max_compactible_offset) {
vlog(
gclog.debug,
"{}: Compaction is pinned by offset: pinned Kafka offset: {}, "
"log offsets max unpinned {} < max removable {}",
ntp,
*pinned_kafka_offset,
*max_unpinned_offset,
max_compactible_offset);
max_compactible_offset = *max_unpinned_offset;
try {
co_await do_housekeeping(
current_log, collection_threshold, preempt_as);
} catch (const priority_compaction_exception& e) {
// Preempted for priority partition compaction.
vlog(gclog.info, "{} - {}", ntp, e);
}
co_await current_log.handle->housekeeping(
housekeeping_config::make_config(
collection_threshold,
_config.retention_bytes(),
max_compactible_offset,
max_tombstone_remove_offset,
max_tx_remove_offset,
current_log.handle->config().delete_retention_ms(),
current_log.handle->config().delete_retention_ms(),
current_log.handle->config().min_compaction_lag_ms(),
_abort_source,
std::move(ntp_sanitizer_cfg),
_compaction_hash_key_map.get()));
_probe->housekeeping_log_processed();
}
}

Expand Down Expand Up @@ -685,6 +657,147 @@ ss::future<> log_manager::gc_loop() {
}
}

ss::future<>
log_manager::compact_priority_logs(model::timestamp collection_threshold) {
for (auto& log_meta : _priority_logs_list) {
if (_abort_source.abort_requested()) {
co_return;
}

if (!log_meta.handle->needs_compaction()) {
vlog(
gclog.trace,
"{}: dirty ratio ({}) < min.cleanable.dirty.ratio ({}) and "
"time since earliest dirty timestamp does not exceed "
"max.compaction.lag.ms ({}), skipping compaction.",
log_meta.handle->config().ntp(),
log_meta.handle->dirty_ratio(),
log_meta.handle->config().min_cleanable_dirty_ratio(),
log_meta.handle->config().max_compaction_lag_ms());
continue;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't the priority compaction handling here also do something equivalent to

        if (is_not_set(current_log.flags, bflags::should_compact)) {
            // Still perform gc() here on a regular `log_compaction_interval_ms`
            // basis. Use `try_get_units()` to avoid concurrent garbage
            // collection with `gc_loop()`- if we fail to obtain units,
            // it is because urgent garbage collection is already underway for
            // this log.
            auto units = current_log.housekeeping_lock.try_get_units();
            if (units.has_value()) {
                co_await current_log.handle->gc(
                  gc_config(collection_threshold, _config.retention_bytes()));
            }
            continue;
        }

which looks like it is applied to non-priority partitions in the main loop. i don't actually know if it should be done or not, but this is an example of why factoring out the "what should i work on next" is great, because then we can have unified handling of that choice and this question likely doesn't even arise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah potentially a good point. Once again, because __consumer_offsets is the only priority topic, gc() should have no effect whatsoever here, but if we were to extend the definition of "priority", this would certainly need to be called.

but this is an example of why factoring out the "what should i work on next" is great, because then we can have unified handling of that choice and this question likely doesn't even arise.

Agreed, refactoring the entirety of housekeeping out into something more intelligent than a loop over the intrusive list would be great.


auto gate = log_meta.housekeeping_gate.hold();

auto housekeeping_lock_holder
= co_await log_meta.housekeeping_lock.get_units();

if (!log_meta.link.is_linked()) {
continue;
}

co_await do_housekeeping(log_meta, collection_threshold);
}
}

bool log_manager::is_priority_ntp(const model::ntp& ntp) const {
return model::is_consumer_offsets_topic(ntp);
}

ss::future<> log_manager::do_housekeeping(
log_housekeeping_meta& meta,
model::timestamp collection_threshold,
model::opt_abort_source_t preempt_source) {
if (!meta.link.is_linked()) {
co_return;
}

auto ntp = meta.handle->config().ntp();
auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config(ntp);

// Until we better implement bailing out of compaction, the best thing
// we can do for observability is add a watchdog here.
ssx::watchdog wd5m(5min, [ntp] {
vlog(gclog.warn, "{}: Housekeeping process exceeding 5 minutes", ntp);
});

// Create a composite abort source that triggers on shutdown or
// preemption. If no preempt_source is provided, just use _abort_source
// directly.
std::optional<ssx::composite_abort_source> composite_as;
auto& as = [this, &preempt_source, &composite_as]() -> ss::abort_source& {
if (preempt_source.has_value()) {
composite_as.emplace(_abort_source, preempt_source->get());
return composite_as->as();
}
return _abort_source;
}();
Comment on lines +714 to +724

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaict this commit is intended to be "factor out code for reuse", so this is net-new functionality and should probably be in the next commit?


// NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents
// the removal of the parent object. this makes awaiting housekeeping
// safe against removal of segments from _logs_list
auto& log = meta.handle;
auto pinned_kafka_offset = log->stm_manager()->lowest_pinned_data_offset();
std::optional<model::offset> max_unpinned_offset;
if (pinned_kafka_offset) {
auto local_log_start = log->offsets().start_offset;
auto kafka_local_start = model::offset_cast(
log->from_log_offset(local_log_start));
if (*pinned_kafka_offset >= kafka_local_start) {
max_unpinned_offset = model::prev_offset(
log->to_log_offset(kafka::offset_cast(*pinned_kafka_offset)));
} else {
max_unpinned_offset = model::prev_offset(
log->offsets().start_offset);
}
}

model::offset max_compactible_offset
= log->stm_manager()->max_removable_local_log_offset();
model::offset max_tombstone_remove_offset
= log->stm_manager()->max_tombstone_remove_offset();
model::offset max_tx_end_remove_offset
= log->stm_manager()->max_tx_end_remove_offset();
model::offset tx_snapshot_offset = log->stm_manager()->tx_snapshot_offset();
// We clamp the offset up to which we can remove transactional control
// batches to the last snapshot taken by the transactional stm. This
// ensures that we do not remove control batches that may be needed to
// reconstruct the state machine during recovery.
model::offset max_tx_remove_offset = std::min(
max_tx_end_remove_offset, tx_snapshot_offset);

vlog(
gclog.trace,
"{}: max tombstone remove offset: {}, max tx remove offset: {}, max "
"tx end remove snapshot: {}, tx_snapshot_offset: {}",
ntp,
max_tombstone_remove_offset,
max_tx_remove_offset,
max_tx_end_remove_offset,
tx_snapshot_offset);

if (max_unpinned_offset && *max_unpinned_offset < max_compactible_offset) {
vlog(
gclog.debug,
"{}: Compaction is pinned by offset: pinned Kafka offset: {}, "
"log offsets max unpinned {} < max removable {}",
ntp,
*pinned_kafka_offset,
*max_unpinned_offset,
max_compactible_offset);
max_compactible_offset = *max_unpinned_offset;
}

co_await log->housekeeping(
housekeeping_config::make_config(
collection_threshold,
_config.retention_bytes(),
max_compactible_offset,
max_tombstone_remove_offset,
max_tx_remove_offset,
log->config().delete_retention_ms(),
log->config().delete_retention_ms(),
log->config().min_compaction_lag_ms(),
as,
std::move(ntp_sanitizer_cfg),
_compaction_hash_key_map.get()));

meta.flags |= log_housekeeping_meta::bitflags::compacted;
meta.last_compaction = ss::lowres_clock::now();

_probe->housekeeping_log_processed();
}

/**
*
* @param read_buf_size size of underlying ss::input_stream's buffer
Expand Down Expand Up @@ -802,7 +915,18 @@ ss::future<ss::shared_ptr<log>> log_manager::do_manage(
std::move(translator_batch_types));
auto [it, success] = _logs.emplace(
l->config().ntp(), std::make_unique<log_housekeeping_meta>(l));
_logs_list.push_back(*it->second);

// Add to appropriate list based on whether this is a priority NTP.
if (is_priority_ntp(l->config().ntp())) {
_priority_logs_list.push_back(*it->second);
vlog(
stlog.debug,
"Tracking {} as priority partition for compaction",
l->config().ntp());
} else {
_logs_list.push_back(*it->second);
}

update_log_count();
vassert(success, "Could not keep track of:{} - concurrency issue", l);
co_return l;
Expand Down
Loading