diff --git a/kafka/client_async.py b/kafka/client_async.py index 9e57efd5e..96663b58c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -398,7 +398,7 @@ def _should_recycle_connection(self, conn): return False - def _maybe_connect(self, node_id): + def _init_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id. Returns True if connection object exists and is connected / connecting @@ -427,10 +427,8 @@ def _maybe_connect(self, node_id): **self.config) self._conns[node_id] = conn - elif conn.connected(): - return True - - conn.connect() + if conn.disconnected(): + conn.connect() return not conn.disconnected() def ready(self, node_id, metadata_priority=True): @@ -621,15 +619,18 @@ def poll(self, timeout_ms=None, future=None): if self._closed: break - # Send a metadata request if needed (or initiate new connection) - metadata_timeout_ms = self._maybe_refresh_metadata() - # Attempt to complete pending connections for node_id in list(self._connecting): # False return means no more connection progress is possible # Connected nodes will update _connecting via state_change callback - if not self._maybe_connect(node_id): - self._connecting.remove(node_id) + if not self._init_connect(node_id): + # It's possible that the connection attempt triggered a state change + # but if not, make sure to remove from _connecting list + if node_id in self._connecting: + self._connecting.remove(node_id) + + # Send a metadata request if needed (or initiate new connection) + metadata_timeout_ms = self._maybe_refresh_metadata() # If we got a future that is already done, don't block in _poll if future is not None and future.is_done: @@ -679,6 +680,8 @@ def _poll(self, timeout): self._register_send_sockets() start_select = time.time() + if timeout == float('inf'): + timeout = None ready = self._selector.select(timeout) end_select = time.time() if self._sensors: @@ -893,6 +896,26 @@ def _maybe_refresh_metadata(self, wakeup=False): log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms) return next_connect_ms + if not self._can_send_request(node_id): + # If there's any connection establishment underway, wait until it completes. This prevents + # the client from unnecessarily connecting to additional nodes while a previous connection + # attempt has not been completed. + if self._connecting: + return float('inf') + + elif self._can_connect(node_id): + log.debug("Initializing connection to node %s for metadata request", node_id) + self._connecting.add(node_id) + if not self._init_connect(node_id): + if node_id in self._connecting: + self._connecting.remove(node_id) + # Connection attempt failed immediately, need to retry with a different node + return self.config['reconnect_backoff_ms'] + else: + # Existing connection with max in flight requests. Wait for request to complete. + return self.config['request_timeout_ms'] + + # Recheck node_id in case we were able to connect immediately above if self._can_send_request(node_id): topics = list(self._topics) if not topics and self.cluster.is_bootstrap(node_id): @@ -917,20 +940,11 @@ def refresh_done(val_or_error): future.add_errback(refresh_done) return self.config['request_timeout_ms'] - # If there's any connection establishment underway, wait until it completes. This prevents - # the client from unnecessarily connecting to additional nodes while a previous connection - # attempt has not been completed. + # Should only get here if still connecting if self._connecting: return float('inf') - - if self.maybe_connect(node_id, wakeup=wakeup): - log.debug("Initializing connection to node %s for metadata request", node_id) - return float('inf') - - # connected but can't send more, OR connecting - # In either case we just need to wait for a network event - # to let us know the selected connection might be usable again. - return float('inf') + else: + return self.config['reconnect_backoff_ms'] def get_api_versions(self): """Return the ApiVersions map, if available. @@ -973,7 +987,7 @@ def check_version(self, node_id=None, timeout=None, strict=False): if try_node is None: self._lock.release() raise Errors.NoBrokersAvailable() - if not self._maybe_connect(try_node): + if not self._init_connect(try_node): if try_node == node_id: raise Errors.NodeNotReadyError("Connection failed to %s" % node_id) else: diff --git a/test/test_client_async.py b/test/test_client_async.py index 16ee4291d..015f39365 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -58,7 +58,7 @@ def test_can_connect(cli, conn): assert cli._can_connect(0) # Node is connected, can't reconnect - assert cli._maybe_connect(0) is True + assert cli._init_connect(0) is True assert not cli._can_connect(0) # Node is disconnected, can connect @@ -70,15 +70,15 @@ def test_can_connect(cli, conn): assert not cli._can_connect(0) -def test_maybe_connect(cli, conn): +def test_init_connect(cli, conn): # Node not in metadata, return False - assert not cli._maybe_connect(2) + assert not cli._init_connect(2) # New node_id creates a conn object assert 0 not in cli._conns conn.state = ConnectionStates.DISCONNECTED conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING) - assert cli._maybe_connect(0) is True + assert cli._init_connect(0) is True assert cli._conns[0] is conn @@ -122,8 +122,8 @@ def test_ready(mocker, cli, conn): def test_is_ready(mocker, cli, conn): - cli._maybe_connect(0) - cli._maybe_connect(1) + cli._init_connect(0) + cli._init_connect(1) # metadata refresh blocks ready nodes assert cli.is_ready(0) @@ -166,14 +166,14 @@ def test_close(mocker, cli, conn): assert conn.close.call_count == call_count # Single node close - cli._maybe_connect(0) + cli._init_connect(0) assert conn.close.call_count == call_count cli.close(0) call_count += 1 assert conn.close.call_count == call_count # All node close - cli._maybe_connect(1) + cli._init_connect(1) cli.close() # +2 close: node 1, node bootstrap (node 0 already closed) call_count += 2 @@ -185,7 +185,7 @@ def test_is_disconnected(cli, conn): conn.state = ConnectionStates.DISCONNECTED assert not cli.is_disconnected(0) - cli._maybe_connect(0) + cli._init_connect(0) assert cli.is_disconnected(0) conn.state = ConnectionStates.CONNECTING @@ -210,7 +210,7 @@ def test_send(cli, conn): assert isinstance(f.exception, Errors.NodeNotReadyError) conn.state = ConnectionStates.CONNECTED - cli._maybe_connect(0) + cli._init_connect(0) # ProduceRequest w/ 0 required_acks -> no response request = ProduceRequest[0](0, 0, []) assert request.expect_response() is False @@ -339,8 +339,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=False) mocker.patch.object(client, '_can_connect', return_value=True) - mocker.patch.object(client, '_maybe_connect', return_value=True) - mocker.patch.object(client, 'maybe_connect', return_value=True) + mocker.patch.object(client, '_init_connect', return_value=True) now = time.time() t = mocker.patch('time.time') @@ -349,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) client._poll.assert_called_with(12345.678) - client.maybe_connect.assert_called_once_with('foobar', wakeup=False) + client._init_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection client._connecting.add('foobar')