Skip to content

Commit 9ff8ef9

Browse files
authored
Merge pull request #26929 from bharathv/notif_mgr
cluster/utils: unified partition notifications API
2 parents 20c6cf9 + 93f374d commit 9ff8ef9

File tree

15 files changed

+473
-204
lines changed

15 files changed

+473
-204
lines changed

src/v/cluster/utils/BUILD

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load("//bazel:build.bzl", "redpanda_cc_library")
2+
3+
redpanda_cc_library(
4+
name = "partition_change_notifier_api",
5+
hdrs = [
6+
"partition_change_notifier.h",
7+
],
8+
include_prefix = "cluster/utils",
9+
visibility = ["//visibility:public"],
10+
deps = [
11+
"//src/v/cluster:notification",
12+
"//src/v/cluster:topic_configuration",
13+
"//src/v/model",
14+
"@seastar",
15+
],
16+
)
17+
18+
redpanda_cc_library(
19+
name = "partition_change_notifier_impl",
20+
srcs = [
21+
"partition_change_notifier_impl.cc",
22+
],
23+
hdrs = [
24+
"partition_change_notifier_impl.h",
25+
],
26+
include_prefix = "cluster/utils",
27+
visibility = ["//visibility:public"],
28+
deps = [
29+
":partition_change_notifier_api",
30+
"//src/v/cluster",
31+
"//src/v/container:chunked_hash_map",
32+
"//src/v/raft",
33+
"@seastar",
34+
],
35+
)
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
#pragma once
13+
14+
#include "base/seastarx.h"
15+
#include "cluster/notification.h"
16+
#include "cluster/topic_configuration.h"
17+
#include "model/fundamental.h"
18+
19+
#include <seastar/util/noncopyable_function.hh>
20+
21+
namespace cluster {
22+
23+
// Unified interface for partition level notifications.
24+
// Supports notifications for partitions with replicas on the current shard.
25+
class partition_change_notifier {
26+
public:
27+
enum class notification_type : int8_t {
28+
// Notification for leadership change of a partition.
29+
// Notified on assuming leadership or losing leadership.
30+
// For leadership changes for all partitions, not just those on the
31+
// current shard, use cluster::partition_leaders_table.
32+
leadership_change,
33+
// A new replica is now being managed by the current shard.
34+
// New replicas assignments typically come from topic creations and
35+
// partition movements.
36+
partition_replica_assigned,
37+
// An existing replica is no longer managed by the current shard.
38+
// There will be a separate notification for leadership drop if the
39+
// replica was a leader.
40+
partition_replica_unassigned,
41+
// Properties of the replica's partition have changed.
42+
partition_properties_change
43+
};
44+
struct partition_state {
45+
partition_state(
46+
model::term_id term,
47+
bool is_leader,
48+
std::optional<cluster::topic_configuration> topic_cfg)
49+
: term(term)
50+
, is_leader(is_leader)
51+
, topic_cfg(std::move(topic_cfg)) {}
52+
model::term_id term;
53+
bool is_leader;
54+
std::optional<cluster::topic_configuration> topic_cfg;
55+
};
56+
// partition_state is only set if the partition is still active
57+
// on the shard. If unset the processor can assume that the partition
58+
// replica is no longer active on the shard.
59+
using notification_cb_t = ss::noncopyable_function<void(
60+
notification_type, const model::ntp&, std::optional<partition_state>)>;
61+
62+
partition_change_notifier() = default;
63+
partition_change_notifier(const partition_change_notifier&) = delete;
64+
partition_change_notifier(partition_change_notifier&&) = delete;
65+
partition_change_notifier& operator=(const partition_change_notifier&)
66+
= delete;
67+
partition_change_notifier& operator=(partition_change_notifier&&) = delete;
68+
69+
virtual ~partition_change_notifier() = default;
70+
71+
virtual notification_id_type
72+
register_partition_notifications(notification_cb_t)
73+
= 0;
74+
75+
virtual void unregister_partition_notifications(notification_id_type) = 0;
76+
};
77+
} // namespace cluster
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
#include "cluster/utils/partition_change_notifier_impl.h"
13+
14+
#include "cluster/partition_manager.h"
15+
#include "cluster/topic_table.h"
16+
#include "raft/group_manager.h"
17+
18+
#include <seastar/util/defer.hh>
19+
20+
namespace cluster {
21+
22+
partition_change_notifier_impl::partition_change_notifier_impl(
23+
ss::sharded<raft::group_manager>& group_mgr,
24+
ss::sharded<cluster::partition_manager>& partition_mgr,
25+
ss::sharded<cluster::topic_table>& topic_table)
26+
: _group_mgr(group_mgr)
27+
, _partition_mgr(partition_mgr)
28+
, _topic_table(topic_table) {}
29+
30+
void partition_change_notifier_impl::do_notify_call_back(
31+
notification_type type,
32+
notification_id_type id,
33+
const model::ntp& ntp,
34+
ss::lw_shared_ptr<partition> partition) {
35+
auto it = _notification_ids.find(id);
36+
if (it == _notification_ids.end()) [[unlikely]] {
37+
return;
38+
}
39+
std::optional<partition_state> state;
40+
if (partition) {
41+
state = partition_state{
42+
partition->term(),
43+
partition->is_leader(),
44+
_topic_table.local().get_topic_cfg(
45+
model::topic_namespace_view{partition->ntp()})};
46+
}
47+
it->second.cb(type, ntp, std::move(state));
48+
}
49+
50+
notification_id_type
51+
partition_change_notifier_impl::register_partition_notifications(
52+
notification_cb_t cb) {
53+
notification_state nstate;
54+
auto id = _notification_id++;
55+
auto exceptional_cleanup = ss::defer(
56+
[this, &nstate] { do_unregister_partition_notifications(nstate); });
57+
nstate.leadership = _group_mgr.local().register_leadership_notification(
58+
[this, id](
59+
raft::group_id group,
60+
model::term_id,
61+
std::optional<model::node_id>) mutable {
62+
auto partition = _partition_mgr.local().partition_for(group);
63+
if (!partition) [[unlikely]] {
64+
// If the partition is not found, it means that the group is
65+
// being deleted concurrently, ignore
66+
return;
67+
}
68+
do_notify_call_back(
69+
notification_type::leadership_change,
70+
id,
71+
partition->ntp(),
72+
partition);
73+
});
74+
// kafka namespace
75+
nstate.kafka_replica_assigned
76+
= _partition_mgr.local().register_manage_notification(
77+
model::kafka_namespace,
78+
[this, id](ss::lw_shared_ptr<cluster::partition> partition) mutable {
79+
vassert(partition, "Invalid partition in managed notification");
80+
do_notify_call_back(
81+
notification_type::partition_replica_assigned,
82+
id,
83+
partition->ntp(),
84+
partition);
85+
});
86+
nstate.kafka_replica_unassigned
87+
= _partition_mgr.local().register_unmanage_notification(
88+
model::kafka_namespace,
89+
[this, id](model::topic_partition_view tp) mutable {
90+
model::ntp ntp{model::kafka_namespace, tp.topic, tp.partition};
91+
do_notify_call_back(
92+
notification_type::partition_replica_unassigned,
93+
id,
94+
ntp,
95+
_partition_mgr.local().get(ntp));
96+
});
97+
// kafka internal namespace
98+
nstate.kafka_int_replica_assigned
99+
= _partition_mgr.local().register_manage_notification(
100+
model::kafka_internal_namespace,
101+
[this, id](ss::lw_shared_ptr<cluster::partition> partition) mutable {
102+
vassert(partition, "Invalid partition in managed notification");
103+
do_notify_call_back(
104+
notification_type::partition_replica_assigned,
105+
id,
106+
partition->ntp(),
107+
partition);
108+
});
109+
nstate.kafka_int_replica_unassigned
110+
= _partition_mgr.local().register_unmanage_notification(
111+
model::kafka_internal_namespace,
112+
[this, id](model::topic_partition_view tp) mutable {
113+
model::ntp ntp{model::kafka_namespace, tp.topic, tp.partition};
114+
do_notify_call_back(
115+
notification_type::partition_replica_unassigned,
116+
id,
117+
ntp,
118+
_partition_mgr.local().get(ntp));
119+
});
120+
// partition properties changes
121+
nstate.properties = _topic_table.local().register_ntp_delta_notification(
122+
[this, id](cluster::topic_table::ntp_delta_range_t range) mutable {
123+
for (const auto& entry : range) {
124+
if (
125+
entry.type
126+
== cluster::topic_table_ntp_delta_type::properties_updated) {
127+
do_notify_call_back(
128+
notification_type::partition_properties_change,
129+
id,
130+
entry.ntp,
131+
_partition_mgr.local().get(entry.ntp));
132+
}
133+
}
134+
});
135+
exceptional_cleanup.cancel();
136+
nstate.cb = std::move(cb);
137+
_notification_ids.emplace(id, std::move(nstate));
138+
return id;
139+
}
140+
141+
void partition_change_notifier_impl::do_unregister_partition_notifications(
142+
const notification_state& ids) {
143+
_group_mgr.local().unregister_leadership_notification(ids.leadership);
144+
_partition_mgr.local().unregister_manage_notification(
145+
ids.kafka_replica_assigned);
146+
_partition_mgr.local().unregister_unmanage_notification(
147+
ids.kafka_replica_unassigned);
148+
_partition_mgr.local().unregister_manage_notification(
149+
ids.kafka_int_replica_assigned);
150+
_partition_mgr.local().unregister_unmanage_notification(
151+
ids.kafka_int_replica_unassigned);
152+
_topic_table.local().unregister_ntp_delta_notification(ids.properties);
153+
}
154+
155+
void partition_change_notifier_impl::unregister_partition_notifications(
156+
notification_id_type id) {
157+
auto it = _notification_ids.find(id);
158+
if (it == _notification_ids.end()) [[unlikely]] {
159+
return;
160+
}
161+
do_unregister_partition_notifications(it->second);
162+
_notification_ids.erase(it);
163+
}
164+
165+
std::unique_ptr<cluster::partition_change_notifier>
166+
partition_change_notifier_impl::make_default(
167+
ss::sharded<raft::group_manager>& group_mgr,
168+
ss::sharded<cluster::partition_manager>& partition_mgr,
169+
ss::sharded<cluster::topic_table>& topic_table) {
170+
return std::make_unique<partition_change_notifier_impl>(
171+
group_mgr, partition_mgr, topic_table);
172+
}
173+
} // namespace cluster
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
#pragma once
13+
14+
#include "cluster/fwd.h"
15+
#include "cluster/utils/partition_change_notifier.h"
16+
#include "container/chunked_hash_map.h"
17+
#include "raft/fwd.h"
18+
#include "raft/notification.h"
19+
#include "seastar/core/sharded.hh"
20+
21+
namespace cluster {
22+
23+
class partition_change_notifier_impl final : public partition_change_notifier {
24+
public:
25+
partition_change_notifier_impl(
26+
ss::sharded<raft::group_manager>&,
27+
ss::sharded<cluster::partition_manager>&,
28+
ss::sharded<cluster::topic_table>&);
29+
30+
notification_id_type
31+
register_partition_notifications(notification_cb_t) final;
32+
33+
void unregister_partition_notifications(notification_id_type) final;
34+
35+
static std::unique_ptr<partition_change_notifier> make_default(
36+
ss::sharded<raft::group_manager>&,
37+
ss::sharded<cluster::partition_manager>&,
38+
ss::sharded<cluster::topic_table>&);
39+
40+
private:
41+
struct notification_state {
42+
raft::group_manager_notification_id leadership
43+
= raft::notification_id_type_invalid;
44+
notification_id_type kafka_replica_assigned
45+
= notification_id_type_invalid;
46+
notification_id_type kafka_replica_unassigned
47+
= notification_id_type_invalid;
48+
notification_id_type kafka_int_replica_assigned
49+
= notification_id_type_invalid;
50+
notification_id_type kafka_int_replica_unassigned
51+
= notification_id_type_invalid;
52+
notification_id_type properties = notification_id_type_invalid;
53+
notification_cb_t cb;
54+
};
55+
void do_notify_call_back(
56+
notification_type,
57+
notification_id_type,
58+
const model::ntp&,
59+
ss::lw_shared_ptr<partition>);
60+
void do_unregister_partition_notifications(const notification_state&);
61+
ss::sharded<raft::group_manager>& _group_mgr;
62+
ss::sharded<cluster::partition_manager>& _partition_mgr;
63+
ss::sharded<cluster::topic_table>& _topic_table;
64+
notification_id_type _notification_id{0};
65+
chunked_hash_map<notification_id_type, notification_state>
66+
_notification_ids;
67+
};
68+
69+
} // namespace cluster

src/v/cluster_link/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ redpanda_cc_library(
9696
":fwd",
9797
"//src/v/base",
9898
"//src/v/cluster",
99+
"//src/v/cluster/utils:partition_change_notifier_api",
99100
"//src/v/model",
100101
"//src/v/raft",
101102
"@seastar",

src/v/cluster_link/manager.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ void manager::on_link_change(model::id_t id) {
7070
_queue.submit([this, id] { return handle_on_link_change(id); });
7171
}
7272

73-
void manager::on_leadership_change(::model::ntp ntp, ntp_leader is_ntp_leader) {
73+
void manager::handle_partition_state_change(
74+
::model::ntp ntp, ntp_leader is_ntp_leader) {
7475
vlog(cllog.trace, "NTP={} leadership changed to {}", ntp, is_ntp_leader);
7576
_queue.submit([this, ntp{std::move(ntp)}, is_ntp_leader]() mutable {
7677
return handle_on_leadership_change(std::move(ntp), is_ntp_leader);

src/v/cluster_link/manager.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ class manager {
9494
/// Used to notify that a cluster link has been updated
9595
void on_link_change(model::id_t id);
9696
/// Used to notify manager in a change of NTP leadership
97-
void on_leadership_change(::model::ntp ntp, ntp_leader is_ntp_leader);
97+
void
98+
handle_partition_state_change(::model::ntp ntp, ntp_leader is_ntp_leader);
9899
/// Handles creation and start of a link
99100
ss::future<> handle_on_link_change(model::id_t id);
100101
/// Handles leadership changes for a given NTP

0 commit comments

Comments
 (0)