Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/v/kafka/protocol/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ redpanda_cc_library(
"//src/v/kafka:__subpackages__",
"//src/v/security/audit:__subpackages__",
],
deps = MESSAGE_TYPES,
deps = [
"//src/v/kafka/protocol",
] + MESSAGE_TYPES,
)

redpanda_cc_library(
Expand All @@ -25,6 +27,8 @@ redpanda_cc_library(
"types.cc",
],
hdrs = [
"api_key_indexed_array.h",
"api_key_table.h",
"batch_consumer.h",
"batch_reader.h",
"errors.h",
Expand All @@ -34,6 +38,7 @@ redpanda_cc_library(
"legacy_message.h",
"timeout.h",
"topic_properties.h",
"type_list.h",
"types.h",
"wire.h",
],
Expand Down
195 changes: 195 additions & 0 deletions src/v/kafka/protocol/api_key_indexed_array.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include "kafka/protocol/types.h"

#include <fmt/format.h>

#include <array>
#include <cstddef>
#include <stdexcept>

namespace kafka {

// Cold helper kept out of line so fmt::format doesn't bloat at() and block it
// from inlining at its hot call sites.
[[noreturn]] inline void throw_api_key_out_of_range(api_key key) {
throw std::out_of_range(
fmt::format("api_key_indexed_array::at: {} out of range", key()));
}

/// \brief A dense, array-backed map from kafka::api_key to T spanning two
/// disjoint key regions: the standard Kafka range [0, StandardSize) and the
/// reserved Redpanda range [ReservedBase, ReservedBase + ReservedSize).
///
/// Indexing routes to whichever region a key falls in, so callers treat it as
/// a single array keyed by api_key without paying for the large gap between the
/// two regions. Every member is constexpr, so the same type backs the
/// compile-time dispatch/flex tables and the run-time probe/throughput tables.
template<
typename T,
std::size_t StandardSize,
std::size_t ReservedSize,
api_key::type ReservedBase = redpanda_api_key_base()>
class api_key_indexed_array {
// Regions must not overlap, or standard keys in [ReservedBase,
// StandardSize) would silently alias the reserved region.
static_assert(
static_cast<std::size_t>(ReservedBase) >= StandardSize,
"reserved region overlaps the standard range");

public:
constexpr api_key_indexed_array() = default;

/// Fill both regions with \p init (e.g. the flex map's invalid sentinel).
explicit constexpr api_key_indexed_array(const T& init) {
_standard.fill(init);
_reserved.fill(init);
}

/// Unchecked access, like std::array::operator[]. Out-of-range keys are
/// undefined at run time and a compile error under constant evaluation.
/// Standard keys (the common case) are routed first.
constexpr T& operator[](api_key key) noexcept {
if (!is_reserved(key)) [[likely]] {
return _standard[static_cast<std::size_t>(key())];
}
return _reserved[reserved_index(key)];
}
constexpr const T& operator[](api_key key) const noexcept {
if (!is_reserved(key)) [[likely]] {
return _standard[static_cast<std::size_t>(key())];
}
return _reserved[reserved_index(key)];
}

/// Checked access with std::array::at semantics: throws std::out_of_range
/// for any key that is not a valid index in either region. Standard keys
/// (the common case) are checked and returned first.
constexpr T& at(api_key key) {
if (in_standard_range(key)) [[likely]] {
return _standard[static_cast<std::size_t>(key())];
}
if (is_reserved(key)) {
return _reserved.at(reserved_index(key));
}
throw_api_key_out_of_range(key);
}
constexpr const T& at(api_key key) const {
if (in_standard_range(key)) [[likely]] {
return _standard[static_cast<std::size_t>(key())];
}
if (is_reserved(key)) {
return _reserved.at(reserved_index(key));
}
throw_api_key_out_of_range(key);
}

/// True iff \p key is a valid index in either region.
constexpr bool contains(api_key key) const noexcept {
if (in_standard_range(key)) [[likely]] {
return true;
}
return in_reserved_range(key);
}

/// Pointer to the element for \p key, or nullptr if out of range.
constexpr const T* find(api_key key) const noexcept {
if (in_standard_range(key)) [[likely]] {
return &_standard[static_cast<std::size_t>(key())];
}
if (in_reserved_range(key)) {
return &_reserved[reserved_index(key)];
}
return nullptr;
}
constexpr T* find(api_key key) noexcept {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, you should be able to use the c++23 deducing self feature to avoid duplicating code for each of the const/non-const versions of these functions.

if (in_standard_range(key)) [[likely]] {
return &_standard[static_cast<std::size_t>(key())];
}
if (in_reserved_range(key)) {
return &_reserved[reserved_index(key)];
}
return nullptr;
}

/// Pointer to the first element (standard region then reserved) for which
/// pred(api_key, element) is true, or nullptr if none match.
template<typename Pred>
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward)
constexpr const T* find_if(Pred&& pred) const noexcept {
for (std::size_t i = 0; i < StandardSize; ++i) {
if (pred(api_key(static_cast<api_key::type>(i)), _standard[i])) {
return &_standard[i];
}
}
for (std::size_t i = 0; i < ReservedSize; ++i) {
if (
pred(
api_key(static_cast<api_key::type>(ReservedBase + i)),
_reserved[i])) {
return &_reserved[i];
}
}
return nullptr;
}

/// Invoke f(api_key, T&) for every slot, standard region then reserved
/// region, passing the real api_key (not a 0..N index).
template<typename F>
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward)
constexpr void for_each(F&& f) {
for (std::size_t i = 0; i < StandardSize; ++i) {
f(api_key(static_cast<api_key::type>(i)), _standard[i]);
}
for (std::size_t i = 0; i < ReservedSize; ++i) {
f(api_key(static_cast<api_key::type>(ReservedBase + i)),
_reserved[i]);
}
}
template<typename F>
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward)
constexpr void for_each(F&& f) const {
for (std::size_t i = 0; i < StandardSize; ++i) {
f(api_key(static_cast<api_key::type>(i)), _standard[i]);
}
for (std::size_t i = 0; i < ReservedSize; ++i) {
f(api_key(static_cast<api_key::type>(ReservedBase + i)),
_reserved[i]);
}
}

constexpr bool operator==(const api_key_indexed_array&) const = default;

static constexpr std::size_t standard_size() { return StandardSize; }
static constexpr std::size_t reserved_size() { return ReservedSize; }
static constexpr api_key reserved_base() { return api_key(ReservedBase); }

private:
static constexpr bool in_standard_range(api_key key) noexcept {
return key() >= 0 && static_cast<std::size_t>(key()) < StandardSize;
}
static constexpr bool in_reserved_range(api_key key) noexcept {
return is_reserved(key) && reserved_index(key) < ReservedSize;
}
static constexpr bool is_reserved(api_key key) noexcept {
return key() >= ReservedBase;
}
static constexpr std::size_t reserved_index(api_key key) noexcept {
return static_cast<std::size_t>(key() - ReservedBase);
}

std::array<T, StandardSize> _standard{};
std::array<T, ReservedSize> _reserved{};
};

} // namespace kafka
35 changes: 35 additions & 0 deletions src/v/kafka/protocol/api_key_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include "kafka/protocol/api_key_indexed_array.h"

#include <cstddef>

namespace kafka {

// Sizes of the two api_key_indexed_array regions, kept here as literals so this
// lean header -- and its many transitive includers via
// handler_probe.h -> connection_context.h -> request_context.h -- need not pull
// in the request schemata that messages.h includes. messages.h static_asserts
// these against the live request type lists, so they cannot silently drift: add
// an API past these bounds and messages.h fails to compile, pointing back here.
inline constexpr std::size_t standard_api_key_table_size = 67;
inline constexpr std::size_t reserved_api_key_table_size = 1;

/// Kafka-key-indexed dense table sized to the standard and reserved ranges.
template<typename T>
using api_key_table = api_key_indexed_array<
T,
standard_api_key_table_size,
reserved_api_key_table_size>;

} // namespace kafka
57 changes: 57 additions & 0 deletions src/v/kafka/protocol/describe_redpanda_roles.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once
#include "bytes/iobuf.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/schemata/describe_redpanda_roles_request.h"
#include "kafka/protocol/schemata/describe_redpanda_roles_response.h"

#include <seastar/core/future.hh>

namespace kafka {

struct describe_redpanda_roles_request final {
using api_type = describe_redpanda_roles_api;

describe_redpanda_roles_request_data data;

void encode(protocol::encoder& writer, api_version version) {
data.encode(writer, version);
}

void decode(protocol::decoder& reader, api_version version) {
data.decode(reader, version);
}

fmt::iterator format_to(fmt::iterator it) const {
return fmt::format_to(it, "{}", data);
}
};

struct describe_redpanda_roles_response final {
using api_type = describe_redpanda_roles_api;

describe_redpanda_roles_response_data data;

void encode(protocol::encoder& writer, api_version version) {
data.encode(writer, version);
}

void decode(iobuf buf, api_version version) {
data.decode(std::move(buf), version);
}

fmt::iterator format_to(fmt::iterator it) const {
return fmt::format_to(it, "{}", data);
}
};

} // namespace kafka
44 changes: 18 additions & 26 deletions src/v/kafka/protocol/flex_versions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,26 @@ namespace kafka {

namespace {

template<typename... RequestTypes>
consteval size_t max_api_key(type_list<RequestTypes...>) {
/// Black magic here is an overload of std::max() that takes an
/// std::initializer_list
return std::max({RequestTypes::key()...});
}

/// Not every value from 0 -> max_api_key is a valid request, non-supported
/// requests will map to a value of api_key(-2)
/// requests will map to a value of api_version(-2)
constexpr api_version invalid_api = api_version(-2);

template<typename... RequestTypes>
consteval auto
get_flexible_request_min_versions_list(type_list<RequestTypes...> r) {
/// An std::array where the indicies map to api_keys and values at an index
/// map to the first flex version for a given api. If an api doesn't exist
/// at an index -2 or \ref invalid_api will be the value at the index.
std::array<api_version, max_api_key(r) + 1> versions;
versions.fill(invalid_api);
((versions[RequestTypes::key()] = RequestTypes::min_flexible), ...);
template<typename... Ts>
consteval void
fill_flex(api_key_table<api_version>& versions, type_list<Ts...>) {
/// Values map to the first flex version for a given api; apis absent from
/// the lists keep \ref invalid_api.
((versions[Ts::key] = Ts::min_flexible), ...);
}

consteval auto get_flexible_request_min_versions_list() {
api_key_table<api_version> versions{invalid_api};
fill_flex(versions, request_types{});
fill_flex(versions, redpanda_request_types{});
return versions;
}

constexpr auto g_flex_mapping = get_flexible_request_min_versions_list(
request_types());
constexpr auto g_flex_mapping = get_flexible_request_min_versions_list();

struct protocol_parse_exception : public net::parsing_exception {
explicit protocol_parse_exception(const std::string& m)
Expand All @@ -57,18 +52,15 @@ struct protocol_parse_exception : public net::parsing_exception {

bool flex_versions::is_flexible_request(api_key key, api_version version) {
/// If bounds checking is desired call is_api_in_schema(key) beforehand
const api_version first_flex_version = g_flex_mapping[key()];
const api_version first_flex_version = g_flex_mapping[key];
return (version >= first_flex_version)
&& (first_flex_version != never_flexible);
}

bool flex_versions::is_api_in_schema(api_key key) noexcept {
constexpr auto max_version = g_flex_mapping.max_size() - 1;
if (key() < 0 || static_cast<size_t>(key()) > max_version) {
return false;
}
const api_version first_flex_version = g_flex_mapping[key()];
return first_flex_version != invalid_api;
// find() does the region routing once and returns null for out-of-range.
const api_version* first_flex_version = g_flex_mapping.find(key);
return first_flex_version != nullptr && *first_flex_version != invalid_api;
}

namespace {
Expand Down
Loading