Skip to content

Commit 6b88a8f

Browse files
committed
kafka: improve log lines in produce_validation
These log lines aren't actually that helpful for fixing a bad client (which partition is being produced to? Which client is producing the batches?), especially in a severless environment. Add the `ntp` and `client_id` to the log line to help with isolating the client which needs fixing.
1 parent f8004ef commit 6b88a8f

4 files changed

Lines changed: 55 additions & 32 deletions

File tree

src/v/kafka/server/handlers/produce.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,14 @@ ss::future<produce_response::partition> do_produce_topic_partition(
214214
ntp_produce_request req,
215215
std::unique_ptr<ss::promise<>> dispatched) {
216216
auto start = std::chrono::steady_clock::now();
217-
auto validate_batch_res = co_await validate_batch({
218-
.batch = *req.batch,
219-
.timestamp_type = req.timestamp_type,
220-
.message_timestamp_before_max_ms = req.message_timestamp_before_max_ms,
221-
.message_timestamp_after_max_ms = req.message_timestamp_after_max_ms,
222-
.probe = octx.rctx.probe(),
223-
});
217+
auto validate_batch_res = co_await validate_batch(
218+
{.batch = *req.batch,
219+
.timestamp_type = req.timestamp_type,
220+
.message_timestamp_before_max_ms = req.message_timestamp_before_max_ms,
221+
.message_timestamp_after_max_ms = req.message_timestamp_after_max_ms,
222+
.probe = octx.rctx.probe(),
223+
.ntp = req.ntp,
224+
.client_id = octx.rctx.header().client_id});
224225

225226
if (validate_batch_res.has_value()) {
226227
co_return finalize_request_with_error_code(

src/v/kafka/server/handlers/produce_validation.cc

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ std::optional<error_code_and_msg> validate_timestamp(
3434
model::timestamp_type timestamp_type,
3535
std::chrono::milliseconds message_timestamp_before_max_ms,
3636
std::chrono::milliseconds message_timestamp_after_max_ms,
37-
kafka::kafka_probe& probe) {
37+
kafka::kafka_probe& probe,
38+
const model::ntp& ntp) {
3839
if (timestamp_type == model::timestamp_type::append_time) {
3940
// These validations are skipped for `APPEND_TIME`.
4041
// https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.after.max.ms
@@ -49,9 +50,10 @@ std::optional<error_code_and_msg> validate_timestamp(
4950
if (is_invalid) {
5051
thread_local static ss::logger::rate_limit rate(validate_rate_interval);
5152
auto msg = ssx::sformat(
52-
"Timestamp {} of message with offset {} is out of range. The "
53-
"timestamp should be within [{}, {}] of the broker time.",
53+
"Timestamp {} of message for partition {} with offset {} is out of "
54+
"range. The timestamp should be within [{}, {}] of the broker time.",
5455
timestamp,
56+
ntp,
5557
offset,
5658
message_timestamp_before_max_ms,
5759
message_timestamp_after_max_ms);
@@ -78,7 +80,8 @@ std::optional<error_code_and_msg> validate_batch_timestamps(
7880
model::timestamp_type timestamp_type,
7981
std::chrono::milliseconds message_timestamp_before_max_ms,
8082
std::chrono::milliseconds message_timestamp_after_max_ms,
81-
kafka::kafka_probe& probe) {
83+
kafka::kafka_probe& probe,
84+
const model::ntp& ntp) {
8285
if (timestamp_type == model::timestamp_type::append_time) {
8386
// These validations are skipped for `APPEND_TIME`.
8487
// https://kafka.apache.org/documentation/#brokerconfigs_log.message.timestamp.after.max.ms
@@ -97,7 +100,8 @@ std::optional<error_code_and_msg> validate_batch_timestamps(
97100
timestamp_type,
98101
message_timestamp_before_max_ms,
99102
message_timestamp_after_max_ms,
100-
probe);
103+
probe,
104+
ntp);
101105

102106
if (res.has_value()) {
103107
return res;
@@ -111,7 +115,8 @@ std::optional<error_code_and_msg> validate_batch_timestamps(
111115
timestamp_type,
112116
message_timestamp_before_max_ms,
113117
message_timestamp_after_max_ms,
114-
probe);
118+
probe,
119+
ntp);
115120

116121
return res;
117122
}
@@ -197,7 +202,8 @@ validate_records_and_compute_max_timestamp(
197202
model::timestamp_type timestamp_type,
198203
std::chrono::milliseconds message_timestamp_before_max_ms,
199204
std::chrono::milliseconds message_timestamp_after_max_ms,
200-
kafka::kafka_probe& probe) {
205+
kafka::kafka_probe& probe,
206+
const model::ntp& ntp) {
201207
std::optional<error_code_and_msg> res;
202208
int64_t max_timestamp = -1;
203209
auto iterable_res = iterate_over_records(
@@ -212,7 +218,8 @@ validate_records_and_compute_max_timestamp(
212218
timestamp_type,
213219
message_timestamp_before_max_ms,
214220
message_timestamp_after_max_ms,
215-
probe);
221+
probe,
222+
ntp);
216223
max_timestamp = std::max(timestamp(), max_timestamp);
217224

218225
return res.has_value() ? ss::stop_iteration::yes
@@ -269,7 +276,9 @@ std::optional<error_code_and_msg> validate_batch(
269276
model::timestamp_type timestamp_type,
270277
std::chrono::milliseconds message_timestamp_before_max_ms,
271278
std::chrono::milliseconds message_timestamp_after_max_ms,
272-
kafka::kafka_probe& probe) {
279+
kafka::kafka_probe& probe,
280+
const model::ntp& ntp,
281+
std::optional<std::string_view> client_id) {
273282
std::optional<error_code_and_msg> res{std::nullopt};
274283
const auto broker_time = model::timestamp::now();
275284
const auto has_iterable_batch = iterable_batch_ref.has_value();
@@ -301,7 +310,8 @@ std::optional<error_code_and_msg> validate_batch(
301310
timestamp_type,
302311
message_timestamp_before_max_ms,
303312
message_timestamp_after_max_ms,
304-
probe);
313+
probe,
314+
ntp);
305315

306316
if (res.has_value()) {
307317
return res;
@@ -315,10 +325,12 @@ std::optional<error_code_and_msg> validate_batch(
315325
klog.log(
316326
ss::log_level::warn,
317327
rate,
318-
"Produced batch has max_timestamp left unset ({{-1}}) by "
319-
"client. Accepting batch since '{}' is set to '{}'. It is "
320-
"strongly recommended that you update your client to set the "
321-
"max_timestamp when producing: {}.",
328+
"Produced batch for partition {} has max_timestamp left unset "
329+
"({{-1}}) by client (client_id: {}). Accepting batch since '{}' "
330+
"is set to '{}'. It is strongly recommended that you update your "
331+
"client to set the max_timestamp when producing: {}.",
332+
ntp,
333+
client_id,
322334
config::shard_local_cfg().kafka_produce_batch_validation.name(),
323335
validation_mode,
324336
batch.header());
@@ -343,10 +355,13 @@ std::optional<error_code_and_msg> validate_batch(
343355
klog.log(
344356
ss::log_level::warn,
345357
rate,
346-
"Produced batch has max_timestamp left unset ({{-1}}) by "
347-
"client. Decompressing batch and setting max_timestamp manually "
348-
"since '{}' to set to '{}'. It is strongly recommended that you "
349-
"update your client to set the max_timestamp when producing: {}.",
358+
"Produced batch for partition {} has max_timestamp left unset "
359+
"({{-1}}) by client (client_id: {}). Decompressing batch and "
360+
"setting max_timestamp manually since '{}' is set to '{}'. It is "
361+
"strongly recommended that you update your client to set the "
362+
"max_timestamp when producing: {}.",
363+
ntp,
364+
client_id,
350365
config::shard_local_cfg().kafka_produce_batch_validation.name(),
351366
validation_mode,
352367
batch.header());
@@ -362,7 +377,8 @@ std::optional<error_code_and_msg> validate_batch(
362377
timestamp_type,
363378
message_timestamp_before_max_ms,
364379
message_timestamp_after_max_ms,
365-
probe);
380+
probe,
381+
ntp);
366382
if (!max_ts_res.has_value()) {
367383
return max_ts_res.error();
368384
} else {
@@ -382,7 +398,8 @@ std::optional<error_code_and_msg> validate_batch(
382398
timestamp_type,
383399
message_timestamp_before_max_ms,
384400
message_timestamp_after_max_ms,
385-
probe);
401+
probe,
402+
ntp);
386403
}
387404

388405
if (res.has_value()) {
@@ -413,7 +430,8 @@ std::optional<error_code_and_msg> validate_batch(
413430
timestamp_type,
414431
message_timestamp_before_max_ms,
415432
message_timestamp_after_max_ms,
416-
probe);
433+
probe,
434+
ntp);
417435

418436
if (!max_ts_res.has_value()) {
419437
return max_ts_res.error();
@@ -467,7 +485,9 @@ validate_batch(const validation_args& args) {
467485
args.timestamp_type,
468486
args.message_timestamp_before_max_ms,
469487
args.message_timestamp_after_max_ms,
470-
args.probe);
488+
args.probe,
489+
args.ntp,
490+
args.client_id);
471491
}
472492

473493
} // namespace kafka

src/v/kafka/server/handlers/produce_validation.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ struct validation_args {
4040
std::chrono::milliseconds message_timestamp_before_max_ms;
4141
std::chrono::milliseconds message_timestamp_after_max_ms;
4242
kafka::kafka_probe& probe;
43+
const model::ntp& ntp;
44+
std::optional<std::string_view> client_id;
4345
};
4446

4547
// Entry point for batch validation.

tests/rptest/tests/compatibility/sarama_produce_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ class SaramaLegacyProduceTest(RedpandaTest):
9090
expected_log_lines = defaultdict(dict)
9191
expected_log_lines[ValidationMode.RELAXED][False] = None
9292
expected_log_lines[ValidationMode.RELAXED][True] = (
93-
"Produced batch has max_timestamp left unset ({-1}) by client. Decompressing batch and setting max_timestamp manually since 'kafka_produce_batch_validation' to set to 'relaxed'. It is strongly recommended that you update your client to set the max_timestamp when producing"
93+
"Produced batch for partition {kafka/topic/0} has max_timestamp left unset ({-1}) by client (client_id: {sarama}). Decompressing batch and setting max_timestamp manually since 'kafka_produce_batch_validation' is set to 'relaxed'. It is strongly recommended that you update your client to set the max_timestamp when producing"
9494
)
9595
expected_log_lines[ValidationMode.LEGACY][False] = None
9696
expected_log_lines[ValidationMode.LEGACY][True] = (
97-
"Produced batch has max_timestamp left unset ({-1}) by client. Accepting batch since 'kafka_produce_batch_validation' is set to 'legacy'. It is strongly recommended that you update your client to set the max_timestamp when producing"
97+
"Produced batch for partition {kafka/topic/0} has max_timestamp left unset ({-1}) by client (client_id: {sarama}). Accepting batch since 'kafka_produce_batch_validation' is set to 'legacy'. It is strongly recommended that you update your client to set the max_timestamp when producing"
9898
)
9999
expected_log_lines[ValidationMode.STRICT][False] = None
100100
expected_log_lines[ValidationMode.STRICT][True] = None
@@ -151,7 +151,7 @@ def test_produce(self, validation_mode, compression_type):
151151

152152
if expected_log_line is None:
153153
# We shouldn't see any warning for an unset max_timestamp if it is not expected.
154-
pattern = "Produced batch has max_timestamp left unset ({-1}) by client"
154+
pattern = "Produced batch for partition {kafka/topic/0} has max_timestamp left unset ({-1}) by client"
155155
assert not self.redpanda.search_log_all(pattern), (
156156
f"Saw unexpected log line in redpanda logs for {validation_mode=}, {compression_type=} when maximum timestamp is unset in a produced batch."
157157
)

0 commit comments

Comments
 (0)