Skip to content

Commit a582aeb

Browse files
committed
Propagate timeout to _retrieve_offsets
The default timeout of _retrieve_offsets is infinite, this makes the Consumer block indefinitely even if poll was called with a timeout. Propagate the timeout from the Consumer to the Fetcher operation, removing some of the timeout as more and more sub-operation consume the total allowed timeout. During a `poll` operation, the timeout for positions and offsets is kept separate because poll will always keep running and accumulate messages until the end of the timeout. We want to be able to poll messages for a short time (or for zero time, the default) while allowing some delay when retrieving the positions.
1 parent e0ab864 commit a582aeb

File tree

3 files changed

+27
-21
lines changed

3 files changed

+27
-21
lines changed

kafka/consumer/fetcher.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,18 @@ def send_fetches(self):
131131
self._clean_done_fetch_futures()
132132
return futures
133133

134-
def reset_offsets_if_needed(self, partitions):
134+
def reset_offsets_if_needed(self, partitions, timeout_ms=float("inf")):
135135
"""Lookup and set offsets for any partitions which are awaiting an
136136
explicit reset.
137137
138138
Arguments:
139139
partitions (set of TopicPartitions): the partitions to reset
140140
"""
141+
end_time = time.time() + timeout_ms / 1000
141142
for tp in partitions:
142143
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
143144
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
144-
self._reset_offset(tp)
145+
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))
145146

146147
def _clean_done_fetch_futures(self):
147148
while True:
@@ -156,7 +157,7 @@ def in_flight_fetches(self):
156157
self._clean_done_fetch_futures()
157158
return bool(self._fetch_futures)
158159

159-
def update_fetch_positions(self, partitions):
160+
def update_fetch_positions(self, partitions, timeout_ms=float("inf")):
160161
"""Update the fetch positions for the provided partitions.
161162
162163
Arguments:
@@ -167,6 +168,7 @@ def update_fetch_positions(self, partitions):
167168
partition and no reset policy is available
168169
"""
169170
# reset the fetch position to the committed position
171+
end_time = time.time() + timeout_ms / 1000
170172
for tp in partitions:
171173
if not self._subscriptions.is_assigned(tp):
172174
log.warning("partition %s is not assigned - skipping offset"
@@ -178,12 +180,12 @@ def update_fetch_positions(self, partitions):
178180
continue
179181

180182
if self._subscriptions.is_offset_reset_needed(tp):
181-
self._reset_offset(tp)
183+
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))
182184
elif self._subscriptions.assignment[tp].committed is None:
183185
# there's no committed position, so we need to reset with the
184186
# default strategy
185187
self._subscriptions.need_offset_reset(tp)
186-
self._reset_offset(tp)
188+
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))
187189
else:
188190
committed = self._subscriptions.assignment[tp].committed.offset
189191
log.debug("Resetting offset for partition %s to the committed"
@@ -215,7 +217,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
215217
offsets[tp] = offsets[tp][0]
216218
return offsets
217219

218-
def _reset_offset(self, partition):
220+
def _reset_offset(self, partition, timeout_ms):
219221
"""Reset offsets for the given partition using the offset reset strategy.
220222
221223
Arguments:
@@ -234,7 +236,7 @@ def _reset_offset(self, partition):
234236

235237
log.debug("Resetting offset for partition %s to %s offset.",
236238
partition, strategy)
237-
offsets = self._retrieve_offsets({partition: timestamp})
239+
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms)
238240

239241
if partition in offsets:
240242
offset = offsets[partition][0]

kafka/consumer/group.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ def partitions_for_topic(self, topic):
615615
partitions = cluster.partitions_for_topic(topic)
616616
return partitions
617617

618-
def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
618+
def poll(self, timeout_ms=0, positions_timeout_ms=float("inf"), max_records=None, update_offsets=True):
619619
"""Fetch data from assigned topics / partitions.
620620
621621
Records are fetched and returned in batches by topic-partition.
@@ -656,7 +656,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
656656
start = time.time()
657657
remaining = timeout_ms
658658
while not self._closed:
659-
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
659+
records = self._poll_once(remaining, positions_timeout_ms, max_records, update_offsets=update_offsets)
660660
if records:
661661
return records
662662

@@ -668,7 +668,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
668668

669669
return {}
670670

671-
def _poll_once(self, timeout_ms, max_records, update_offsets=True):
671+
def _poll_once(self, timeout_ms, positions_timeout_ms, max_records, update_offsets=True):
672672
"""Do one round of polling. In addition to checking for new data, this does
673673
any needed heart-beating, auto-commits, and offset updates.
674674
@@ -683,7 +683,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
683683
# Fetch positions if we have partitions we're subscribed to that we
684684
# don't know the offset for
685685
if not self._subscription.has_all_fetch_positions():
686-
self._update_fetch_positions(self._subscription.missing_fetch_positions())
686+
self._update_fetch_positions(self._subscription.missing_fetch_positions(), positions_timeout_ms)
687687

688688
# If data is available already, e.g. from a previous network client
689689
# poll() call to commit, then just return it immediately
@@ -714,7 +714,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
714714
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
715715
return records
716716

717-
def position(self, partition):
717+
def position(self, partition, timeout_ms=float("inf")):
718718
"""Get the offset of the next record that will be fetched
719719
720720
Arguments:
@@ -728,7 +728,7 @@ def position(self, partition):
728728
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
729729
offset = self._subscription.assignment[partition].position
730730
if offset is None:
731-
self._update_fetch_positions([partition])
731+
self._update_fetch_positions([partition], timeout_ms)
732732
offset = self._subscription.assignment[partition].position
733733
return offset
734734

@@ -1087,7 +1087,7 @@ def _use_consumer_group(self):
10871087
return False
10881088
return True
10891089

1090-
def _update_fetch_positions(self, partitions):
1090+
def _update_fetch_positions(self, partitions, timeout_ms):
10911091
"""Set the fetch position to the committed position (if there is one)
10921092
or reset it using the offset reset policy the user has configured.
10931093
@@ -1099,12 +1099,13 @@ def _update_fetch_positions(self, partitions):
10991099
NoOffsetForPartitionError: If no offset is stored for a given
11001100
partition and no offset reset policy is defined.
11011101
"""
1102+
end_time = time.time() + timeout_ms / 1000
11021103
# Lookup any positions for partitions which are awaiting reset (which may be the
11031104
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
11041105
# this check first to avoid an unnecessary lookup of committed offsets (which
11051106
# typically occurs when the user is manually assigning partitions and managing
11061107
# their own offsets).
1107-
self._fetcher.reset_offsets_if_needed(partitions)
1108+
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms)
11081109

11091110
if not self._subscription.has_all_fetch_positions():
11101111
# if we still don't have offsets for all partitions, then we should either seek
@@ -1115,7 +1116,8 @@ def _update_fetch_positions(self, partitions):
11151116
self._coordinator.refresh_committed_offsets_if_needed()
11161117

11171118
# Then, do any offset lookups in case some positions are not known
1118-
self._fetcher.update_fetch_positions(partitions)
1119+
update_timeout_ms = max(0.0, 1000 * (end_time - time.time()))
1120+
self._fetcher.update_fetch_positions(partitions, update_timeout_ms)
11191121

11201122
def _message_generator_v2(self):
11211123
timeout_ms = 1000 * (self._consumer_timeout - time.time())
@@ -1145,7 +1147,8 @@ def _message_generator(self):
11451147
# Fetch offsets for any subscribed partitions that we arent tracking yet
11461148
if not self._subscription.has_all_fetch_positions():
11471149
partitions = self._subscription.missing_fetch_positions()
1148-
self._update_fetch_positions(partitions)
1150+
update_timeout_ms = max(0.0, 1000 * (self._consumer_timeout - time.time()))
1151+
self._update_fetch_positions(partitions, update_timeout_ms)
11491152

11501153
poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])
11511154
self._client.poll(timeout_ms=poll_ms)

test/test_fetcher.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55

66
from collections import OrderedDict
7+
from unittest.mock import ANY
78
import itertools
89
import time
910

@@ -114,11 +115,11 @@ def test_update_fetch_positions(fetcher, topic, mocker):
114115
# partition needs reset, no committed offset
115116
fetcher._subscriptions.need_offset_reset(partition)
116117
fetcher._subscriptions.assignment[partition].awaiting_reset = False
117-
fetcher.update_fetch_positions([partition])
118-
fetcher._reset_offset.assert_called_with(partition)
118+
fetcher.update_fetch_positions([partition], timeout_ms=1234)
119+
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)
119120
assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
120121
fetcher.update_fetch_positions([partition])
121-
fetcher._reset_offset.assert_called_with(partition)
122+
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)
122123

123124
# partition needs reset, has committed offset
124125
fetcher._reset_offset.reset_mock()
@@ -139,7 +140,7 @@ def test__reset_offset(fetcher, mocker):
139140
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')
140141

141142
mocked.return_value = {tp: (1001, None)}
142-
fetcher._reset_offset(tp)
143+
fetcher._reset_offset(tp, timeout_ms=1234)
143144
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
144145
assert fetcher._subscriptions.assignment[tp].position == 1001
145146

0 commit comments

Comments
 (0)