Skip to content

Client connection / maybe_refresh_metadata changes #2507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def _should_recycle_connection(self, conn):

return False

def _maybe_connect(self, node_id):
def _init_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id.

Returns True if connection object exists and is connected / connecting
Expand Down Expand Up @@ -427,10 +427,8 @@ def _maybe_connect(self, node_id):
**self.config)
self._conns[node_id] = conn

elif conn.connected():
return True

conn.connect()
if conn.disconnected():
conn.connect()
return not conn.disconnected()

def ready(self, node_id, metadata_priority=True):
Expand Down Expand Up @@ -621,15 +619,18 @@ def poll(self, timeout_ms=None, future=None):
if self._closed:
break

# Send a metadata request if needed (or initiate new connection)
metadata_timeout_ms = self._maybe_refresh_metadata()

# Attempt to complete pending connections
for node_id in list(self._connecting):
# False return means no more connection progress is possible
# Connected nodes will update _connecting via state_change callback
if not self._maybe_connect(node_id):
self._connecting.remove(node_id)
if not self._init_connect(node_id):
# It's possible that the connection attempt triggered a state change
# but if not, make sure to remove from _connecting list
if node_id in self._connecting:
self._connecting.remove(node_id)

# Send a metadata request if needed (or initiate new connection)
metadata_timeout_ms = self._maybe_refresh_metadata()

# If we got a future that is already done, don't block in _poll
if future is not None and future.is_done:
Expand Down Expand Up @@ -679,6 +680,8 @@ def _poll(self, timeout):
self._register_send_sockets()

start_select = time.time()
if timeout == float('inf'):
timeout = None
ready = self._selector.select(timeout)
end_select = time.time()
if self._sensors:
Expand Down Expand Up @@ -893,6 +896,26 @@ def _maybe_refresh_metadata(self, wakeup=False):
log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms)
return next_connect_ms

if not self._can_send_request(node_id):
# If there's any connection establishment underway, wait until it completes. This prevents
# the client from unnecessarily connecting to additional nodes while a previous connection
# attempt has not been completed.
if self._connecting:
return float('inf')

elif self._can_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
self._connecting.add(node_id)
if not self._init_connect(node_id):
if node_id in self._connecting:
self._connecting.remove(node_id)
# Connection attempt failed immediately, need to retry with a different node
return self.config['reconnect_backoff_ms']
else:
# Existing connection with max in flight requests. Wait for request to complete.
return self.config['request_timeout_ms']

# Recheck node_id in case we were able to connect immediately above
if self._can_send_request(node_id):
topics = list(self._topics)
if not topics and self.cluster.is_bootstrap(node_id):
Expand All @@ -917,20 +940,11 @@ def refresh_done(val_or_error):
future.add_errback(refresh_done)
return self.config['request_timeout_ms']

# If there's any connection establishment underway, wait until it completes. This prevents
# the client from unnecessarily connecting to additional nodes while a previous connection
# attempt has not been completed.
# Should only get here if still connecting
if self._connecting:
return float('inf')

if self.maybe_connect(node_id, wakeup=wakeup):
log.debug("Initializing connection to node %s for metadata request", node_id)
return float('inf')

# connected but can't send more, OR connecting
# In either case we just need to wait for a network event
# to let us know the selected connection might be usable again.
return float('inf')
else:
return self.config['reconnect_backoff_ms']

def get_api_versions(self):
"""Return the ApiVersions map, if available.
Expand Down Expand Up @@ -973,7 +987,7 @@ def check_version(self, node_id=None, timeout=None, strict=False):
if try_node is None:
self._lock.release()
raise Errors.NoBrokersAvailable()
if not self._maybe_connect(try_node):
if not self._init_connect(try_node):
if try_node == node_id:
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
else:
Expand Down
25 changes: 12 additions & 13 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_can_connect(cli, conn):
assert cli._can_connect(0)

# Node is connected, can't reconnect
assert cli._maybe_connect(0) is True
assert cli._init_connect(0) is True
assert not cli._can_connect(0)

# Node is disconnected, can connect
Expand All @@ -70,15 +70,15 @@ def test_can_connect(cli, conn):
assert not cli._can_connect(0)


def test_maybe_connect(cli, conn):
def test_init_connect(cli, conn):
# Node not in metadata, return False
assert not cli._maybe_connect(2)
assert not cli._init_connect(2)

# New node_id creates a conn object
assert 0 not in cli._conns
conn.state = ConnectionStates.DISCONNECTED
conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
assert cli._maybe_connect(0) is True
assert cli._init_connect(0) is True
assert cli._conns[0] is conn


Expand Down Expand Up @@ -122,8 +122,8 @@ def test_ready(mocker, cli, conn):


def test_is_ready(mocker, cli, conn):
cli._maybe_connect(0)
cli._maybe_connect(1)
cli._init_connect(0)
cli._init_connect(1)

# metadata refresh blocks ready nodes
assert cli.is_ready(0)
Expand Down Expand Up @@ -166,14 +166,14 @@ def test_close(mocker, cli, conn):
assert conn.close.call_count == call_count

# Single node close
cli._maybe_connect(0)
cli._init_connect(0)
assert conn.close.call_count == call_count
cli.close(0)
call_count += 1
assert conn.close.call_count == call_count

# All node close
cli._maybe_connect(1)
cli._init_connect(1)
cli.close()
# +2 close: node 1, node bootstrap (node 0 already closed)
call_count += 2
Expand All @@ -185,7 +185,7 @@ def test_is_disconnected(cli, conn):
conn.state = ConnectionStates.DISCONNECTED
assert not cli.is_disconnected(0)

cli._maybe_connect(0)
cli._init_connect(0)
assert cli.is_disconnected(0)

conn.state = ConnectionStates.CONNECTING
Expand All @@ -210,7 +210,7 @@ def test_send(cli, conn):
assert isinstance(f.exception, Errors.NodeNotReadyError)

conn.state = ConnectionStates.CONNECTED
cli._maybe_connect(0)
cli._init_connect(0)
# ProduceRequest w/ 0 required_acks -> no response
request = ProduceRequest[0](0, 0, [])
assert request.expect_response() is False
Expand Down Expand Up @@ -339,8 +339,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_send_request', return_value=False)
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)
mocker.patch.object(client, 'maybe_connect', return_value=True)
mocker.patch.object(client, '_init_connect', return_value=True)

now = time.time()
t = mocker.patch('time.time')
Expand All @@ -349,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
# first poll attempts connection
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(12345.678)
client.maybe_connect.assert_called_once_with('foobar', wakeup=False)
client._init_connect.assert_called_once_with('foobar')

# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
Expand Down