diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..95c5f7cb5 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -368,18 +368,26 @@ def _maybe_connect(self, node_id): conn = self._conns.get(node_id) if conn is None: - broker = self.cluster.broker_metadata(node_id) - assert broker, 'Broker id %s not in current metadata' % (node_id,) - - log.debug("Initiating connection to node %s at %s:%s", - node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) - cb = WeakMethod(self._conn_state_change) - conn = BrokerConnection(host, broker.port, afi, - state_change_callback=cb, - node_id=node_id, - **self.config) - self._conns[node_id] = conn + broker_metadata = self.cluster.broker_metadata(node_id) + + # The broker may have been removed from the cluster after the + # call to `maybe_connect`. At this point there is no way to + # recover, so just ignore the connection + if broker_metadata is None: + log.debug("Node %s is not available anymore, discarding connection", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + return False + else: + log.debug("Initiating connection to node %s at %s:%s", + node_id, broker_metadata.host, broker_metadata.port) + host, port, afi = get_ip_port_afi(broker_metadata.host) + cb = WeakMethod(self._conn_state_change) + conn = BrokerConnection(host, broker_metadata.port, afi, + state_change_callback=cb, + node_id=node_id, + **self.config) + self._conns[node_id] = conn # Check if existing connection should be recreated because host/port changed elif self._should_recycle_connection(conn): diff --git a/test/test_client_async.py b/test/test_client_async.py index 74da66a36..1eb66c691 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -71,13 +71,8 @@ def test_can_connect(cli, conn): def test_maybe_connect(cli, conn): - try: - # Node not in metadata, raises AssertionError - cli._maybe_connect(2) - except AssertionError: - pass - else: - assert False, 'Exception not raised' + # Node not in metadata should be ignored + cli._maybe_connect(2) # New node_id creates a conn object assert 0 not in cli._conns