Skip to content

Commit 91a849d

Browse files
committed
ct/fe: frontend::advance_epoch
- fence - advance epoch thru ctp_stm_api - sync to next placeholder batch beyond the new epoch cmd - return epoch_info snapshot Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent 0c2af7b commit 91a849d

4 files changed

Lines changed: 102 additions & 0 deletions

File tree

src/v/cloud_topics/frontend/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ redpanda_cc_library(
4040
"//src/v/cloud_topics:data_plane_api",
4141
"//src/v/cloud_topics:log_reader_config",
4242
"//src/v/cloud_topics:state_accessors",
43+
"//src/v/cloud_topics:types",
4344
"//src/v/cloud_topics/level_one/frontend_reader:reader",
4445
"//src/v/cloud_topics/level_zero/common:extent_meta",
4546
"//src/v/cloud_topics/level_zero/frontend_reader",

src/v/cloud_topics/frontend/frontend.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "cloud_topics/level_zero/common/producer_queue.h"
1919
#include "cloud_topics/level_zero/frontend_reader/level_zero_reader.h"
2020
#include "cloud_topics/level_zero/stm/ctp_stm.h"
21+
#include "cloud_topics/level_zero/stm/ctp_stm_api.h"
2122
#include "cloud_topics/level_zero/stm/placeholder.h"
2223
#include "cloud_topics/logger.h"
2324
#include "cloud_topics/state_accessors.h"
@@ -1044,6 +1045,41 @@ frontend::epoch_info frontend::get_epoch_info() const {
10441045
};
10451046
}
10461047

1048+
auto frontend::advance_epoch(
1049+
cloud_topics::cluster_epoch new_epoch,
1050+
model::timeout_clock::time_point deadline)
1051+
-> ss::future<std::expected<epoch_info, frontend_errc>> {
1052+
vlog(cd_log.debug, "{}: advance epoch to {}", ntp(), new_epoch);
1053+
1054+
constexpr auto api_errc_to_fe_errc =
1055+
[](ctp_stm_api_errc ec) -> frontend_errc {
1056+
switch (ec) {
1057+
using enum ctp_stm_api_errc;
1058+
case not_leader:
1059+
return frontend_errc::not_leader_for_partition;
1060+
case shutdown:
1061+
case failure:
1062+
case timeout:
1063+
return frontend_errc::timeout;
1064+
}
1065+
};
1066+
1067+
ss::abort_source as;
1068+
1069+
auto result = co_await _ctp_stm_api->advance_epoch(new_epoch, deadline, as);
1070+
if (!result.has_value()) {
1071+
co_return std::unexpected{api_errc_to_fe_errc(result.error())};
1072+
}
1073+
1074+
auto adv_res = co_await _ctp_stm_api->sync_to_next_placeholder(
1075+
deadline, as);
1076+
if (!adv_res.has_value()) {
1077+
co_return std::unexpected{api_errc_to_fe_errc(adv_res.error())};
1078+
}
1079+
1080+
co_return get_epoch_info();
1081+
}
1082+
10471083
fmt::iterator
10481084
frontend::coarse_grained_timequery_result::format_to(fmt::iterator it) const {
10491085
return fmt::format_to(

src/v/cloud_topics/frontend/frontend.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ class frontend final {
167167
/// Return current epoch state.
168168
epoch_info get_epoch_info() const;
169169

170+
/// Advance the partition to the current cluster epoch and return epoch
171+
/// state.
172+
ss::future<std::expected<epoch_info, frontend_errc>> advance_epoch(
173+
cloud_topics::cluster_epoch, model::timeout_clock::time_point);
174+
170175
private:
171176
// All timequeries work by first getting a coarse grained timequery result
172177
// from metadata indexes, then getting an exact answer using the datapath.

src/v/cloud_topics/frontend/tests/frontend_test.cc

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,63 @@ TEST_F(frontend_fixture, test_replicate_epoch) {
188188
ASSERT_FALSE(res.has_value());
189189
}
190190
}
191+
192+
TEST_F(frontend_fixture, test_advance_epoch) {
193+
// This test verifies that frontend::advance_epoch() correctly integrates
194+
// with the underlying ctp_stm_api to advance the partition's epoch and
195+
// return consistent epoch_info.
196+
const model::topic topic_name("advance_epoch_test");
197+
model::ntp ntp(model::kafka_namespace, topic_name, 0);
198+
199+
cluster::topic_properties props;
200+
props.cloud_topic_enabled = true;
201+
props.shadow_indexing = model::shadow_indexing_mode::disabled;
202+
203+
add_topic({model::kafka_namespace, topic_name}, 1, props).get();
204+
wait_for_leader(ntp).get();
205+
206+
auto partition = app.partition_manager.local().get(ntp);
207+
ASSERT_TRUE(
208+
partition->raft()->stm_manager()->get<cloud_topics::ctp_stm>()
209+
!= nullptr);
210+
211+
cloud_topics::frontend frontend(std::move(partition), _data_plane.get());
212+
213+
// Initially, get_epoch_info should return min epochs (no data yet)
214+
auto initial_info = frontend.get_epoch_info();
215+
EXPECT_EQ(initial_info.max_applied_epoch, cluster_epoch::min());
216+
EXPECT_EQ(initial_info.estimated_inactive_epoch, cluster_epoch::min());
217+
218+
// Call advance_epoch to establish epoch 5
219+
auto first_advance
220+
= frontend.advance_epoch(cluster_epoch(5), model::no_timeout).get();
221+
ASSERT_TRUE(first_advance.has_value())
222+
<< "first advance_epoch should succeed";
223+
EXPECT_EQ(first_advance.value().max_applied_epoch, cluster_epoch(5));
224+
// first advance call reflects the new epoch right away because the stm
225+
// state was empty
226+
EXPECT_EQ(first_advance.value().estimated_inactive_epoch, cluster_epoch(4));
227+
228+
// Call advance_epoch with a higher epoch (10)
229+
auto advance_result
230+
= frontend.advance_epoch(cluster_epoch(10), model::no_timeout).get();
231+
ASSERT_TRUE(advance_result.has_value())
232+
<< "advance_epoch should succeed on leader";
233+
234+
auto epoch_info = advance_result.value();
235+
// max_applied_epoch should now be 10, inactive epoch is 4 because
236+
// prev_applied_epoch is 5
237+
EXPECT_EQ(epoch_info.max_applied_epoch, cluster_epoch(10));
238+
EXPECT_EQ(epoch_info.estimated_inactive_epoch, cluster_epoch(4));
239+
240+
// advance the epoch one more time to observe lower bound sync behavior
241+
242+
auto final_result
243+
= frontend.advance_epoch(cluster_epoch(15), model::no_timeout).get();
244+
ASSERT_TRUE(advance_result.has_value())
245+
<< "advance_epoch should succeed on leader";
246+
auto final_info = final_result.value();
247+
EXPECT_EQ(final_info.max_applied_epoch, cluster_epoch(15));
248+
EXPECT_EQ(final_info.estimated_inactive_epoch, cluster_epoch(9));
249+
EXPECT_EQ(final_info, frontend.get_epoch_info());
250+
}

0 commit comments

Comments
 (0)