Skip to content

Commit 797eb9f

Browse files
cloud_topics/l1: delete an imported segment's tiered-storage objects
When an imported L1 object is removed, delete its backing tiered-storage objects (the segment, its .tx range manifest, and its .index -- the same set the archiver would have deleted) so nothing is orphaned. - delete_objects (abstract_io/file_io/fake_io) takes object_location instead of object_id, so it routes by the imported location: present -> delete from the tiered-storage bucket; absent -> delete from the L1 bucket as before. - Both garbage collectors (metastore and lsm) build the removal list from each object's imported_ts_location. Native L1 deletion is unchanged. Testing: an imported-segment GC test (import via add_objects on a migrating partition, then trim + GC) asserts the backing TS segment is removed, routed by ts_path; fake_io grows has_ts_segment to observe it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 0d074e3 commit 797eb9f

8 files changed

Lines changed: 224 additions & 33 deletions

File tree

src/v/cloud_topics/level_one/common/abstract_io.h

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525

2626
namespace cloud_topics::l1 {
2727

28+
// Forward declaration — full definition in object_handle.h.
29+
// Do not #include object_handle.h here: object_handle.h already includes
30+
// abstract_io.h for io::errc, so including it here creates a cycle.
31+
class object_handle;
32+
2833
// An abstraction for a local file that is used for staging uploads to object
2934
// storage.
3035
class staging_file {
@@ -51,11 +56,6 @@ class staging_file {
5156
virtual ss::future<ss::input_stream<char>> input_stream() = 0;
5257
};
5358

54-
// Forward declaration -- full definition in object_handle.h.
55-
// Do not #include object_handle.h here: object_handle.h already includes this
56-
// header (for io::errc), so the dependency must run one way.
57-
class object_handle;
58-
5959
// An abstraction for IO in level one.
6060
class io {
6161
public:
@@ -98,14 +98,21 @@ class io {
9898
virtual ss::future<std::expected<iobuf, errc>> read_object_as_iobuf(
9999
object_extent, ss::abort_source*, cloud_io::group_id g);
100100

101-
// Open an object for reading: returns a handle that exposes the object's
102-
// index (seek by offset/timestamp) and opens readers at a seek point.
101+
// Open an object for indexed read access, returning a handle that exposes
102+
// both an index (for seeking) and readers (for streaming batches).
103+
//
104+
// For native L1 objects (extent.imported == nullopt), the handle reads the
105+
// L1 footer and implements the native seek path. For imported TS extents
106+
// (extent.imported.has_value()), the handle loads the TS segment index and
107+
// translates log offsets to Kafka offsets while reading.
103108
virtual ss::future<std::expected<std::unique_ptr<object_handle>, errc>>
104109
open_object(object_extent, ss::abort_source*, cloud_io::group_id g) = 0;
105110

106-
// Delete the specified objects from object storage.
111+
// Delete the specified objects from object storage. An entry with a ts_path
112+
// is addressed by that tiered-storage segment path; otherwise the native L1
113+
// object path is used. Both live in the one configured object bucket.
107114
virtual ss::future<std::expected<void, errc>>
108-
delete_objects(chunked_vector<object_id>, ss::abort_source*) = 0;
115+
delete_objects(chunked_vector<object_location>, ss::abort_source*) = 0;
109116

110117
// Create a multipart upload for streaming data directly to object storage.
111118
virtual ss::future<

src/v/cloud_topics/level_one/common/fake_io.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,24 @@ fake_io::open_object(
195195
extent.id, std::get<l1::footer>(std::move(footer_result)), this, g);
196196
}
197197

198-
ss::future<std::expected<void, io::errc>>
199-
fake_io::delete_objects(chunked_vector<object_id> oids, ss::abort_source*) {
200-
for (const auto& oid : oids) {
201-
remove_object(oid);
198+
ss::future<std::expected<void, io::errc>> fake_io::delete_objects(
199+
chunked_vector<object_location> objects, ss::abort_source*) {
200+
for (const auto& obj : objects) {
201+
if (obj.ts_path.has_value()) {
202+
// Imported object: its backing segment is addressed by ts_path
203+
// (mirrors file_io's path routing).
204+
_ts_storage.erase(*obj.ts_path);
205+
} else {
206+
remove_object(obj.id);
207+
}
202208
}
203209
co_return std::expected<void, io::errc>{};
204210
}
205211

212+
bool fake_io::has_ts_segment(const ts_segment_path& ts_path) const {
213+
return _ts_storage.contains(ts_path);
214+
}
215+
206216
std::optional<iobuf> fake_io::get_object(object_id id) {
207217
auto it = _storage.find(id);
208218
if (it == _storage.end()) {

src/v/cloud_topics/level_one/common/fake_io.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class fake_io : public io {
3636
object_extent, ss::abort_source*, cloud_io::group_id g) override;
3737

3838
ss::future<std::expected<void, errc>>
39-
delete_objects(chunked_vector<object_id>, ss::abort_source*) override;
39+
delete_objects(chunked_vector<object_location>, ss::abort_source*) override;
4040

4141
ss::future<std::expected<cloud_storage_clients::multipart_upload_ref, errc>>
4242
create_multipart_upload(
@@ -54,6 +54,10 @@ class fake_io : public io {
5454
// Return a list of the object IDs that haven't been removed.
5555
chunked_vector<object_id> list_objects() const;
5656

57+
// Whether an injected TS segment (see put_ts_segment) is still present.
58+
// For tests that exercise imported-object deletion.
59+
bool has_ts_segment(const ts_segment_path& ts_path) const;
60+
5761
/// Inject a raw TS-format segment for use with open_object on imported
5862
/// extents whose ts_path matches. open_object always seeks through the real
5963
/// ts_segment_index: with index_bytes (a serialized offset_index, as

src/v/cloud_topics/level_one/common/file_io.cc

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -441,19 +441,14 @@ file_io::open_object(
441441
extent.id, std::get<footer>(std::move(footer_result)), this, g);
442442
}
443443

444-
ss::future<std::expected<void, io::errc>>
445-
file_io::delete_objects(chunked_vector<object_id> ids, ss::abort_source* as) {
446-
static constexpr auto timeout = 10s;
447-
static constexpr auto backoff = 100ms;
448-
retry_chain_node root(*as, ss::lowres_clock::now() + timeout, backoff);
449-
chunked_vector<cloud_storage_clients::object_key> keys;
450-
for (const auto& id : ids) {
451-
keys.push_back(object_path_factory::level_one_path(id));
452-
}
444+
ss::future<std::expected<void, io::errc>> file_io::delete_keys(
445+
const cloud_storage_clients::bucket_name& bucket,
446+
chunked_vector<cloud_storage_clients::object_key> keys,
447+
retry_chain_node& root) {
453448
auto result_fut
454449
= co_await ss::coroutine::as_future<cloud_io::upload_result>(
455450
_remote->delete_objects(
456-
_bucket, std::move(keys), root, [](size_t retry_count) {
451+
bucket, std::move(keys), root, [](size_t retry_count) {
457452
std::ignore = retry_count;
458453
}));
459454
if (result_fut.failed()) {
@@ -473,6 +468,64 @@ file_io::delete_objects(chunked_vector<object_id> ids, ss::abort_source* as) {
473468
std::unreachable();
474469
}
475470

471+
ss::future<std::expected<void, io::errc>> file_io::delete_objects(
472+
chunked_vector<object_location> objects, ss::abort_source* as) {
473+
static constexpr auto timeout = 10s;
474+
static constexpr auto backoff = 100ms;
475+
retry_chain_node root(*as, ss::lowres_clock::now() + timeout, backoff);
476+
477+
chunked_vector<cloud_storage_clients::object_key> native_keys;
478+
chunked_vector<cloud_storage_clients::object_key> ts_keys;
479+
size_t ts_count = 0;
480+
for (const auto& obj : objects) {
481+
if (obj.ts_path.has_value()) {
482+
++ts_count;
483+
// An imported segment owns three cloud objects (the segment, its
484+
// .tx range manifest, and its .index), the same set the archiver
485+
// would have deleted. Remove all three so nothing is orphaned.
486+
cloud_storage::remote_segment_path seg_path{
487+
std::filesystem::path{(*obj.ts_path)()}};
488+
ts_keys.push_back(
489+
cloud_storage_clients::object_key{(*obj.ts_path)()});
490+
ts_keys.push_back(
491+
cloud_storage_clients::object_key{
492+
cloud_storage::generate_remote_tx_path(seg_path)().native()});
493+
ts_keys.push_back(
494+
cloud_storage_clients::object_key{
495+
cloud_storage::generate_index_path(seg_path).native()});
496+
} else {
497+
native_keys.push_back(object_path_factory::level_one_path(obj.id));
498+
}
499+
}
500+
501+
if (!native_keys.empty()) {
502+
auto l1_object_count = native_keys.size();
503+
auto res = co_await delete_keys(_bucket, std::move(native_keys), root);
504+
if (!res.has_value()) {
505+
vlog(
506+
cd_log.warn,
507+
"Failed to delete {} L1 objects: {}",
508+
l1_object_count,
509+
res.error());
510+
co_return res;
511+
}
512+
}
513+
if (!ts_keys.empty()) {
514+
auto ts_key_count = ts_keys.size();
515+
auto res = co_await delete_keys(_bucket, std::move(ts_keys), root);
516+
if (!res.has_value()) {
517+
vlog(
518+
cd_log.warn,
519+
"Failed to delete {} keys for {} imported TS segments: {}",
520+
ts_key_count,
521+
ts_count,
522+
res.error());
523+
co_return res;
524+
}
525+
}
526+
co_return std::expected<void, io::errc>{};
527+
}
528+
476529
ss::future<std::expected<cloud_storage_clients::multipart_upload_ref, io::errc>>
477530
file_io::create_multipart_upload(
478531
object_id oid, size_t part_size, ss::abort_source* as) {

src/v/cloud_topics/level_one/common/file_io.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,19 @@ class file_io : public io {
4848
object_extent, ss::abort_source*, cloud_io::group_id g) override;
4949

5050
ss::future<std::expected<void, errc>>
51-
delete_objects(chunked_vector<object_id>, ss::abort_source*) override;
51+
delete_objects(chunked_vector<object_location>, ss::abort_source*) override;
5252

5353
ss::future<std::expected<cloud_storage_clients::multipart_upload_ref, errc>>
5454
create_multipart_upload(
5555
object_id, size_t part_size, ss::abort_source*) override;
5656

5757
private:
58+
// Delete a batch of keys from the given bucket.
59+
ss::future<std::expected<void, errc>> delete_keys(
60+
const cloud_storage_clients::bucket_name& bucket,
61+
chunked_vector<cloud_storage_clients::object_key> keys,
62+
retry_chain_node& parent);
63+
5864
cloud_io::remote* _remote;
5965
// Holds both native L1 objects and imported tiered-storage segments: a
6066
// single cluster always stores both in the one configured object bucket.

src/v/cloud_topics/level_one/metastore/garbage_collector.cc

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ garbage_collector::remove_unreferenced_objects(ss::abort_source* as) {
3131
// remove on the state that has been persisted to cloud.
3232
const auto& s = stm_->state();
3333

34-
chunked_vector<object_id> to_remove;
34+
chunked_vector<object_location> to_remove;
3535
for (const auto& [oid, obj_entry] : s.objects) {
3636
if (obj_entry.is_preregistration) {
3737
continue;
@@ -42,7 +42,13 @@ garbage_collector::remove_unreferenced_objects(ss::abort_source* as) {
4242

4343
// TODO: split these into multiple updates in case we've got a lot of
4444
// objects to remove.
45-
to_remove.emplace_back(oid);
45+
to_remove.push_back(
46+
object_location{
47+
.id = oid,
48+
.ts_path = obj_entry.imported_ts_location.transform(
49+
[](const imported_ts_object_location& loc) {
50+
return loc.ts_path;
51+
})});
4652
vlog(cd_log.debug, "Deleting L1 object: {}", oid);
4753
}
4854
if (to_remove.empty()) {
@@ -52,7 +58,12 @@ garbage_collector::remove_unreferenced_objects(ss::abort_source* as) {
5258
if (!del_res.has_value()) {
5359
co_return std::unexpected(error{"io error"});
5460
}
55-
auto update_res = remove_objects_update::build(s, std::move(to_remove));
61+
chunked_vector<object_id> remove_ids;
62+
remove_ids.reserve(to_remove.size());
63+
for (const auto& ext : to_remove) {
64+
remove_ids.emplace_back(ext.id);
65+
}
66+
auto update_res = remove_objects_update::build(s, std::move(remove_ids));
5667
if (!update_res.has_value()) {
5768
co_return std::unexpected(error{"logic error"});
5869
}

src/v/cloud_topics/level_one/metastore/lsm/garbage_collector.cc

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ db_garbage_collector::remove_unreferenced_batch(
117117
std::move(object_range_res.error()), "Error getting object range"));
118118
}
119119
auto object_gen = object_range_res.value().get_rows();
120-
chunked_vector<object_id> to_remove;
120+
chunked_vector<object_location> to_remove;
121121
size_t batch_expire_count = 0;
122122
std::optional<object_id> next_batch_start{std::nullopt};
123123
while (auto obj_ref_opt = co_await object_gen()) {
@@ -178,7 +178,13 @@ db_garbage_collector::remove_unreferenced_batch(
178178
obj_entry.removed_data_size == obj_entry.total_data_size
179179
&& obj_entry.last_updated <= deletion_delay_cutoff) {
180180
vlog(cd_log.debug, "Deleting L1 object: {}", oid);
181-
to_remove.emplace_back(oid);
181+
to_remove.push_back(
182+
object_location{
183+
.id = oid,
184+
.ts_path = obj_entry.imported_ts_location.transform(
185+
[](const imported_ts_object_location& loc) {
186+
return loc.ts_path;
187+
})});
182188
if (to_remove.size() + batch_expire_count == batch_size) {
183189
// Set an explicit next starting object if we had more than one
184190
// batch of objects so we can make incremental progress.
@@ -206,7 +212,12 @@ db_garbage_collector::remove_unreferenced_batch(
206212
error(errc::io_error, "Error deleting objects: {}", del_res.error()));
207213
}
208214
probe_->gc_objects_deleted(num_to_remove);
209-
remove_objects_db_update update{std::move(to_remove)};
215+
chunked_vector<object_id> remove_ids;
216+
remove_ids.reserve(to_remove.size());
217+
for (const auto& ext : to_remove) {
218+
remove_ids.emplace_back(ext.id);
219+
}
220+
remove_objects_db_update update{std::move(remove_ids)};
210221
chunked_vector<write_batch_row> rows;
211222
auto build_res = co_await update.build_rows(rows);
212223
if (!build_res.has_value()) {

src/v/cloud_topics/level_one/metastore/tests/garbage_collector_test.cc

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,71 @@ class GarbageCollectorTest : public raft::stm_raft_fixture<simple_stm> {
129129
ASSERT_TRUE_CORO(repl_res.has_value());
130130
}
131131

132+
// Registers a single-extent imported object covering [base, last] whose
133+
// backing segment is the TS-bucket object at ts_path (injected here).
134+
// Unlike add_objects, an imported object has no L1 blob and is not
135+
// preregistered.
136+
ss::future<> add_imported_object(
137+
simple_stm* stm,
138+
model::topic_id_partition tp,
139+
const ts_segment_path& ts_path,
140+
kafka::offset base,
141+
kafka::offset last) {
142+
_io.put_ts_segment(ts_path, iobuf{});
143+
144+
new_object obj;
145+
obj.oid = create_object_id();
146+
obj.footer_pos = 0;
147+
obj.object_size = 500;
148+
obj.imported_ts_location = imported_ts_object_location{
149+
.ts_path = ts_path};
150+
obj.extent_metas[tp.topic_id][tp.partition] = new_object::metadata{
151+
.base_offset = base,
152+
.last_offset = last,
153+
.max_timestamp = ts{10000},
154+
.filepos = 0,
155+
.len = 500,
156+
.imported_ts_info = imported_ts_segment_info{},
157+
};
158+
chunked_vector<new_object> new_objects;
159+
new_objects.push_back(std::move(obj));
160+
term_state_update_t terms;
161+
terms[tp].emplace_back(
162+
term_start{.term_id = model::term_id{0}, .start_offset = base});
163+
164+
// Mark the partition migrating so the metastore adopts its log at the
165+
// imported extents' (non-zero) base, then import via the normal
166+
// add_objects path (imported objects skip pre-registration).
167+
{
168+
auto sync_res = co_await stm->sync(10s);
169+
ASSERT_TRUE_CORO(sync_res.has_value());
170+
set_migrating_update mig{.tp = tp, .migrating = true};
171+
storage::record_batch_builder b(
172+
model::record_batch_type::l1_stm, model::offset{0});
173+
b.add_raw_kv(
174+
serde::to_iobuf(set_migrating_update::key),
175+
serde::to_iobuf(std::move(mig)));
176+
auto r = co_await stm->replicate_and_wait(
177+
sync_res.value(), std::move(b).build(), never_abort);
178+
ASSERT_TRUE_CORO(r.has_value());
179+
}
180+
181+
auto sync_res = co_await stm->sync(10s);
182+
ASSERT_TRUE_CORO(sync_res.has_value());
183+
add_objects_update update{
184+
.new_objects = std::move(new_objects),
185+
.new_terms = std::move(terms),
186+
};
187+
storage::record_batch_builder builder(
188+
model::record_batch_type::l1_stm, model::offset{0});
189+
builder.add_raw_kv(
190+
serde::to_iobuf(add_objects_update::key),
191+
serde::to_iobuf(std::move(update)));
192+
auto repl_res = co_await stm->replicate_and_wait(
193+
sync_res.value(), std::move(builder).build(), never_abort);
194+
ASSERT_TRUE_CORO(repl_res.has_value());
195+
}
196+
132197
size_t count_objects() { return _io.list_objects().size(); }
133198

134199
fake_io _io;
@@ -189,6 +254,30 @@ TEST_F(GarbageCollectorTest, TestGarbageCollectPartiallyRemovedObjects) {
189254
ASSERT_EQ(5, count_objects());
190255
}
191256

257+
TEST_F(GarbageCollectorTest, TestGarbageCollectImportedObject) {
258+
initialize_state_machines(1).get();
259+
wait_for_leader(5s).get();
260+
auto stm = get_stm<0>(*nodes().begin()->second);
261+
262+
auto tp = make_tp(0);
263+
const ts_segment_path ts_path{"imported/seg-100-104.log"};
264+
add_imported_object(stm.get(), tp, ts_path, o{100}, o{104}).get();
265+
EXPECT_EQ(1, stm->state().objects.size());
266+
EXPECT_TRUE(_io.has_ts_segment(ts_path));
267+
268+
// Trim past the imported extent so the object becomes unreferenced.
269+
set_start_offset(stm.get(), tp, o{105}).get();
270+
271+
garbage_collector gc(stm.get(), &_io);
272+
auto gc_res = gc.remove_unreferenced_objects(&never_abort).get();
273+
ASSERT_TRUE(gc_res.has_value());
274+
275+
// The object row is gone, and its backing TS-bucket segment is deleted
276+
// (routed by ts_path, not the object id).
277+
EXPECT_EQ(0, stm->state().objects.size());
278+
EXPECT_FALSE(_io.has_ts_segment(ts_path));
279+
}
280+
192281
// A fake_io wrapper that fails on delete_objects
193282
class failing_io : public io {
194283
public:
@@ -217,8 +306,8 @@ class failing_io : public io {
217306
std::unexpected(errc::cloud_op_error));
218307
}
219308

220-
ss::future<std::expected<void, errc>>
221-
delete_objects(chunked_vector<object_id>, ss::abort_source*) override {
309+
ss::future<std::expected<void, errc>> delete_objects(
310+
chunked_vector<object_location>, ss::abort_source*) override {
222311
return ss::make_ready_future<std::expected<void, errc>>(
223312
std::unexpected(errc::cloud_op_error));
224313
}

0 commit comments

Comments
 (0)