From 13d3f76112eac8103ae26357e01b186653ddd913 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 9 Jan 2026 10:43:34 -0500 Subject: [PATCH 1/3] `kafka`: pre-validate sizes when reading off the wire For string, array, and number of tags. --- src/v/kafka/protocol/wire.h | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/v/kafka/protocol/wire.h b/src/v/kafka/protocol/wire.h index f02cba7fa9adb..f3f65c4a3e03f 100644 --- a/src/v/kafka/protocol/wire.h +++ b/src/v/kafka/protocol/wire.h @@ -261,6 +261,13 @@ class decoder { tagged_fields read_tags() { tagged_fields::type tags; auto num_tags = read_unsigned_varint(); // consume total num of tags + if (unlikely(static_cast(num_tags) > _parser.bytes_left())) { + throw std::out_of_range( + fmt::format( + "Number of tags {} exceeds remaining bytes {}", + num_tags, + _parser.bytes_left())); + } int64_t prev_tag_id = -1; while (num_tags-- > 0) { auto id = read_unsigned_varint(); // consume tag id @@ -299,6 +306,15 @@ class decoder { if (unlikely(n < 0)) { throw std::out_of_range("Asked to read a negative byte string"); } + + if (unlikely(static_cast(n) > _parser.bytes_left())) { + throw std::out_of_range( + fmt::format( + "String length {} exceeds remaining bytes {}", + n, + _parser.bytes_left())); + } + return _parser.read_string(n); } @@ -306,6 +322,15 @@ class decoder { if (unlikely(n == 0)) { throw std::out_of_range("Asked to read a 0 byte flex string"); } + + if (unlikely(static_cast(n - 1) > _parser.bytes_left())) { + throw std::out_of_range( + fmt::format( + "Flex string length {} exceeds remaining bytes {}", + n, + _parser.bytes_left())); + } + return _parser.read_string(n - 1); } @@ -321,6 +346,15 @@ class decoder { if (len < 0) { throw std::out_of_range("Attempt to parse array w/ negative len"); } + + if (unlikely(static_cast(len) > _parser.bytes_left())) { + throw std::out_of_range( + fmt::format( + "Array length {} exceeds remaining bytes {}", + len, + _parser.bytes_left())); + } + Container res; if constexpr (detail::reserveable>) { res.reserve(len); From 61bfa92dafe62acd9ad34a414349175b53d42778 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 9 Jan 2026 10:44:10 -0500 Subject: [PATCH 2/3] `kafka`: add extra logging detail in `connection_context` --- src/v/kafka/server/connection_context.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index e7b4a3434b445..c07afbcd26dfa 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -1130,9 +1130,10 @@ ss::future<> connection_context::client_protocol_state::handle_response( if (disconnected) { vlog( klog.info, - "Disconnected {} ({})", + "Disconnected {} ({}), {}", connection_ctx->conn->addr, - disconnected.value()); + disconnected.value(), + e); } else { vlog(klog.warn, "Error processing request: {}", e); } From a78c238a220001bf8605bbf846de2ddc9866ba4d Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 9 Jan 2026 12:29:01 -0500 Subject: [PATCH 3/3] `kafka`: add unit tests for wire protocol validation Add wire_validation_test.cc with tests that verify bounds checking and error handling in the Kafka protocol decoder. Tests cover: - Array length validation (exceeds buffer, negative, max int32) - Flex array validation (exceeds buffer, zero length) - String/flex string validation (exceeds buffer, negative/zero length) - Bytes/flex bytes validation - Tagged fields validation (count exceeds buffer, duplicate/non-ascending IDs, size exceeds buffer) Co-Authored-By: Claude Opus 4.5 --- src/v/kafka/protocol/tests/BUILD | 16 + .../protocol/tests/wire_validation_test.cc | 275 ++++++++++++++++++ 2 files changed, 291 insertions(+) create mode 100644 src/v/kafka/protocol/tests/wire_validation_test.cc diff --git a/src/v/kafka/protocol/tests/BUILD b/src/v/kafka/protocol/tests/BUILD index ab2433b8e4195..655724aef874b 100644 --- a/src/v/kafka/protocol/tests/BUILD +++ b/src/v/kafka/protocol/tests/BUILD @@ -85,3 +85,19 @@ redpanda_cc_btest( "@seastar//:testing", ], ) + +redpanda_cc_btest( + name = "wire_validation_test", + timeout = "short", + srcs = [ + "wire_validation_test.cc", + ], + deps = [ + "//src/v/bytes:iobuf", + "//src/v/kafka/protocol", + "//src/v/test_utils:seastar_boost", + "@boost//:test", + "@seastar", + "@seastar//:testing", + ], +) diff --git a/src/v/kafka/protocol/tests/wire_validation_test.cc b/src/v/kafka/protocol/tests/wire_validation_test.cc new file mode 100644 index 0000000000000..cda6a55ae455d --- /dev/null +++ b/src/v/kafka/protocol/tests/wire_validation_test.cc @@ -0,0 +1,275 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "bytes/iobuf.h" +#include "kafka/protocol/wire.h" + +#include + +#include + +#include +#include + +/// Test that read_array throws when array length exceeds remaining bytes +SEASTAR_THREAD_TEST_CASE(read_array_length_exceeds_buffer) { + // Create a buffer that claims array length of 1000 but only has space for 1 + // element Format: int32 length (1000) + one int32 element + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write(int32_t{1000}); // Claim 1000 elements + writer.write(int32_t{42}); // But only provide 1 element (4 bytes) + + kafka::protocol::decoder reader(std::move(buf)); + + // This should fail because we claim 1000 elements but don't have the data + BOOST_CHECK_THROW( + reader.read_array( + [](kafka::protocol::decoder& r) { return r.read_int32(); }), + std::out_of_range); +} + +/// Test that read_array throws for negative length (not -1 which means null) +SEASTAR_THREAD_TEST_CASE(read_array_negative_length) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write(int32_t{-2}); // Invalid negative length + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW( + reader.read_array( + [](kafka::protocol::decoder& r) { return r.read_int32(); }), + std::out_of_range); +} + +/// Test that read_array with max int32 length fails appropriately +SEASTAR_THREAD_TEST_CASE(read_array_max_length) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write(std::numeric_limits::max()); // 2^31 - 1 elements + writer.write(int32_t{42}); // Only 1 element of data + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW( + reader.read_array( + [](kafka::protocol::decoder& r) { return r.read_int32(); }), + std::out_of_range); +} + +/// Test that read_flex_array throws when length exceeds remaining bytes +SEASTAR_THREAD_TEST_CASE(read_flex_array_length_exceeds_buffer) { + iobuf buf; + kafka::protocol::encoder writer(buf); + // Flex array uses varint where value = actual_length + 1 + // So varint 1001 means 1000 elements + writer.write_unsigned_varint(1001); + writer.write(int32_t{42}); // Only 1 element + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW( + reader.read_flex_array( + [](kafka::protocol::decoder& r) { return r.read_int32(); }), + std::out_of_range); +} + +/// Test that read_flex_array with 0 length throws (0 means null in flex) +SEASTAR_THREAD_TEST_CASE(read_flex_array_zero_length) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_unsigned_varint(0); // 0 = null, invalid for non-nullable + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW( + reader.read_flex_array( + [](kafka::protocol::decoder& r) { return r.read_int32(); }), + std::out_of_range); +} + +/// Test that read_flex_string throws when length exceeds remaining bytes +SEASTAR_THREAD_TEST_CASE(read_flex_string_length_exceeds_buffer) { + iobuf buf; + kafka::protocol::encoder writer(buf); + // Flex string: varint 1001 means 1000 bytes + writer.write_unsigned_varint(1001); + // Only provide 3 bytes of actual data + buf.append("abc", 3); + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_flex_string(), std::out_of_range); +} + +/// Test that read_flex_string with 0 length throws +SEASTAR_THREAD_TEST_CASE(read_flex_string_zero_length) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_unsigned_varint(0); // 0 = invalid for non-nullable flex string + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_flex_string(), std::out_of_range); +} + +/// Test that read_flex_bytes throws when length exceeds remaining bytes +SEASTAR_THREAD_TEST_CASE(read_flex_bytes_length_exceeds_buffer) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_unsigned_varint(1001); // 1000 bytes + buf.append("abc", 3); // Only 3 bytes + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_flex_bytes(), std::out_of_range); +} + +/// Test that read_string throws when length exceeds remaining bytes +SEASTAR_THREAD_TEST_CASE(read_string_length_exceeds_buffer) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write(int16_t{1000}); // Claim 1000 bytes + buf.append("abc", 3); // Only 3 bytes + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_string(), std::out_of_range); +} + +/// Test that read_string with negative length (not -1) throws +SEASTAR_THREAD_TEST_CASE(read_string_negative_length) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write(int16_t{-2}); // Invalid negative + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_string(), std::out_of_range); +} + +/// Test that read_tags throws when num_tags exceeds available data +SEASTAR_THREAD_TEST_CASE(read_tags_count_exceeds_buffer) { + iobuf buf; + kafka::protocol::encoder writer(buf); + // Claim 1000 tags but provide none + writer.write_unsigned_varint(1000); + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range); +} + +/// Test that read_tags throws for duplicate tag IDs +SEASTAR_THREAD_TEST_CASE(read_tags_duplicate_ids) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_unsigned_varint(2); // 2 tags + writer.write_unsigned_varint(0); // tag id 0 + writer.write_unsigned_varint(0); // tag size 0 + writer.write_unsigned_varint(0); // tag id 0 again (duplicate!) + writer.write_unsigned_varint(0); // tag size 0 + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range); +} + +/// Test that read_tags throws for non-ascending tag IDs +SEASTAR_THREAD_TEST_CASE(read_tags_non_ascending_ids) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_unsigned_varint(2); // 2 tags + writer.write_unsigned_varint(5); // tag id 5 + writer.write_unsigned_varint(0); // tag size 0 + writer.write_unsigned_varint(3); // tag id 3 (less than 5, invalid!) + writer.write_unsigned_varint(0); // tag size 0 + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range); +} + +/// Test that read_tags throws when tag size exceeds remaining bytes +SEASTAR_THREAD_TEST_CASE(read_tags_size_exceeds_buffer) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_unsigned_varint(1); // 1 tag + writer.write_unsigned_varint(0); // tag id 0 + writer.write_unsigned_varint(1000); // tag size 1000 (but no data follows) + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range); +} + +/// Test that read_bytes throws when length exceeds remaining bytes +SEASTAR_THREAD_TEST_CASE(read_bytes_length_exceeds_buffer) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write(int32_t{1000}); // Claim 1000 bytes + buf.append("abc", 3); // Only 3 bytes + + kafka::protocol::decoder reader(std::move(buf)); + + BOOST_CHECK_THROW(reader.read_bytes(), std::out_of_range); +} + +/// Test valid array parsing still works +SEASTAR_THREAD_TEST_CASE(read_array_valid) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write(int32_t{3}); // 3 elements + writer.write(int32_t{1}); + writer.write(int32_t{2}); + writer.write(int32_t{3}); + + kafka::protocol::decoder reader(std::move(buf)); + + auto result = reader.read_array( + [](kafka::protocol::decoder& r) { return r.read_int32(); }); + + BOOST_REQUIRE_EQUAL(result.size(), 3); + BOOST_CHECK_EQUAL(result[0], 1); + BOOST_CHECK_EQUAL(result[1], 2); + BOOST_CHECK_EQUAL(result[2], 3); +} + +/// Test valid flex string parsing still works +SEASTAR_THREAD_TEST_CASE(read_flex_string_valid) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_flex(std::string_view{"abc"}); + + kafka::protocol::decoder reader(std::move(buf)); + + auto result = reader.read_flex_string(); + BOOST_CHECK_EQUAL(result, "abc"); +} + +/// Test valid tags parsing still works +SEASTAR_THREAD_TEST_CASE(read_tags_valid) { + iobuf buf; + kafka::protocol::encoder writer(buf); + writer.write_unsigned_varint(2); // 2 tags + writer.write_unsigned_varint(0); // tag id 0 + writer.write_unsigned_varint(2); // tag size 2 + buf.append("ab", 2); // tag data + writer.write_unsigned_varint(5); // tag id 5 (ascending) + writer.write_unsigned_varint(1); // tag size 1 + buf.append("c", 1); // tag data + + kafka::protocol::decoder reader(std::move(buf)); + + auto tags = reader.read_tags(); + BOOST_CHECK_EQUAL(tags().size(), 2); +}