[CORE-15021] Cloud Topics: Level Zero GC delete pipeline#29050
Conversation
2b52b11 to
f24b2cf
Compare
There was a problem hiding this comment.
Pull request overview
This PR implements a pipelined delete mechanism for Level Zero garbage collection in Cloud Topics, enabling concurrent deletion operations and handling paginated object listings.
Key changes:
- Adds concurrent delete pipeline with semaphores to control in-flight operations and memory usage
- Implements pagination support for object listing with continuation tokens
- Introduces work queue for managing asynchronous delete operations
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/cloud_topics/level_zero/gc/level_zero_gc.h | Adds object_storage interface pagination support, semaphores for concurrency control, work queue, and continuation token state |
| src/v/cloud_topics/level_zero/gc/level_zero_gc.cc | Implements pipelined deletion with continuation token handling, page-based memory limiting, and asynchronous delete workers |
| src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc | Adds test infrastructure for pagination simulation and new test cases for multi-page deletion, concurrent operations, and clean shutdown |
| src/v/cloud_topics/level_zero/gc/tests/BUILD | Adds mutex dependency for test synchronization |
| src/v/cloud_topics/level_zero/gc/BUILD | Adds semaphore and work_queue dependencies |
7ee19e0 to
8a894e1
Compare
8a894e1 to
598065d
Compare
598065d to
9a76a9e
Compare
9a76a9e to
658edf0
Compare
658edf0 to
d5af28b
Compare
Retry command for Build#78427please wait until all jobs are finished before running the slash command |
d5af28b to
c1df2d6
Compare
|
CI Failures:
|
Retry command for Build#78431please wait until all jobs are finished before running the slash command |
| cd_log.debug, | ||
| "Received error listing objects during L0 GC: {}", | ||
| candidate_objects.error()); | ||
| co_return std::unexpected(collection_error::service_error); |
There was a problem hiding this comment.
based on the comment below
/ TODO: fairly naive approach to caching the token. if the request
// fails, we start over,
what does start over mean? does it mean that here in the error case ("!candidate_objects.has_value()") that we should reset the token?
There was a problem hiding this comment.
yeah, it looks like I misplaced the optional::reset while splitting out this commit from a later one.
| u.emplace( | ||
| seastar::consume_units(page_sem_, object_keys_total_bytes)); | ||
| } | ||
| delete_worker_.submit([this, |
There was a problem hiding this comment.
i wonder if ssx/actor.h would work here? in start create a new actor. in pause/stop you'd destroy the current actor. but similar simplification for work queue might exist?
There was a problem hiding this comment.
ssx/actor
first I'm hearing of it, but at a quick glance it looks pretty close.
| ssx::work_queue delete_worker_; | ||
| // TODO: configurable limits? | ||
| // max number of in-flight delete ops | ||
| ssx::semaphore delete_sem_{5, "ct/gc/delete"}; | ||
| // control (approximate) total memory held by gc-eligible list pages in | ||
| // flight (these may be queued depending on delete concurrency) | ||
| ssx::semaphore page_sem_{1_MiB, "ct/gc/page"}; | ||
| seastar::abort_source delete_as_{}; | ||
| seastar::gate gate_{}; | ||
|
|
||
| seastar::future<> worker(); | ||
| seastar::future<> delete_worker( | ||
| std::vector<cloud_storage_clients::client::list_bucket_item>, | ||
| ssx::semaphore_units) noexcept; | ||
| enum class collection_error : int8_t; | ||
| seastar::future<std::expected<size_t, collection_error>> try_to_collect(); | ||
| seastar::future<std::expected<size_t, collection_error>> | ||
| do_try_to_collect(std::optional<cluster_epoch>&); | ||
| std::optional<ss::sstring> continuation_token_; |
There was a problem hiding this comment.
instead of having new work queue, semaphores, gates, abort sources all within this existing class, should we instead factor out paging/deletion into separate "workers" that are self-contained with a simple interface so that level_zero_gc is managing them at a higher level of abstraction (e.g. start/pause/stop)?
There was a problem hiding this comment.
instead factor out paging/deletion
yeah maybe so.
separate "workers" that are self-contained with a simple interface
Not sure I follow "separate workers". I rather like the work queue itself as a primitive source of backpressure, so in my mind we could just factor this new code into a separate class basically as is and expose some single-method async API to the GC loop.
Did you have something more granular in mind?
There was a problem hiding this comment.
factor this new code
or maybe this is just ssx::actor
There was a problem hiding this comment.
Not sure I follow "separate workers". I rather like the work queue itself as a primitive source of backpressure, so in my mind we could just factor this new code into a separate class basically as is and expose some single-method async API to the GC loop.
Right, nothing wrong with keeping the work queue, I'm talking about hiding the other complexity (semaphores, abort sources, gates, etc...)
There was a problem hiding this comment.
done. if ssx::actor is the better choice it'll be a bit easier to swap that in.
c1df2d6 to
21b123c
Compare
|
force push rebase dev |
- config points - swap 'deleted' vector<object> for unordered_map<key> - mutual exclusion for list & delete handlers Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
cd09435 to
8ba02b6
Compare
Retry command for Build#78590please wait until all jobs are finished before running the slash command |
In general, we expect sequential list operations to dominate GC latency. With that in mind, this commit introduces list_delete_worker to manage and dispatch outstanding pages of GC-eligible object keys, the idea being that the _processing_ of a page of objects should not block grabbing the next one. This worker includes a "page semaphore" to limit the total volume of object keys held in memory at once. Precise accounting is not the goal here. We just want to avoid growing the backlog of delete tasks unbounded. Delete requests are dispatched sequentially but execute in the background. Another semaphore limits the number of background delete requests in flight at one time. Note that splitting a given remote::delete_objects payload into API-sized batches is handled internally to cloud_io::remote, so we may want to rejigger the concurrency in this layer a bit. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Limited by the the semaphore in list_delete_worker. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
8ba02b6 to
11f5d23
Compare
| worker_.submit([this, | ||
| o = std::move(objects), | ||
| u = std::move(u).value()]() mutable { | ||
| return do_delete_objects(std::move(o), std::move(u)); |
There was a problem hiding this comment.
it's unclear to me what value the worker provides if the worker turns around and backgrounds the work you gave it. can you clarify its role?
if it because you want to be able to block on the delete_sem, then you could acquire the delete_sem in the background. that is, have many inflight background tasks that each acquire the delete_sem.
that would mean that you'd want to probably limit the number of background tasks that are created (i.e. limit the producer/paging side), but presumably that already exists otherwise the worker's queue would fill up (maybe that is the !delete_worker_->has_capacity())?
There was a problem hiding this comment.
want to be able to block on the delete_sem
yes, that
what value the worker provides...[vs having] many inflight background tasks that each acquire the delete_sem
just erring on the side of fewer background fibers I guess. it feels subjectively better to stack pending work up on a queue we control vs having an arbitrary number of blocked tasks in the background, but I'm not sure there's an empirical reason to prefer one or the other.
I think we had an issue in tiered storage where we pushed some work into the background, wrongly assuming that some higher level concurrency control would supply enough back pressure. But maybe blocking on delete_sem is good enough for this use case.
| std::optional<cluster_epoch> max_gc_epoch; | ||
| size_t total_eligible{0}; | ||
| while (delete_worker_->has_capacity()) { | ||
| auto res = co_await do_try_to_collect(std::ref(max_gc_epoch)); |
There was a problem hiding this comment.
instead of introducing an input/output reference parameter, can we pull the max_gc_epoch query out into this function and pass it in as a value? from the looks of it do_try_collect won't recompute it, so a single query here reused across invocations would be equivalent?
There was a problem hiding this comment.
single query here reused across invocations would be equivalent?
fair enough. i assumed that putting the epoch after the LIST request was deliberate to avoid wasting an expensive computation, but I suppose reusing it across several pages is a good enough hedge against LIST failure.
There was a problem hiding this comment.
yeh makes sense. i'm fine with whatever, just always a fan of avoiding input/output parameters when possible.
This PR adds a work_queue for delete operation in level_zero_gc, with the general goal of overlapping list & delete. Napkin math suggests that sequential paging list operations will dominate GC latency at low to moderate throughput, so we want to hide any additional delete latency behind the usually more expensive list.
Request concurrency & memory usage of the pipeline are governed by a pair of semaphores - one to limit the memory consumption of outstanding GC-eligible object keys, the other to limit the number of in-flight delete operations going on in the background.
Since work_queue tasks are processed sequentially, if a particular task blocks waiting for the request semaphore (i.e. before being pushed to the background), subsequent tasks will stay in the queue rather than stacking up on the semaphore.
Similarly, each iteration of the GC loop will try to page through the entire level_zero/data prefix, queueing up pages of work as it goes. If the page semaphore is exhausted before we reach the last page of list results, the iteration ends and we reenter the loop after a configurable delay, giving the delete worker a chance to burn down some of the outstanding work.
Backports Required
Release Notes