[CORE-15271] kafka/server: expose user based client quotas#29425
Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements user-based client quotas for Redpanda, enabling the Kafka API to support quota management at the user (principal) level in addition to the existing client-id based quotas.
Changes:
- Added support for user entity type in Kafka client quota APIs (describe/alter)
- Wired up user principal tracking throughout the quota system
- Implemented feature flag gating for user quotas to support graceful upgrades
- Added comprehensive test coverage for user quota functionality
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/quota_management_test.py | Added test cases for user quotas, compound user+client quotas, upgrade scenarios, and comparison operators for entity types |
| tests/rptest/clients/kcl.py | Added optional node parameter to raw_alter_quotas for routing requests during upgrade tests |
| src/v/kafka/server/server.cc | Wired user principal into partition mutation quota tracking for delete_topics |
| src/v/kafka/server/handlers/create_topics.cc | Wired user principal into partition mutation quota tracking for create_topics |
| src/v/kafka/server/handlers/create_partitions.cc | Wired user principal into partition mutation quota tracking for create_partitions |
| src/v/kafka/server/handlers/client_quotas.cc | Implemented user entity type support in describe/alter quota handlers with feature flag gating |
| src/v/kafka/server/connection_context.h | Made get_principal() public to enable quota tracking by user |
| src/v/kafka/server/connection_context.cc | Wired user principal into fetch/produce throughput quota tracking |
| src/v/features/feature_table.h | Added user_based_client_quota feature flag |
| src/v/features/feature_table.cc | Added string representation for user_based_client_quota feature |
50f6274 to
3608650
Compare
| def _get_ordering(self): | ||
| return {name: idx for idx, name in enumerate(self.__class__)} | ||
|
|
||
| def __lt__(self, other): | ||
| ordering = self._get_ordering() |
There was a problem hiding this comment.
The method name _get_ordering uses a single leading underscore, which in Python typically indicates a 'private' method. However, this method is called by the public __lt__ method and constructs ordering for comparison operations. Consider renaming it to something more descriptive like _create_enum_ordering or _build_ordering_dict to better convey its purpose of creating an ordering dictionary from the enum class.
| def _get_ordering(self): | |
| return {name: idx for idx, name in enumerate(self.__class__)} | |
| def __lt__(self, other): | |
| ordering = self._get_ordering() | |
| def _build_ordering_dict(self): | |
| return {name: idx for idx, name in enumerate(self.__class__)} | |
| def __lt__(self, other): | |
| ordering = self._build_ordering_dict() |
There was a problem hiding this comment.
I like that you felt then need to point out that it has a leading underscore and it is bad, but also to suggest another name with leading underscore! 😅
| bool has_feature_user_quota(const request_context& ctx) { | ||
| constexpr auto feature = features::feature::user_based_client_quota; | ||
| const auto& ft = ctx.feature_table().local(); | ||
| return ft.is_active(feature); | ||
| } | ||
|
|
There was a problem hiding this comment.
The function name has_feature_user_quota is inconsistent with the actual feature name user_based_client_quota. Consider renaming to has_user_based_client_quota or has_feature_user_based_client_quota to match the feature naming convention.
| bool has_feature_user_quota(const request_context& ctx) { | |
| constexpr auto feature = features::feature::user_based_client_quota; | |
| const auto& ft = ctx.feature_table().local(); | |
| return ft.is_active(feature); | |
| } | |
| bool has_user_based_client_quota(const request_context& ctx) { | |
| constexpr auto feature = features::feature::user_based_client_quota; | |
| const auto& ft = ctx.feature_table().local(); | |
| return ft.is_active(feature); | |
| } | |
| bool has_feature_user_quota(const request_context& ctx) { | |
| return has_user_based_client_quota(ctx); | |
| } |
| return ft.is_active(feature); | ||
| } | ||
|
|
||
| constexpr auto has_user = [](const auto& e) { return e.entity_type == "user"; }; |
There was a problem hiding this comment.
The lambda has_user is defined in the anonymous namespace but its name is too generic for namespace-level scope. Consider making it a local variable in the functions where it's used (lines 371, 502), or rename it to be more specific like is_user_entity_type to better indicate it's checking the entity_type field.
| constexpr auto has_user = [](const auto& e) { return e.entity_type == "user"; }; | |
| constexpr auto is_user_entity_type = [](const auto& e) { | |
| return e.entity_type == "user"; | |
| }; |
3608650 to
af744d8
Compare
CI test resultstest results on build#79721
|
| // either there's only a single key part or the key is a user+client | ||
| // compound key | ||
| if (entity.size() != 1) { | ||
| if (entity.size() > 2) { |
There was a problem hiding this comment.
Do we need to check for entity.size() == 0 as well here?
| schema_registry_authz = 1ULL << 3U, | ||
| topic_ids_api = 1ULL << 4U, | ||
| controller_forced_reconfiguration = 1ULL << 5U, | ||
| user_based_client_quota = 1ULL << 6U, |
There was a problem hiding this comment.
Should this be 13? What was 6 used for previously?
There was a problem hiding this comment.
6 used to be license in v24.3.x. 13 was replication_factor_change in the same version. I see more values that have been reused. 5 used to be serde_raft_0 and now it is controller_forced_reconfiguration
So it should be fine to reuse any number, as the header says.
af744d8 to
ebdb04b
Compare
|
changes in force-push:
|
michael-redpanda
left a comment
There was a problem hiding this comment.
looks good to me
| //} else if (component.entity_type == "ip") { | ||
| // return ip_predicate; |
There was a problem hiding this comment.
leave in? Remove?
There was a problem hiding this comment.
these are the "guides" on where to extend when/if we want to support ip-based quotas.
They were already there from previous work and they were quite useful while adding user. So i still kept the ip ones for future reference.
michael-redpanda
left a comment
There was a problem hiding this comment.
looks fine minus gellert's comments
For backwards compatibility, a feature flag is added to protect against using the user quota entity while the cluster during upgrade
ebdb04b to
49ccca4
Compare
|
changes in force-push:
|
| { | ||
| "Entity": [ | ||
| { | ||
| "Type": "user", |
There was a problem hiding this comment.
Maybe for a separate PR, but as we discussed earlier, let's ensure that we also have an upgrade test that ensures that the serde changes work end-to-end (create some quotas on an old cluster, upgrade one node, create some old quotas mid-upgrade, upgrade fully, verify everything works).
Implements : CORE-15271
Expose entity type
userthrough the describe/alter client quota kafka api.Backports Required
Release Notes
Features