diff --git a/kafka/client_async.py b/kafka/client_async.py index 78ff1c118..69f91c0f0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -236,7 +236,6 @@ def __init__(self, **configs): self._api_versions = None self._connecting = set() self._sending = set() - self._refresh_on_disconnects = True # Not currently used, but data is collected internally self._last_bootstrap = 0 @@ -382,7 +381,7 @@ def _conn_state_change(self, node_id, sock, conn): elif self.cluster.is_bootstrap(node_id): self._bootstrap_fails += 1 - elif self._refresh_on_disconnects and not self._closed and not idle_disconnect: + elif conn.connect_failed() and not self._closed and not idle_disconnect: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() diff --git a/kafka/conn.py b/kafka/conn.py index 2f8c2491c..247e88fd7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -850,6 +850,10 @@ def disconnected(self): """Return True iff socket is closed""" return self.state is ConnectionStates.DISCONNECTED + def connect_failed(self): + """Return True iff connection attempt failed after attempting all dns records""" + return self.disconnected() and self.last_attempt >= 0 and len(self._gai) == 0 + def _reset_reconnect_backoff(self): self._failures = 0 self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0