Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,26 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "table_utils",
srcs = [
"cluster_link/table_utils.cc",
],
hdrs = [
"cluster_link/table_utils.h",
],
implementation_deps = [
"//src/v/ssx:async_algorithm",
],
visibility = ["__subpackages__"],
deps = [
"//src/v/base",
"//src/v/cluster_link/model",
"//src/v/container:chunked_hash_map",
"@seastar",
],
)

redpanda_cc_library(
name = "cluster",
srcs = [
Expand Down Expand Up @@ -647,6 +667,7 @@ redpanda_cc_library(
"version.h",
],
implementation_deps = [
":table_utils",
":topic_memory_per_partition_default",
"//src/v/cloud_storage:logger",
"//src/v/crash_tracker",
Expand Down
74 changes: 35 additions & 39 deletions src/v/cluster/cluster_link/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ using ::cluster_link::model::add_mirror_topic_cmd;
using ::cluster_link::model::delete_mirror_topic_cmd;
using ::cluster_link::model::id_t;
using ::cluster_link::model::metadata;
using ::cluster_link::model::metadata_ptr;
using ::cluster_link::model::name_t;
using ::cluster_link::model::update_cluster_link_configuration_cmd;
using ::cluster_link::model::update_mirror_topic_properties_cmd;
Expand Down Expand Up @@ -205,13 +206,11 @@ frontend::find_link_id_by_topic(model::topic_view topic) const {
return _table->find_id_by_topic(topic);
}

std::optional<std::reference_wrapper<const metadata>>
frontend::find_link_by_id(id_t id) const {
metadata_ptr frontend::find_link_by_id(id_t id) const {
return _table->find_link_by_id(id);
}

std::optional<std::reference_wrapper<const metadata>>
frontend::find_link_by_name(const name_t& name) const {
metadata_ptr frontend::find_link_by_name(const name_t& name) const {
return _table->find_link_by_name(name);
}

Expand Down Expand Up @@ -240,8 +239,8 @@ frontend::get_mirror_topics_for_link(id_t id) const {
::model::topic,
::cluster_link::model::mirror_topic_metadata>
mirror_topics;
mirror_topics.reserve(link->get().state.mirror_topics.size());
for (const auto& [topic, metadata] : link->get().state.mirror_topics) {
mirror_topics.reserve(link->state.mirror_topics.size());
for (const auto& [topic, metadata] : link->state.mirror_topics) {
mirror_topics.emplace(topic, metadata.copy());
}
return mirror_topics;
Expand All @@ -259,10 +258,9 @@ bool frontend::is_autocreate_mirror_topic(const model::topic& topic) const {
}
auto link = _table->find_link_by_id(link_id.value());
vassert(
link.has_value(), "Expected value for link with id {}", link_id.value());
link != nullptr, "Expected value for link with id {}", link_id.value());
const auto& topic_filters
= link->get()
.configuration.topic_metadata_mirroring_cfg.topic_name_filters;
= link->configuration.topic_metadata_mirroring_cfg.topic_name_filters;
return ::cluster_link::model::select_topic(topic, topic_filters);
}

Expand Down Expand Up @@ -311,11 +309,11 @@ bool frontend::schema_registry_shadowing_active() const {
auto link_ids = get_all_link_ids();
return std::ranges::any_of(link_ids, [this](id_t link_id) -> bool {
const auto md = find_link_by_id(link_id);
if (!md.has_value()) {
if (!md) {
return false;
}
// Check to see if the schema registry topic is in the mirror topic list
const auto& mirror_topics = md->get().state.mirror_topics;
const auto& mirror_topics = md->state.mirror_topics;
auto topic_it = mirror_topics.find(
::model::schema_registry_internal_tp.topic);
if (topic_it != mirror_topics.end()) {
Expand All @@ -324,7 +322,7 @@ bool frontend::schema_registry_shadowing_active() const {
}
// If mirror_schema_registry_topic option is set and the topic is not
// yet in the mirror topic list, then shadowing for SR is active
const auto& sr_cfg = md->get().configuration.schema_registry_sync_cfg;
const auto& sr_cfg = md->configuration.schema_registry_sync_cfg;
if (
sr_cfg.sync_schema_registry_topic_mode.has_value()
&& std::holds_alternative<
Expand Down Expand Up @@ -573,10 +571,9 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
cmd,
[this](const cluster::cluster_link_upsert_cmd& cmd) {
auto existing = _table->find_link_by_name(cmd.value.name);
if (existing.has_value()) {
if (existing) {
// upsert
const auto& meta = existing->get();
if (meta.uuid != cmd.value.uuid) {
if (existing->uuid != cmd.value.uuid) {
// If the UUIDs do not match, it means we are trying to
// update an existing link with a different UUID.
vlog(
Expand All @@ -585,7 +582,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
"different UUID ({}) than the existing one ({})",
cmd.value.name,
cmd.value.uuid,
meta.uuid);
existing->uuid);
return errc::uuid_conflict;
}
auto ec = validate_connection_config(cmd.value.connection);
Expand Down Expand Up @@ -652,10 +649,10 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
},
[this](const cluster::cluster_link_remove_cmd& cmd) {
auto meta = _table->find_link_by_name(cmd.value.link_name);
if (!meta.has_value()) {
if (!meta) {
return errc::does_not_exist;
}
const auto& md = meta->get();

const auto is_removable =
[](const ::cluster_link::model::mirror_topic_status s) {
switch (s) {
Expand All @@ -671,7 +668,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
}
};
const auto mirror_topic_states
= md.state.mirror_topics | std::views::values
= meta->state.mirror_topics | std::views::values
| std::views::transform(
&::cluster_link::model::mirror_topic_metadata::status);
if (
Expand Down Expand Up @@ -704,18 +701,18 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
return errc::invalid_create;
}
auto meta = _table->find_link_by_id(cmd.key);
if (!meta.has_value()) {
if (!meta) {
return errc::does_not_exist;
}
const auto status = meta->get().state.status;
const auto status = meta->state.status;
if (status != ::cluster_link::model::link_status::active) {
// fence any new topic additions if the link is not active
vlog(
cluster::clusterlog.warn,
"Attempting to add mirror topic {} to link {} which is not in "
"the active state (current state: {})",
cmd.value.topic,
meta->get().name,
meta->name,
status);
return errc::invalid_update;
}
Expand All @@ -728,14 +725,14 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
"Attempting to add mirror topic '{}' to '{}', however it "
"is already mirrored by another link",
cmd.value.topic,
meta->get().name);
meta->name);
return errc::topic_being_mirrored_by_other_link;
} else {
vlog(
cluster::clusterlog.warn,
"Topic '{}' is already mirrored by link '{}'",
cmd.value.topic,
meta->get().name);
meta->name);
return errc::topic_already_being_mirrored;
}
}
Expand All @@ -744,7 +741,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
cluster::clusterlog.warn,
"Invalid partition count for topic {} in link {}: {}",
cmd.value.topic,
meta->get().name,
meta->name,
cmd.value.metadata.partition_count);
return errc::invalid_update;
}
Expand All @@ -766,7 +763,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
return errc::mirror_topic_name_invalid;
}
auto meta = _table->find_link_by_id(cmd.key);
if (!meta.has_value()) {
if (!meta) {
return errc::does_not_exist;
}
auto id = _table->find_id_by_topic(cmd.value.topic);
Expand All @@ -776,7 +773,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
"Attempting to delete mirror topic '{}' from link '{}', "
"however topic is not being mirrored",
cmd.value.topic,
meta->get().name);
meta->name);
return errc::topic_not_being_mirrored;
}
if (id.value() != cmd.key) {
Expand All @@ -786,7 +783,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
"however topic "
"is mirrored by another link",
cmd.value.topic,
meta->get().name);
meta->name);
return errc::topic_being_mirrored_by_other_link;
}
return errc::success;
Expand All @@ -798,7 +795,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
return errc::mirror_topic_name_invalid;
}
auto meta = _table->find_link_by_id(cmd.key);
if (!meta.has_value()) {
if (!meta) {
return errc::does_not_exist;
}
auto id = _table->find_id_by_topic(cmd.value.topic);
Expand Down Expand Up @@ -847,7 +844,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
return errc::mirror_topic_name_invalid;
}
auto meta = _table->find_link_by_id(cmd.key);
if (!meta.has_value()) {
if (!meta) {
return errc::does_not_exist;
}
auto id = _table->find_id_by_topic(cmd.value.topic);
Expand All @@ -865,7 +862,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
cmd.value.topic);
return errc::topic_being_mirrored_by_other_link;
}
const auto& mirror_state = meta->get().state;
const auto& mirror_state = meta->state;
const auto it = mirror_state.mirror_topics.find(cmd.value.topic);

vassert(
Expand All @@ -881,7 +878,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
"Attempting to update mirror topic {} on link {} which is not "
"in the active state (current state: {})",
cmd.value.topic,
meta->get().name,
meta->name,
mirror_state.status);
return errc::invalid_update;
}
Expand Down Expand Up @@ -912,7 +909,7 @@ errc frontend::validator::validate_mutation(const cluster_link_cmd& cmd) const {
[this](
const cluster::cluster_link_update_cluster_link_configuration_cmd&
cmd) {
if (!_table->find_link_by_id(cmd.key).has_value()) {
if (!_table->find_link_by_id(cmd.key)) {
vlog(
cluster::clusterlog.warn,
"Attempting to update a non-existant link id {}",
Expand Down Expand Up @@ -1091,21 +1088,20 @@ errc frontend::validator::validate_metadata_mirroring_config(
ss::future<errc> frontend::failover_link_topics(
::cluster_link::model::id_t id, model::timeout_clock::time_point timeout) {
auto meta = _table->find_link_by_id(id);
if (!meta.has_value()) {
if (!meta) {
co_return errc::does_not_exist;
}
const auto& md = meta->get();
if (md.state.status != ::cluster_link::model::link_status::active) {
if (meta->state.status != ::cluster_link::model::link_status::active) {
vlog(
cluster::clusterlog.warn,
"Attempting to failover topics of link {} which is not in the active "
"state (current state: {})",
md.name,
md.state.status);
meta->name,
meta->state.status);
co_return errc::invalid_update;
}

const auto& topics = md.state.mirror_topics;
const auto& topics = meta->state.mirror_topics;
chunked_vector<model::topic> topics_to_failover;
auto should_failover = [](::cluster_link::model::mirror_topic_status s) {
switch (s) {
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/cluster_link/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ class frontend : public ss::peering_sharded_service<frontend> {
notification_id register_for_updates(notification_callback);
void unregister_for_updates(notification_id);

std::optional<std::reference_wrapper<const ::cluster_link::model::metadata>>
::cluster_link::model::metadata_ptr
find_link_by_id(::cluster_link::model::id_t id) const;

std::optional<std::reference_wrapper<const ::cluster_link::model::metadata>>
::cluster_link::model::metadata_ptr
find_link_by_name(const ::cluster_link::model::name_t& name) const;

std::optional<::cluster_link::model::id_t>
Expand Down
Loading