Skip to content

Commit 15d0fcd

Browse files
committed
ct/rr: plumb LSM configs into read replica database
Also does some test plumbing to supply a real cache to snapshot_manager_test, since the pre-warming will require a cache.
1 parent 13da425 commit 15d0fcd

2 files changed

Lines changed: 42 additions & 27 deletions

File tree

src/v/cloud_topics/read_replica/snapshot_manager.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,10 @@ ss::future<> database_refresher::open_or_refresh() {
148148
lsm::options{
149149
.database_epoch = lsm::internal::database_epoch::max(),
150150
.readonly = true,
151-
// TODO: tuning.
151+
.max_pre_open_fibers = config::shard_local_cfg()
152+
.cloud_topics_metastore_max_pre_open_fibers(),
153+
.block_cache_size
154+
= config::shard_local_cfg().cloud_topics_metastore_block_cache_size(),
152155
},
153156
std::move(io));
154157
vlog(logger_.debug, "Opened with seqno {}", db.max_applied_seqno());

src/v/cloud_topics/read_replica/tests/snapshot_manager_test.cc

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,32 @@ const auto test_tidp = model::topic_id_partition{
3333
model::topic_id{uuid_t::create()}, model::partition_id{0}};
3434
using o = kafka::offset;
3535

36+
void start_cache(
37+
ss::sharded<cloud_io::cache>& cache, const std::filesystem::path& dir) {
38+
cloud_io::cache::initialize(dir).get();
39+
cache
40+
.start(
41+
dir,
42+
30_GiB,
43+
config::mock_binding<double>(0.0),
44+
config::mock_binding<uint64_t>(100_MiB),
45+
config::mock_binding<std::optional<double>>(std::nullopt),
46+
config::mock_binding<uint32_t>(100000),
47+
config::mock_binding<uint16_t>(3))
48+
.get();
49+
cache.invoke_on_all([](cloud_io::cache& c) { return c.start(); }).get();
50+
cache
51+
.invoke_on(
52+
ss::shard_id{0},
53+
[](cloud_io::cache& c) {
54+
c.notify_disk_status(
55+
100ULL * 1024 * 1024 * 1024,
56+
50ULL * 1024 * 1024 * 1024,
57+
storage::disk_space_alert::ok);
58+
})
59+
.get();
60+
}
61+
3662
} // namespace
3763
class SnapshotManagerTest
3864
: public ::testing::Test
@@ -45,35 +71,19 @@ class SnapshotManagerTest
4571
.cloud_storage_readreplica_manifest_sync_timeout_ms.set_value(500ms);
4672
staging_dir_ = std::make_unique<temporary_dir>("snapshot_manager_test");
4773

48-
auto cache_dir = staging_dir_->get_path() / "cache";
49-
cloud_io::cache::initialize(cache_dir).get();
50-
writer_cache_
51-
.start(
52-
cache_dir,
53-
30_GiB,
54-
config::mock_binding<double>(0.0),
55-
config::mock_binding<uint64_t>(100_MiB),
56-
config::mock_binding<std::optional<double>>(std::nullopt),
57-
config::mock_binding<uint32_t>(100000),
58-
config::mock_binding<uint16_t>(3))
59-
.get();
60-
writer_cache_
61-
.invoke_on_all([](cloud_io::cache& c) { return c.start(); })
62-
.get();
63-
writer_cache_
64-
.invoke_on(
65-
ss::shard_id{0},
66-
[](cloud_io::cache& c) {
67-
c.notify_disk_status(
68-
100ULL * 1024 * 1024 * 1024,
69-
50ULL * 1024 * 1024 * 1024,
70-
storage::disk_space_alert::ok);
71-
})
72-
.get();
74+
// The writer and reader run separate caches in one process, so disable
75+
// metrics to avoid double-registering the shared cache metric names.
76+
config::shard_local_cfg().disable_metrics.set_value(true);
77+
config::shard_local_cfg().disable_public_metrics.set_value(true);
78+
79+
// The writer and reader use separate caches so that the reader
80+
// hydrates from S3 rather than reading the writer's cached objects.
81+
start_cache(writer_cache_, staging_dir_->get_path() / "writer_cache");
82+
start_cache(reader_cache_, staging_dir_->get_path() / "reader_cache");
7383

7484
reader_staging_dir_ = staging_dir_->get_path() / "reader";
7585
snapshot_mgr_ = std::make_unique<read_replica::snapshot_manager>(
76-
reader_staging_dir_, &sr_->remote.local(), nullptr);
86+
reader_staging_dir_, &sr_->remote.local(), &reader_cache_.local());
7787
}
7888

7989
void TearDown() override {
@@ -85,6 +95,7 @@ class SnapshotManagerTest
8595
snapshot_mgr_->stop().get();
8696
snapshot_mgr_.reset();
8797
}
98+
reader_cache_.stop().get();
8899
writer_cache_.stop().get();
89100
sr_.reset();
90101
db_s3_imposter_fixture::stop().get();
@@ -140,6 +151,7 @@ class SnapshotManagerTest
140151
protected:
141152
std::unique_ptr<temporary_dir> staging_dir_;
142153
ss::sharded<cloud_io::cache> writer_cache_;
154+
ss::sharded<cloud_io::cache> reader_cache_;
143155
std::filesystem::path reader_staging_dir_;
144156
std::unique_ptr<cloud_io::scoped_remote> sr_;
145157
std::unique_ptr<read_replica::snapshot_manager> snapshot_mgr_;

0 commit comments

Comments
 (0)