Remove record copying in produce batch validation#29577
Conversation
There was a problem hiding this comment.
Pull request overview
This PR removes record copying from the produce batch validation path by parsing only per-record metadata (timestamp/offset deltas) and skipping key/value/headers, reducing allocations and runtime.
Changes:
- Add
model::record_metadataand a lightweight parserparse_record_metadata_from_buffer. - Update produce validation record iteration to use the metadata-only parser instead of
record_batch::for_each_record. - Add a unit test validating metadata parsing against the existing record iterator.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/model/tests/record_batch_test.cc | Adds a unit test for metadata-only parsing. |
| src/v/model/record_utils.h | Declares parse_record_metadata_from_buffer and forward-declares record_metadata. |
| src/v/model/record_utils.cc | Implements metadata-only parsing and skipping of record payloads. |
| src/v/model/record.h | Introduces model::record_metadata (lightweight metadata container). |
| src/v/kafka/server/handlers/produce_validation.cc | Switches record iteration in validation to metadata-only parsing to avoid copies. |
| auto record_count = b.record_count(); | ||
| auto parser = iobuf_const_parser(b.data()); | ||
| try { | ||
| b.for_each_record(std::move(f)); | ||
| for (int32_t i = 0; i < record_count; ++i) { | ||
| auto res = f(model::parse_record_metadata_from_buffer(parser)); | ||
| if (res == ss::stop_iteration::yes) { | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
Iteration is bounded only by b.record_count(). If the batch payload contains more encoded records than record_count (e.g., crafted input), the extra bytes/records would not be validated (timestamp/offset checks skipped) and no error would be raised. Consider adding a post-loop check that parser.bytes_left() == 0 (or otherwise validate full consumption) and return invalid_record when trailing bytes remain.
There was a problem hiding this comment.
yeah exactly this. We need to validate everything, the input is completely untrusted
There was a problem hiding this comment.
Good point, will add additional validation here.
There was a problem hiding this comment.
I've added the post-loop validation check and a check to ensure the record size exceeds the small subset of fields we've read.
Since we already do a crc check on the batch the validation checks here seem to be for cases where the client is generating an ill-formed record. For that parsing every field of the record, as previously done, and ensuring that the record size field matches the actual size exactly is a stronger check.
Do you folks think we should still be parsing all fields here? It has a non-negligible impact on the performance, at least within the micro-benchmarks.
There was a problem hiding this comment.
What we really want is full validation (ie checking offset deltas are monotonically increasing, the timestamps match expectations, etc).
I would prefer full validation - we could also gate full validation in the "strict" mode too, and let the existing lax default be... lax 😄
There was a problem hiding this comment.
we could also gate full validation in the "strict" mode too, and let the existing lax default be... lax 😄
I think that this would address all of my concerns as it avoids the full parsing in the default config.
checking offset deltas are monotonically increasing
This seems like it's currently unimplemented in the strict mode though there is a TODO indicating it is desired. I'll go ahead and implement it in a commit for this PR. I can move it to its own PR as well if desired.
There was a problem hiding this comment.
I'm OK with any option about the offset deltas: this pr, another or telling me you didn't sign up for that work 😆
There was a problem hiding this comment.
Yeah, more in depth record validation feels like it belongs in a separate PR.
There was a problem hiding this comment.
#29624 for the discussed follow-up PR to bring our strict validation mode in parity with what Apache Kafka does.
|
we may want to make users explicitly aware by renaming the function Sounds like there are more uses of this in the codebase to audit/optimize from our talks at CKO |
9579e22 to
bdb9660
Compare
Will be posting a follow-up PR re-naming this method soon to avoid future accidental copies. Will be removing its use in other hot-paths(compaction, cloud topics, and iceberg at a glance) soon as well. |
bdb9660 to
c5d3fb0
Compare
It can be expensive to fully parse then share/copy the underlying key/value/headers for a record. In cases where only some metadata is needed from each record its much cheaper to just skip over the keys/values of each record.
c5d3fb0 to
88aaba6
Compare
This commit removes the use of `model::record_batch::for_each_record` from batch validation as it internally copies all data in a record. In its place a parser that just reads the record metadata and skips everything else is used. This leads to a decent performance improvement in our microbenchmarks. Prior to this commit: | test | iters | runtime | allocs | tasks | inst | cycles | | - | -: | -: | -: | -: | -: | -: | | produce_partition_fixture.1_submitted | 1298 | 155.92µs ± 0.00% | 206.002 | 0.000 | 1138717.0 | 839553.3 | | produce_partition_fixture.1_KiB_submitted | 99 | 170.12µs ± 0.00% | 206.030 | 0.000 | 1196613.3 | 904472.8 | | produce_partition_fixture.4_KiB_submitted | 27 | 175.35µs ± 0.00% | 206.148 | 0.000 | 1221996.9 | 934508.4 | | produce_partition_fixture.8_KiB_submitted | 14 | 184.96µs ± 0.00% | 206.071 | 0.000 | 1255062.4 | 985105.4 | | produce_partition_fixture.1_dispatched | 1303 | 207.89µs ± 0.00% | 264.068 | 23.038 | 1333748.1 | 1121835.2 | | produce_partition_fixture.1_KiB_dispatched | 99 | 243.84µs ± 0.00% | 462.061 | 23.000 | 1604919.2 | 1297880.6 | | produce_partition_fixture.4_KiB_dispatched | 27 | 249.86µs ± 0.00% | 462.185 | 23.000 | 1629563.6 | 1331322.7 | | produce_partition_fixture.8_KiB_dispatched | 14 | 260.99µs ± 0.00% | 462.929 | 23.143 | 1680685.2 | 1388548.6 | | produce_partition_fixture.1_produced | 1304 | 404.47µs ± 0.00% | 396.669 | 129.457 | 2189375.1 | 2115771.1 | | produce_partition_fixture.1_KiB_produced | 99 | 704.31µs ± 0.00% | 910.788 | 402.960 | 4018930.1 | 3496713.7 | | produce_partition_fixture.4_KiB_produced | 27 | 1.01ms ± 0.00% | 1216.296 | 626.926 | 5405753.9 | 4654244.5 | | produce_partition_fixture.8_KiB_produced | 14 | 1.43ms ± 0.00% | 1622.429 | 923.286 | 7287018.4 | 6180247.2 | After this commit: | test | iters | runtime | allocs | tasks | inst | cycles | | - | -: | -: | -: | -: | -: | -: | | produce_partition_fixture.1_submitted | 1467 | 66.97µs ± 0.00% | 6.000 | 0.000 | 411005.25 | 355761.6 | | produce_partition_fixture.1_KiB_submitted | 94 | 76.23µs ± 0.00% | 6.000 | 0.000 | 450017.20 | 398133.1 | | produce_partition_fixture.4_KiB_submitted | 27 | 76.23µs ± 0.00% | 6.000 | 0.000 | 450017.22 | 400184.3 | | produce_partition_fixture.8_KiB_submitted | 14 | 81.09µs ± 0.00% | 6.000 | 0.000 | 460517.50 | 415534.4 | | produce_partition_fixture.1_dispatched | 1466 | 118.17µs ± 0.00% | 64.005 | 23.000 | 604144.00 | 635594.0 | | produce_partition_fixture.1_KiB_dispatched | 98 | 144.28µs ± 0.00% | 262.000 | 23.000 | 852617.63 | 766608.1 | | produce_partition_fixture.4_KiB_dispatched | 27 | 147.39µs ± 0.00% | 262.000 | 23.000 | 854440.44 | 778284.8 | | produce_partition_fixture.8_KiB_dispatched | 14 | 150.64µs ± 0.00% | 262.000 | 23.000 | 863854.14 | 797258.4 | | produce_partition_fixture.1_produced | 1465 | 314.59µs ± 0.00% | 197.066 | 129.668 | 1461883.0 | 1630931.5 | | produce_partition_fixture.1_KiB_produced | 100 | 644.00µs ± 0.00% | 706.590 | 401.140 | 3265578.4 | 2971754.3 | | produce_partition_fixture.4_KiB_produced | 27 | 914.66µs ± 0.00% | 1019.148 | 628.778 | 4655887.0 | 4149024.8 | | produce_partition_fixture.8_KiB_produced | 14 | 1.39ms ± 0.00% | 1422.000 | 923.857 | 6873481.9 | 6006453.5 |
88aaba6 to
7a721b9
Compare
| const model::record_batch& b, | ||
| ss::noncopyable_function<ss::stop_iteration(model::record)> f) { | ||
| ss::noncopyable_function<ss::stop_iteration(model::record_metadata)>&& f, | ||
| bool is_strict_validation = false) { |
There was a problem hiding this comment.
mega-nit: prefer not to have default parameters
| sizeof(model::record_attributes::type) | ||
| + vint::vint_size(_timestamp_delta) + vint::vint_size(_offset_delta) | ||
| + vint::vint_size(_key_size) + std::max<int32_t>(_key_size, 0) | ||
| + vint::vint_size(_val_size) + std::max<int32_t>(_val_size, 0) | ||
| + vint::vint_size(static_cast<int64_t>(_headers.size())) | ||
| + std::accumulate( | ||
| _headers.begin(), | ||
| _headers.end(), | ||
| size_t(0), | ||
| [](size_t acc, const record_header& h) { | ||
| return acc + h.memory_usage(); | ||
| }) // | ||
| ); | ||
| return acc + vint::vint_size(h.key_size()) | ||
| + std::max<int32_t>(h.key_size(), 0) | ||
| + vint::vint_size(h.value_size()) | ||
| + std::max<int32_t>(h.value_size(), 0); | ||
| })); |
There was a problem hiding this comment.
Added unit tests to ensure this new calculation matches the actual serialized size.
Prior to this commit in cases where the key or value fields were set to std::nullopt the serialization size of these fields and their associated length field would be incorrectly calculated to be 0. Incorrect since the length field still needs to be written to with a -1 to indicate the empty key/value field.
7a721b9 to
6a4366b
Compare
model::record_batch::for_each_recordinternally copies all data in each record. This really isn't needed or wanted in batch validation. Instead all that is really needed is the timestamp and offset deltas of each record. This PR changes the produce batch validation path to just parse out these two fields of each record and skip over everything else.Before this PR:
After this PR:
Backports Required
Release Notes