Skip to content

Commit 8ba02b6

Browse files
committed
ct/l0/gc: Each iteration should page through list results
Limited by the the semaphore in list_delete_worker. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent 93e2dec commit 8ba02b6

3 files changed

Lines changed: 64 additions & 11 deletions

File tree

src/v/cloud_topics/level_zero/gc/level_zero_gc.cc

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -604,9 +604,27 @@ seastar::future<> level_zero_gc::worker() {
604604

605605
seastar::future<std::expected<size_t, level_zero_gc::collection_error>>
606606
level_zero_gc::try_to_collect() {
607-
if (!delete_worker_->has_capacity()) {
608-
co_return 0;
607+
// Ultra-temporary cache to avoid repeatedly querying for max gc-able epoch.
608+
// Since the result will always be valid clusterwide, compute exactly once
609+
// per collection loop.
610+
std::optional<cluster_epoch> max_gc_epoch;
611+
size_t total_eligible{0};
612+
while (delete_worker_->has_capacity()) {
613+
auto res = co_await do_try_to_collect(std::ref(max_gc_epoch));
614+
if (!res.has_value()) {
615+
co_return res;
616+
}
617+
if (res.value() == 0) {
618+
break;
619+
}
620+
total_eligible += res.value();
609621
}
622+
623+
co_return total_eligible;
624+
}
625+
626+
seastar::future<std::expected<size_t, level_zero_gc::collection_error>>
627+
level_zero_gc::do_try_to_collect(std::optional<cluster_epoch>& max_gc_epoch) {
610628
auto candidate_objects = co_await delete_worker_->next_page();
611629
if (!candidate_objects.has_value()) {
612630
vlog(
@@ -616,17 +634,19 @@ level_zero_gc::try_to_collect() {
616634
co_return std::unexpected(collection_error::service_error);
617635
}
618636

619-
const auto maybe_max_gc_epoch
620-
= co_await epoch_source_->max_gc_eligible_epoch(&asrc_);
621-
if (!maybe_max_gc_epoch.has_value()) {
622-
vlog(
623-
cd_log.debug,
624-
"Received error retrieving GC eligible epoch: {}",
625-
maybe_max_gc_epoch.error());
626-
co_return std::unexpected(collection_error::service_error);
637+
if (!max_gc_epoch.has_value()) {
638+
const auto maybe_max_gc_epoch
639+
= co_await epoch_source_->max_gc_eligible_epoch(&asrc_);
640+
if (!maybe_max_gc_epoch.has_value()) {
641+
vlog(
642+
cd_log.debug,
643+
"Received error retrieving GC eligible epoch: {}",
644+
maybe_max_gc_epoch.error());
645+
co_return std::unexpected(collection_error::service_error);
646+
}
647+
max_gc_epoch = maybe_max_gc_epoch.value();
627648
}
628649

629-
const auto max_gc_epoch = maybe_max_gc_epoch.value();
630650
if (!max_gc_epoch.has_value()) {
631651
vlog(cd_log.info, "No GC eligible epoch currently exists");
632652
co_return std::unexpected(collection_error::no_collectible_epoch);

src/v/cloud_topics/level_zero/gc/level_zero_gc.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,8 @@ class level_zero_gc {
275275
seastar::future<> worker();
276276
enum class collection_error : int8_t;
277277
seastar::future<std::expected<size_t, collection_error>> try_to_collect();
278+
seastar::future<std::expected<size_t, collection_error>>
279+
do_try_to_collect(std::optional<cluster_epoch>&);
278280

279281
level_zero_gc_probe probe_;
280282

src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,3 +390,34 @@ TEST_F(LevelZeroGCScaleOutTest, CleanShutdown) {
390390
// then immediately shutdown gc
391391
gc.stop().get();
392392
}
393+
394+
TEST_F(LevelZeroGCScaleOutTest, ConcurrentDeletes) {
395+
static constexpr size_t list_page_size{20};
396+
int n = list_page_size * 20;
397+
for (int i = 0; i < n; i++) {
398+
add_listed(i, 24h);
399+
}
400+
this->max_epoch = n;
401+
this->cfg.list_page_size = list_page_size;
402+
this->cfg.delete_cost = 100ms;
403+
gc.start();
404+
EXPECT_TRUE(Eventually(
405+
[this, expected = (size_t)n] { return deleted.size() == expected; }));
406+
}
407+
408+
// make sure we make progress when the total size of eligible keys exceeds the
409+
// in-flight limit
410+
// i.e. 50 * 500 * len(key)=72 ~= 1.5MiB > 1MiB
411+
TEST_F(LevelZeroGCScaleOutTest, ConcurrentDeletesPipelineSaturation) {
412+
static constexpr size_t list_page_size{500};
413+
int n = list_page_size * 50;
414+
for (int i = 0; i < n; i++) {
415+
add_listed(i, 24h);
416+
}
417+
this->max_epoch = n;
418+
this->cfg.list_page_size = list_page_size;
419+
this->cfg.delete_cost = 50ms;
420+
gc.start();
421+
EXPECT_TRUE(Eventually(
422+
[this, expected = (size_t)n] { return deleted.size() == expected; }));
423+
}

0 commit comments

Comments
 (0)