2828#include < seastar/core/sleep.hh>
2929#include < seastar/coroutine/as_future.hh>
3030
31+ #include < memory>
32+
33+ namespace {
34+ constexpr ss::lowres_clock::duration control_timeout = 5s;
35+ }
36+
3137namespace cloud_topics {
3238
3339class level_zero_gc ::list_delete_worker {
40+ static constexpr auto handle_worker_exc = [](std::exception_ptr eptr) {
41+ vlog (cd_log.warn , " Exception from delete worker: {}" , eptr);
42+ };
43+
3444public:
3545 explicit list_delete_worker (
3646 std::unique_ptr<object_storage> storage,
@@ -39,9 +49,7 @@ class level_zero_gc::list_delete_worker {
3949 : storage_(std::move(storage))
4050 , node_info_(std::move(node_info))
4151 , probe_(&probe)
42- , worker_([](std::exception_ptr eptr) {
43- vlog (cd_log.warn , " Exception from delete worker: {}" , eptr);
44- }) {}
52+ , worker_(std::make_unique<ssx::work_queue>(handle_worker_exc)) {}
4553 void start () {
4654 vlog (cd_log.info , " Starting cloud topics list/delete worker" );
4755 if (as_.abort_requested ()) {
@@ -63,11 +71,40 @@ class level_zero_gc::list_delete_worker {
6371 as_.request_abort ();
6472 delete_sem_.broken ();
6573 page_sem_.broken ();
66- co_await worker_. shutdown ();
74+ co_await worker_-> shutdown ();
6775 co_await gate_.close ();
6876 vlog (cd_log.info , " Stopped cloud topics list/delete worker" );
6977 }
7078
79+ seastar::future<> reset () {
80+ if (gate_.is_closed ()) {
81+ co_return ;
82+ }
83+ vlog (cd_log.info , " Resetting cloud topics list/delete worker" );
84+
85+ // Abort in-flight list/delete operations
86+ as_.request_abort ();
87+
88+ // Drain pending delete tasks
89+ co_await worker_->shutdown ();
90+
91+ // Wait for spawned delete fibers to complete
92+ if (!gate_.is_closed ()) {
93+ co_await gate_.close ();
94+ }
95+
96+ continuation_token_.reset ();
97+ curr_prefix_.reset ();
98+ key_prefixes_.set_range (std::nullopt );
99+
100+ as_ = {};
101+ gate_ = {};
102+
103+ worker_ = std::make_unique<ssx::work_queue>(handle_worker_exc);
104+
105+ vlog (cd_log.info , " Reset cloud topics list/delete worker" );
106+ }
107+
71108 bool has_capacity () const { return page_sem_.available_units () > 0 ; }
72109
73110 seastar::future<std::expected<
@@ -110,9 +147,9 @@ class level_zero_gc::list_delete_worker {
110147 // unbounded.
111148 u.emplace (seastar::consume_units (page_sem_, keys_total_bytes));
112149 }
113- worker_. submit ([this ,
114- o = std::move (objects),
115- u = std::move (u).value ()]() mutable {
150+ worker_-> submit ([this ,
151+ o = std::move (objects),
152+ u = std::move (u).value ()]() mutable {
116153 return do_delete_objects (std::move (o), std::move (u));
117154 });
118155 }
@@ -222,7 +259,7 @@ class level_zero_gc::list_delete_worker {
222259 std::unique_ptr<object_storage> storage_;
223260 std::unique_ptr<node_info> node_info_;
224261 level_zero_gc_probe* probe_;
225- ssx::work_queue worker_;
262+ std::unique_ptr< ssx::work_queue> worker_;
226263 // TODO: configurable limits?
227264 // max number of in-flight delete ops
228265 ssx::semaphore delete_sem_{5 , " ct/gc/delete" };
@@ -621,14 +658,22 @@ level_zero_gc::level_zero_gc(
621658
622659level_zero_gc::~level_zero_gc () = default ;
623660
624- void level_zero_gc::start () {
661+ seastar::future<> level_zero_gc::start () {
662+ while (resetting_) {
663+ co_await reset_cv_.wait (
664+ control_timeout, [this ] { return !resetting_; });
665+ }
625666 vlog (cd_log.info , " Starting cloud topics L0 GC worker" );
626667 delete_worker_->start ();
627668 should_run_ = true ;
628669 worker_cv_.signal ();
629670}
630671
631- void level_zero_gc::pause () {
672+ seastar::future<> level_zero_gc::pause () {
673+ while (resetting_) {
674+ co_await reset_cv_.wait (
675+ control_timeout, [this ] { return !resetting_; });
676+ }
632677 vlog (cd_log.info , " Pausing cloud topics L0 GC worker" );
633678 should_run_ = false ;
634679 asrc_.request_abort ();
@@ -645,13 +690,44 @@ seastar::future<> level_zero_gc::stop() {
645690 vlog (cd_log.info , " Stopped cloud_topics L0 GC worker" );
646691}
647692
693+ seastar::future<> level_zero_gc::reset () {
694+ if (should_shutdown_ || resetting_) {
695+ co_return ;
696+ }
697+ vlog (cd_log.info , " Resetting cloud topics L0 GC worker state" );
698+
699+ resetting_ = true ;
700+ const bool was_running = should_run_;
701+
702+ auto done = ss::defer ([this ] {
703+ resetting_ = false ;
704+ reset_cv_.broadcast ();
705+ });
706+
707+ // Pause the outer worker loop so it blocks on the CV
708+ should_run_ = false ;
709+ asrc_.request_abort ();
710+
711+ co_await delete_worker_->reset ();
712+
713+ // Resume if was running, then clear the flag so that start()/pause()
714+ // waiting on reset_cv_ don't race with the resume.
715+ if (was_running && !should_shutdown_) {
716+ delete_worker_->start ();
717+ should_run_ = true ;
718+ worker_cv_.signal ();
719+ }
720+ }
721+
648722std::string_view to_string_view (level_zero_gc::state s) {
649723 switch (s) {
650724 using enum level_zero_gc::state;
651725 case paused:
652726 return " level_zero_gc::state::paused" ;
653727 case running:
654728 return " level_zero_gc::state::running" ;
729+ case resetting:
730+ return " level_zero_gc::state::resetting" ;
655731 case stopping:
656732 return " level_zero_gc::state::stopping" ;
657733 case stopped:
@@ -667,6 +743,9 @@ auto level_zero_gc::get_state() const -> state {
667743 if (should_shutdown_) {
668744 return worker_.available () ? state::stopped : state::stopping;
669745 }
746+ if (resetting_) {
747+ return state::resetting;
748+ }
670749 return should_run_ ? state::running : state::paused;
671750 }();
672751 vlog (cd_log.debug , " cloud_topics L0 GC worker state: {}" , st);
0 commit comments