Skip to content

Commit e15ed37

Browse files
committed
bugfix: race among _connecting and cluster metadata
A call to `maybe_connect` can be performed while the cluster metadata is being updated. If that happens, the assumption that every entry in `_connecting` has metadata won't hold. The existing assert will then raise on every subsequent call to `poll` driving the client instance unusable. This fixes the issue by ignoring connetion request to nodes that do not have the metadata available anymore.
1 parent 9feeb79 commit e15ed37

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

kafka/client_async.py

+20-12
Original file line numberDiff line numberDiff line change
@@ -368,18 +368,26 @@ def _maybe_connect(self, node_id):
368368
conn = self._conns.get(node_id)
369369

370370
if conn is None:
371-
broker = self.cluster.broker_metadata(node_id)
372-
assert broker, 'Broker id %s not in current metadata' % (node_id,)
373-
374-
log.debug("Initiating connection to node %s at %s:%s",
375-
node_id, broker.host, broker.port)
376-
host, port, afi = get_ip_port_afi(broker.host)
377-
cb = WeakMethod(self._conn_state_change)
378-
conn = BrokerConnection(host, broker.port, afi,
379-
state_change_callback=cb,
380-
node_id=node_id,
381-
**self.config)
382-
self._conns[node_id] = conn
371+
broker_metadata = self.cluster.broker_metadata(node_id)
372+
373+
# The broker may have been removed from the cluster after the
374+
# call to `maybe_connect`. At this point there is no way to
375+
# recover, so just ignore the connection
376+
if broker_metadata is None:
377+
log.debug("Node %s is not available anymore, discarding connection", node_id)
378+
if node_id in self._connecting:
379+
self._connecting.remove(node_id)
380+
return False
381+
else:
382+
log.debug("Initiating connection to node %s at %s:%s",
383+
node_id, broker_metadata.host, broker_metadata.port)
384+
host, port, afi = get_ip_port_afi(broker_metadata.host)
385+
cb = WeakMethod(self._conn_state_change)
386+
conn = BrokerConnection(host, broker_metadata.port, afi,
387+
state_change_callback=cb,
388+
node_id=node_id,
389+
**self.config)
390+
self._conns[node_id] = conn
383391

384392
# Check if existing connection should be recreated because host/port changed
385393
elif self._should_recycle_connection(conn):

test/test_client_async.py

+2-7
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,8 @@ def test_can_connect(cli, conn):
7171

7272

7373
def test_maybe_connect(cli, conn):
74-
try:
75-
# Node not in metadata, raises AssertionError
76-
cli._maybe_connect(2)
77-
except AssertionError:
78-
pass
79-
else:
80-
assert False, 'Exception not raised'
74+
# Node not in metadata should be ignored
75+
cli._maybe_connect(2)
8176

8277
# New node_id creates a conn object
8378
assert 0 not in cli._conns

0 commit comments

Comments
 (0)