Skip to content

Commit d050098

Browse files
authored
Merge pull request #28979 from daisukebe/print-empty-partitions
rpk: print why we avoid consuming
2 parents 406808a + cf9a6da commit d050098

2 files changed

Lines changed: 116 additions & 0 deletions

File tree

src/go/rpk/pkg/cli/topic/consume.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,8 +682,14 @@ func (c *consumer) filterEmptyPartitions() (allEmpty bool) {
682682
for lp, lat := range lps {
683683
rat, exists := rps[lp]
684684
if exists && rat <= lat {
685+
zap.L().Sugar().Warnf("no data to consume for %s/%d: requested end offset %d, available start offset %d",
686+
lt, lp, rat, lat)
685687
delete(rps, lp)
686688
delete(lps, lp)
689+
} else if rat > lat {
690+
// There is data to consume. We consume from the available start offset (lat),
691+
// which automatically adjusts if the user's requested start was too low.
692+
zap.L().Sugar().Debugf("consuming %s/%d from offset %d", lt, lp, lat)
687693
}
688694
}
689695
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright 2025 Redpanda Data, Inc.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the file licenses/BSL.md
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0
9+
10+
import subprocess
11+
12+
from ducktape.tests.test import TestContext
13+
from rptest.clients.rpk import RpkTool
14+
from rptest.clients.types import TopicSpec
15+
from rptest.services.cluster import cluster
16+
from rptest.tests.redpanda_test import RedpandaTest
17+
from rptest.services.kgo_verifier_services import KgoVerifierProducer
18+
19+
20+
class RpkConsumeEmptyPartitionWarningTest(RedpandaTest):
21+
"""
22+
Test that rpk topic consume prints a warning when trying to consume from
23+
a partition where the requested end offset is <= the available start offset.
24+
This verifies the change in https://github.com/redpanda-data/redpanda/pull/28979
25+
"""
26+
27+
def __init__(self, test_context: TestContext):
28+
super(RpkConsumeEmptyPartitionWarningTest, self).__init__(
29+
test_context=test_context,
30+
)
31+
32+
@cluster(num_nodes=4)
33+
def test_warning_on_empty_partition_consume(self):
34+
"""
35+
Test that rpk prints a warning when the requested end offset is
36+
less than or equal to the available start offset (e.g., after
37+
data has been deleted by trim_prefix).
38+
"""
39+
40+
topic = TopicSpec(
41+
partition_count=1,
42+
replication_factor=3,
43+
cleanup_policy=TopicSpec.CLEANUP_DELETE,
44+
)
45+
self.client().create_topic(topic)
46+
47+
rpk = RpkTool(self.redpanda)
48+
49+
# Produce 10 messages (offsets 0-9)
50+
producer = KgoVerifierProducer(
51+
self.test_context,
52+
self.redpanda,
53+
topic.name,
54+
msg_size=1024,
55+
msg_count=10,
56+
)
57+
producer.start()
58+
producer.wait()
59+
producer.free()
60+
61+
# Verify initial state
62+
partitions = list(rpk.describe_topic(topic.name))
63+
p = partitions[0]
64+
assert p.start_offset == 0
65+
assert p.high_watermark == 10
66+
67+
# Use trim_prefix to move start offset to 8
68+
# This simulates data deletion (offsets 0-7 are now gone)
69+
new_start_offset = 8
70+
rpk.trim_prefix(topic.name, offset=new_start_offset, partitions=[0])
71+
72+
# Verify that the start offset has moved forward
73+
partitions = list(rpk.describe_topic(topic.name))
74+
p = partitions[0]
75+
assert p.start_offset == new_start_offset, (
76+
f"Expected start offset to be {new_start_offset}, but got {p.start_offset}"
77+
)
78+
79+
# Try to consume from offset 0 to 7 (before the new start offset)
80+
# This should trigger the warning message
81+
end_offset = new_start_offset - 1
82+
offset_range = f"0:{end_offset}"
83+
84+
# Run rpk consume directly with subprocess to capture stderr
85+
cmd = [
86+
rpk._rpk_binary(),
87+
"topic",
88+
"consume",
89+
topic.name,
90+
"-o",
91+
offset_range,
92+
"--brokers",
93+
self.redpanda.brokers(),
94+
"-v",
95+
]
96+
97+
result = subprocess.run(
98+
cmd,
99+
capture_output=True,
100+
text=True,
101+
timeout=10,
102+
)
103+
104+
stderr = result.stderr
105+
106+
# Verify that the warning message appears in stderr
107+
expected_warning = f"no data to consume for {topic.name}/0: requested end offset {end_offset}, available start offset {new_start_offset}"
108+
assert expected_warning in stderr, (
109+
f"Expected warning message not found in stderr. Expected substring: '{expected_warning}', Got stderr: '{stderr}'"
110+
)

0 commit comments

Comments
 (0)