From e15ed3764f8eafd340c22f72b157dc30bc04400a Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Tue, 5 Jan 2021 13:20:38 +0100 Subject: [PATCH] bugfix: race among _connecting and cluster metadata A call to `maybe_connect` can be performed while the cluster metadata is being updated. If that happens, the assumption that every entry in `_connecting` has metadata won't hold. The existing assert will then raise on every subsequent call to `poll` driving the client instance unusable. This fixes the issue by ignoring connetion request to nodes that do not have the metadata available anymore. --- kafka/client_async.py | 32 ++++++++++++++++++++------------ test/test_client_async.py | 9 ++------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..95c5f7cb5 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -368,18 +368,26 @@ def _maybe_connect(self, node_id): conn = self._conns.get(node_id) if conn is None: - broker = self.cluster.broker_metadata(node_id) - assert broker, 'Broker id %s not in current metadata' % (node_id,) - - log.debug("Initiating connection to node %s at %s:%s", - node_id, broker.host, broker.port) - host, port, afi = get_ip_port_afi(broker.host) - cb = WeakMethod(self._conn_state_change) - conn = BrokerConnection(host, broker.port, afi, - state_change_callback=cb, - node_id=node_id, - **self.config) - self._conns[node_id] = conn + broker_metadata = self.cluster.broker_metadata(node_id) + + # The broker may have been removed from the cluster after the + # call to `maybe_connect`. At this point there is no way to + # recover, so just ignore the connection + if broker_metadata is None: + log.debug("Node %s is not available anymore, discarding connection", node_id) + if node_id in self._connecting: + self._connecting.remove(node_id) + return False + else: + log.debug("Initiating connection to node %s at %s:%s", + node_id, broker_metadata.host, broker_metadata.port) + host, port, afi = get_ip_port_afi(broker_metadata.host) + cb = WeakMethod(self._conn_state_change) + conn = BrokerConnection(host, broker_metadata.port, afi, + state_change_callback=cb, + node_id=node_id, + **self.config) + self._conns[node_id] = conn # Check if existing connection should be recreated because host/port changed elif self._should_recycle_connection(conn): diff --git a/test/test_client_async.py b/test/test_client_async.py index 74da66a36..1eb66c691 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -71,13 +71,8 @@ def test_can_connect(cli, conn): def test_maybe_connect(cli, conn): - try: - # Node not in metadata, raises AssertionError - cli._maybe_connect(2) - except AssertionError: - pass - else: - assert False, 'Exception not raised' + # Node not in metadata should be ignored + cli._maybe_connect(2) # New node_id creates a conn object assert 0 not in cli._conns