Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ service LevelZeroGcService {
authz: SUPERUSER
};
}

// Advance a cloud topic partition to the current cluster epoch.
rpc AdvanceEpoch(AdvanceEpochRequest) returns (AdvanceEpochResponse) {
option (pbgen.rpc) = {
authz: SUPERUSER
};
}

// Query epoch state for one or more cloud topic partitions.
rpc GetEpochInfo(GetEpochInfoRequest) returns (GetEpochInfoResponse) {
option (pbgen.rpc) = {
authz: SUPERUSER
};
}
}

// Request to query GC worker status.
Expand Down Expand Up @@ -109,6 +123,39 @@ message PauseResult {
optional string error = 3;
}

// Request to advance a partition to the current cluster epoch.
message AdvanceEpochRequest {
common.v1.TopicPartition partition = 1;
int64 new_epoch = 2;
}

// Response containing epoch state after a successful advance.
message AdvanceEpochResponse {
EpochInfo epoch = 1;
}

// Request epoch info for a single partition.
message GetEpochInfoRequest {
common.v1.TopicPartition partition = 1;
}

// Response with epoch info for the requested partition.
message GetEpochInfoResponse {
EpochInfo epoch = 1;
}

// Epoch state snapshot for a cloud topic partition.
message EpochInfo {
// Largest epoch no longer referenced by the partition.
int64 estimated_inactive_epoch = 1;
// Highest epoch applied to the partition STM.
int64 max_applied_epoch = 2;
// Log offset of the last reconciled record.
int64 last_reconciled_log_offset = 3;
// Log offset at which max_applied_epoch was set.
int64 current_epoch_window_offset = 4;
}

// GC worker lifecycle states.
// Maps to cloud_topics::level_zero_gc::state.
enum Status {
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/services/internal/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ redpanda_cc_library(
deps = [
"//proto/redpanda/core/admin/internal/cloud_topics/v1:level_zero_gc_redpanda_proto",
"//src/v/base",
"//src/v/cloud_topics:state_accessors",
"//src/v/cloud_topics/frontend",
"//src/v/cloud_topics/level_zero/gc:level_zero_gc",
"//src/v/cluster",
"//src/v/model",
Expand Down
156 changes: 154 additions & 2 deletions src/v/redpanda/admin/services/internal/level_zero_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@
#include "redpanda/admin/services/internal/level_zero_gc.h"

#include "base/vassert.h"
#include "cloud_topics/frontend/frontend.h"
#include "cloud_topics/level_zero/gc/level_zero_gc.h"
#include "cloud_topics/state_accessors.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/partition_manager.h"
#include "cluster/shard_table.h"
#include "model/fundamental.h"
#include "model/timeout_clock.h"
#include "serde/protobuf/rpc.h"
#include "ssx/sformat.h"

Expand All @@ -21,6 +27,8 @@

namespace {
ss::logger gclog("level_zero_gc_service");
constexpr model::timeout_clock::duration leader_timeout = 5s;
constexpr model::timeout_clock::duration advance_epoch_timeout = 10s;
} // namespace

namespace admin {
Expand Down Expand Up @@ -48,11 +56,17 @@ level_zero_gc_service_impl::level_zero_gc_service_impl(
model::node_id self,
admin::proxy::client pc,
ss::sharded<cloud_topics::level_zero_gc>* gc,
ss::sharded<cluster::members_table>* mt)
ss::sharded<cluster::members_table>* mt,
ss::sharded<cluster::partition_manager>* pm,
ss::sharded<cluster::partition_leaders_table>* pl,
ss::sharded<cluster::shard_table>* st)
: _self(self)
, _proxy_client(std::move(pc))
, _gc(gc)
, _members_table(mt) {}
, _members_table(mt)
, _partition_manager(pm)
, _partition_leaders(pl)
, _shard_table(st) {}

seastar::future<proto::admin::level_zero_gc::get_status_response>
level_zero_gc_service_impl::get_status(
Expand Down Expand Up @@ -183,4 +197,142 @@ level_zero_gc_service_impl::pause(
co_return response;
}

namespace {

std::unique_ptr<cloud_topics::frontend>
make_ct_frontend(cluster::partition_manager& pm, const model::ntp& ntp) {
auto partition = pm.get(ntp);
if (partition == nullptr) {
throw serde::pb::rpc::not_found_exception(
ssx::sformat("TopicPartition {} not found", ntp.tp));
}
if (!partition->get_ntp_config().cloud_topic_enabled()) {
throw serde::pb::rpc::failed_precondition_exception(
ssx::sformat("TopicPartition {} is not a cloud topic", ntp.tp));
}
auto ct_state = partition->get_cloud_topics_state();
if (ct_state == nullptr || !ct_state->local_is_initialized()) {
throw serde::pb::rpc::failed_precondition_exception(
"Cloud topics subsystem is not initialized");
}
return std::make_unique<cloud_topics::frontend>(
partition, ct_state->local().get_data_plane());
}

proto::admin::level_zero_gc::epoch_info
epoch_info_to_pb(cloud_topics::frontend::epoch_info info) {
proto::admin::level_zero_gc::epoch_info result;
result.set_estimated_inactive_epoch(info.estimated_inactive_epoch);
result.set_max_applied_epoch(info.max_applied_epoch);
result.set_last_reconciled_log_offset(info.last_reconciled_log_offset);
result.set_current_epoch_window_offset(info.current_epoch_window_offset);
return result;
}

seastar::future<proto::admin::level_zero_gc::advance_epoch_response>
do_advance_epoch(
cluster::partition_manager& pm,
const model::ntp& ntp,
cloud_topics::cluster_epoch new_epoch) {
using namespace proto::admin::level_zero_gc;
auto fe = make_ct_frontend(pm, ntp);
auto info = co_await fe->advance_epoch(
new_epoch, model::timeout_clock::now() + advance_epoch_timeout);
if (!info.has_value()) {
using enum cloud_topics::frontend_errc;
switch (info.error()) {
case not_leader_for_partition:
throw serde::pb::rpc::failed_precondition_exception(
ssx::sformat("advance_epoch failed: {}", info.error()));
case timeout:
throw serde::pb::rpc::deadline_exceeded_exception(
ssx::sformat("advance_epoch failed: {}", info.error()));
default:
throw serde::pb::rpc::internal_exception(
ssx::sformat("advance_epoch failed: {}", info.error()));
}
}
advance_epoch_response response;
response.set_epoch(epoch_info_to_pb(info.value()));
co_return response;
}

} // namespace

seastar::future<proto::admin::level_zero_gc::advance_epoch_response>
level_zero_gc_service_impl::advance_epoch(
serde::pb::rpc::context ctx,
proto::admin::level_zero_gc::advance_epoch_request req) {
using namespace proto::admin::level_zero_gc;
auto& tp = req.get_partition();
auto ntp = model::ntp{
model::kafka_namespace, tp.get_topic(), tp.get_partition()};

auto leader = co_await _partition_leaders->local().wait_for_leader(
ntp, model::timeout_clock::now() + leader_timeout, std::nullopt);

if (leader != _self) {
if (proxy::is_proxied(ctx)) {
throw serde::pb::rpc::unavailable_exception("Not leader");
}
co_return co_await _proxy_client
.make_client_for_node<level_zero_gc_service_client>(leader)
.advance_epoch(ctx, std::move(req));
}

auto shard = _shard_table->local().shard_for(ntp);
if (!shard.has_value()) {
throw serde::pb::rpc::unavailable_exception("Not leader");
}

co_return co_await _partition_manager->invoke_on(
shard.value(),
[ntp, new_epoch = req.get_new_epoch()](
cluster::partition_manager& pm) -> ss::future<advance_epoch_response> {
return do_advance_epoch(
pm, ntp, cloud_topics::cluster_epoch{new_epoch});
});
}

seastar::future<proto::admin::level_zero_gc::get_epoch_info_response>
level_zero_gc_service_impl::get_epoch_info(
serde::pb::rpc::context ctx,
proto::admin::level_zero_gc::get_epoch_info_request req) {
using namespace proto::admin::level_zero_gc;

auto& tp = req.get_partition();
auto ntp = model::ntp{
model::kafka_namespace, tp.get_topic(), tp.get_partition()};

auto leader = _partition_leaders->local().get_leader(ntp);
if (!leader.has_value()) {
throw serde::pb::rpc::unavailable_exception(
ssx::sformat("No leader for {}", ntp.tp));
}

if (leader.value() != _self) {
if (proxy::is_proxied(ctx)) {
throw serde::pb::rpc::unavailable_exception("Not leader");
}
co_return co_await _proxy_client
.make_client_for_node<level_zero_gc_service_client>(leader.value())
.get_epoch_info(ctx, std::move(req));
}

auto shard = _shard_table->local().shard_for(ntp);
if (!shard.has_value()) {
throw serde::pb::rpc::unavailable_exception("Not leader");
}

auto info = co_await _partition_manager->invoke_on(
shard.value(), [ntp](cluster::partition_manager& pm) {
auto fe = make_ct_frontend(pm, ntp);
return fe->get_epoch_info();
});

get_epoch_info_response response;
response.set_epoch(epoch_info_to_pb(info));
co_return response;
}

} // namespace admin
23 changes: 22 additions & 1 deletion src/v/redpanda/admin/services/internal/level_zero_gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@
#pragma once

#include "cloud_topics/level_zero/gc/level_zero_gc.h"
#include "cluster/fwd.h"
#include "cluster/members_table.h"
#include "proto/redpanda/core/admin/internal/cloud_topics/v1/level_zero_gc.proto.h"
#include "redpanda/admin/proxy/client.h"
#include "redpanda/admin/proxy/context.h"

#include <seastar/core/sharded.hh>

namespace cloud_topics {
class frontend;
}

namespace admin {

namespace level_zero_gc::detail {
Expand Down Expand Up @@ -48,7 +53,10 @@ class level_zero_gc_service_impl
model::node_id self,
admin::proxy::client pc,
ss::sharded<cloud_topics::level_zero_gc>* gc,
ss::sharded<cluster::members_table>* mt);
ss::sharded<cluster::members_table>* mt,
ss::sharded<cluster::partition_manager>* pm,
ss::sharded<cluster::partition_leaders_table>* pl,
ss::sharded<cluster::shard_table>* st);

seastar::future<proto::admin::level_zero_gc::get_status_response>
get_status(
Expand All @@ -63,6 +71,16 @@ class level_zero_gc_service_impl
serde::pb::rpc::context,
proto::admin::level_zero_gc::pause_request) override;

seastar::future<proto::admin::level_zero_gc::advance_epoch_response>
advance_epoch(
serde::pb::rpc::context,
proto::admin::level_zero_gc::advance_epoch_request) override;

seastar::future<proto::admin::level_zero_gc::get_epoch_info_response>
get_epoch_info(
serde::pb::rpc::context,
proto::admin::level_zero_gc::get_epoch_info_request) override;

private:
using apply_local = ss::bool_class<struct apply_local_tag>;
using apply_remote = ss::bool_class<struct apply_remote_tag>;
Expand Down Expand Up @@ -162,6 +180,9 @@ class level_zero_gc_service_impl
admin::proxy::client _proxy_client;
ss::sharded<cloud_topics::level_zero_gc>* _gc;
ss::sharded<cluster::members_table>* _members_table;
ss::sharded<cluster::partition_manager>* _partition_manager;
ss::sharded<cluster::partition_leaders_table>* _partition_leaders;
ss::sharded<cluster::shard_table>* _shard_table;
};

} // namespace admin
5 changes: 4 additions & 1 deletion src/v/redpanda/application_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ void application::configure_admin_server(model::node_id node_id) {
node_id,
create_client(),
cloud_topics_app->get_level_zero_gc(),
&controller->get_members_table()));
&controller->get_members_table(),
&controller->get_partition_manager(),
&controller->get_partition_leaders(),
&controller->get_shard_table()));
}
s.add_service(
std::make_unique<
Expand Down
Loading
Loading