RFC ts-ct-migration: metastore and L1 reader [PR 1]#30800
RFC ts-ct-migration: metastore and L1 reader [PR 1]#30800sjust-redpanda wants to merge 9 commits into
Conversation
609249b to
e79962a
Compare
|
/ci-repeat 1 |
1 similar comment
|
/ci-repeat 1 |
a7f0aa2 to
41625b5
Compare
8cf7d87 to
afc1be9
Compare
afc1be9 to
6491b9d
Compare
|
I think this is ready for review/feedback, but it probably shouldn't be merged until the remaining pieces are at least available in draft form and have been looked at. |
andrwng
left a comment
There was a problem hiding this comment.
Not a complete review yet, but looked over the structure and nothing too crazy stands out yet; just leaving some initial thoughts. Thanks for structuring this to be very easily reviewable!
6491b9d to
69c1b77
Compare
WillemKauf
left a comment
There was a problem hiding this comment.
Looking pretty good so far
|
|
||
| // Encode a v0 extent_row_value (no `imported` field) and verify that decoding | ||
| // it as the current v1 struct leaves `imported` as nullopt. | ||
| TEST(ImportedSerdeTest, ExtentRowValueV0CompatV1) { |
There was a problem hiding this comment.
This file feels like it is testing serde primitives/invariants that we depend on rather than anything to do with objects/the metastore/cloud_topics in general. I don't really oppose to having it but I'm not sure it's buying us anything more than a sanity check that should be elsewhere to begin with
|
|
||
| /// Mirrors metadata_row_value at version<1>, before the `migrating` field | ||
| /// field was added in version<2>. | ||
| struct metadata_row_value_v1 |
| } | ||
|
|
||
| std::expected<std::monostate, stm_update_error> | ||
| set_migrating_update::can_apply(const state& state, bool* is_no_op) { |
There was a problem hiding this comment.
absolute-micro-nit: ref makes more sense than pointer here imo, not sure we need a default is_no_op=nullptr argument
There was a problem hiding this comment.
Reasonable, but I'd rather leave it as a pointer to match l1::set_start_offset_db_update::build_rows and l1::set_start_offset_update::build.
| ss::future<std::optional<model::record_batch>> | ||
| tiered_storage_object_reader::fetch_next_translated() { | ||
| static const auto translator_types = model::offset_translator_batch_types(); | ||
| for (;;) { |
There was a problem hiding this comment.
this function is kind of big, mostly because of the different case handling and the code comments to go with it. Can we move that logic and those comments to a different function that returns a bool keep_reading and simplifies this inner loop significantly in terms of LOC and readability?
There was a problem hiding this comment.
I actually prefer it in this form, having the comments collected together makes the _running_delta handling easier to understand and quick to scan. I can adjust it though if you feel strongly.
3826677 to
674c8c6
Compare
| operator==(const imported_ts_info&, const imported_ts_info&) = default; | ||
| auto serde_fields() { | ||
| return std::tie( | ||
| ts_path, segment_term, base_kafka_offset, last_kafka_offset); |
There was a problem hiding this comment.
I think this will not work. You need the size of the object (it's part of the object name). You need the delta offset in order to be able to translate offsets.
You can theoretically do this with Kafka offsets if there are no gaps.
bk = base_kafka_offset;
bo = segment.first_batch.log_base_offset
delta = bo - bk
<..now you can scan the segment and translate offsets..>
The problem is that some segments are compacted. The compacted segment may have base_kafka_offset = M but the first batch in the segment will have base_kafka_offset = N and M < N.
So, you need at least one delta_offset field which will contain total number of configuration records in the prefix of the log.
There was a problem hiding this comment.
Yeah, I misunderstood how TS compaction works. There's another related problem, my current prototype disables compacted reupload. That could actually cause entries (and keys, depending on how re-upload is ordered) to resurrect, I think, once reads go to L1. Looking, thanks for the pointer.
There was a problem hiding this comment.
Ah, yea, compacted segments are a problem I hadn't thought of. Sorry for missing that.
For size though, object size is a part of extent info in the metastore, FWIW.
There was a problem hiding this comment.
Ok, we now store the base delta and the size is present in the normal metastore metadata.
| /// log-to-Kafka delta there is the first batch's log offset minus this); | ||
| /// last_kafka_offset bounds seeks in the imported segment index. | ||
| kafka::offset base_kafka_offset{0}; | ||
| kafka::offset last_kafka_offset{0}; |
There was a problem hiding this comment.
how about adding a flag that tells us that there is a tx-manifest present?
There was a problem hiding this comment.
Added a tx_manifest_state to indicate whether the tx manifest is present.
| deps = [ | ||
| ":abstract_io", | ||
| ":object_handle", | ||
| "//src/v/cloud_storage", |
There was a problem hiding this comment.
can we avoid adding this dependency by extracting what ts_reader/ts_object need out of the cloud_storage module?
There was a problem hiding this comment.
I split out a library with the bit we actually need.
797eb9f to
047e1ef
Compare
remote_segment_index.{h,cc} contained two independent things sharing a TU: the
read/write `offset_index` (used by readers) and `remote_segment_index_builder`, a
storage::batch_consumer that pulls in raft (raft::offset_translator_batch_types).
offset_index itself has none of that coupling -- only model, bytes, utils/delta_for
and serde.
Move `offset_index` into cloud_storage/offset_index.{h,cc} behind a new
//src/v/cloud_storage:offset_index cc_library.
remote_segment_index.{h,cc} keeps segment_record_stats + the builder, now
including offset_index.h; the cloud_storage monolith depends on :offset_index.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
047e1ef to
f2ed171
Compare
Add an optional descriptor that lets an L1 object/extent point at a segment that physically lives in the tiered-storage bucket instead of an L1-native object. Metadata/encoding only; no read, write, or delete path consumes it yet. - imported_ts_info (object_id.h): the per-segment descriptor -- ts_path, delta_offset/delta_offset_end (log<->kafka offset translation), segment_term, and last_kafka_offset. Stored split by ownership: ts_path on the object row (imported_ts_object_location), delta/term on the extent row (imported_ts_segment_info); absent means a native L1 object. - Thread it through the metastore read/interface types on both backends: an optional imported_ts_info on object_response / extent_object_info (recomposed from the storage split on read), plus rpc_types and the replicated/domain passthrough. - Encode it: serde-versioned on both backends (object_entry/extent and the LSM rows), the LSM debug-serde path, and the ImportedTsObjectLocation / ImportedTsSegmentInfo protobufs. Old-version decode defaults to absent. Testing: serde round-trip unit tests for the new fields, including old-version decode. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add a durable, per-partition boolean migrating to each backend's partition record, set true while a partition is mid tiered->cloud migration. - state: add the field to partition_state (serde v0->v1) and the LSM metadata_row_value (v1->v2), defaulting to false so snapshots predating the field decode as native cloud topics; carry it through the MetadataValue debug proto. - op: add set_migrating(partition, bool) across the metastore interface, the RPC service/client, the leader router (+probe), both domain managers, and both backends. It is idempotent (setting the current value is a no-op), applies in either direction, and creates the partition record when set on an absent partition; metadata-row rewrites (add_objects, set_start_offset) preserve the flag. - read: surface the flag on get_offsets. Tests (both backends): create-on-absent, set-then-clear, idempotent set, and preservation across add_objects; serde back-compat for both records. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Let the metastore register tiered-storage segments as imported L1 extents
by reference, reusing the existing add_objects extent-addition path rather
than a dedicated command.
- add_objects now handles imported objects:
- skips the pre-registration check for imported objects (they reference an
existing tiered-storage segment; nothing is written to L1)
- copies imported_ts_location / imported_ts_delta onto the object and
extent rows
- sets start_offset from the first extent for an empty migrating partition
(a non-zero start -- the TS->CT mount), rather than requiring offset 0
- the object builder's add_imported(ntp_metadata) registers one imported
segment as a finished, single-extent object: with no L1 write to reserve it
creates the object id and records it as finished without pre-registration,
to be committed by a subsequent add_objects. meta.imported must be set; the
read-replica metastore rejects it.
Tests (both backends): a migrating partition adopts a non-zero start at the
first imported extent; successive batches forward-append; a non-contiguous
batch is rejected.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… abstraction
Introduce an object_handle/object_index indirection for the L1 read path so a
reader can seek and stream from an object without knowing its on-disk format.
This is the seam a later commit uses to serve imported tiered-storage segments;
here it is wired for native L1 only and is behavior-preserving.
- object_handle.{h,cc}: object_index (seek_to_offset/seek_to_timestamp ->
seek_result) and object_handle (index() + open_reader) interfaces, plus the
native implementations l1_footer_index and l1_native_object_handle. The
handle reads byte ranges via the owning io's read_object, so file_io and
fake_io share one implementation.
- io::open_object (abstract_io.h): the dispatch entry point; file_io and
fake_io return an l1_native_object_handle (footer-indexed).
- level_one_reader obtains an object_handle via open_object and reads through
its index/open_reader instead of reading the footer directly; group_id is
threaded through.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add the building blocks for serving an imported tiered-storage segment, not yet
wired to any io backend (inert on their own):
- tiered_storage_object_reader (ts_reader.{h,cc}): an object_reader over a raw
Kafka-format segment stream. Maintains a running offset delta, emits only data
batches, drops aborted-transaction ranges (from the .tx manifest) so the
region is committed-only, and stamps the segment term as each batch's leader
epoch.
- ts_segment_index (ts_object.{h,cc}): object_index over a segment's
offset_index -- translates a target offset/timestamp into a byte position +
delta via find_kaf_offset/find_timestamp.
- ts_object_handle (ts_object.{h,cc}): object_handle whose byte
transport is injected as a fetch_range_fn, so file_io (download + cache) and
fake_io (in-memory) can share the seek/reader wiring.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Extract read_object's loop -- look up the cache; on miss reserve space, download the byte range, and cache it -- into a free download_cached_range( remote, cache, bucket, key, cache_key, offset, size, group, label) helper, and fold the one-line save_to_cache member into a free function it uses. read_object now delegates to it. Behavior-preserving; the imported tiered-storage read path in a later commit reuses the same helper for its segment downloads. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Wire io::open_object to dispatch an imported extent (one carrying imported_ts_info) to the tiered-storage read path, so a single L1 reader serves native L1 objects and imported segments interchangeably. - file_io: an imported extent downloads the segment's .index and .tx manifest, builds a ts_segment_index, and returns a ts_object_handle whose fetch callback downloads (and caches) byte ranges from the TS bucket. Adds the ts_bucket constructor parameter (wired in app.cc) and a download_raw_iobuf helper. - fake_io: an injected segment (put_ts_segment, optionally with a serialized offset_index) is served through the same ts_object_handle with an in-memory fetch; without an index the ts_segment_index is empty (a full-segment scan), matching file_io's missing-.index fallback. Testing: imported open_object/reader unit tests (offset translation, the index-backed seek, read-committed strip, leader-epoch stamping, control-batch drop) against fake_io, plus the imported reader fixture. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
When an imported L1 object is removed, delete its backing tiered-storage objects (the segment, its .tx range manifest, and its .index -- the same set the archiver would have deleted) so nothing is orphaned. - delete_objects (abstract_io/file_io/fake_io) takes object_location instead of object_id, so it routes by the imported location: present -> delete from the tiered-storage bucket; absent -> delete from the L1 bucket as before. - Both garbage collectors (metastore and lsm) build the removal list from each object's imported_ts_location. Native L1 deletion is unchanged. Testing: an imported-segment GC test (import via add_objects on a migrating partition, then trim + GC) asserts the backing TS segment is removed, routed by ts_path; fake_io grows has_ts_segment to observe it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
f2ed171 to
302623a
Compare
Backports Required
Release Notes
Features
Add the L1 metastore and reader foundation for an imported segment: one that physically lives in the tiered-storage bucket but is described and addressed by reference from L1. First PR of the tiered→cloud migration stack.
Changes by area
Schema / encoding — teach the metastore to describe a segment that lives in the tiered-storage bucket, referenced from L1 rather than written natively. The encoding changes, all serde-versioned (old snapshots decode with no imported info and
migrating= false):imported_ts_info(object_id.h) -- optional metadata for object_extent required to read ts extents, absent on L1 object_extentts_pathdelta_offset/delta_offset_endsegment_termlast_kafka_offset. Carried as an optional; absent = a native L1 object.object_entrygainsimported_ts_object_location— the backing segment'sts_path.imported_ts_segment_info(imported_ts_delta) — the per-data delta/term; the extent's own offsets carry the Kafka bounds.partition_state(v0→v1) and LSMmetadata_row_value(v1→v2) gain the booleanmigratingflag. Used by recovery/read-replica to determine whether to treat the partition as TS or CTImportedTsObjectLocation(ObjectValue.imported_ts_location),ImportedTsSegmentInfo(ExtentValue.imported_ts_delta), andMetadataValue.migrating.Write path
add_objectsalso now supports imported segments via object_metadata_builder with appropriate additions to new_objectmigrating(via the idempotentset_migratingop, surfaced onget_offsets) adopts its log at the first imported extent's base rather than at 0.Read path —
object_handleabstraction through /io::open_objectwith implementations for the existing L1 behaviorts_object_handle/ts_segment_indexresponsible for dealing with TS index and transaction filesDeletion — route object GC by the imported info: an imported extent's backing tiered-storage objects (segment +
.tx+.index) are deleted from the tiered-storage bucket; native objects route to the L1 bucket as before.Testing
Unit tests, green standalone (
bazel test //src/v/cloud_topics/level_one/...): serde round-trip / old-version decode,state_updateop semantics on both backends, the imported reader (offset translation, txn strip, epoch stamping, download errors), and imported-segment GC.Design Doc: https://docs.google.com/document/d/1HmzBsabpu28Oh8d2PW_HoEQA4rB3Jo1jcj1D3Rsh5mo/edit?usp=sharing