diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 27be4588d..30f14e3de 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -694,7 +694,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): Returns: dict: Map of topic to list of records (may be empty). """ - self._coordinator.poll() + begin = time.time() + self._coordinator.poll(timeout_ms=timeout_ms) # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -720,7 +721,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): if len(futures): self._client.poll(timeout_ms=0) - timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll() * 1000) + timeout_ms -= (time.time() - begin) * 1000 + timeout_ms = max(0, min(timeout_ms, self._coordinator.time_to_next_poll() * 1000)) self._client.poll(timeout_ms=timeout_ms) # after the long poll, we should check whether the group needs to rebalance # prior to returning data so that the group can stabilize faster @@ -1134,7 +1136,7 @@ def _update_fetch_positions(self, partitions): self._fetcher.update_fetch_positions(partitions) def _message_generator_v2(self): - timeout_ms = 1000 * (self._consumer_timeout - time.time()) + timeout_ms = 1000 * max(0, self._consumer_timeout - time.time()) record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) for tp, records in six.iteritems(record_map): # Generators are stateful, and it is possible that the tp / records @@ -1154,17 +1156,20 @@ def _message_generator_v2(self): def _message_generator(self): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' + + def inner_poll_ms(): + return max(0, min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])) + while time.time() < self._consumer_timeout: - self._coordinator.poll() + self._coordinator.poll(timeout_ms=inner_poll_ms()) # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms']) - self._client.poll(timeout_ms=poll_ms) + self._client.poll(timeout_ms=inner_poll_ms()) # after the long poll, we should check whether the group needs to rebalance # prior to returning data so that the group can stabilize faster diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index a30b5a9b8..0d4aedb88 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -234,10 +234,25 @@ def coordinator(self): else: return self.coordinator_id - def ensure_coordinator_ready(self): - """Block until the coordinator for this group is known - (and we have an active connection -- java client uses unsent queue). + def ensure_coordinator_ready(self, timeout_ms=None): + """Block until the coordinator for this group is known. + + Keyword Arguments: + timeout_ms (numeric, optional): Maximum number of milliseconds to + block waiting to find coordinator. Default: None. + + Raises: KafkaTimeoutError if timeout_ms is not None """ + elapsed = 0.0 # noqa: F841 + begin = time.time() + def inner_timeout_ms(): + if timeout_ms is None: + return None + elapsed = (time.time() - begin) * 1000 + if elapsed >= timeout_ms: + raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator') + return max(0, timeout_ms - elapsed) + with self._client._lock, self._lock: while self.coordinator_unknown(): @@ -251,16 +266,16 @@ def ensure_coordinator_ready(self): continue future = self.lookup_coordinator() - self._client.poll(future=future) + self._client.poll(future=future, timeout_ms=inner_timeout_ms()) if future.failed(): if future.retriable(): if getattr(future.exception, 'invalid_metadata', False): log.debug('Requesting metadata for group coordinator request: %s', future.exception) metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) + self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms()) else: - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000) else: raise future.exception # pylint: disable-msg=raising-bad-type @@ -339,14 +354,31 @@ def _handle_join_failure(self, _): with self._lock: self.state = MemberState.UNJOINED - def ensure_active_group(self): - """Ensure that the group is active (i.e. joined and synced)""" + def ensure_active_group(self, timeout_ms=None): + """Ensure that the group is active (i.e. joined and synced) + + Keyword Arguments: + timeout_ms (numeric, optional): Maximum number of milliseconds to + block waiting to join group. Default: None. + + Raises: KafkaTimeoutError if timeout_ms is not None + """ with self._client._lock, self._lock: if self._heartbeat_thread is None: self._start_heartbeat_thread() + elapsed = 0.0 # noqa: F841 + begin = time.time() + def inner_timeout_ms(): + if timeout_ms is None: + return None + elapsed = (time.time() - begin) * 1000 + if elapsed >= timeout_ms: + raise Errors.KafkaTimeoutError() + return max(0, timeout_ms - elapsed) + while self.need_rejoin() or self._rejoin_incomplete(): - self.ensure_coordinator_ready() + self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) # call on_join_prepare if needed. We set a flag # to make sure that we do not call it a second @@ -367,7 +399,7 @@ def ensure_active_group(self): while not self.coordinator_unknown(): if not self._client.in_flight_request_count(self.coordinator_id): break - self._client.poll(timeout_ms=200) + self._client.poll(timeout_ms=min(200, inner_timeout_ms())) else: continue @@ -400,7 +432,7 @@ def ensure_active_group(self): else: future = self.join_future - self._client.poll(future=future) + self._client.poll(future=future, timeout_ms=inner_timeout_ms()) if future.succeeded(): self._on_join_complete(self._generation.generation_id, @@ -419,7 +451,7 @@ def ensure_active_group(self): continue elif not future.retriable(): raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000) def _rejoin_incomplete(self): return self.join_future is not None diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 9c662ce7f..73cf25297 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -258,7 +258,7 @@ def _on_join_complete(self, generation, member_id, protocol, self._subscription.listener, self.group_id, assigned) - def poll(self): + def poll(self, timeout_ms=None): """ Poll for coordinator events. Only applicable if group_id is set, and broker version supports GroupCoordinators. This ensures that the @@ -269,31 +269,45 @@ def poll(self): if self.group_id is None: return - self._invoke_completed_offset_commit_callbacks() - self.ensure_coordinator_ready() - - if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): - if self.need_rejoin(): - # due to a race condition between the initial metadata fetch and the - # initial rebalance, we need to ensure that the metadata is fresh - # before joining initially, and then request the metadata update. If - # metadata update arrives while the rebalance is still pending (for - # example, when the join group is still inflight), then we will lose - # track of the fact that we need to rebalance again to reflect the - # change to the topic subscription. Without ensuring that the - # metadata is fresh, any metadata update that changes the topic - # subscriptions and arrives while a rebalance is in progress will - # essentially be ignored. See KAFKA-3949 for the complete - # description of the problem. - if self._subscription.subscribed_pattern: - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) - - self.ensure_active_group() - - self.poll_heartbeat() - - self._maybe_auto_commit_offsets_async() + elapsed = 0.0 # noqa: F841 + begin = time.time() + def inner_timeout_ms(): + if timeout_ms is None: + return None + elapsed = (time.time() - begin) * 1000 + if elapsed >= timeout_ms: + raise Errors.KafkaTimeoutError() + return max(0, timeout_ms - elapsed) + + try: + self._invoke_completed_offset_commit_callbacks() + self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) + + if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): + if self.need_rejoin(): + # due to a race condition between the initial metadata fetch and the + # initial rebalance, we need to ensure that the metadata is fresh + # before joining initially, and then request the metadata update. If + # metadata update arrives while the rebalance is still pending (for + # example, when the join group is still inflight), then we will lose + # track of the fact that we need to rebalance again to reflect the + # change to the topic subscription. Without ensuring that the + # metadata is fresh, any metadata update that changes the topic + # subscriptions and arrives while a rebalance is in progress will + # essentially be ignored. See KAFKA-3949 for the complete + # description of the problem. + if self._subscription.subscribed_pattern: + metadata_update = self._client.cluster.request_update() + self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms()) + + self.ensure_active_group(timeout_ms=inner_timeout_ms()) + + self.poll_heartbeat() + + self._maybe_auto_commit_offsets_async() + + except Errors.KafkaTimeoutError: + return def time_to_next_poll(self): """Return seconds (float) remaining until :meth:`.poll` should be called again"""