Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/v/kafka/protocol/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,19 @@ redpanda_cc_btest(
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "wire_validation_test",
timeout = "short",
srcs = [
"wire_validation_test.cc",
],
deps = [
"//src/v/bytes:iobuf",
"//src/v/kafka/protocol",
"//src/v/test_utils:seastar_boost",
"@boost//:test",
"@seastar",
"@seastar//:testing",
],
)
275 changes: 275 additions & 0 deletions src/v/kafka/protocol/tests/wire_validation_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "bytes/iobuf.h"
#include "kafka/protocol/wire.h"

#include <seastar/testing/thread_test_case.hh>

#include <boost/test/unit_test.hpp>

#include <cstdint>
#include <limits>

/// Test that read_array throws when array length exceeds remaining bytes
SEASTAR_THREAD_TEST_CASE(read_array_length_exceeds_buffer) {
// Create a buffer that claims array length of 1000 but only has space for 1
// element Format: int32 length (1000) + one int32 element
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write(int32_t{1000}); // Claim 1000 elements
writer.write(int32_t{42}); // But only provide 1 element (4 bytes)

kafka::protocol::decoder reader(std::move(buf));

// This should fail because we claim 1000 elements but don't have the data
BOOST_CHECK_THROW(
reader.read_array(
[](kafka::protocol::decoder& r) { return r.read_int32(); }),
std::out_of_range);
}

/// Test that read_array throws for negative length (not -1 which means null)
SEASTAR_THREAD_TEST_CASE(read_array_negative_length) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write(int32_t{-2}); // Invalid negative length

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(
reader.read_array(
[](kafka::protocol::decoder& r) { return r.read_int32(); }),
std::out_of_range);
}

/// Test that read_array with max int32 length fails appropriately
SEASTAR_THREAD_TEST_CASE(read_array_max_length) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write(std::numeric_limits<int32_t>::max()); // 2^31 - 1 elements
writer.write(int32_t{42}); // Only 1 element of data

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(
reader.read_array(
[](kafka::protocol::decoder& r) { return r.read_int32(); }),
std::out_of_range);
}

/// Test that read_flex_array throws when length exceeds remaining bytes
SEASTAR_THREAD_TEST_CASE(read_flex_array_length_exceeds_buffer) {
iobuf buf;
kafka::protocol::encoder writer(buf);
// Flex array uses varint where value = actual_length + 1
// So varint 1001 means 1000 elements
writer.write_unsigned_varint(1001);
writer.write(int32_t{42}); // Only 1 element

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(
reader.read_flex_array(
[](kafka::protocol::decoder& r) { return r.read_int32(); }),
std::out_of_range);
}

/// Test that read_flex_array with 0 length throws (0 means null in flex)
SEASTAR_THREAD_TEST_CASE(read_flex_array_zero_length) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_unsigned_varint(0); // 0 = null, invalid for non-nullable

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(
reader.read_flex_array(
[](kafka::protocol::decoder& r) { return r.read_int32(); }),
std::out_of_range);
}

/// Test that read_flex_string throws when length exceeds remaining bytes
SEASTAR_THREAD_TEST_CASE(read_flex_string_length_exceeds_buffer) {
iobuf buf;
kafka::protocol::encoder writer(buf);
// Flex string: varint 1001 means 1000 bytes
writer.write_unsigned_varint(1001);
// Only provide 3 bytes of actual data
buf.append("abc", 3);

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_flex_string(), std::out_of_range);
}

/// Test that read_flex_string with 0 length throws
SEASTAR_THREAD_TEST_CASE(read_flex_string_zero_length) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_unsigned_varint(0); // 0 = invalid for non-nullable flex string

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_flex_string(), std::out_of_range);
}

/// Test that read_flex_bytes throws when length exceeds remaining bytes
SEASTAR_THREAD_TEST_CASE(read_flex_bytes_length_exceeds_buffer) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_unsigned_varint(1001); // 1000 bytes
buf.append("abc", 3); // Only 3 bytes

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_flex_bytes(), std::out_of_range);
}

/// Test that read_string throws when length exceeds remaining bytes
SEASTAR_THREAD_TEST_CASE(read_string_length_exceeds_buffer) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write(int16_t{1000}); // Claim 1000 bytes
buf.append("abc", 3); // Only 3 bytes

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_string(), std::out_of_range);
}

/// Test that read_string with negative length (not -1) throws
SEASTAR_THREAD_TEST_CASE(read_string_negative_length) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write(int16_t{-2}); // Invalid negative

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_string(), std::out_of_range);
}

/// Test that read_tags throws when num_tags exceeds available data
SEASTAR_THREAD_TEST_CASE(read_tags_count_exceeds_buffer) {
iobuf buf;
kafka::protocol::encoder writer(buf);
// Claim 1000 tags but provide none
writer.write_unsigned_varint(1000);

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range);
}

/// Test that read_tags throws for duplicate tag IDs
SEASTAR_THREAD_TEST_CASE(read_tags_duplicate_ids) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_unsigned_varint(2); // 2 tags
writer.write_unsigned_varint(0); // tag id 0
writer.write_unsigned_varint(0); // tag size 0
writer.write_unsigned_varint(0); // tag id 0 again (duplicate!)
writer.write_unsigned_varint(0); // tag size 0

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range);
}

/// Test that read_tags throws for non-ascending tag IDs
SEASTAR_THREAD_TEST_CASE(read_tags_non_ascending_ids) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_unsigned_varint(2); // 2 tags
writer.write_unsigned_varint(5); // tag id 5
writer.write_unsigned_varint(0); // tag size 0
writer.write_unsigned_varint(3); // tag id 3 (less than 5, invalid!)
writer.write_unsigned_varint(0); // tag size 0

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range);
}

/// Test that read_tags throws when tag size exceeds remaining bytes
SEASTAR_THREAD_TEST_CASE(read_tags_size_exceeds_buffer) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_unsigned_varint(1); // 1 tag
writer.write_unsigned_varint(0); // tag id 0
writer.write_unsigned_varint(1000); // tag size 1000 (but no data follows)

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_tags(), std::out_of_range);
}

/// Test that read_bytes throws when length exceeds remaining bytes
SEASTAR_THREAD_TEST_CASE(read_bytes_length_exceeds_buffer) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write(int32_t{1000}); // Claim 1000 bytes
buf.append("abc", 3); // Only 3 bytes

kafka::protocol::decoder reader(std::move(buf));

BOOST_CHECK_THROW(reader.read_bytes(), std::out_of_range);
}

/// Test valid array parsing still works
SEASTAR_THREAD_TEST_CASE(read_array_valid) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write(int32_t{3}); // 3 elements
writer.write(int32_t{1});
writer.write(int32_t{2});
writer.write(int32_t{3});

kafka::protocol::decoder reader(std::move(buf));

auto result = reader.read_array(
[](kafka::protocol::decoder& r) { return r.read_int32(); });

BOOST_REQUIRE_EQUAL(result.size(), 3);
BOOST_CHECK_EQUAL(result[0], 1);
BOOST_CHECK_EQUAL(result[1], 2);
BOOST_CHECK_EQUAL(result[2], 3);
}

/// Test valid flex string parsing still works
SEASTAR_THREAD_TEST_CASE(read_flex_string_valid) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_flex(std::string_view{"abc"});

kafka::protocol::decoder reader(std::move(buf));

auto result = reader.read_flex_string();
BOOST_CHECK_EQUAL(result, "abc");
}

/// Test valid tags parsing still works
SEASTAR_THREAD_TEST_CASE(read_tags_valid) {
iobuf buf;
kafka::protocol::encoder writer(buf);
writer.write_unsigned_varint(2); // 2 tags
writer.write_unsigned_varint(0); // tag id 0
writer.write_unsigned_varint(2); // tag size 2
buf.append("ab", 2); // tag data
writer.write_unsigned_varint(5); // tag id 5 (ascending)
writer.write_unsigned_varint(1); // tag size 1
buf.append("c", 1); // tag data

kafka::protocol::decoder reader(std::move(buf));

auto tags = reader.read_tags();
BOOST_CHECK_EQUAL(tags().size(), 2);
}
34 changes: 34 additions & 0 deletions src/v/kafka/protocol/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ class decoder {
tagged_fields read_tags() {
tagged_fields::type tags;
auto num_tags = read_unsigned_varint(); // consume total num of tags
if (unlikely(static_cast<size_t>(num_tags) > _parser.bytes_left())) {
throw std::out_of_range(
fmt::format(
"Number of tags {} exceeds remaining bytes {}",
num_tags,
_parser.bytes_left()));
}
int64_t prev_tag_id = -1;
while (num_tags-- > 0) {
auto id = read_unsigned_varint(); // consume tag id
Comment on lines +264 to 273

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michael-redpanda what was the concern for this case? it looks like if num_tags is too large then the loop will hit end-of-buffer exception. the other cases were large allocations because the bogus value was plugged directly into malloc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my concern isn't here, but below in the iobuf_to_bytes(_parser.share(size)) call. Maybe that one is ok as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure _parser.share() won't malloc() either, it iterates over fragments.

Expand Down Expand Up @@ -299,13 +306,31 @@ class decoder {
if (unlikely(n < 0)) {
throw std::out_of_range("Asked to read a negative byte string");
}

if (unlikely(static_cast<size_t>(n) > _parser.bytes_left())) {
throw std::out_of_range(
fmt::format(
"String length {} exceeds remaining bytes {}",
n,
_parser.bytes_left()));
}

return _parser.read_string(n);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the above validations should be inside the parser read_string method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be, though iobuf_parser is used a lot internally and performing validation here at the kafka layer seems reasonable as well. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iobuf_parser is used a lot internally and performing validation here at the kafka layer seems reasonable as well. WDYT?

maybe i'm missing something, but isn't this an argument for putting the validation in the parser itself, so that the validation happens for all users not just this one spot?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if two extra if statements is a concern for performance 👍

WDYT about putting these checks into iobuf_parser::read_string_safe() and calling that from wire.h instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry, you're right. i was thinking that the iobuf_parser interfaces were reading the size, but infact we are providing it as parsed out of the kafka protocol. so yeh, doing the check above the iobuf_parser makes sense!

}

ss::sstring do_read_flex_string(uint32_t n) {
if (unlikely(n == 0)) {
throw std::out_of_range("Asked to read a 0 byte flex string");
}

if (unlikely(static_cast<size_t>(n - 1) > _parser.bytes_left())) {
throw std::out_of_range(
fmt::format(
"Flex string length {} exceeds remaining bytes {}",
n,
_parser.bytes_left()));
}

return _parser.read_string(n - 1);
}

Expand All @@ -321,6 +346,15 @@ class decoder {
if (len < 0) {
throw std::out_of_range("Attempt to parse array w/ negative len");
}

if (unlikely(static_cast<size_t>(len) > _parser.bytes_left())) {
throw std::out_of_range(
fmt::format(
"Array length {} exceeds remaining bytes {}",
len,
_parser.bytes_left()));
}

Container<T> res;
if constexpr (detail::reserveable<Container<T>>) {
res.reserve(len);
Expand Down
5 changes: 3 additions & 2 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1130,9 +1130,10 @@ ss::future<> connection_context::client_protocol_state::handle_response(
if (disconnected) {
vlog(
klog.info,
"Disconnected {} ({})",
"Disconnected {} ({}), {}",
connection_ctx->conn->addr,
disconnected.value());
disconnected.value(),
e);
} else {
vlog(klog.warn, "Error processing request: {}", e);
}
Expand Down