Skip to content

Commit 37d0b7f

Browse files
committed
Retry failed partitions and add integration tests
1 parent 9ab8415 commit 37d0b7f

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

kafka/consumer/simple.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,8 @@ def _fetch(self):
348348
"Resetting partition offset...",
349349
resp.topic, resp.partition)
350350
self.reset_partition_offset(resp.partition)
351+
# Retry this partition
352+
retry_partitions[resp.partition] = partitions[resp.partition]
351353
continue
352354

353355
partition = resp.partition

test/test_consumer_integration.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
77
from kafka.common import (
8-
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
8+
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError
99
)
1010
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
1111

@@ -85,6 +85,48 @@ def test_simple_consumer(self):
8585

8686
consumer.stop()
8787

88+
@kafka_versions('all')
89+
def test_simple_consumer_smallest_offset_reset(self):
90+
self.send_messages(0, range(0, 100))
91+
self.send_messages(1, range(100, 200))
92+
93+
consumer = self.consumer(auto_offset_reset='smallest')
94+
# Move fetch offset ahead of 300 message (out of range)
95+
consumer.seek(300, 2)
96+
# Since auto_offset_reset is set to smallest we should read all 200
97+
# messages from beginning.
98+
self.assert_message_count([message for message in consumer], 200)
99+
100+
@kafka_versions('all')
101+
def test_simple_consumer_largest_offset_reset(self):
102+
self.send_messages(0, range(0, 100))
103+
self.send_messages(1, range(100, 200))
104+
105+
# Default largest
106+
consumer = self.consumer()
107+
# Move fetch offset ahead of 300 message (out of range)
108+
consumer.seek(300, 2)
109+
# Since auto_offset_reset is set to largest we should not read any
110+
# messages.
111+
self.assert_message_count([message for message in consumer], 0)
112+
# Send 200 new messages to the queue
113+
self.send_messages(0, range(200, 300))
114+
self.send_messages(1, range(300, 400))
115+
# Since the offset is set to largest we should read all the new messages.
116+
self.assert_message_count([message for message in consumer], 200)
117+
118+
@kafka_versions('all')
119+
def test_simple_consumer_no_reset(self):
120+
self.send_messages(0, range(0, 100))
121+
self.send_messages(1, range(100, 200))
122+
123+
# Default largest
124+
consumer = self.consumer(auto_offset_reset=None)
125+
# Move fetch offset ahead of 300 message (out of range)
126+
consumer.seek(300, 2)
127+
with self.assertRaises(OffsetOutOfRangeError):
128+
consumer.get_message()
129+
88130
@kafka_versions("all")
89131
def test_simple_consumer__seek(self):
90132
self.send_messages(0, range(0, 100))

0 commit comments

Comments
 (0)