Skip to content

Call ApiVersionsRequest during connection, prior to Sasl Handshake #2493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 55 additions & 47 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def _can_connect(self, node_id):

def _conn_state_change(self, node_id, sock, conn):
with self._lock:
if conn.connecting():
if conn.state is ConnectionStates.CONNECTING:
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
Expand All @@ -315,7 +315,19 @@ def _conn_state_change(self, node_id, sock, conn):
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()

elif conn.connected():
elif conn.state is ConnectionStates.API_VERSIONS_SEND:
try:
self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
self._selector.modify(sock, selectors.EVENT_WRITE, conn)

elif conn.state in (ConnectionStates.API_VERSIONS_RECV, ConnectionStates.AUTHENTICATING):
try:
self._selector.register(sock, selectors.EVENT_READ, conn)
except KeyError:
self._selector.modify(sock, selectors.EVENT_READ, conn)

elif conn.state is ConnectionStates.CONNECTED:
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
self._connecting.remove(node_id)
Expand All @@ -332,6 +344,8 @@ def _conn_state_change(self, node_id, sock, conn):

if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails = 0
if self._api_versions is None:
self._api_versions = conn._api_versions

else:
for node_id in list(self._conns.keys()):
Expand Down Expand Up @@ -970,15 +984,14 @@ def refresh_done(val_or_error):
def get_api_versions(self):
"""Return the ApiVersions map, if available.

Note: A call to check_version must previously have succeeded and returned
version 0.10.0 or later
Note: Only available after bootstrap; requires broker version 0.10.0 or later.

Returns: a map of dict mapping {api_key : (min_version, max_version)},
or None if ApiVersion is not supported by the kafka cluster.
"""
return self._api_versions

def check_version(self, node_id=None, timeout=None, strict=False):
def check_version(self, node_id=None, timeout=None, **kwargs):
"""Attempt to guess the version of a Kafka broker.

Keyword Arguments:
Expand All @@ -994,50 +1007,45 @@ def check_version(self, node_id=None, timeout=None, strict=False):
Raises:
NodeNotReadyError (if node_id is provided)
NoBrokersAvailable (if node_id is None)
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
self._lock.acquire()
end = time.time() + timeout
while time.time() < end:

# It is possible that least_loaded_node falls back to bootstrap,
# which can block for an increasing backoff period
try_node = node_id or self.least_loaded_node()
if try_node is None:
self._lock.release()
raise Errors.NoBrokersAvailable()
if not self._init_connect(try_node):
if try_node == node_id:
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
else:
with self._lock:
end = time.time() + timeout
while time.time() < end:
time_remaining = max(end - time.time(), 0)
if node_id is not None and self.connection_delay(node_id) > 0:
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
if sleep_time > 0:
time.sleep(sleep_time)
continue

conn = self._conns[try_node]

# We will intentionally cause socket failures
# These should not trigger metadata refresh
self._refresh_on_disconnects = False
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
if not self._api_versions:
self._api_versions = conn.get_api_versions()
self._lock.release()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
try_node = node_id or self.least_loaded_node()
if try_node is None:
sleep_time = min(time_remaining, self.least_loaded_node_refresh_ms() / 1000.0)
if sleep_time > 0:
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
time.sleep(sleep_time)
continue
log.debug('Attempting to check version with node %s', try_node)
if not self._init_connect(try_node):
if try_node == node_id:
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
else:
continue
conn = self._conns[try_node]

while conn.connecting() and time.time() < end:
timeout_ms = min((end - time.time()) * 1000, 200)
self.poll(timeout_ms=timeout_ms)

if conn._api_version is not None:
return conn._api_version

# Timeout
else:
if node_id is not None:
self._lock.release()
raise
finally:
self._refresh_on_disconnects = True

# Timeout
else:
self._lock.release()
raise Errors.NoBrokersAvailable()
raise Errors.NodeNotReadyError(node_id)
else:
raise Errors.NoBrokersAvailable()

def api_version(self, operation, max_version=None):
"""Find the latest version of the protocol operation supported by both
Expand All @@ -1063,15 +1071,15 @@ def api_version(self, operation, max_version=None):
broker_api_versions = self._api_versions
api_key = operation[0].API_KEY
if broker_api_versions is None or api_key not in broker_api_versions:
raise IncompatibleBrokerVersion(
raise Errors.IncompatibleBrokerVersion(
"Kafka broker does not support the '{}' Kafka protocol."
.format(operation[0].__name__))
broker_min_version, broker_max_version = broker_api_versions[api_key]
version = min(max_version, broker_max_version)
if version < broker_min_version:
# max library version is less than min broker version. Currently,
# no Kafka versions specify a min msg version. Maybe in the future?
raise IncompatibleBrokerVersion(
raise Errors.IncompatibleBrokerVersion(
"No version of the '{}' Kafka protocol is supported by both the client and broker."
.format(operation[0].__name__))
return version
Expand Down
Loading