Skip to content

Commit 1bd6573

Browse files
authored
Remove legacy/v1 consumer message iterator (#2543)
1 parent b60a266 commit 1bd6573

File tree

1 file changed

+4
-81
lines changed

1 file changed

+4
-81
lines changed

kafka/consumer/group.py

+4-81
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,6 @@ class KafkaConsumer(six.Iterator):
327327
'sasl_kerberos_domain_name': None,
328328
'sasl_oauth_token_provider': None,
329329
'socks5_proxy': None,
330-
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
331330
'kafka_client': KafkaClient,
332331
}
333332
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
@@ -845,8 +844,7 @@ def seek(self, partition, offset):
845844
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
846845
log.debug("Seeking to offset %s for partition %s", offset, partition)
847846
self._subscription.assignment[partition].seek(offset)
848-
if not self.config['legacy_iterator']:
849-
self._iterator = None
847+
self._iterator = None
850848

851849
def seek_to_beginning(self, *partitions):
852850
"""Seek to the oldest available offset for partitions.
@@ -871,8 +869,7 @@ def seek_to_beginning(self, *partitions):
871869
for tp in partitions:
872870
log.debug("Seeking to beginning of partition %s", tp)
873871
self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST)
874-
if not self.config['legacy_iterator']:
875-
self._iterator = None
872+
self._iterator = None
876873

877874
def seek_to_end(self, *partitions):
878875
"""Seek to the most recent available offset for partitions.
@@ -897,8 +894,7 @@ def seek_to_end(self, *partitions):
897894
for tp in partitions:
898895
log.debug("Seeking to end of partition %s", tp)
899896
self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST)
900-
if not self.config['legacy_iterator']:
901-
self._iterator = None
897+
self._iterator = None
902898

903899
def subscribe(self, topics=(), pattern=None, listener=None):
904900
"""Subscribe to a list of topics, or a topic regex pattern.
@@ -974,8 +970,7 @@ def unsubscribe(self):
974970
self._client.cluster.need_all_topic_metadata = False
975971
self._client.set_topics([])
976972
log.debug("Unsubscribed all topics or patterns and assigned partitions")
977-
if not self.config['legacy_iterator']:
978-
self._iterator = None
973+
self._iterator = None
979974

980975
def metrics(self, raw=False):
981976
"""Get metrics on consumer performance.
@@ -1157,73 +1152,12 @@ def _message_generator_v2(self):
11571152
self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1)
11581153
yield record
11591154

1160-
def _message_generator(self):
1161-
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
1162-
1163-
def inner_poll_ms():
1164-
return max(0, min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms']))
1165-
1166-
while time.time() < self._consumer_timeout:
1167-
1168-
if not self._coordinator.poll(timeout_ms=inner_poll_ms()):
1169-
continue
1170-
1171-
# Fetch offsets for any subscribed partitions that we arent tracking yet
1172-
if not self._subscription.has_all_fetch_positions():
1173-
partitions = self._subscription.missing_fetch_positions()
1174-
self._update_fetch_positions(partitions)
1175-
1176-
self._client.poll(timeout_ms=inner_poll_ms())
1177-
1178-
# after the long poll, we should check whether the group needs to rebalance
1179-
# prior to returning data so that the group can stabilize faster
1180-
if self._coordinator.need_rejoin():
1181-
continue
1182-
1183-
# We need to make sure we at least keep up with scheduled tasks,
1184-
# like heartbeats, auto-commits, and metadata refreshes
1185-
timeout_at = self._next_timeout()
1186-
1187-
# Short-circuit the fetch iterator if we are already timed out
1188-
# to avoid any unintentional interaction with fetcher setup
1189-
if time.time() > timeout_at:
1190-
continue
1191-
1192-
for msg in self._fetcher:
1193-
yield msg
1194-
if time.time() > timeout_at:
1195-
log.debug("internal iterator timeout - breaking for poll")
1196-
break
1197-
self._client.poll(timeout_ms=0)
1198-
1199-
# An else block on a for loop only executes if there was no break
1200-
# so this should only be called on a StopIteration from the fetcher
1201-
# We assume that it is safe to init_fetches when fetcher is done
1202-
# i.e., there are no more records stored internally
1203-
else:
1204-
self._fetcher.send_fetches()
1205-
1206-
def _next_timeout(self):
1207-
timeout = min(self._consumer_timeout,
1208-
self._client.cluster.ttl() / 1000.0 + time.time(),
1209-
self._coordinator.time_to_next_poll() + time.time())
1210-
return timeout
1211-
12121155
def __iter__(self): # pylint: disable=non-iterator-returned
12131156
return self
12141157

12151158
def __next__(self):
12161159
if self._closed:
12171160
raise StopIteration('KafkaConsumer closed')
1218-
# Now that the heartbeat thread runs in the background
1219-
# there should be no reason to maintain a separate iterator
1220-
# but we'll keep it available for a few releases just in case
1221-
if self.config['legacy_iterator']:
1222-
return self.next_v1()
1223-
else:
1224-
return self.next_v2()
1225-
1226-
def next_v2(self):
12271161
self._set_consumer_timeout()
12281162
while time.time() < self._consumer_timeout:
12291163
if not self._iterator:
@@ -1234,17 +1168,6 @@ def next_v2(self):
12341168
self._iterator = None
12351169
raise StopIteration()
12361170

1237-
def next_v1(self):
1238-
if not self._iterator:
1239-
self._iterator = self._message_generator()
1240-
1241-
self._set_consumer_timeout()
1242-
try:
1243-
return next(self._iterator)
1244-
except StopIteration:
1245-
self._iterator = None
1246-
raise
1247-
12481171
def _set_consumer_timeout(self):
12491172
# consumer_timeout_ms can be used to stop iteration early
12501173
if self.config['consumer_timeout_ms'] >= 0:

0 commit comments

Comments
 (0)