Skip to content

Improve client networking backoff / retry #2480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 14, 2025
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class KafkaAdminClient(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -156,7 +156,7 @@ class KafkaAdminClient(object):
'request_timeout_ms': 30000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down
59 changes: 38 additions & 21 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class KafkaClient(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -164,7 +164,7 @@ class KafkaClient(object):
'wakeup_timeout_ms': 3000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down Expand Up @@ -464,9 +464,8 @@ def is_disconnected(self, node_id):
def connection_delay(self, node_id):
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting, returns 0 to allow
non-blocking connect to finish. When connected, returns a very large
state, before attempting to send data. When connecting or disconnected,
this respects the reconnect backoff time. When connected, returns a very large
number to handle slow/stalled connections.

Arguments:
Expand Down Expand Up @@ -537,7 +536,8 @@ def send(self, node_id, request, wakeup=True):
# we will need to call send_pending_requests()
# to trigger network I/O
future = conn.send(request, blocking=False)
self._sending.add(conn)
if not future.is_done:
self._sending.add(conn)

# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
Expand All @@ -563,9 +563,7 @@ def poll(self, timeout_ms=None, future=None):
Returns:
list: responses received (can be empty)
"""
if future is not None:
timeout_ms = 100
elif timeout_ms is None:
if timeout_ms is None:
timeout_ms = self.config['request_timeout_ms']
elif not isinstance(timeout_ms, (int, float)):
raise TypeError('Invalid type for timeout: %s' % type(timeout_ms))
Expand All @@ -577,26 +575,25 @@ def poll(self, timeout_ms=None, future=None):
if self._closed:
break

# Send a metadata request if needed (or initiate new connection)
metadata_timeout_ms = self._maybe_refresh_metadata()

# Attempt to complete pending connections
for node_id in list(self._connecting):
self._maybe_connect(node_id)

# Send a metadata request if needed
metadata_timeout_ms = self._maybe_refresh_metadata()

# If we got a future that is already done, don't block in _poll
if future is not None and future.is_done:
timeout = 0
else:
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
request_timeout_ms = self._next_ifr_request_timeout_ms()
log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms)
timeout = min(
timeout_ms,
metadata_timeout_ms,
idle_connection_timeout_ms,
self.config['request_timeout_ms'])
# if there are no requests in flight, do not block longer than the retry backoff
if self.in_flight_request_count() == 0:
timeout = min(timeout, self.config['retry_backoff_ms'])
request_timeout_ms)
timeout = max(0, timeout) # avoid negative timeouts

self._poll(timeout / 1000)
Expand All @@ -615,6 +612,8 @@ def poll(self, timeout_ms=None, future=None):
def _register_send_sockets(self):
while self._sending:
conn = self._sending.pop()
if conn._sock is None:
continue
try:
key = self._selector.get_key(conn._sock)
events = key.events | selectors.EVENT_WRITE
Expand Down Expand Up @@ -772,6 +771,17 @@ def least_loaded_node(self):

return found

def least_loaded_node_refresh_ms(self):
"""Return connection delay in milliseconds for next available node.

This method is used primarily for retry/backoff during metadata refresh
during / after a cluster outage, in which there are no available nodes.

Returns:
float: delay_ms
"""
return min([self.connection_delay(broker.nodeId) for broker in self.cluster.brokers()])

def set_topics(self, topics):
"""Set specific topics to track for metadata.

Expand Down Expand Up @@ -803,12 +813,18 @@ def add_topic(self, topic):
self._topics.add(topic)
return self.cluster.request_update()

def _next_ifr_request_timeout_ms(self):
if self._conns:
return min([conn.next_ifr_request_timeout_ms() for conn in six.itervalues(self._conns)])
else:
return float('inf')

# This method should be locked when running multi-threaded
def _maybe_refresh_metadata(self, wakeup=False):
"""Send a metadata request if needed.

Returns:
int: milliseconds until next refresh
float: milliseconds until next refresh
"""
ttl = self.cluster.ttl()
wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0
Expand All @@ -822,8 +838,9 @@ def _maybe_refresh_metadata(self, wakeup=False):
# least_loaded_node()
node_id = self.least_loaded_node()
if node_id is None:
log.debug("Give up sending metadata request since no node is available");
return self.config['reconnect_backoff_ms']
next_connect_ms = self.least_loaded_node_refresh_ms()
log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms)
return next_connect_ms

if self._can_send_request(node_id):
topics = list(self._topics)
Expand All @@ -850,11 +867,11 @@ def refresh_done(val_or_error):
# the client from unnecessarily connecting to additional nodes while a previous connection
# attempt has not been completed.
if self._connecting:
return self.config['reconnect_backoff_ms']
return float('inf')

if self.maybe_connect(node_id, wakeup=wakeup):
log.debug("Initializing connection to node %s for metadata request", node_id)
return self.config['reconnect_backoff_ms']
return float('inf')

# connected but can't send more, OR connecting
# In either case we just need to wait for a network event
Expand Down
36 changes: 22 additions & 14 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class BrokerConnection(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
Expand Down Expand Up @@ -198,7 +198,7 @@ class BrokerConnection(object):
'node_id': 0,
'request_timeout_ms': 30000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
Expand Down Expand Up @@ -848,20 +848,22 @@ def blacked_out(self):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
if time.time() < self.last_attempt + self._reconnect_backoff:
return True
return self.connection_delay() > 0
return False

def connection_delay(self):
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting or connected, returns a very
state, before attempting to send data. When connecting or disconnected,
this respects the reconnect backoff time. When connected, returns a very
large number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited, 0) * 1000
if self.disconnected() or self.connecting():
if len(self._gai) > 0:
return 0
else:
time_waited = time.time() - self.last_attempt
return max(self._reconnect_backoff - time_waited, 0) * 1000
else:
# When connecting or connected, we should be able to delay
# indefinitely since other events (connection or data acked) will
Expand All @@ -887,6 +889,9 @@ def _reset_reconnect_backoff(self):
self._failures = 0
self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0

def _reconnect_jitter_pct(self):
return uniform(0.8, 1.2)

def _update_reconnect_backoff(self):
# Do not mark as failure if there are more dns entries available to try
if len(self._gai) > 0:
Expand All @@ -895,7 +900,7 @@ def _update_reconnect_backoff(self):
self._failures += 1
self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
self._reconnect_backoff *= uniform(0.8, 1.2)
self._reconnect_backoff *= self._reconnect_jitter_pct()
self._reconnect_backoff /= 1000.0
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)

Expand Down Expand Up @@ -1136,15 +1141,18 @@ def _recv(self):
return ()

def requests_timed_out(self):
return self.next_ifr_request_timeout_ms() == 0

def next_ifr_request_timeout_ms(self):
with self._lock:
if self.in_flight_requests:
get_timestamp = lambda v: v[1]
oldest_at = min(map(get_timestamp,
self.in_flight_requests.values()))
timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() >= oldest_at + timeout:
return True
return False
next_timeout = oldest_at + self.config['request_timeout_ms'] / 1000.0
return max(0, (next_timeout - time.time()) * 1000)
else:
return float('inf')

def _handle_api_version_response(self, response):
error_type = Errors.for_code(response.error_code)
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class KafkaConsumer(six.Iterator):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
Expand Down Expand Up @@ -263,7 +263,7 @@ class KafkaConsumer(six.Iterator):
'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class KafkaProducer(object):
reconnection attempts will continue periodically with this fixed
rate. To avoid connection storms, a randomization factor of 0.2
will be applied to the backoff resulting in a random range between
20% below and 20% above the computed value. Default: 1000.
20% below and 20% above the computed value. Default: 30000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Note that if this setting is set to be greater
Expand Down Expand Up @@ -311,7 +311,7 @@ class KafkaProducer(object):
'sock_chunk_bytes': 4096, # undocumented experimental option
'sock_chunk_buffer_count': 1000, # undocumented experimental option
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'reconnect_backoff_max_ms': 30000,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
Expand Down
10 changes: 5 additions & 5 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ def run_once(self):
self._metadata.request_update()

# remove any nodes we aren't ready to send to
not_ready_timeout = float('inf')
not_ready_timeout_ms = float('inf')
for node in list(ready_nodes):
if not self._client.is_ready(node):
log.debug('Node %s not ready; delaying produce of accumulated batch', node)
node_delay_ms = self._client.connection_delay(node)
log.debug('Node %s not ready; delaying produce of accumulated batch (%f ms)', node, node_delay_ms)
self._client.maybe_connect(node, wakeup=False)
ready_nodes.remove(node)
not_ready_timeout = min(not_ready_timeout,
self._client.connection_delay(node))
not_ready_timeout_ms = min(not_ready_timeout_ms, node_delay_ms)

# create produce requests
batches_by_node = self._accumulator.drain(
Expand All @@ -136,7 +136,7 @@ def run_once(self):
# off). Note that this specifically does not include nodes with
# sendable data that aren't ready to send since they would cause busy
# looping.
poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout)
poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout_ms)
if ready_nodes:
log.debug("Nodes with data ready to send: %s", ready_nodes) # trace
log.debug("Created %d produce requests: %s", len(requests), requests) # trace
Expand Down
2 changes: 2 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def conn(mocker):
MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
conn.connection_delay.return_value = 0
conn.blacked_out.return_value = False
conn.next_ifr_request_timeout_ms.return_value = float('inf')
def _set_conn_state(state):
conn.state = state
return state
Expand Down
Loading