diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index dd40bf5d4..0edd50616 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -252,12 +252,16 @@ def ensure_coordinator_ready(self, timeout_ms=None): # so we will just pick a node at random and treat # it as the "coordinator" if self.config['api_version'] < (0, 8, 2): - self.coordinator_id = self._client.least_loaded_node() - if self.coordinator_id is not None: + maybe_coordinator_id = self._client.least_loaded_node() + if maybe_coordinator_id is None or self._client.cluster.is_bootstrap(maybe_coordinator_id): + future = Future().failure(Errors.NoBrokersAvailable()) + else: + self.coordinator_id = maybe_coordinator_id self._client.maybe_connect(self.coordinator_id) - continue + continue + else: + future = self.lookup_coordinator() - future = self.lookup_coordinator() self._client.poll(future=future, timeout_ms=inner_timeout_ms()) if not future.is_done: @@ -677,7 +681,7 @@ def _send_group_coordinator_request(self): Future: resolves to the node id of the coordinator """ node_id = self._client.least_loaded_node() - if node_id is None: + if node_id is None or self._client.cluster.is_bootstrap(node_id): return Future().failure(Errors.NoBrokersAvailable()) elif not self._client.ready(node_id, metadata_priority=False):