diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 67cb0832b9314..192751a42fcd9 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -1030,6 +1030,13 @@ configuration::configuration() "How often to trigger background compaction.", {.needs_restart = needs_restart::no, .visibility = visibility::user}, 10s) + , log_compaction_max_priority_wait_ms( + *this, + "log_compaction_max_priority_wait_ms", + "Maximum time a priority partition (for example, __consumer_offsets) can " + "wait for compaction before preempting regular compaction.", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 30min) , tombstone_retention_ms( *this, "tombstone_retention_ms", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 096c63efd90f0..b3038c3a51063 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -261,6 +261,7 @@ struct configuration final : public config_store { // same as log.retention.ms in kafka retention_duration_property log_retention_ms; property log_compaction_interval_ms; + property log_compaction_max_priority_wait_ms; // same as delete.retention.ms in kafka property> tombstone_retention_ms; bounded_property, numeric_bounds> diff --git a/src/v/storage/BUILD b/src/v/storage/BUILD index eef7755dd2942..119ec7f2ea112 100644 --- a/src/v/storage/BUILD +++ b/src/v/storage/BUILD @@ -318,6 +318,7 @@ redpanda_cc_library( ], implementation_deps = [ ":log_manager_probe", + "//src/v/ssx:abort_source", "//src/v/syschecks", ], visibility = ["//visibility:public"], diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index a6ea04ed91cb2..d8367d1e9f309 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -18,6 +18,7 @@ #include "model/metadata.h" #include "model/timestamp.h" #include "resource_mgmt/memory_groups.h" +#include "ssx/abort_source.h" #include "ssx/async-clear.h" #include "ssx/future-util.h" #include "ssx/mutex.h" @@ -53,6 +54,7 @@ #include #include #include +#include #include #include #include @@ -69,9 +71,20 @@ #include #include #include +#include using namespace std::chrono_literals; +namespace { + +class priority_compaction_exception final : public std::runtime_error { +public: + explicit priority_compaction_exception(const std::string& msg) + : std::runtime_error(msg) {} +}; + +} // namespace + namespace storage { using logs_type = absl::flat_hash_map; @@ -85,7 +98,6 @@ log_config::log_config( , segment_size_jitter(0) // For deterministic behavior in unit tests. , compacted_segment_size(config::mock_binding(256_MiB)) , max_compacted_segment_size(config::mock_binding(5_GiB)) - , retention_bytes(config::mock_binding>(std::nullopt)) , compaction_interval( config::mock_binding(std::chrono::minutes(10))) @@ -357,6 +369,11 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { co_return; } + // Before each regular compaction, compact any priority logs that need + // it. This ensures priority partitions (e.g. __consumer_offsets) are + // not starved by long-running compactions. + co_await compact_priority_logs(collection_threshold); + auto& current_log = _logs_list.front(); _logs_list.shift_forward(); @@ -382,11 +399,9 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { continue; } - current_log.flags |= bflags::compacted; - current_log.last_compaction = ss::lowres_clock::now(); - - auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config( - current_log.handle->config().ntp()); + if (!current_log.link.is_linked()) { + continue; + } // Obtain housekeeping lock to prevent concurrency of // log->housekeeping() with gc fibre. @@ -397,87 +412,44 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { continue; } - // Until we better implement bailing out of compaction, the best thing - // we can do for observability is add a watchdog here. - auto ntp = current_log.handle->config().ntp(); - ssx::watchdog wd5m(5min, [ntp] { - vlog( - gclog.warn, "{}: Housekeeping process exceeding 5 minutes", ntp); - }); - - // NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents - // the removal of the parent object. this makes awaiting housekeeping - // safe against removal of segments from _logs_list - auto& log = current_log.handle; - auto pinned_kafka_offset - = current_log.handle->stm_manager()->lowest_pinned_data_offset(); - std::optional max_unpinned_offset; - if (pinned_kafka_offset) { - auto local_log_start = log->offsets().start_offset; - auto kafka_local_start = model::offset_cast( - log->from_log_offset(local_log_start)); - if (*pinned_kafka_offset >= kafka_local_start) { - // Translate the pinned Kafka offset. - max_unpinned_offset = model::prev_offset( - log->to_log_offset(kafka::offset_cast(*pinned_kafka_offset))); - } else { - // The pin falls below the log start, in which case the entire - // local log is pinned. - max_unpinned_offset = model::prev_offset( - log->offsets().start_offset); - } - } - model::offset max_compactible_offset - = current_log.handle->stm_manager()->max_removable_local_log_offset(); - model::offset max_tombstone_remove_offset - = current_log.handle->stm_manager()->max_tombstone_remove_offset(); - model::offset max_tx_end_remove_offset - = current_log.handle->stm_manager()->max_tx_end_remove_offset(); - model::offset tx_snapshot_offset - = current_log.handle->stm_manager()->tx_snapshot_offset(); - // We clamp the offset up to which we can remove transactional control - // batches to the last snapshot taken by the transactional stm. This - // ensures that we do not remove control batches that may be needed to - // reconstruct the state machine during recovery. - model::offset max_tx_remove_offset = std::min( - max_tx_end_remove_offset, tx_snapshot_offset); + // Set up timer-based preemption: if compaction exceeds the configured + // timeout and a priority partition needs compaction, abort so priority + // partitions can be serviced. If no priority partition needs + // compaction, rearm the timer to check on a shorter interval. + ss::abort_source preempt_as; + const auto& ntp = current_log.handle->config().ntp(); + auto timeout + = config::shard_local_cfg().log_compaction_max_priority_wait_ms(); + ss::timer preempt_timer; + preempt_timer.set_callback( + [this, &preempt_as, &ntp, &preempt_timer, timeout] { + bool priority_needs_compaction = std::ranges::any_of( + _priority_logs_list, + [](const auto& m) { return m.handle->needs_compaction(); }); + if (priority_needs_compaction) { + vlog( + gclog.info, + "{}: compaction exceeded {}ms, preempting for priority " + "partitions", + ntp, + timeout.count()); + preempt_as.request_abort_ex(priority_compaction_exception( + "Compaction pre-empted for compaction of priority " + "partition")); + } else { + // Rearm on a shorter time interval. + preempt_timer.arm(5min); + } + }); + preempt_timer.arm(timeout); - vlog( - gclog.trace, - "{}: max tombstone remove offset: {}, max tx remove offset: {}, max " - "tx end remove snapshot: {}, tx_snapshot_offset: {}", - ntp, - max_tombstone_remove_offset, - max_tx_remove_offset, - max_tx_end_remove_offset, - tx_snapshot_offset); - if ( - max_unpinned_offset - && *max_unpinned_offset < max_compactible_offset) { - vlog( - gclog.debug, - "{}: Compaction is pinned by offset: pinned Kafka offset: {}, " - "log offsets max unpinned {} < max removable {}", - ntp, - *pinned_kafka_offset, - *max_unpinned_offset, - max_compactible_offset); - max_compactible_offset = *max_unpinned_offset; + try { + co_await do_housekeeping( + current_log, collection_threshold, preempt_as); + } catch (const priority_compaction_exception& e) { + // Preempted for priority partition compaction. + vlog(gclog.info, "{} - {}", ntp, e); } - co_await current_log.handle->housekeeping( - housekeeping_config::make_config( - collection_threshold, - _config.retention_bytes(), - max_compactible_offset, - max_tombstone_remove_offset, - max_tx_remove_offset, - current_log.handle->config().delete_retention_ms(), - current_log.handle->config().delete_retention_ms(), - current_log.handle->config().min_compaction_lag_ms(), - _abort_source, - std::move(ntp_sanitizer_cfg), - _compaction_hash_key_map.get())); - _probe->housekeeping_log_processed(); } } @@ -685,6 +657,147 @@ ss::future<> log_manager::gc_loop() { } } +ss::future<> +log_manager::compact_priority_logs(model::timestamp collection_threshold) { + for (auto& log_meta : _priority_logs_list) { + if (_abort_source.abort_requested()) { + co_return; + } + + if (!log_meta.handle->needs_compaction()) { + vlog( + gclog.trace, + "{}: dirty ratio ({}) < min.cleanable.dirty.ratio ({}) and " + "time since earliest dirty timestamp does not exceed " + "max.compaction.lag.ms ({}), skipping compaction.", + log_meta.handle->config().ntp(), + log_meta.handle->dirty_ratio(), + log_meta.handle->config().min_cleanable_dirty_ratio(), + log_meta.handle->config().max_compaction_lag_ms()); + continue; + } + + auto gate = log_meta.housekeeping_gate.hold(); + + auto housekeeping_lock_holder + = co_await log_meta.housekeeping_lock.get_units(); + + if (!log_meta.link.is_linked()) { + continue; + } + + co_await do_housekeeping(log_meta, collection_threshold); + } +} + +bool log_manager::is_priority_ntp(const model::ntp& ntp) const { + return model::is_consumer_offsets_topic(ntp); +} + +ss::future<> log_manager::do_housekeeping( + log_housekeeping_meta& meta, + model::timestamp collection_threshold, + model::opt_abort_source_t preempt_source) { + if (!meta.link.is_linked()) { + co_return; + } + + auto ntp = meta.handle->config().ntp(); + auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config(ntp); + + // Until we better implement bailing out of compaction, the best thing + // we can do for observability is add a watchdog here. + ssx::watchdog wd5m(5min, [ntp] { + vlog(gclog.warn, "{}: Housekeeping process exceeding 5 minutes", ntp); + }); + + // Create a composite abort source that triggers on shutdown or + // preemption. If no preempt_source is provided, just use _abort_source + // directly. + std::optional composite_as; + auto& as = [this, &preempt_source, &composite_as]() -> ss::abort_source& { + if (preempt_source.has_value()) { + composite_as.emplace(_abort_source, preempt_source->get()); + return composite_as->as(); + } + return _abort_source; + }(); + + // NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents + // the removal of the parent object. this makes awaiting housekeeping + // safe against removal of segments from _logs_list + auto& log = meta.handle; + auto pinned_kafka_offset = log->stm_manager()->lowest_pinned_data_offset(); + std::optional max_unpinned_offset; + if (pinned_kafka_offset) { + auto local_log_start = log->offsets().start_offset; + auto kafka_local_start = model::offset_cast( + log->from_log_offset(local_log_start)); + if (*pinned_kafka_offset >= kafka_local_start) { + max_unpinned_offset = model::prev_offset( + log->to_log_offset(kafka::offset_cast(*pinned_kafka_offset))); + } else { + max_unpinned_offset = model::prev_offset( + log->offsets().start_offset); + } + } + + model::offset max_compactible_offset + = log->stm_manager()->max_removable_local_log_offset(); + model::offset max_tombstone_remove_offset + = log->stm_manager()->max_tombstone_remove_offset(); + model::offset max_tx_end_remove_offset + = log->stm_manager()->max_tx_end_remove_offset(); + model::offset tx_snapshot_offset = log->stm_manager()->tx_snapshot_offset(); + // We clamp the offset up to which we can remove transactional control + // batches to the last snapshot taken by the transactional stm. This + // ensures that we do not remove control batches that may be needed to + // reconstruct the state machine during recovery. + model::offset max_tx_remove_offset = std::min( + max_tx_end_remove_offset, tx_snapshot_offset); + + vlog( + gclog.trace, + "{}: max tombstone remove offset: {}, max tx remove offset: {}, max " + "tx end remove snapshot: {}, tx_snapshot_offset: {}", + ntp, + max_tombstone_remove_offset, + max_tx_remove_offset, + max_tx_end_remove_offset, + tx_snapshot_offset); + + if (max_unpinned_offset && *max_unpinned_offset < max_compactible_offset) { + vlog( + gclog.debug, + "{}: Compaction is pinned by offset: pinned Kafka offset: {}, " + "log offsets max unpinned {} < max removable {}", + ntp, + *pinned_kafka_offset, + *max_unpinned_offset, + max_compactible_offset); + max_compactible_offset = *max_unpinned_offset; + } + + co_await log->housekeeping( + housekeeping_config::make_config( + collection_threshold, + _config.retention_bytes(), + max_compactible_offset, + max_tombstone_remove_offset, + max_tx_remove_offset, + log->config().delete_retention_ms(), + log->config().delete_retention_ms(), + log->config().min_compaction_lag_ms(), + as, + std::move(ntp_sanitizer_cfg), + _compaction_hash_key_map.get())); + + meta.flags |= log_housekeeping_meta::bitflags::compacted; + meta.last_compaction = ss::lowres_clock::now(); + + _probe->housekeeping_log_processed(); +} + /** * * @param read_buf_size size of underlying ss::input_stream's buffer @@ -802,7 +915,18 @@ ss::future> log_manager::do_manage( std::move(translator_batch_types)); auto [it, success] = _logs.emplace( l->config().ntp(), std::make_unique(l)); - _logs_list.push_back(*it->second); + + // Add to appropriate list based on whether this is a priority NTP. + if (is_priority_ntp(l->config().ntp())) { + _priority_logs_list.push_back(*it->second); + vlog( + stlog.debug, + "Tracking {} as priority partition for compaction", + l->config().ntp()); + } else { + _logs_list.push_back(*it->second); + } + update_log_count(); vassert(success, "Could not keep track of:{} - concurrency issue", l); co_return l; diff --git a/src/v/storage/log_manager.h b/src/v/storage/log_manager.h index 27e5ccde03028..239d97d306817 100644 --- a/src/v/storage/log_manager.h +++ b/src/v/storage/log_manager.h @@ -22,6 +22,7 @@ #include "features/feature_table.h" #include "model/fundamental.h" #include "model/metadata.h" +#include "model/namespace.h" #include "random/simple_time_jitter.h" #include "ssx/mutex.h" #include "storage/batch_cache.h" @@ -294,6 +295,21 @@ class log_manager { bool _gc_triggered{false}; ssx::semaphore _gc_sem{0, "log_manager::gc"}; + // Check if an NTP is a priority partition for compaction. + bool is_priority_ntp(const model::ntp& ntp) const; + + // Run housekeeping (gc + compaction) on a single partition. + // The optional abort source is combined with _abort_source to create + // a composite that triggers on either (used for timer-based preemption). + ss::future<> do_housekeeping( + log_housekeeping_meta& meta, + model::timestamp collection_threshold, + model::opt_abort_source_t preempt_source = std::nullopt); + + // Compact any priority partitions that need it. Called before each + // regular compaction iteration to ensure priority logs aren't starved. + ss::future<> compact_priority_logs(model::timestamp collection_threshold); + disk_space_alert _disk_space_alert{disk_space_alert::ok}; ss::future<> dispatch_topic_dir_deletion(ss::sstring dir); @@ -310,6 +326,8 @@ class log_manager { simple_time_jitter _trigger_gc_jitter; logs_type _logs; compaction_list_type _logs_list; + // List of priority partitions managed by housekeeping_loop(). + compaction_list_type _priority_logs_list; batch_cache _batch_cache; // Hash key-map to use across multiple compactions to reuse reserved memory