Skip to content

Commit 84702c7

Browse files
Merge pull request #30731 from nguyen-andrew/kafka-redpanda-api-range
kafka: add a reserved Redpanda-specific Kafka API-key range
2 parents 3d5bc9d + 2c4b827 commit 84702c7

35 files changed

Lines changed: 1106 additions & 89 deletions

src/v/kafka/protocol/BUILD

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ redpanda_cc_library(
1313
"//src/v/kafka:__subpackages__",
1414
"//src/v/security/audit:__subpackages__",
1515
],
16-
deps = MESSAGE_TYPES,
16+
deps = [
17+
"//src/v/kafka/protocol",
18+
] + MESSAGE_TYPES,
1719
)
1820

1921
redpanda_cc_library(
@@ -25,6 +27,8 @@ redpanda_cc_library(
2527
"types.cc",
2628
],
2729
hdrs = [
30+
"api_key_indexed_array.h",
31+
"api_key_table.h",
2832
"batch_consumer.h",
2933
"batch_reader.h",
3034
"errors.h",
@@ -34,6 +38,7 @@ redpanda_cc_library(
3438
"legacy_message.h",
3539
"timeout.h",
3640
"topic_properties.h",
41+
"type_list.h",
3742
"types.h",
3843
"wire.h",
3944
],
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
#pragma once
12+
13+
#include "kafka/protocol/types.h"
14+
15+
#include <fmt/format.h>
16+
17+
#include <array>
18+
#include <cstddef>
19+
#include <stdexcept>
20+
21+
namespace kafka {
22+
23+
// Cold helper kept out of line so fmt::format doesn't bloat at() and block it
24+
// from inlining at its hot call sites.
25+
[[noreturn]] inline void throw_api_key_out_of_range(api_key key) {
26+
throw std::out_of_range(
27+
fmt::format("api_key_indexed_array::at: {} out of range", key()));
28+
}
29+
30+
/// \brief A dense, array-backed map from kafka::api_key to T spanning two
31+
/// disjoint key regions: the standard Kafka range [0, StandardSize) and the
32+
/// reserved Redpanda range [ReservedBase, ReservedBase + ReservedSize).
33+
///
34+
/// Indexing routes to whichever region a key falls in, so callers treat it as
35+
/// a single array keyed by api_key without paying for the large gap between the
36+
/// two regions. Every member is constexpr, so the same type backs the
37+
/// compile-time dispatch/flex tables and the run-time probe/throughput tables.
38+
template<
39+
typename T,
40+
std::size_t StandardSize,
41+
std::size_t ReservedSize,
42+
api_key::type ReservedBase = redpanda_api_key_base()>
43+
class api_key_indexed_array {
44+
// Regions must not overlap, or standard keys in [ReservedBase,
45+
// StandardSize) would silently alias the reserved region.
46+
static_assert(
47+
static_cast<std::size_t>(ReservedBase) >= StandardSize,
48+
"reserved region overlaps the standard range");
49+
50+
public:
51+
constexpr api_key_indexed_array() = default;
52+
53+
/// Fill both regions with \p init (e.g. the flex map's invalid sentinel).
54+
explicit constexpr api_key_indexed_array(const T& init) {
55+
_standard.fill(init);
56+
_reserved.fill(init);
57+
}
58+
59+
/// Unchecked access, like std::array::operator[]. Out-of-range keys are
60+
/// undefined at run time and a compile error under constant evaluation.
61+
/// Standard keys (the common case) are routed first.
62+
constexpr T& operator[](api_key key) noexcept {
63+
if (!is_reserved(key)) [[likely]] {
64+
return _standard[static_cast<std::size_t>(key())];
65+
}
66+
return _reserved[reserved_index(key)];
67+
}
68+
constexpr const T& operator[](api_key key) const noexcept {
69+
if (!is_reserved(key)) [[likely]] {
70+
return _standard[static_cast<std::size_t>(key())];
71+
}
72+
return _reserved[reserved_index(key)];
73+
}
74+
75+
/// Checked access with std::array::at semantics: throws std::out_of_range
76+
/// for any key that is not a valid index in either region. Standard keys
77+
/// (the common case) are checked and returned first.
78+
constexpr T& at(api_key key) {
79+
if (in_standard_range(key)) [[likely]] {
80+
return _standard[static_cast<std::size_t>(key())];
81+
}
82+
if (is_reserved(key)) {
83+
return _reserved.at(reserved_index(key));
84+
}
85+
throw_api_key_out_of_range(key);
86+
}
87+
constexpr const T& at(api_key key) const {
88+
if (in_standard_range(key)) [[likely]] {
89+
return _standard[static_cast<std::size_t>(key())];
90+
}
91+
if (is_reserved(key)) {
92+
return _reserved.at(reserved_index(key));
93+
}
94+
throw_api_key_out_of_range(key);
95+
}
96+
97+
/// True iff \p key is a valid index in either region.
98+
constexpr bool contains(api_key key) const noexcept {
99+
if (in_standard_range(key)) [[likely]] {
100+
return true;
101+
}
102+
return in_reserved_range(key);
103+
}
104+
105+
/// Pointer to the element for \p key, or nullptr if out of range.
106+
constexpr const T* find(api_key key) const noexcept {
107+
if (in_standard_range(key)) [[likely]] {
108+
return &_standard[static_cast<std::size_t>(key())];
109+
}
110+
if (in_reserved_range(key)) {
111+
return &_reserved[reserved_index(key)];
112+
}
113+
return nullptr;
114+
}
115+
constexpr T* find(api_key key) noexcept {
116+
if (in_standard_range(key)) [[likely]] {
117+
return &_standard[static_cast<std::size_t>(key())];
118+
}
119+
if (in_reserved_range(key)) {
120+
return &_reserved[reserved_index(key)];
121+
}
122+
return nullptr;
123+
}
124+
125+
/// Pointer to the first element (standard region then reserved) for which
126+
/// pred(api_key, element) is true, or nullptr if none match.
127+
template<typename Pred>
128+
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward)
129+
constexpr const T* find_if(Pred&& pred) const noexcept {
130+
for (std::size_t i = 0; i < StandardSize; ++i) {
131+
if (pred(api_key(static_cast<api_key::type>(i)), _standard[i])) {
132+
return &_standard[i];
133+
}
134+
}
135+
for (std::size_t i = 0; i < ReservedSize; ++i) {
136+
if (
137+
pred(
138+
api_key(static_cast<api_key::type>(ReservedBase + i)),
139+
_reserved[i])) {
140+
return &_reserved[i];
141+
}
142+
}
143+
return nullptr;
144+
}
145+
146+
/// Invoke f(api_key, T&) for every slot, standard region then reserved
147+
/// region, passing the real api_key (not a 0..N index).
148+
template<typename F>
149+
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward)
150+
constexpr void for_each(F&& f) {
151+
for (std::size_t i = 0; i < StandardSize; ++i) {
152+
f(api_key(static_cast<api_key::type>(i)), _standard[i]);
153+
}
154+
for (std::size_t i = 0; i < ReservedSize; ++i) {
155+
f(api_key(static_cast<api_key::type>(ReservedBase + i)),
156+
_reserved[i]);
157+
}
158+
}
159+
template<typename F>
160+
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward)
161+
constexpr void for_each(F&& f) const {
162+
for (std::size_t i = 0; i < StandardSize; ++i) {
163+
f(api_key(static_cast<api_key::type>(i)), _standard[i]);
164+
}
165+
for (std::size_t i = 0; i < ReservedSize; ++i) {
166+
f(api_key(static_cast<api_key::type>(ReservedBase + i)),
167+
_reserved[i]);
168+
}
169+
}
170+
171+
constexpr bool operator==(const api_key_indexed_array&) const = default;
172+
173+
static constexpr std::size_t standard_size() { return StandardSize; }
174+
static constexpr std::size_t reserved_size() { return ReservedSize; }
175+
static constexpr api_key reserved_base() { return api_key(ReservedBase); }
176+
177+
private:
178+
static constexpr bool in_standard_range(api_key key) noexcept {
179+
return key() >= 0 && static_cast<std::size_t>(key()) < StandardSize;
180+
}
181+
static constexpr bool in_reserved_range(api_key key) noexcept {
182+
return is_reserved(key) && reserved_index(key) < ReservedSize;
183+
}
184+
static constexpr bool is_reserved(api_key key) noexcept {
185+
return key() >= ReservedBase;
186+
}
187+
static constexpr std::size_t reserved_index(api_key key) noexcept {
188+
return static_cast<std::size_t>(key() - ReservedBase);
189+
}
190+
191+
std::array<T, StandardSize> _standard{};
192+
std::array<T, ReservedSize> _reserved{};
193+
};
194+
195+
} // namespace kafka
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
#pragma once
12+
13+
#include "kafka/protocol/api_key_indexed_array.h"
14+
15+
#include <cstddef>
16+
17+
namespace kafka {
18+
19+
// Sizes of the two api_key_indexed_array regions, kept here as literals so this
20+
// lean header -- and its many transitive includers via
21+
// handler_probe.h -> connection_context.h -> request_context.h -- need not pull
22+
// in the request schemata that messages.h includes. messages.h static_asserts
23+
// these against the live request type lists, so they cannot silently drift: add
24+
// an API past these bounds and messages.h fails to compile, pointing back here.
25+
inline constexpr std::size_t standard_api_key_table_size = 67;
26+
inline constexpr std::size_t reserved_api_key_table_size = 1;
27+
28+
/// Kafka-key-indexed dense table sized to the standard and reserved ranges.
29+
template<typename T>
30+
using api_key_table = api_key_indexed_array<
31+
T,
32+
standard_api_key_table_size,
33+
reserved_api_key_table_size>;
34+
35+
} // namespace kafka
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
#pragma once
12+
#include "bytes/iobuf.h"
13+
#include "kafka/protocol/errors.h"
14+
#include "kafka/protocol/schemata/describe_redpanda_roles_request.h"
15+
#include "kafka/protocol/schemata/describe_redpanda_roles_response.h"
16+
17+
#include <seastar/core/future.hh>
18+
19+
namespace kafka {
20+
21+
struct describe_redpanda_roles_request final {
22+
using api_type = describe_redpanda_roles_api;
23+
24+
describe_redpanda_roles_request_data data;
25+
26+
void encode(protocol::encoder& writer, api_version version) {
27+
data.encode(writer, version);
28+
}
29+
30+
void decode(protocol::decoder& reader, api_version version) {
31+
data.decode(reader, version);
32+
}
33+
34+
fmt::iterator format_to(fmt::iterator it) const {
35+
return fmt::format_to(it, "{}", data);
36+
}
37+
};
38+
39+
struct describe_redpanda_roles_response final {
40+
using api_type = describe_redpanda_roles_api;
41+
42+
describe_redpanda_roles_response_data data;
43+
44+
void encode(protocol::encoder& writer, api_version version) {
45+
data.encode(writer, version);
46+
}
47+
48+
void decode(iobuf buf, api_version version) {
49+
data.decode(std::move(buf), version);
50+
}
51+
52+
fmt::iterator format_to(fmt::iterator it) const {
53+
return fmt::format_to(it, "{}", data);
54+
}
55+
};
56+
57+
} // namespace kafka

src/v/kafka/protocol/flex_versions.cc

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,26 @@ namespace kafka {
2222

2323
namespace {
2424

25-
template<typename... RequestTypes>
26-
consteval size_t max_api_key(type_list<RequestTypes...>) {
27-
/// Black magic here is an overload of std::max() that takes an
28-
/// std::initializer_list
29-
return std::max({RequestTypes::key()...});
30-
}
31-
3225
/// Not every value from 0 -> max_api_key is a valid request, non-supported
33-
/// requests will map to a value of api_key(-2)
26+
/// requests will map to a value of api_version(-2)
3427
constexpr api_version invalid_api = api_version(-2);
3528

36-
template<typename... RequestTypes>
37-
consteval auto
38-
get_flexible_request_min_versions_list(type_list<RequestTypes...> r) {
39-
/// An std::array where the indicies map to api_keys and values at an index
40-
/// map to the first flex version for a given api. If an api doesn't exist
41-
/// at an index -2 or \ref invalid_api will be the value at the index.
42-
std::array<api_version, max_api_key(r) + 1> versions;
43-
versions.fill(invalid_api);
44-
((versions[RequestTypes::key()] = RequestTypes::min_flexible), ...);
29+
template<typename... Ts>
30+
consteval void
31+
fill_flex(api_key_table<api_version>& versions, type_list<Ts...>) {
32+
/// Values map to the first flex version for a given api; apis absent from
33+
/// the lists keep \ref invalid_api.
34+
((versions[Ts::key] = Ts::min_flexible), ...);
35+
}
36+
37+
consteval auto get_flexible_request_min_versions_list() {
38+
api_key_table<api_version> versions{invalid_api};
39+
fill_flex(versions, request_types{});
40+
fill_flex(versions, redpanda_request_types{});
4541
return versions;
4642
}
4743

48-
constexpr auto g_flex_mapping = get_flexible_request_min_versions_list(
49-
request_types());
44+
constexpr auto g_flex_mapping = get_flexible_request_min_versions_list();
5045

5146
struct protocol_parse_exception : public net::parsing_exception {
5247
explicit protocol_parse_exception(const std::string& m)
@@ -57,18 +52,15 @@ struct protocol_parse_exception : public net::parsing_exception {
5752

5853
bool flex_versions::is_flexible_request(api_key key, api_version version) {
5954
/// If bounds checking is desired call is_api_in_schema(key) beforehand
60-
const api_version first_flex_version = g_flex_mapping[key()];
55+
const api_version first_flex_version = g_flex_mapping[key];
6156
return (version >= first_flex_version)
6257
&& (first_flex_version != never_flexible);
6358
}
6459

6560
bool flex_versions::is_api_in_schema(api_key key) noexcept {
66-
constexpr auto max_version = g_flex_mapping.max_size() - 1;
67-
if (key() < 0 || static_cast<size_t>(key()) > max_version) {
68-
return false;
69-
}
70-
const api_version first_flex_version = g_flex_mapping[key()];
71-
return first_flex_version != invalid_api;
61+
// find() does the region routing once and returns null for out-of-range.
62+
const api_version* first_flex_version = g_flex_mapping.find(key);
63+
return first_flex_version != nullptr && *first_flex_version != invalid_api;
7264
}
7365

7466
namespace {

0 commit comments

Comments
 (0)