diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 22c29878d..62527838f 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -72,7 +72,7 @@ class KafkaAdminClient(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. connections_max_idle_ms: Close idle connections after the number of @@ -156,7 +156,7 @@ class KafkaAdminClient(object): 'request_timeout_ms': 30000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..ea5e606cb 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -75,7 +75,7 @@ class KafkaClient(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. connections_max_idle_ms: Close idle connections after the number of @@ -164,7 +164,7 @@ class KafkaClient(object): 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -464,9 +464,8 @@ def is_disconnected(self, node_id): def connection_delay(self, node_id): """ Return the number of milliseconds to wait, based on the connection - state, before attempting to send data. When disconnected, this respects - the reconnect backoff time. When connecting, returns 0 to allow - non-blocking connect to finish. When connected, returns a very large + state, before attempting to send data. When connecting or disconnected, + this respects the reconnect backoff time. When connected, returns a very large number to handle slow/stalled connections. Arguments: @@ -537,7 +536,8 @@ def send(self, node_id, request, wakeup=True): # we will need to call send_pending_requests() # to trigger network I/O future = conn.send(request, blocking=False) - self._sending.add(conn) + if not future.is_done: + self._sending.add(conn) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding @@ -563,9 +563,7 @@ def poll(self, timeout_ms=None, future=None): Returns: list: responses received (can be empty) """ - if future is not None: - timeout_ms = 100 - elif timeout_ms is None: + if timeout_ms is None: timeout_ms = self.config['request_timeout_ms'] elif not isinstance(timeout_ms, (int, float)): raise TypeError('Invalid type for timeout: %s' % type(timeout_ms)) @@ -577,26 +575,25 @@ def poll(self, timeout_ms=None, future=None): if self._closed: break + # Send a metadata request if needed (or initiate new connection) + metadata_timeout_ms = self._maybe_refresh_metadata() + # Attempt to complete pending connections for node_id in list(self._connecting): self._maybe_connect(node_id) - # Send a metadata request if needed - metadata_timeout_ms = self._maybe_refresh_metadata() - # If we got a future that is already done, don't block in _poll if future is not None and future.is_done: timeout = 0 else: idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() + request_timeout_ms = self._next_ifr_request_timeout_ms() + log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms) timeout = min( timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, - self.config['request_timeout_ms']) - # if there are no requests in flight, do not block longer than the retry backoff - if self.in_flight_request_count() == 0: - timeout = min(timeout, self.config['retry_backoff_ms']) + request_timeout_ms) timeout = max(0, timeout) # avoid negative timeouts self._poll(timeout / 1000) @@ -615,6 +612,8 @@ def poll(self, timeout_ms=None, future=None): def _register_send_sockets(self): while self._sending: conn = self._sending.pop() + if conn._sock is None: + continue try: key = self._selector.get_key(conn._sock) events = key.events | selectors.EVENT_WRITE @@ -772,6 +771,17 @@ def least_loaded_node(self): return found + def least_loaded_node_refresh_ms(self): + """Return connection delay in milliseconds for next available node. + + This method is used primarily for retry/backoff during metadata refresh + during / after a cluster outage, in which there are no available nodes. + + Returns: + float: delay_ms + """ + return min([self.connection_delay(broker.nodeId) for broker in self.cluster.brokers()]) + def set_topics(self, topics): """Set specific topics to track for metadata. @@ -803,12 +813,18 @@ def add_topic(self, topic): self._topics.add(topic) return self.cluster.request_update() + def _next_ifr_request_timeout_ms(self): + if self._conns: + return min([conn.next_ifr_request_timeout_ms() for conn in six.itervalues(self._conns)]) + else: + return float('inf') + # This method should be locked when running multi-threaded def _maybe_refresh_metadata(self, wakeup=False): """Send a metadata request if needed. Returns: - int: milliseconds until next refresh + float: milliseconds until next refresh """ ttl = self.cluster.ttl() wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0 @@ -822,8 +838,9 @@ def _maybe_refresh_metadata(self, wakeup=False): # least_loaded_node() node_id = self.least_loaded_node() if node_id is None: - log.debug("Give up sending metadata request since no node is available"); - return self.config['reconnect_backoff_ms'] + next_connect_ms = self.least_loaded_node_refresh_ms() + log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms) + return next_connect_ms if self._can_send_request(node_id): topics = list(self._topics) @@ -850,11 +867,11 @@ def refresh_done(val_or_error): # the client from unnecessarily connecting to additional nodes while a previous connection # attempt has not been completed. if self._connecting: - return self.config['reconnect_backoff_ms'] + return float('inf') if self.maybe_connect(node_id, wakeup=wakeup): log.debug("Initializing connection to node %s for metadata request", node_id) - return self.config['reconnect_backoff_ms'] + return float('inf') # connected but can't send more, OR connecting # In either case we just need to wait for a network event diff --git a/kafka/conn.py b/kafka/conn.py index 4fd8bc759..7dab7995c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -120,7 +120,7 @@ class BrokerConnection(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. max_in_flight_requests_per_connection (int): Requests are pipelined @@ -198,7 +198,7 @@ class BrokerConnection(object): 'node_id': 0, 'request_timeout_ms': 30000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -848,20 +848,22 @@ def blacked_out(self): re-establish a connection yet """ if self.state is ConnectionStates.DISCONNECTED: - if time.time() < self.last_attempt + self._reconnect_backoff: - return True + return self.connection_delay() > 0 return False def connection_delay(self): """ Return the number of milliseconds to wait, based on the connection - state, before attempting to send data. When disconnected, this respects - the reconnect backoff time. When connecting or connected, returns a very + state, before attempting to send data. When connecting or disconnected, + this respects the reconnect backoff time. When connected, returns a very large number to handle slow/stalled connections. """ - time_waited = time.time() - (self.last_attempt or 0) - if self.state is ConnectionStates.DISCONNECTED: - return max(self._reconnect_backoff - time_waited, 0) * 1000 + if self.disconnected() or self.connecting(): + if len(self._gai) > 0: + return 0 + else: + time_waited = time.time() - self.last_attempt + return max(self._reconnect_backoff - time_waited, 0) * 1000 else: # When connecting or connected, we should be able to delay # indefinitely since other events (connection or data acked) will @@ -887,6 +889,9 @@ def _reset_reconnect_backoff(self): self._failures = 0 self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 + def _reconnect_jitter_pct(self): + return uniform(0.8, 1.2) + def _update_reconnect_backoff(self): # Do not mark as failure if there are more dns entries available to try if len(self._gai) > 0: @@ -895,7 +900,7 @@ def _update_reconnect_backoff(self): self._failures += 1 self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms']) - self._reconnect_backoff *= uniform(0.8, 1.2) + self._reconnect_backoff *= self._reconnect_jitter_pct() self._reconnect_backoff /= 1000.0 log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) @@ -1136,15 +1141,18 @@ def _recv(self): return () def requests_timed_out(self): + return self.next_ifr_request_timeout_ms() == 0 + + def next_ifr_request_timeout_ms(self): with self._lock: if self.in_flight_requests: get_timestamp = lambda v: v[1] oldest_at = min(map(get_timestamp, self.in_flight_requests.values())) - timeout = self.config['request_timeout_ms'] / 1000.0 - if time.time() >= oldest_at + timeout: - return True - return False + next_timeout = oldest_at + self.config['request_timeout_ms'] / 1000.0 + return max(0, (next_timeout - time.time()) * 1000) + else: + return float('inf') def _handle_api_version_response(self, response): error_type = Errors.for_code(response.error_code) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..2d7571d1b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -98,7 +98,7 @@ class KafkaConsumer(six.Iterator): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. @@ -263,7 +263,7 @@ class KafkaConsumer(six.Iterator): 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms 'retry_backoff_ms': 100, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'auto_offset_reset': 'latest', 'enable_auto_commit': True, diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dd1cc508c..eb6e91961 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -216,7 +216,7 @@ class KafkaProducer(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Note that if this setting is set to be greater @@ -311,7 +311,7 @@ class KafkaProducer(object): 'sock_chunk_bytes': 4096, # undocumented experimental option 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 35688d3f1..581064ca5 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -103,14 +103,14 @@ def run_once(self): self._metadata.request_update() # remove any nodes we aren't ready to send to - not_ready_timeout = float('inf') + not_ready_timeout_ms = float('inf') for node in list(ready_nodes): if not self._client.is_ready(node): - log.debug('Node %s not ready; delaying produce of accumulated batch', node) + node_delay_ms = self._client.connection_delay(node) + log.debug('Node %s not ready; delaying produce of accumulated batch (%f ms)', node, node_delay_ms) self._client.maybe_connect(node, wakeup=False) ready_nodes.remove(node) - not_ready_timeout = min(not_ready_timeout, - self._client.connection_delay(node)) + not_ready_timeout_ms = min(not_ready_timeout_ms, node_delay_ms) # create produce requests batches_by_node = self._accumulator.drain( @@ -136,7 +136,7 @@ def run_once(self): # off). Note that this specifically does not include nodes with # sendable data that aren't ready to send since they would cause busy # looping. - poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout) + poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout_ms) if ready_nodes: log.debug("Nodes with data ready to send: %s", ready_nodes) # trace log.debug("Created %d produce requests: %s", len(requests), requests) # trace diff --git a/test/conftest.py b/test/conftest.py index 3fa0262fd..d54a91243 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -137,7 +137,9 @@ def conn(mocker): MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics + conn.connection_delay.return_value = 0 conn.blacked_out.return_value = False + conn.next_ifr_request_timeout_ms.return_value = float('inf') def _set_conn_state(state): conn.state = state return state diff --git a/test/test_client_async.py b/test/test_client_async.py index 66b227aa9..ec5e2c0ae 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -230,29 +230,25 @@ def test_send(cli, conn): def test_poll(mocker): metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') + ifr_request_timeout = mocker.patch.object(KafkaClient, '_next_ifr_request_timeout_ms') _poll = mocker.patch.object(KafkaClient, '_poll') - ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count') - ifrs.return_value = 1 cli = KafkaClient(api_version=(0, 9)) # metadata timeout wins + ifr_request_timeout.return_value = float('inf') metadata.return_value = 1000 cli.poll() _poll.assert_called_with(1.0) # user timeout wins - cli.poll(250) + cli.poll(timeout_ms=250) _poll.assert_called_with(0.25) - # default is request_timeout_ms + # ifr request timeout wins + ifr_request_timeout.return_value = 30000 metadata.return_value = 1000000 cli.poll() - _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) - - # If no in-flight-requests, drop timeout to retry_backoff_ms - ifrs.return_value = 0 - cli.poll() - _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0) + _poll.assert_called_with(30.0) def test__poll(): @@ -309,25 +305,24 @@ def client(mocker): def test_maybe_refresh_metadata_ttl(mocker, client): client.cluster.ttl.return_value = 1234 - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(1.234) def test_maybe_refresh_metadata_backoff(mocker, client): - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) + mocker.patch.object(client, 'least_loaded_node', return_value=None) + mocker.patch.object(client, 'least_loaded_node_refresh_ms', return_value=4321) now = time.time() t = mocker.patch('time.time') t.return_value = now client.poll(timeout_ms=12345678) - client._poll.assert_called_with(2.222) # reconnect backoff + client._poll.assert_called_with(4.321) def test_maybe_refresh_metadata_in_progress(mocker, client): client._metadata_refresh_in_progress = True - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(9999.999) # request_timeout_ms @@ -336,7 +331,6 @@ def test_maybe_refresh_metadata_in_progress(mocker, client): def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=True) - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) send = mocker.patch.object(client, 'send') client.poll(timeout_ms=12345678) @@ -348,10 +342,10 @@ def test_maybe_refresh_metadata_update(mocker, client): def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client, '_can_send_request', return_value=False) mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) mocker.patch.object(client, 'maybe_connect', return_value=True) - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) now = time.time() t = mocker.patch('time.time') @@ -359,14 +353,14 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) - client._poll.assert_called_with(2.222) # reconnect backoff + client._poll.assert_called_with(12345.678) client.maybe_connect.assert_called_once_with('foobar', wakeup=False) # poll while connecting should not attempt a new connection client._connecting.add('foobar') client._can_connect.reset_mock() client.poll(timeout_ms=12345678) - client._poll.assert_called_with(2.222) # connection timeout (reconnect timeout) + client._poll.assert_called_with(12345.678) assert not client._can_connect.called assert not client._metadata_refresh_in_progress diff --git a/test/test_conn.py b/test/test_conn.py index 966f7b34d..3afa9422d 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -80,15 +80,35 @@ def test_blacked_out(conn): assert conn.blacked_out() is True -def test_connection_delay(conn): +def test_connection_delay(conn, mocker): + mocker.patch.object(conn, '_reconnect_jitter_pct', return_value=1.0) with mock.patch("time.time", return_value=1000): conn.last_attempt = 1000 assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTING - assert conn.connection_delay() == float('inf') + assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTED assert conn.connection_delay() == float('inf') + conn._gai.clear() + conn._update_reconnect_backoff() + conn.state = ConnectionStates.DISCONNECTED + assert conn.connection_delay() == 1.0 * conn.config['reconnect_backoff_ms'] + conn.state = ConnectionStates.CONNECTING + assert conn.connection_delay() == 1.0 * conn.config['reconnect_backoff_ms'] + + conn._update_reconnect_backoff() + conn.state = ConnectionStates.DISCONNECTED + assert conn.connection_delay() == 2.0 * conn.config['reconnect_backoff_ms'] + conn.state = ConnectionStates.CONNECTING + assert conn.connection_delay() == 2.0 * conn.config['reconnect_backoff_ms'] + + conn._update_reconnect_backoff() + conn.state = ConnectionStates.DISCONNECTED + assert conn.connection_delay() == 4.0 * conn.config['reconnect_backoff_ms'] + conn.state = ConnectionStates.CONNECTING + assert conn.connection_delay() == 4.0 * conn.config['reconnect_backoff_ms'] + def test_connected(conn): assert conn.connected() is False