Skip to content

Improve connection state logging #2574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def _conn_state_change(self, node_id, sock, conn):
self._connecting.remove(node_id)
try:
self._selector.unregister(sock)
except KeyError:
except (KeyError, ValueError):
pass

if self._sensors:
Expand Down
49 changes: 25 additions & 24 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ def _init_sasl_mechanism(self):
def _dns_lookup(self):
self._gai = dns_lookup(self.host, self.port, self.afi)
if not self._gai:
log.error('DNS lookup failed for %s:%i (%s)',
self.host, self.port, self.afi)
log.error('%s: DNS lookup failed for %s:%i (%s)',
self, self.host, self.port, self.afi)
return False
return True

Expand Down Expand Up @@ -366,6 +366,7 @@ def connect_blocking(self, timeout=float('inf')):
def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
Expand All @@ -390,7 +391,6 @@ def connect(self):
self._sock.setsockopt(*option)

self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
self.config['state_change_callback'](self.node_id, self._sock, self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
Expand All @@ -412,20 +412,20 @@ def connect(self):
log.debug('%s: established TCP connection', self)

if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', self)
self.state = ConnectionStates.HANDSHAKE
log.debug('%s: initiating SSL handshake', self)
self.config['state_change_callback'](self.node_id, self._sock, self)
# _wrap_ssl can alter the connection state -- disconnects on failure
self._wrap_ssl()
else:
log.debug('%s: checking broker Api Versions', self)
self.state = ConnectionStates.API_VERSIONS_SEND
log.debug('%s: checking broker Api Versions', self)
self.config['state_change_callback'](self.node_id, self._sock, self)

# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
log.error('Connect attempt to %s returned error %s.'
log.error('%s: Connect attempt returned error %s.'
' Disconnecting.', self, ret)
errstr = errno.errorcode.get(ret, 'UNKNOWN')
self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
Expand All @@ -438,22 +438,22 @@ def connect(self):
if self.state is ConnectionStates.HANDSHAKE:
if self._try_handshake():
log.debug('%s: completed SSL handshake.', self)
log.debug('%s: checking broker Api Versions', self)
self.state = ConnectionStates.API_VERSIONS_SEND
log.debug('%s: checking broker Api Versions', self)
self.config['state_change_callback'](self.node_id, self._sock, self)

if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV):
if self._try_api_versions_check():
# _try_api_versions_check has side-effects: possibly disconnected on socket errors
if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV):
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
log.debug('%s: initiating SASL authentication', self)
self.config['state_change_callback'](self.node_id, self._sock, self)
else:
# security_protocol PLAINTEXT
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
log.info('%s: Connection complete.', self)
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)

Expand All @@ -462,8 +462,8 @@ def connect(self):
if self._try_authenticate():
# _try_authenticate has side-effects: possibly disconnected on socket errors
if self.state is ConnectionStates.AUTHENTICATING:
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
log.info('%s: Connection complete.', self)
self._reset_reconnect_backoff()
self.config['state_change_callback'](self.node_id, self._sock, self)

Expand All @@ -472,7 +472,7 @@ def connect(self):
# Connection timed out
request_timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
log.error('%s: Connection attempt timed out', self)
self.close(Errors.KafkaConnectionError('timeout'))
return self.state

Expand Down Expand Up @@ -531,7 +531,7 @@ def _try_handshake(self):
except (SSLWantReadError, SSLWantWriteError):
pass
except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
log.warning('%s: SSL connection closed by server during handshake.', self)
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user

Expand Down Expand Up @@ -611,7 +611,7 @@ def _handle_api_versions_response(self, future, response):
for api_key, min_version, max_version, *rest in response.api_versions
])
self._api_version = self._infer_broker_version_from_api_versions(self._api_versions)
log.info('Broker version identified as %s', '.'.join(map(str, self._api_version)))
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version)))
future.success(self._api_version)
self.connect()

Expand All @@ -621,7 +621,7 @@ def _handle_api_versions_failure(self, future, ex):
# after failure connection is closed, so state should already be DISCONNECTED

def _handle_check_version_response(self, future, version, _response):
log.info('Broker version identified as %s', '.'.join(map(str, version)))
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
self._api_versions = BROKER_API_VERSIONS[version]
Expand Down Expand Up @@ -751,7 +751,7 @@ def _send_sasl_authenticate(self, sasl_auth_bytes):
request = SaslAuthenticateRequest[0](sasl_auth_bytes)
self._send(request, blocking=True)
else:
log.debug('Sending %d raw sasl auth bytes to server', len(sasl_auth_bytes))
log.debug('%s: Sending %d raw sasl auth bytes to server', self, len(sasl_auth_bytes))
try:
self._send_bytes_blocking(Int32.encode(len(sasl_auth_bytes)) + sasl_auth_bytes)
except (ConnectionError, TimeoutError) as e:
Expand Down Expand Up @@ -781,7 +781,7 @@ def _recv_sasl_authenticate(self):
latency_ms = (time.time() - timestamp) * 1000
if self._sensors:
self._sensors.request_time.record(latency_ms)
log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response)

error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
Expand All @@ -792,7 +792,7 @@ def _recv_sasl_authenticate(self):
return response.auth_bytes
else:
# unframed bytes w/ SaslHandhake v0
log.debug('Received %d raw sasl auth bytes from server', nbytes)
log.debug('%s: Received %d raw sasl auth bytes from server', self, nbytes)
return data[4:]

def _sasl_authenticate(self, future):
Expand Down Expand Up @@ -956,7 +956,8 @@ def close(self, error=None):

# drop lock before state change callback and processing futures
self.config['state_change_callback'](self.node_id, sock, self)
sock.close()
if sock:
sock.close()
for (_correlation_id, (future, _timestamp, _timeout)) in ifrs:
future.failure(error)

Expand Down Expand Up @@ -1002,7 +1003,7 @@ def _send(self, request, blocking=True, request_timeout_ms=None):

correlation_id = self._protocol.send_request(request)

log.debug('%s Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request)
log.debug('%s: Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request)
if request.expect_response():
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!'
sent_time = time.time()
Expand Down Expand Up @@ -1036,7 +1037,7 @@ def send_pending_requests(self):
return True

except (ConnectionError, TimeoutError) as e:
log.exception("Error sending request data to %s", self)
log.exception("%s: Error sending request data", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return False
Expand Down Expand Up @@ -1069,7 +1070,7 @@ def send_pending_requests_v2(self):
return len(self._send_buffer) == 0

except (ConnectionError, TimeoutError, Exception) as e:
log.exception("Error sending request data to %s", self)
log.exception("%s: Error sending request data", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return False
Expand Down Expand Up @@ -1106,7 +1107,7 @@ def recv(self):
if not responses and self.requests_timed_out():
timed_out = self.timed_out_ifrs()
timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000
log.warning('%s timed out after %s ms. Closing connection.',
log.warning('%s: timed out after %s ms. Closing connection.',
self, timeout_ms)
self.close(error=Errors.RequestTimedOutError(
'Request timed out after %s ms' %
Expand All @@ -1125,7 +1126,7 @@ def recv(self):
if self._sensors:
self._sensors.request_time.record(latency_ms)

log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
self._maybe_throttle(response)
responses[i] = (response, future)

Expand All @@ -1137,7 +1138,7 @@ def _recv(self):
err = None
with self._lock:
if not self._can_send_recv():
log.warning('%s cannot recv: socket not connected', self)
log.warning('%s: cannot recv: socket not connected', self)
return ()

while len(recvd) < self.config['sock_chunk_buffer_count']:
Expand Down
Loading