Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cloud_topics/level_one/compaction/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ redpanda_cc_library(
"//src/v/compaction:key_offset_map",
"//src/v/compaction:reducer",
"//src/v/compaction:utils",
"//src/v/config",
"//src/v/container:chunked_vector",
"//src/v/model",
"//src/v/model:batch_compression",
Expand Down Expand Up @@ -107,6 +108,7 @@ redpanda_cc_library(
"//src/v/config",
"//src/v/container:chunked_hash_map",
"//src/v/model",
"//src/v/resource_mgmt:memory_groups",
"//src/v/ssx:future_util",
"//src/v/ssx:work_queue",
"@seastar",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,8 @@ log_info_collector::get_logs_to_collect(
}

if (log.info_and_ts.has_value()) {
// TODO: maybe configure this some other way.
auto sample_interval
= config::shard_local_cfg().log_compaction_interval_ms();
= config::shard_local_cfg().cloud_topics_compaction_interval_ms();
auto delta = to_time_point(collection_timestamp)
- to_time_point(log.info_and_ts->collected_at);
if (delta <= sample_interval) {
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_topics/level_one/compaction/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ compaction_scheduler::compaction_scheduler(
state.metadata_cache,
_probe)
, _compaction_interval(
config::shard_local_cfg().log_compaction_interval_ms.bind())
config::shard_local_cfg().cloud_topics_compaction_interval_ms.bind())
, _compaction_queue(_scheduling_policy->get_comparator()) {
_compaction_interval.watch([this]() { _sem.signal(); });
}
Expand All @@ -64,7 +64,7 @@ compaction_scheduler::compaction_scheduler(log_info_collector info_collector)
, _worker_manager(
_compaction_queue, nullptr, nullptr, &_committer, nullptr, _probe)
, _compaction_interval(
config::shard_local_cfg().log_compaction_interval_ms.bind())
config::shard_local_cfg().cloud_topics_compaction_interval_ms.bind())
, _compaction_queue(_scheduling_policy->get_comparator()) {
_compaction_interval.watch([this]() { _sem.signal(); });
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_topics/level_one/compaction/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ class compaction_scheduler {
config::binding<std::chrono::milliseconds> _compaction_interval;

// This semaphore is used as a way to signal a change to
// `log_compaction_interval_ms` during the `wait()` operation in the main
// scheduling loop.
// `cloud_topics_compaction_interval_ms` during the `wait()` operation in
// the main scheduling loop.
ssx::semaphore _sem{0, "cloud_topics::compaction::scheduling_loop"};

ss::abort_source _as;
Expand Down
6 changes: 4 additions & 2 deletions src/v/cloud_topics/level_one/compaction/sink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ compaction_sink::compaction_sink(
kafka::offset start_offset,
io* io,
compaction_committer* committer,
config::binding<size_t> max_object_size,
object_builder::options opts)
: _tp(tp)
: _max_object_size(std::move(max_object_size))
, _tp(tp)
, _dirty_range_intervals(dirty_range_intervals)
, _removable_tombstone_ranges(removable_tombstone_ranges)
, _expected_compaction_epoch(expected_compaction_epoch)
Expand Down Expand Up @@ -210,7 +212,7 @@ compaction_sink::operator()(model::record_batch b, model::compression c) {

if (
_inflight_object
&& _inflight_object->builder->file_size() >= max_object_size) {
&& _inflight_object->builder->file_size() >= _max_object_size()) {
co_await flush(prev_offset);
}

Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_topics/level_one/compaction/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cloud_topics/level_one/compaction/committer.h"
#include "cloud_topics/level_one/compaction/meta.h"
#include "compaction/reducer.h"
#include "config/property.h"
#include "container/chunked_vector.h"
#include "model/fundamental.h"
#include "model/timestamp.h"
Expand All @@ -31,6 +32,7 @@ class compaction_sink : public compaction::sliding_window_reducer::sink {
kafka::offset,
l1::io*,
compaction_committer*,
config::binding<size_t> max_object_size,
object_builder::options = {});

ss::future<bool>
Expand Down Expand Up @@ -59,7 +61,7 @@ class compaction_sink : public compaction::sliding_window_reducer::sink {
// The target maximum L1 object size that will be built. After this
// threshold is breached, `needs_roll()` should return `true` and a new L1
// object will be started.
static constexpr size_t max_object_size = 128_MiB;
config::binding<size_t> _max_object_size;

// Initializes the `_inflight_object`. It is guaranteed to have a value (!=
// nullptr) after this function is called, if no exception is thrown.
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/level_one/compaction/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ redpanda_cc_gtest(
"//src/v/compaction:key_offset_map",
"//src/v/compaction:reducer",
"//src/v/compaction/tests:simple_reducer",
"//src/v/config",
"//src/v/container:chunked_circular_buffer",
"//src/v/container:chunked_vector",
"//src/v/kafka/server/tests:kafka_test_utils",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "compaction/key_offset_map.h"
#include "compaction/reducer.h"
#include "compaction/tests/simple_reducer.h"
#include "config/property.h"
#include "container/chunked_circular_buffer.h"
#include "container/chunked_vector.h"
#include "kafka/server/tests/produce_consume_utils.h"
Expand Down Expand Up @@ -127,7 +128,8 @@ ss::future<> do_compact(
expected_compaction_epoch,
start_offset,
io,
&committer);
&committer,
config::mock_binding<size_t>(128_MiB));
auto reducer = compaction::sliding_window_reducer(
std::move(src), std::move(sink));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ TEST_F(SchedulerTestFixture, TestSchedulerMultithread) {
#endif

scoped_config cfg;
cfg.get("log_compaction_interval_ms").set_value(100ms);
cfg.get("cloud_topics_compaction_interval_ms").set_value(100ms);
ss::abort_source as;
chunked_hash_set<ss::shard_id> paused_workers;
chunked_hash_map<model::ntp, model::topic_id_partition> managed_ntps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TEST_F(SchedulerTestFixture, TestScheduler) {
#endif

scoped_config cfg;
cfg.get("log_compaction_interval_ms").set_value(100ms);
cfg.get("cloud_topics_compaction_interval_ms").set_value(100ms);
ss::abort_source as;
chunked_hash_set<ss::shard_id> paused_workers;
chunked_hash_map<model::ntp, model::topic_id_partition> managed_ntps;
Expand Down
26 changes: 17 additions & 9 deletions src/v/cloud_topics/level_one/compaction/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "config/configuration.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "resource_mgmt/memory_groups.h"
#include "ssx/future-util.h"

#include <seastar/coroutine/as_future.hh>
Expand All @@ -38,11 +39,15 @@ compaction_worker::compaction_worker(
"Unexpected compaction worker update queue error: {}",
ex);
})
, _poll_interval(
config::shard_local_cfg().cloud_topics_compaction_interval_ms.bind())
, _worker_manager(worker_manager)
, _io(io)
, _metastore(metastore)
, _committer(committer)
, _metadata_cache(metadata_cache) {}
, _metadata_cache(metadata_cache) {
_poll_interval.watch([this]() { _worker_cv.signal(); });
}

ss::future<> compaction_worker::start() {
_probe.setup_metrics();
Expand Down Expand Up @@ -79,15 +84,19 @@ void compaction_worker::start_work_loop() {
}

ss::future<> compaction_worker::work_loop() {
constexpr std::chrono::seconds poll_frequency(60);

while (is_active()) {
auto poll_interval = _poll_interval();
Comment thread
WillemKauf marked this conversation as resolved.
try {
co_await _worker_cv.wait(poll_frequency);
} catch (const ss::semaphore_timed_out&) {
co_await _worker_cv.wait(_poll_interval());
} catch (const ss::condition_variable_timed_out&) {
// Fall through
}

if (poll_interval != _poll_interval()) {
// Cluster config was changed while waiting.
continue;
}

while (is_active()) {
auto maybe_work = co_await try_acquire_work_from_manager();

Expand Down Expand Up @@ -219,7 +228,8 @@ ss::future<> compaction_worker::compact_log(log_compaction_meta* log) {
expected_compaction_epoch,
start_offset,
_io,
_committer);
_committer,
config::shard_local_cfg().cloud_topics_compaction_max_object_size.bind());
auto reducer = compaction::sliding_window_reducer(
std::move(src), std::move(sink));

Expand Down Expand Up @@ -356,10 +366,8 @@ ss::future<> compaction_worker::initialize_map() {
co_return;
}

// TODO: use memory group reservation.
// auto compaction_mem_bytes = memory_groups().compaction_reserved_memory();
auto compaction_mem_bytes
= config::shard_local_cfg().storage_compaction_key_map_memory();
= memory_groups().cloud_topics_compaction_reserved_memory();
auto compaction_map = std::make_unique<compaction::hash_key_offset_map>();
co_await compaction_map->initialize(compaction_mem_bytes);
_map = std::move(compaction_map);
Expand Down
7 changes: 6 additions & 1 deletion src/v/cloud_topics/level_one/compaction/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cloud_topics/level_one/metastore/metastore.h"
#include "cluster/metadata_cache.h"
#include "compaction/key_offset_map.h"
#include "config/property.h"
#include "ssx/work_queue.h"

class WorkerManagerTestFixture;
Expand Down Expand Up @@ -174,9 +175,13 @@ class compaction_worker {

ss::abort_source _as;

// Used to alert worker that a job has become available.
// Used to alert worker that a job has become available, or when
// `cloud_topics_compaction_interval_ms` config changes.
ss::condition_variable _worker_cv;

// The interval on which the worker polls for new work.
config::binding<std::chrono::milliseconds> _poll_interval;

// Owned by `scheduler`.
worker_manager* _worker_manager;

Expand Down
23 changes: 23 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4600,6 +4600,29 @@ configuration::configuration()
"target object size of 64 MiB.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
80_MiB)
, cloud_topics_compaction_max_object_size(
*this,
"cloud_topics_compaction_max_object_size",
"Maximum size in bytes for L1 objects produced by cloud topics "
"compaction.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
128_MiB)
, cloud_topics_compaction_interval_ms(
*this,
"cloud_topics_compaction_interval_ms",
"How often to trigger background compaction for cloud topics.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
30s)
, cloud_topics_compaction_key_map_memory(
*this,
"cloud_topics_compaction_key_map_memory",
"Maximum number of bytes that may be used on each shard by cloud topics "
"compaction key-offset maps.",
{.needs_restart = needs_restart::yes,
.example = "134217728",
.visibility = visibility::tunable},
128_MiB,
{.min = 16_MiB, .max = 100_GiB})
, cloud_topics_long_term_garbage_collection_interval(
*this,
"cloud_topics_long_term_garbage_collection_interval",
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,9 @@ struct configuration final : public config_store {
property<double> cloud_topics_reconciliation_speedup_blend;
property<double> cloud_topics_reconciliation_slowdown_blend;
property<size_t> cloud_topics_reconciliation_max_object_size;
property<size_t> cloud_topics_compaction_max_object_size;
property<std::chrono::milliseconds> cloud_topics_compaction_interval_ms;
bounded_property<uint64_t> cloud_topics_compaction_key_map_memory;
property<std::chrono::milliseconds>
cloud_topics_long_term_garbage_collection_interval;
property<std::chrono::milliseconds>
Expand Down
11 changes: 9 additions & 2 deletions src/v/resource_mgmt/memory_groups.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,20 @@ partitions_memory_reservation::reserved_bytes(size_t total_memory) const {
system_memory_groups::system_memory_groups(
size_t total_available_memory,
compaction_memory_reservation compaction,
cloud_topics_compaction_memory_reservation cloud_topics_compaction,
bool wasm_enabled,
bool datalake_enabled,
bool cloud_topics_enabled,
partitions_memory_reservation partitions)
: _compaction_reserved_memory(
compaction.reserved_bytes(total_available_memory))
, _cloud_topics_compaction_reserved_memory(
cloud_topics_compaction.reserved_bytes())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why reserved_bytes here isn't a function of the total available memory like the other reservations?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add a option to configure the reserved memory as a percentage a la storage_compaction_key_map_memory_limit_percent, so we will always just reserve a flat cloud_topics_compaction_key_map_memory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, that's what I had figured. thanks

, _partitions_reserved_memory(
partitions.reserved_bytes(total_available_memory))
, _total_system_memory(
total_available_memory - _compaction_reserved_memory
- _partitions_reserved_memory)
- _cloud_topics_compaction_reserved_memory - _partitions_reserved_memory)
Comment thread
WillemKauf marked this conversation as resolved.
, _wasm_enabled(wasm_enabled)
, _datalake_enabled(datalake_enabled)
, _cloud_topics_enabled(cloud_topics_enabled) {}
Expand Down Expand Up @@ -173,7 +176,7 @@ void system_memory_groups::log_memory_group_allocations(seastar::logger& log) {
"total memory minus pre-share reservations: {}, chunk cache: {}, kafka: "
"{}, rpc: {}, recovery: {}, "
"tiered storage: {}, admin: {}, data transforms: {}, compaction: {}, "
"datalake: {}, partitions: {}",
"cloud topics compaction: {}, datalake: {}, partitions: {}",
human::bytes(ss::memory::stats().total_memory()),
human::bytes(total_memory()),
human::bytes(chunk_cache_max_memory()),
Expand All @@ -184,6 +187,7 @@ void system_memory_groups::log_memory_group_allocations(seastar::logger& log) {
human::bytes(admin_max_memory()),
human::bytes(data_transforms_max_memory()),
human::bytes(compaction_reserved_memory()),
human::bytes(cloud_topics_compaction_reserved_memory()),
human::bytes(datalake_max_memory()),
human::bytes(partitions_max_memory()));
}
Expand Down Expand Up @@ -212,11 +216,14 @@ system_memory_groups& memory_groups() {
compaction.max_limit_pct
= cfg.storage_compaction_key_map_memory_limit_percent.value();
}
cloud_topics_compaction_memory_reservation cloud_topics_compaction{
.max_bytes = cfg.cloud_topics_compaction_key_map_memory.value()};
partitions_memory_reservation partitions{
.max_limit_pct = cfg.topic_partitions_memory_allocation_percent()};
groups.emplace(
total,
compaction,
cloud_topics_compaction,
wasm,
datalake_enabled(),
cloud_topics_enabled(),
Expand Down
16 changes: 16 additions & 0 deletions src/v/resource_mgmt/memory_groups.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ struct partitions_memory_reservation {
size_t reserved_bytes(size_t total_memory) const;
};

/**
* Configurations to reserve memory for cloud topics compaction.
*/
struct cloud_topics_compaction_memory_reservation {
// Maximum amount of memory in bytes to reserve for cloud topics compaction.
size_t max_bytes{0};

size_t reserved_bytes() const { return max_bytes; }
};

namespace testing {
class system_memory_groups_accessor;
}
Expand All @@ -55,6 +65,7 @@ class system_memory_groups {
system_memory_groups(
size_t total_available_memory,
compaction_memory_reservation compaction,
cloud_topics_compaction_memory_reservation cloud_topics_compaction,
bool wasm_enabled,
bool datalake_enabled,
bool cloud_topics_enabled,
Expand Down Expand Up @@ -94,6 +105,10 @@ class system_memory_groups {
return _compaction_reserved_memory;
}

size_t cloud_topics_compaction_reserved_memory() const {
return _cloud_topics_compaction_reserved_memory;
}

size_t datalake_max_memory() const;

size_t cloud_topics_memory() const;
Expand All @@ -120,6 +135,7 @@ class system_memory_groups {
size_t subsystem_memory() const;

size_t _compaction_reserved_memory;
size_t _cloud_topics_compaction_reserved_memory;
size_t _partitions_reserved_memory;
size_t _total_system_memory;
bool _wasm_enabled;
Expand Down
Loading