Skip to content

bugfix: race among _connecting and cluster metadata #2189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,18 +368,26 @@ def _maybe_connect(self, node_id):
conn = self._conns.get(node_id)

if conn is None:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
host, port, afi = get_ip_port_afi(broker.host)
cb = WeakMethod(self._conn_state_change)
conn = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
node_id=node_id,
**self.config)
self._conns[node_id] = conn
broker_metadata = self.cluster.broker_metadata(node_id)

# The broker may have been removed from the cluster after the
# call to `maybe_connect`. At this point there is no way to
# recover, so just ignore the connection
if broker_metadata is None:
log.debug("Node %s is not available anymore, discarding connection", node_id)
if node_id in self._connecting:
self._connecting.remove(node_id)
return False
else:
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker_metadata.host, broker_metadata.port)
host, port, afi = get_ip_port_afi(broker_metadata.host)
cb = WeakMethod(self._conn_state_change)
conn = BrokerConnection(host, broker_metadata.port, afi,
state_change_callback=cb,
node_id=node_id,
**self.config)
self._conns[node_id] = conn

# Check if existing connection should be recreated because host/port changed
elif self._should_recycle_connection(conn):
Expand Down
9 changes: 2 additions & 7 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,8 @@ def test_can_connect(cli, conn):


def test_maybe_connect(cli, conn):
try:
# Node not in metadata, raises AssertionError
cli._maybe_connect(2)
except AssertionError:
pass
else:
assert False, 'Exception not raised'
# Node not in metadata should be ignored
cli._maybe_connect(2)

# New node_id creates a conn object
assert 0 not in cli._conns
Expand Down