Skip to content

Commit 215b626

Browse files
committed
Support client.poll with future and timeout_ms
1 parent 034b4bd commit 215b626

File tree

5 files changed

+36
-12
lines changed

5 files changed

+36
-12
lines changed

kafka/client_async.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -636,11 +636,14 @@ def poll(self, timeout_ms=None, future=None):
636636
Returns:
637637
list: responses received (can be empty)
638638
"""
639-
if timeout_ms is None:
640-
timeout_ms = self.config['request_timeout_ms']
641-
elif not isinstance(timeout_ms, (int, float)):
639+
if not isinstance(timeout_ms, (int, float, type(None))):
642640
raise TypeError('Invalid type for timeout: %s' % type(timeout_ms))
643641

642+
begin = time.time()
643+
if timeout_ms is not None:
644+
timeout_at = begin + (timeout_ms / 1000)
645+
else:
646+
timeout_at = begin + (self.config['request_timeout_ms'] / 1000)
644647
# Loop for futures, break after first loop if None
645648
responses = []
646649
while True:
@@ -665,11 +668,12 @@ def poll(self, timeout_ms=None, future=None):
665668
if future is not None and future.is_done:
666669
timeout = 0
667670
else:
671+
user_timeout_ms = 1000 * max(0, timeout_at - time.time())
668672
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
669673
request_timeout_ms = self._next_ifr_request_timeout_ms()
670-
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
674+
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", user_timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
671675
timeout = min(
672-
timeout_ms,
676+
user_timeout_ms,
673677
metadata_timeout_ms,
674678
idle_connection_timeout_ms,
675679
request_timeout_ms)
@@ -683,7 +687,11 @@ def poll(self, timeout_ms=None, future=None):
683687

684688
# If all we had was a timeout (future is None) - only do one poll
685689
# If we do have a future, we keep looping until it is done
686-
if future is None or future.is_done:
690+
if future is None:
691+
break
692+
elif future.is_done:
693+
break
694+
elif timeout_ms is not None and time.time() >= timeout_at:
687695
break
688696

689697
return responses

kafka/consumer/fetcher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None):
276276
future = self._send_list_offsets_requests(timestamps)
277277
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
278278

279+
# Timeout w/o future completion
280+
if not future.is_done:
281+
break
282+
279283
if future.succeeded():
280284
return future.value
281285
if not future.retriable():

kafka/coordinator/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,17 @@ def ensure_coordinator_ready(self, timeout_ms=None):
260260
future = self.lookup_coordinator()
261261
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
262262

263+
if not future.is_done:
264+
raise Errors.KafkaTimeoutError()
265+
263266
if future.failed():
264267
if future.retriable():
265268
if getattr(future.exception, 'invalid_metadata', False):
266269
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
267270
metadata_update = self._client.cluster.request_update()
268271
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())
272+
if not metadata_update.is_done:
273+
raise Errors.KafkaTimeoutError()
269274
else:
270275
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
271276
else:
@@ -416,6 +421,9 @@ def ensure_active_group(self, timeout_ms=None):
416421

417422
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
418423

424+
if not future.is_done:
425+
raise Errors.KafkaTimeoutError()
426+
419427
if future.succeeded():
420428
self._on_join_complete(self._generation.generation_id,
421429
self._generation.member_id,

test/fixtures.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
1515

1616
from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
17-
from kafka.errors import InvalidReplicationFactorError
17+
from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError
1818
from kafka.protocol.admin import CreateTopicsRequest
1919
from kafka.protocol.metadata import MetadataRequest
2020
from test.testutil import env_kafka_version, random_string
@@ -555,6 +555,8 @@ def _failure(error):
555555
future.error_on_callbacks = True
556556
future.add_errback(_failure)
557557
self._client.poll(future=future, timeout_ms=timeout)
558+
if not future.is_done:
559+
raise KafkaTimeoutError()
558560
return future.value
559561
except Exception as exc:
560562
time.sleep(1)

test/test_client_async.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ def test_poll(mocker):
228228
ifr_request_timeout = mocker.patch.object(KafkaClient, '_next_ifr_request_timeout_ms')
229229
_poll = mocker.patch.object(KafkaClient, '_poll')
230230
cli = KafkaClient(api_version=(0, 9))
231+
now = time.time()
232+
t = mocker.patch('time.time')
233+
t.return_value = now
231234

232235
# metadata timeout wins
233236
ifr_request_timeout.return_value = float('inf')
@@ -346,17 +349,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
346349
t.return_value = now
347350

348351
# first poll attempts connection
349-
client.poll(timeout_ms=12345678)
350-
client._poll.assert_called_with(12345.678)
352+
client.poll()
353+
client._poll.assert_called()
351354
client._init_connect.assert_called_once_with('foobar')
352355

353356
# poll while connecting should not attempt a new connection
354357
client._connecting.add('foobar')
355358
client._can_connect.reset_mock()
356-
client.poll(timeout_ms=12345678)
357-
client._poll.assert_called_with(12345.678)
359+
client.poll()
360+
client._poll.assert_called()
358361
assert not client._can_connect.called
359-
360362
assert not client._metadata_refresh_in_progress
361363

362364

0 commit comments

Comments
 (0)