Skip to content

Commit 2ca10e2

Browse files
committed
Merge pull request #296 from ecanzonieri/validate_consumer_offset
Validate consumer offset in SimpleConsumer
2 parents 9ad0be6 + 37d0b7f commit 2ca10e2

File tree

2 files changed

+97
-4
lines changed

2 files changed

+97
-4
lines changed

kafka/consumer/simple.py

+54-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99

1010
import six
11+
import sys
1112

1213
try:
1314
from Queue import Empty, Queue
@@ -16,7 +17,9 @@
1617

1718
from kafka.common import (
1819
FetchRequest, OffsetRequest,
19-
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
20+
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
21+
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
22+
OffsetOutOfRangeError, check_error
2023
)
2124
from .base import (
2225
Consumer,
@@ -94,6 +97,10 @@ class SimpleConsumer(Consumer):
9497
message in the iterator before exiting. None means no
9598
timeout, so it will wait forever.
9699
100+
auto_offset_reset: default largest. Reset partition offsets upon
101+
OffsetOutOfRangeError. Valid values are largest and smallest.
102+
Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
103+
97104
Auto commit details:
98105
If both auto_commit_every_n and auto_commit_every_t are set, they will
99106
reset one another when one is triggered. These triggers simply call the
@@ -106,7 +113,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
106113
fetch_size_bytes=FETCH_MIN_BYTES,
107114
buffer_size=FETCH_BUFFER_SIZE_BYTES,
108115
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
109-
iter_timeout=None):
116+
iter_timeout=None,
117+
auto_offset_reset='largest'):
110118
super(SimpleConsumer, self).__init__(
111119
client, group, topic,
112120
partitions=partitions,
@@ -125,12 +133,38 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
125133
self.fetch_min_bytes = fetch_size_bytes
126134
self.fetch_offsets = self.offsets.copy()
127135
self.iter_timeout = iter_timeout
136+
self.auto_offset_reset = auto_offset_reset
128137
self.queue = Queue()
129138

130139
def __repr__(self):
131140
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
132141
(self.group, self.topic, str(self.offsets.keys()))
133142

143+
def reset_partition_offset(self, partition):
144+
LATEST = -1
145+
EARLIEST = -2
146+
if self.auto_offset_reset == 'largest':
147+
reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
148+
elif self.auto_offset_reset == 'smallest':
149+
reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
150+
else:
151+
# Let's raise an reasonable exception type if user calls
152+
# outside of an exception context
153+
if sys.exc_info() == (None, None, None):
154+
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
155+
'valid auto_offset_reset setting '
156+
'(largest|smallest)')
157+
# Otherwise we should re-raise the upstream exception
158+
# b/c it typically includes additional data about
159+
# the request that triggered it, and we do not want to drop that
160+
raise
161+
162+
# send_offset_request
163+
(resp, ) = self.client.send_offset_request(reqs)
164+
check_error(resp)
165+
self.offsets[partition] = resp.offsets[0]
166+
self.fetch_offsets[partition] = resp.offsets[0]
167+
134168
def provide_partition_info(self):
135169
"""
136170
Indicates that partition info must be returned by the consumer
@@ -297,10 +331,27 @@ def _fetch(self):
297331
responses = self.client.send_fetch_request(
298332
requests,
299333
max_wait_time=int(self.fetch_max_wait_time),
300-
min_bytes=self.fetch_min_bytes)
334+
min_bytes=self.fetch_min_bytes,
335+
fail_on_error=False
336+
)
301337

302338
retry_partitions = {}
303339
for resp in responses:
340+
341+
try:
342+
check_error(resp)
343+
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
344+
self.client.reset_topic_metadata(resp.topic)
345+
raise
346+
except OffsetOutOfRangeError:
347+
log.warning("OffsetOutOfRangeError for %s - %d. "
348+
"Resetting partition offset...",
349+
resp.topic, resp.partition)
350+
self.reset_partition_offset(resp.partition)
351+
# Retry this partition
352+
retry_partitions[resp.partition] = partitions[resp.partition]
353+
continue
354+
304355
partition = resp.partition
305356
buffer_size = partitions[partition]
306357
try:

test/test_consumer_integration.py

+43-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
77
from kafka.common import (
8-
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
8+
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError
99
)
1010
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
1111

@@ -85,6 +85,48 @@ def test_simple_consumer(self):
8585

8686
consumer.stop()
8787

88+
@kafka_versions('all')
89+
def test_simple_consumer_smallest_offset_reset(self):
90+
self.send_messages(0, range(0, 100))
91+
self.send_messages(1, range(100, 200))
92+
93+
consumer = self.consumer(auto_offset_reset='smallest')
94+
# Move fetch offset ahead of 300 message (out of range)
95+
consumer.seek(300, 2)
96+
# Since auto_offset_reset is set to smallest we should read all 200
97+
# messages from beginning.
98+
self.assert_message_count([message for message in consumer], 200)
99+
100+
@kafka_versions('all')
101+
def test_simple_consumer_largest_offset_reset(self):
102+
self.send_messages(0, range(0, 100))
103+
self.send_messages(1, range(100, 200))
104+
105+
# Default largest
106+
consumer = self.consumer()
107+
# Move fetch offset ahead of 300 message (out of range)
108+
consumer.seek(300, 2)
109+
# Since auto_offset_reset is set to largest we should not read any
110+
# messages.
111+
self.assert_message_count([message for message in consumer], 0)
112+
# Send 200 new messages to the queue
113+
self.send_messages(0, range(200, 300))
114+
self.send_messages(1, range(300, 400))
115+
# Since the offset is set to largest we should read all the new messages.
116+
self.assert_message_count([message for message in consumer], 200)
117+
118+
@kafka_versions('all')
119+
def test_simple_consumer_no_reset(self):
120+
self.send_messages(0, range(0, 100))
121+
self.send_messages(1, range(100, 200))
122+
123+
# Default largest
124+
consumer = self.consumer(auto_offset_reset=None)
125+
# Move fetch offset ahead of 300 message (out of range)
126+
consumer.seek(300, 2)
127+
with self.assertRaises(OffsetOutOfRangeError):
128+
consumer.get_message()
129+
88130
@kafka_versions("all")
89131
def test_simple_consumer__seek(self):
90132
self.send_messages(0, range(0, 100))

0 commit comments

Comments
 (0)