Skip to content

cluster/utils: unified partition notifications API #26929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 28, 2025

Conversation

bharathv
Copy link
Contributor

@bharathv bharathv commented Jul 21, 2025

One stop shop for all partition notifications. Supports leadership changes, manage unmanage scenarios and property change notifications. Lately we have been implementing a lot of follow the current leader scenarios and this API helps simplify that flow while avoiding explicit dependency on raft::group_mgr and friends.

Preparatory change for a future PR.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.2.x
  • v25.1.x
  • v24.3.x
  • v24.2.x

Release Notes

  • none

@bharathv
Copy link
Contributor Author

/dt

@bharathv
Copy link
Contributor Author

/dt

@rockwotj rockwotj self-requested a review July 21, 2025 23:36
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jul 22, 2025

Retry command for Build#69395

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/datalake/recovery_mode_test.py::DatalakeRecoveryModeTest.test_disabled_partitions@{"catalog_type":"rest_hadoop","cloud_storage_type":1}
tests/rptest/tests/datalake/recovery_mode_test.py::DatalakeRecoveryModeTest.test_disabled_partitions@{"catalog_type":"rest_jdbc","cloud_storage_type":1}
tests/rptest/tests/datalake/datalake_e2e_test.py::DatalakeE2ETests.test_topic_lifecycle@{"catalog_type":"nessie","cloud_storage_type":1}
tests/rptest/tests/datalake/recovery_mode_test.py::DatalakeRecoveryModeTest.test_disabled_partitions@{"catalog_type":"nessie","cloud_storage_type":1}
tests/rptest/tests/datalake/datalake_e2e_test.py::DatalakeE2ETests.test_topic_lifecycle@{"catalog_type":"rest_jdbc","cloud_storage_type":1}
tests/rptest/tests/datalake/datalake_e2e_test.py::DatalakeE2ETests.test_topic_lifecycle@{"catalog_type":"rest_hadoop","cloud_storage_type":1}
tests/rptest/tests/datalake/partition_movement_test.py::PartitionMovementTest.test_cross_core_movements@{"cloud_storage_type":1}
tests/rptest/tests/datalake/datalake_e2e_test.py::DatalakeMetricsTest.test_rest_catalog_metrics@{"cloud_storage_type":1}
tests/rptest/tests/datalake/datalake_usage_test.py::IcebergUsageTest.test_iceberg_usage@{"catalog_type":"rest_hadoop","cloud_storage_type":1,"query_engine":"spark"}
tests/rptest/tests/datalake/mount_unmount_test.py::MountUnmountIcebergTest.test_simple_remount@{"cloud_storage_type":1}

Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the direction - overall I do think we could simplify it further.

Comment on lines 54 to 70
virtual notification_id_type
register_leadership_notification(notification_cb_t)
= 0;
virtual void unregister_leadership_notification(notification_id_type) = 0;

virtual notification_id_type
register_partition_manage_notification(const model::ns&, notification_cb_t)
= 0;
virtual void unregister_partition_manage_notification(notification_id_type)
= 0;

virtual notification_id_type register_partition_unmanage_notification(
const model::ns&, notification_cb_t)
= 0;
virtual void
unregister_partition_unmanage_notification(notification_id_type)
= 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever care about the distinction between these 3 cases anywhere for this "follow leadership" use case? Can we just merge them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point .. we could probably merge all of them into a single call.

// Unified interface for partition level notifications.
// Supports notifications for partition leadership changes,
// partition management (creation, deletion), and partition properties changes.
class notifications {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will think about naming, perhaps a bit too generic. Maybe partition_leadership_notifier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about partition_change_notifier? (since this is not limited to leadership alone)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah SGTM 👍

, is_leader(is_leader)
, topic_cfg(std::move(topic_cfg)) {}
model::ntp ntp;
model::term_id term;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YAGNI? I don't know where we would care about term for these use cases...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started without term but perhaps need it for DR.. the rationale is term_id acts as a simple fence in some cases.. for example we try to process a notification at term=0 but the processing doesn't succeed (we queue it again) but meanwhile there is a notification processed in term term=1 which succeeds and then the future term=0 op becomes a noop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd personally rather see added when needed, but fine to keep it...

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jul 22, 2025

CI test results

test results on build#69395
test_class test_method test_arguments test_kind job_url test_status passed reason
DatalakeE2ETests test_topic_lifecycle {"catalog_type": "nessie", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-642b-457b-a652-8ea8b10e6c9c FAIL 0/21 The test has failed across all retries
DatalakeE2ETests test_topic_lifecycle {"catalog_type": "nessie", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5049-46b9-9ecb-86c6f3e60567 FAIL 0/21 The test has failed across all retries
DatalakeE2ETests test_topic_lifecycle {"catalog_type": "rest_hadoop", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-642b-4f9d-8a0a-0475a6cfde82 FAIL 0/21 The test has failed across all retries
DatalakeE2ETests test_topic_lifecycle {"catalog_type": "rest_hadoop", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5043-4a12-bfa3-011c78cceab3 FAIL 0/21 The test has failed across all retries
DatalakeE2ETests test_topic_lifecycle {"catalog_type": "rest_jdbc", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-6426-440c-a335-3e949108888d FAIL 0/21 The test has failed across all retries
DatalakeE2ETests test_topic_lifecycle {"catalog_type": "rest_jdbc", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5044-436f-bfed-404a3cdbdde5 FAIL 0/21 The test has failed across all retries
DatalakeMetricsTest test_rest_catalog_metrics {"cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-6429-46d9-a896-fc5c8c246b33 FAIL 0/21 The test has failed across all retries
DatalakeMetricsTest test_rest_catalog_metrics {"cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5047-4483-873d-0a4e3068e7b0 FAIL 0/21 The test has failed across all retries
IcebergUsageTest test_iceberg_usage {"catalog_type": "rest_hadoop", "cloud_storage_type": 1, "query_engine": "spark"} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-642b-457b-a652-8ea8b10e6c9c FLAKY 1/21 upstream reliability is '100.0'. current run reliability is '4.761904761904762'. drift is 95.2381 and the allowed drift is set to 50. The test should FAIL
IcebergUsageTest test_iceberg_usage {"catalog_type": "rest_hadoop", "cloud_storage_type": 1, "query_engine": "spark"} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5048-4ba6-b48f-887c80f4dabe FLAKY 2/21 upstream reliability is '100.0'. current run reliability is '9.523809523809524'. drift is 90.47619 and the allowed drift is set to 50. The test should FAIL
MountUnmountIcebergTest test_simple_remount {"cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-642b-457b-a652-8ea8b10e6c9c FAIL 0/21 The test has failed across all retries
PartitionMovementTest test_cross_core_movements {"cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-6429-4b8c-be1e-e949a2b477ea FAIL 0/1
PartitionMovementTest test_cross_core_movements {"cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5043-4a12-bfa3-011c78cceab3 FAIL 0/1 The test has failed across all retries
DatalakeRecoveryModeTest test_disabled_partitions {"catalog_type": "nessie", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-6429-46d9-a896-fc5c8c246b33 FAIL 0/1 The test has failed across all retries
DatalakeRecoveryModeTest test_disabled_partitions {"catalog_type": "nessie", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5044-436f-bfed-404a3cdbdde5 FAIL 0/1 The test has failed across all retries
DatalakeRecoveryModeTest test_disabled_partitions {"catalog_type": "rest_hadoop", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-6429-474f-b938-34eb511ced62 FAIL 0/1 The test has failed across all retries
DatalakeRecoveryModeTest test_disabled_partitions {"catalog_type": "rest_hadoop", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5044-4a3c-8996-60e83a4bedca FAIL 0/1
DatalakeRecoveryModeTest test_disabled_partitions {"catalog_type": "rest_jdbc", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-642a-4475-a988-c0d0a1841a1d FAIL 0/1
DatalakeRecoveryModeTest test_disabled_partitions {"catalog_type": "rest_jdbc", "cloud_storage_type": 1} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f78-5045-427e-8de0-6b9dc61a3a14 FAIL 0/1
RandomNodeOperationsTest test_node_operations {"cloud_storage_type": 1, "compaction_mode": "adjacent_merge", "enable_failures": false, "mixed_versions": false, "with_iceberg": false} ducktape https://buildkite.com/redpanda/redpanda/builds/69395#01982f74-6429-474f-b938-34eb511ced62 FLAKY 20/21 upstream reliability is '99.62121212121212'. current run reliability is '95.23809523809523'. drift is 4.38312 and the allowed drift is set to 50. The test should PASS
test results on build#69528
test_class test_method test_arguments test_kind job_url test_status passed reason
DataMigrationsApiTest test_migrated_topic_data_integrity {"include_groups": true, "params": {"cancellation": {"dir": "in", "stage": "preparing"}, "use_alias": false}, "transfer_leadership": true} integration https://buildkite.com/redpanda/redpanda/builds/69528#019837eb-b4ed-40b9-bb53-af291e120155 FAIL 0/1
test results on build#69557
test_class test_method test_arguments test_kind job_url test_status passed reason
cluster_metadata_uploader_fixture test_upload_in_term unit https://buildkite.com/redpanda/redpanda/builds/69557#01983916-db16-4ade-a738-f8ff9990485e FAIL 0/1
test results on build#69779
test_class test_method test_arguments test_kind job_url test_status passed reason
RandomNodeOperationsTest test_node_operations {"cloud_storage_type": 1, "compaction_mode": "chunked_sliding_window", "enable_failures": true, "mixed_versions": false, "with_iceberg": true} integration https://buildkite.com/redpanda/redpanda/builds/69779#01985243-f0d7-458a-a239-36b5c963a194 FLAKY 20/21 upstream reliability is '100.0'. current run reliability is '95.23809523809523'. drift is 4.7619 and the allowed drift is set to 50. The test should PASS

@bharathv
Copy link
Contributor Author

/dt

@bharathv
Copy link
Contributor Author

please hold off on reviews until ci is green (will request reviews then).

@vbotbuildovich
Copy link
Collaborator

Retry command for Build#69498

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/datalake/partition_movement_test.py::PartitionMovementTest.test_cross_core_movements@{"cloud_storage_type":1}

@bharathv
Copy link
Contributor Author

/dt

@bharathv
Copy link
Contributor Author

/dt

Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to keep as draft?

Comment on lines +54 to +73
virtual notification_id_type
register_partition_notifications(notification_cb_t)
= 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some use cases (at least Wasm) doesn't care about topic configuration changes... Maybe those changes are rare enough it doesn't matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configuration changes are rare but they probably are useful (eg: datalake and DR both need them)..

void partition_change_notifier_impl::unregister_partition_notifications(
notification_id_type id) {
auto it = _notification_ids.find(id);
if (it != _notification_ids.end()) [[unlikely]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not [[unlikely]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops I mixed up early return and unlikely.. nice spot fixed.

leadership_change, id, partition->ntp(), partition);
});
nstate.managed = _partition_mgr.local().register_manage_notification(
// do we need any other?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cloud topics I need it for kafka_internal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 28 to 43
enum notification_type {
leadership_change,
partition_management,
partition_unmanagement,
partition_properties_change
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a newcomer it was not clear what "partition_management" or "partition_unmanagement" means. Can we add a brief description of these events and when they happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, added some docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also renamed to replica_assigned and replica_unassigned..feels like that is a bit clearer than manage/unmanage

Comment on lines 23 to 25
// Unified interface for partition level notifications.
// Supports notifications for partition leadership changes,
// partition management (creation, deletion), and partition properties changes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we elaborate on this? If you register a notification using this class then you get notifications for all kafka namespace'd NTPs that are owned by this shard, as well as notifications when they are not "owned" by this shard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, its only for partition replicas on the shard.

@vbotbuildovich
Copy link
Collaborator

Retry command for Build#69528

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/data_migrations_api_test.py::DataMigrationsApiTest.test_migrated_topic_data_integrity@{"include_groups":true,"params":{"cancellation":{"dir":"in","stage":"preparing"},"use_alias":false},"transfer_leadership":true}

@bharathv
Copy link
Contributor Author

Did you mean to keep as draft?

I didn't want to kill the running BK job when moving out of draft.

@bharathv bharathv force-pushed the notif_mgr branch 2 times, most recently from b590390 to 0eda5fc Compare July 23, 2025 18:51
@bharathv bharathv marked this pull request as ready for review July 23, 2025 18:51
@bharathv bharathv requested a review from rockwotj July 23, 2025 19:00
rockwotj
rockwotj previously approved these changes Jul 24, 2025
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits/suggestions, but otherwise LGTM. Thanks for this!

Comment on lines +112 to +122
nstate.properties = _topic_table.local().register_ntp_delta_notification(
[this, id](cluster::topic_table::ntp_delta_range_t range) mutable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to filter out redpanda namespace here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left it as is for now since its easy to filter them out on the subscriptions.

// kafka namespace
nstate.kafka_replica_assigned
= _partition_mgr.local().register_manage_notification(
model::kafka_namespace,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If most use cases only need one or the other, maybe we want to pass in a namespace to the constructor. For Cloud Topics, I only need a single internal topic to follow leadership on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DR needs all updates (eg: controller and kafka), datalake API only needs kafka.. so its bit nonunform.. for now I left it as is since its straight forward to filter out on the caller side.

, is_leader(is_leader)
, topic_cfg(std::move(topic_cfg)) {}
model::term_id term;
bool is_leader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am wondering if it would be sometimes helpful to actually include current leader_id ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the usecases this is intended for rn (Datalake and DR), they don't use leader_id, how about we extend this if needed.

auto id = _notification_id++;
auto exceptional_cleanup = ss::defer(
[this, &nstate] { do_unregister_partition_notifications(nstate); });
nstate.leadership = _group_mgr.local().register_leadership_notification(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the notification will only be triggered on a node/shard that is currently hosting a replica for given partition. Is that intentional ?

Mayb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's intentional. Currently, non-local changes are only supported for leadership changes, not for the manage/unmanage API. To maintain uniformity, this API only supports shard-local updates.

This design targets use cases that follow leadership. For other patterns there are low level APIs anyway.

bharathv added 4 commits July 28, 2025 10:34
Unified interface for all partition level notifications. Avoids direct
dependencies on raft::group_mgr/partition_mgr/topic_table.
This is a bit half baked because cluster partition stuff is deeply
intertwined with datalake_mgr but still showcases the usage of this API.
@bharathv bharathv merged commit 9ff8ef9 into redpanda-data:dev Jul 28, 2025
17 checks passed
@bharathv bharathv deleted the notif_mgr branch July 28, 2025 23:33
@dotnwat
Copy link
Member

dotnwat commented Jul 29, 2025

great stuff

, is_leader(is_leader)
, topic_cfg(std::move(topic_cfg)) {}
model::term_id term;
bool is_leader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: bool_class?

Comment on lines +65 to +66
chunked_hash_map<notification_id_type, notification_state>
_notification_ids;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can use src/v/utils/notification_list.h to remove this boilerplate code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants