-
Notifications
You must be signed in to change notification settings - Fork 749
storage: manage priority logs in housekeeping_loop
#29382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| #include "model/metadata.h" | ||
| #include "model/timestamp.h" | ||
| #include "resource_mgmt/memory_groups.h" | ||
| #include "ssx/abort_source.h" | ||
| #include "ssx/async-clear.h" | ||
| #include "ssx/future-util.h" | ||
| #include "ssx/mutex.h" | ||
|
|
@@ -53,6 +54,7 @@ | |
| #include <seastar/core/semaphore.hh> | ||
| #include <seastar/core/shared_ptr.hh> | ||
| #include <seastar/core/thread.hh> | ||
| #include <seastar/core/timer.hh> | ||
| #include <seastar/core/with_scheduling_group.hh> | ||
| #include <seastar/coroutine/maybe_yield.hh> | ||
| #include <seastar/coroutine/parallel_for_each.hh> | ||
|
|
@@ -69,9 +71,20 @@ | |
| #include <filesystem> | ||
| #include <functional> | ||
| #include <optional> | ||
| #include <ranges> | ||
|
|
||
| using namespace std::chrono_literals; | ||
|
|
||
| namespace { | ||
|
|
||
| class priority_compaction_exception final : public std::runtime_error { | ||
| public: | ||
| explicit priority_compaction_exception(const std::string& msg) | ||
| : std::runtime_error(msg) {} | ||
| }; | ||
|
|
||
| } // namespace | ||
|
|
||
| namespace storage { | ||
| using logs_type = absl::flat_hash_map<model::ntp, log_housekeeping_meta>; | ||
|
|
||
|
|
@@ -85,7 +98,6 @@ log_config::log_config( | |
| , segment_size_jitter(0) // For deterministic behavior in unit tests. | ||
| , compacted_segment_size(config::mock_binding<size_t>(256_MiB)) | ||
| , max_compacted_segment_size(config::mock_binding<size_t>(5_GiB)) | ||
|
|
||
| , retention_bytes(config::mock_binding<std::optional<size_t>>(std::nullopt)) | ||
| , compaction_interval( | ||
| config::mock_binding<std::chrono::milliseconds>(std::chrono::minutes(10))) | ||
|
|
@@ -357,6 +369,11 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { | |
| co_return; | ||
| } | ||
|
|
||
| // Before each regular compaction, compact any priority logs that need | ||
| // it. This ensures priority partitions (e.g. __consumer_offsets) are | ||
| // not starved by long-running compactions. | ||
| co_await compact_priority_logs(collection_threshold); | ||
|
|
||
| auto& current_log = _logs_list.front(); | ||
|
|
||
| _logs_list.shift_forward(); | ||
|
|
@@ -382,11 +399,9 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { | |
| continue; | ||
| } | ||
|
|
||
|
WillemKauf marked this conversation as resolved.
|
||
| current_log.flags |= bflags::compacted; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would one benefit of setting this flag before compaction ran is that if a problem occurred while compacting that housekeeping wouldn't reconsider it for a while?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This flag is actually only considered when initializing the key-offset map starting on L299, it's not used for any scheduling decisions.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahh ok cool |
||
| current_log.last_compaction = ss::lowres_clock::now(); | ||
|
|
||
| auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config( | ||
| current_log.handle->config().ntp()); | ||
| if (!current_log.link.is_linked()) { | ||
| continue; | ||
| } | ||
|
|
||
| // Obtain housekeeping lock to prevent concurrency of | ||
| // log->housekeeping() with gc fibre. | ||
|
|
@@ -397,87 +412,44 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) { | |
| continue; | ||
| } | ||
|
|
||
| // Until we better implement bailing out of compaction, the best thing | ||
| // we can do for observability is add a watchdog here. | ||
| auto ntp = current_log.handle->config().ntp(); | ||
| ssx::watchdog wd5m(5min, [ntp] { | ||
| vlog( | ||
| gclog.warn, "{}: Housekeeping process exceeding 5 minutes", ntp); | ||
| }); | ||
|
|
||
| // NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents | ||
| // the removal of the parent object. this makes awaiting housekeeping | ||
| // safe against removal of segments from _logs_list | ||
| auto& log = current_log.handle; | ||
| auto pinned_kafka_offset | ||
| = current_log.handle->stm_manager()->lowest_pinned_data_offset(); | ||
| std::optional<model::offset> max_unpinned_offset; | ||
| if (pinned_kafka_offset) { | ||
| auto local_log_start = log->offsets().start_offset; | ||
| auto kafka_local_start = model::offset_cast( | ||
| log->from_log_offset(local_log_start)); | ||
| if (*pinned_kafka_offset >= kafka_local_start) { | ||
| // Translate the pinned Kafka offset. | ||
| max_unpinned_offset = model::prev_offset( | ||
| log->to_log_offset(kafka::offset_cast(*pinned_kafka_offset))); | ||
| } else { | ||
| // The pin falls below the log start, in which case the entire | ||
| // local log is pinned. | ||
| max_unpinned_offset = model::prev_offset( | ||
| log->offsets().start_offset); | ||
| } | ||
| } | ||
| model::offset max_compactible_offset | ||
| = current_log.handle->stm_manager()->max_removable_local_log_offset(); | ||
| model::offset max_tombstone_remove_offset | ||
| = current_log.handle->stm_manager()->max_tombstone_remove_offset(); | ||
| model::offset max_tx_end_remove_offset | ||
| = current_log.handle->stm_manager()->max_tx_end_remove_offset(); | ||
| model::offset tx_snapshot_offset | ||
| = current_log.handle->stm_manager()->tx_snapshot_offset(); | ||
| // We clamp the offset up to which we can remove transactional control | ||
| // batches to the last snapshot taken by the transactional stm. This | ||
| // ensures that we do not remove control batches that may be needed to | ||
| // reconstruct the state machine during recovery. | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Argh. Why did claude remove this comment when moving the function? I'll have to force push to fix this again- going to wait for other review comments to come in before doing so. |
||
| model::offset max_tx_remove_offset = std::min( | ||
| max_tx_end_remove_offset, tx_snapshot_offset); | ||
| // Set up timer-based preemption: if compaction exceeds the configured | ||
| // timeout and a priority partition needs compaction, abort so priority | ||
| // partitions can be serviced. If no priority partition needs | ||
| // compaction, rearm the timer to check on a shorter interval. | ||
|
Comment on lines
+417
to
+418
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I can see that it uses a shorter interval by looking at the code, but the comment doesn't help explain why a shorter interval is used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can add more color inline here if you'd like, but the point is that once the timer is fired for the first time, we have already exceeded
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks, that explanation makes sense.
up to you. if your intention was to highlight arming the timer at a smaller timeout then the comment needs to be updated. otherwise you could leave it or remove it. |
||
| ss::abort_source preempt_as; | ||
| const auto& ntp = current_log.handle->config().ntp(); | ||
| auto timeout | ||
| = config::shard_local_cfg().log_compaction_max_priority_wait_ms(); | ||
| ss::timer<ss::lowres_clock> preempt_timer; | ||
| preempt_timer.set_callback( | ||
| [this, &preempt_as, &ntp, &preempt_timer, timeout] { | ||
| bool priority_needs_compaction = std::ranges::any_of( | ||
| _priority_logs_list, | ||
| [](const auto& m) { return m.handle->needs_compaction(); }); | ||
| if (priority_needs_compaction) { | ||
| vlog( | ||
| gclog.info, | ||
| "{}: compaction exceeded {}ms, preempting for priority " | ||
| "partitions", | ||
| ntp, | ||
| timeout.count()); | ||
| preempt_as.request_abort_ex(priority_compaction_exception( | ||
| "Compaction pre-empted for compaction of priority " | ||
| "partition")); | ||
| } else { | ||
| // Rearm on a shorter time interval. | ||
| preempt_timer.arm(5min); | ||
| } | ||
| }); | ||
| preempt_timer.arm(timeout); | ||
|
|
||
| vlog( | ||
| gclog.trace, | ||
| "{}: max tombstone remove offset: {}, max tx remove offset: {}, max " | ||
| "tx end remove snapshot: {}, tx_snapshot_offset: {}", | ||
| ntp, | ||
| max_tombstone_remove_offset, | ||
| max_tx_remove_offset, | ||
| max_tx_end_remove_offset, | ||
| tx_snapshot_offset); | ||
| if ( | ||
| max_unpinned_offset | ||
| && *max_unpinned_offset < max_compactible_offset) { | ||
| vlog( | ||
| gclog.debug, | ||
| "{}: Compaction is pinned by offset: pinned Kafka offset: {}, " | ||
| "log offsets max unpinned {} < max removable {}", | ||
| ntp, | ||
| *pinned_kafka_offset, | ||
| *max_unpinned_offset, | ||
| max_compactible_offset); | ||
| max_compactible_offset = *max_unpinned_offset; | ||
| try { | ||
| co_await do_housekeeping( | ||
| current_log, collection_threshold, preempt_as); | ||
| } catch (const priority_compaction_exception& e) { | ||
| // Preempted for priority partition compaction. | ||
| vlog(gclog.info, "{} - {}", ntp, e); | ||
| } | ||
| co_await current_log.handle->housekeeping( | ||
| housekeeping_config::make_config( | ||
| collection_threshold, | ||
| _config.retention_bytes(), | ||
| max_compactible_offset, | ||
| max_tombstone_remove_offset, | ||
| max_tx_remove_offset, | ||
| current_log.handle->config().delete_retention_ms(), | ||
| current_log.handle->config().delete_retention_ms(), | ||
| current_log.handle->config().min_compaction_lag_ms(), | ||
| _abort_source, | ||
| std::move(ntp_sanitizer_cfg), | ||
| _compaction_hash_key_map.get())); | ||
| _probe->housekeeping_log_processed(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -685,6 +657,147 @@ ss::future<> log_manager::gc_loop() { | |
| } | ||
| } | ||
|
|
||
| ss::future<> | ||
| log_manager::compact_priority_logs(model::timestamp collection_threshold) { | ||
| for (auto& log_meta : _priority_logs_list) { | ||
| if (_abort_source.abort_requested()) { | ||
| co_return; | ||
| } | ||
|
|
||
| if (!log_meta.handle->needs_compaction()) { | ||
| vlog( | ||
| gclog.trace, | ||
| "{}: dirty ratio ({}) < min.cleanable.dirty.ratio ({}) and " | ||
| "time since earliest dirty timestamp does not exceed " | ||
| "max.compaction.lag.ms ({}), skipping compaction.", | ||
| log_meta.handle->config().ntp(), | ||
| log_meta.handle->dirty_ratio(), | ||
| log_meta.handle->config().min_cleanable_dirty_ratio(), | ||
| log_meta.handle->config().max_compaction_lag_ms()); | ||
| continue; | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why doesn't the priority compaction handling here also do something equivalent to which looks like it is applied to non-priority partitions in the main loop. i don't actually know if it should be done or not, but this is an example of why factoring out the "what should i work on next" is great, because then we can have unified handling of that choice and this question likely doesn't even arise.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah potentially a good point. Once again, because
Agreed, refactoring the entirety of housekeeping out into something more intelligent than a loop over the intrusive list would be great. |
||
|
|
||
| auto gate = log_meta.housekeeping_gate.hold(); | ||
|
|
||
| auto housekeeping_lock_holder | ||
| = co_await log_meta.housekeeping_lock.get_units(); | ||
|
|
||
| if (!log_meta.link.is_linked()) { | ||
| continue; | ||
| } | ||
|
|
||
| co_await do_housekeeping(log_meta, collection_threshold); | ||
| } | ||
| } | ||
|
|
||
| bool log_manager::is_priority_ntp(const model::ntp& ntp) const { | ||
| return model::is_consumer_offsets_topic(ntp); | ||
| } | ||
|
|
||
| ss::future<> log_manager::do_housekeeping( | ||
| log_housekeeping_meta& meta, | ||
| model::timestamp collection_threshold, | ||
| model::opt_abort_source_t preempt_source) { | ||
| if (!meta.link.is_linked()) { | ||
| co_return; | ||
| } | ||
|
|
||
| auto ntp = meta.handle->config().ntp(); | ||
| auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config(ntp); | ||
|
|
||
| // Until we better implement bailing out of compaction, the best thing | ||
| // we can do for observability is add a watchdog here. | ||
| ssx::watchdog wd5m(5min, [ntp] { | ||
| vlog(gclog.warn, "{}: Housekeeping process exceeding 5 minutes", ntp); | ||
| }); | ||
|
|
||
| // Create a composite abort source that triggers on shutdown or | ||
| // preemption. If no preempt_source is provided, just use _abort_source | ||
| // directly. | ||
| std::optional<ssx::composite_abort_source> composite_as; | ||
| auto& as = [this, &preempt_source, &composite_as]() -> ss::abort_source& { | ||
| if (preempt_source.has_value()) { | ||
| composite_as.emplace(_abort_source, preempt_source->get()); | ||
| return composite_as->as(); | ||
| } | ||
| return _abort_source; | ||
| }(); | ||
|
Comment on lines
+714
to
+724
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. afaict this commit is intended to be "factor out code for reuse", so this is net-new functionality and should probably be in the next commit? |
||
|
|
||
| // NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents | ||
| // the removal of the parent object. this makes awaiting housekeeping | ||
| // safe against removal of segments from _logs_list | ||
| auto& log = meta.handle; | ||
| auto pinned_kafka_offset = log->stm_manager()->lowest_pinned_data_offset(); | ||
| std::optional<model::offset> max_unpinned_offset; | ||
| if (pinned_kafka_offset) { | ||
| auto local_log_start = log->offsets().start_offset; | ||
| auto kafka_local_start = model::offset_cast( | ||
| log->from_log_offset(local_log_start)); | ||
| if (*pinned_kafka_offset >= kafka_local_start) { | ||
| max_unpinned_offset = model::prev_offset( | ||
| log->to_log_offset(kafka::offset_cast(*pinned_kafka_offset))); | ||
| } else { | ||
| max_unpinned_offset = model::prev_offset( | ||
| log->offsets().start_offset); | ||
| } | ||
| } | ||
|
|
||
| model::offset max_compactible_offset | ||
| = log->stm_manager()->max_removable_local_log_offset(); | ||
| model::offset max_tombstone_remove_offset | ||
| = log->stm_manager()->max_tombstone_remove_offset(); | ||
| model::offset max_tx_end_remove_offset | ||
| = log->stm_manager()->max_tx_end_remove_offset(); | ||
| model::offset tx_snapshot_offset = log->stm_manager()->tx_snapshot_offset(); | ||
| // We clamp the offset up to which we can remove transactional control | ||
| // batches to the last snapshot taken by the transactional stm. This | ||
| // ensures that we do not remove control batches that may be needed to | ||
| // reconstruct the state machine during recovery. | ||
| model::offset max_tx_remove_offset = std::min( | ||
| max_tx_end_remove_offset, tx_snapshot_offset); | ||
|
|
||
| vlog( | ||
| gclog.trace, | ||
| "{}: max tombstone remove offset: {}, max tx remove offset: {}, max " | ||
| "tx end remove snapshot: {}, tx_snapshot_offset: {}", | ||
| ntp, | ||
| max_tombstone_remove_offset, | ||
| max_tx_remove_offset, | ||
| max_tx_end_remove_offset, | ||
| tx_snapshot_offset); | ||
|
|
||
| if (max_unpinned_offset && *max_unpinned_offset < max_compactible_offset) { | ||
| vlog( | ||
| gclog.debug, | ||
| "{}: Compaction is pinned by offset: pinned Kafka offset: {}, " | ||
| "log offsets max unpinned {} < max removable {}", | ||
| ntp, | ||
| *pinned_kafka_offset, | ||
| *max_unpinned_offset, | ||
| max_compactible_offset); | ||
| max_compactible_offset = *max_unpinned_offset; | ||
| } | ||
|
|
||
| co_await log->housekeeping( | ||
| housekeeping_config::make_config( | ||
| collection_threshold, | ||
| _config.retention_bytes(), | ||
| max_compactible_offset, | ||
| max_tombstone_remove_offset, | ||
| max_tx_remove_offset, | ||
| log->config().delete_retention_ms(), | ||
| log->config().delete_retention_ms(), | ||
| log->config().min_compaction_lag_ms(), | ||
| as, | ||
| std::move(ntp_sanitizer_cfg), | ||
| _compaction_hash_key_map.get())); | ||
|
|
||
| meta.flags |= log_housekeeping_meta::bitflags::compacted; | ||
| meta.last_compaction = ss::lowres_clock::now(); | ||
|
|
||
| _probe->housekeeping_log_processed(); | ||
| } | ||
|
|
||
| /** | ||
| * | ||
| * @param read_buf_size size of underlying ss::input_stream's buffer | ||
|
|
@@ -802,7 +915,18 @@ ss::future<ss::shared_ptr<log>> log_manager::do_manage( | |
| std::move(translator_batch_types)); | ||
| auto [it, success] = _logs.emplace( | ||
| l->config().ntp(), std::make_unique<log_housekeeping_meta>(l)); | ||
| _logs_list.push_back(*it->second); | ||
|
|
||
| // Add to appropriate list based on whether this is a priority NTP. | ||
| if (is_priority_ntp(l->config().ntp())) { | ||
| _priority_logs_list.push_back(*it->second); | ||
| vlog( | ||
| stlog.debug, | ||
| "Tracking {} as priority partition for compaction", | ||
| l->config().ntp()); | ||
| } else { | ||
| _logs_list.push_back(*it->second); | ||
| } | ||
|
|
||
| update_log_count(); | ||
| vassert(success, "Could not keep track of:{} - concurrency issue", l); | ||
| co_return l; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.