Skip to content

Commit 7260664

Browse files
committed
bugfix: fix infinite loop on KafkaAdminClient (dpkp#2194)
An infinite loop may happen with the following pattern: self._send_request_to_node(self._client.least_loaded_node(), request) The problem happens when `self._client`'s cluster metadata is out-of-date, and the result of `least_loaded_node()` is a node that has been removed from the cluster but the client is unware of it. When this happens `_send_request_to_node` will enter an infinite loop waiting for the chosen node to become available, which won't happen, resulting in an infinite loop. This commit introduces a new method named `_send_request_to_least_loaded_node` which handles the case above. This is done by regularly checking if the target node is available in the cluster metadata, and if not, a new node is chosen. Notes: - This does not yet cover every call site to `_send_request_to_node`, there are some other places were similar race conditions may happen. - The code above does not guarantee that the request itself will be sucessful, since it is still possible for the target node to exit, however, it does remove the infinite loop which can render client code unusable.
1 parent 9feeb79 commit 7260664

File tree

1 file changed

+36
-14
lines changed

1 file changed

+36
-14
lines changed

kafka/admin/client.py

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ def _refresh_controller_id(self):
272272
version = self._matching_api_version(MetadataRequest)
273273
if 1 <= version <= 6:
274274
request = MetadataRequest[version]()
275-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
275+
future = self._send_request_to_least_loaded_node(request)
276276

277277
self._wait_for_futures([future])
278278

@@ -310,7 +310,7 @@ def _find_coordinator_id_send_request(self, group_id):
310310
raise NotImplementedError(
311311
"Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
312312
.format(version))
313-
return self._send_request_to_node(self._client.least_loaded_node(), request)
313+
return self._send_request_to_least_loaded_node(request)
314314

315315
def _find_coordinator_id_process_response(self, response):
316316
"""Process a FindCoordinatorResponse.
@@ -355,9 +355,36 @@ def _find_coordinator_ids(self, group_ids):
355355
}
356356
return groups_coordinators
357357

358+
def _send_request_to_least_loaded_node(self, request):
359+
"""Send a Kafka protocol message to the least loaded broker.
360+
361+
Returns a future that may be polled for status and results.
362+
363+
:param request: The message to send.
364+
:return: A future object that may be polled for status and results.
365+
:exception: The exception if the message could not be sent.
366+
"""
367+
node_id = self._client.least_loaded_node()
368+
while not self._client.ready(node_id):
369+
# poll until the connection to broker is ready, otherwise send()
370+
# will fail with NodeNotReadyError
371+
self._client.poll()
372+
373+
# node_id is not part of the cluster anymore, choose a new broker
374+
# to connect to
375+
if self._client.cluster.broker_metadata(node_id) is None:
376+
node_id = self._client.least_loaded_node()
377+
378+
return self._client.send(node_id, request)
379+
358380
def _send_request_to_node(self, node_id, request):
359381
"""Send a Kafka protocol message to a specific broker.
360382
383+
.. note::
384+
385+
This function will enter in an infinite loop if `node_id` is
386+
removed from the cluster.
387+
361388
Returns a future that may be polled for status and results.
362389
363390
:param node_id: The broker id to which to send the message.
@@ -506,10 +533,7 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
506533
allow_auto_topic_creation=auto_topic_creation
507534
)
508535

509-
future = self._send_request_to_node(
510-
self._client.least_loaded_node(),
511-
request
512-
)
536+
future = self._send_request_to_least_loaded_node(request)
513537
self._wait_for_futures([future])
514538
return future.value
515539

@@ -601,7 +625,7 @@ def describe_acls(self, acl_filter):
601625
.format(version)
602626
)
603627

604-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
628+
future = self._send_request_to_least_loaded_node(request)
605629
self._wait_for_futures([future])
606630
response = future.value
607631

@@ -692,7 +716,7 @@ def create_acls(self, acls):
692716
.format(version)
693717
)
694718

695-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
719+
future = self._send_request_to_least_loaded_node(request)
696720
self._wait_for_futures([future])
697721
response = future.value
698722

@@ -786,7 +810,7 @@ def delete_acls(self, acl_filters):
786810
.format(version)
787811
)
788812

789-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
813+
future = self._send_request_to_least_loaded_node(request)
790814
self._wait_for_futures([future])
791815
response = future.value
792816

@@ -846,8 +870,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
846870
))
847871

848872
if len(topic_resources) > 0:
849-
futures.append(self._send_request_to_node(
850-
self._client.least_loaded_node(),
873+
futures.append(self._send_request_to_least_loaded_node(
851874
DescribeConfigsRequest[version](resources=topic_resources)
852875
))
853876

@@ -867,8 +890,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
867890
))
868891

869892
if len(topic_resources) > 0:
870-
futures.append(self._send_request_to_node(
871-
self._client.least_loaded_node(),
893+
futures.append(self._send_request_to_least_loaded_node(
872894
DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
873895
))
874896
else:
@@ -915,7 +937,7 @@ def alter_configs(self, config_resources):
915937
# // a single request that may be sent to any broker.
916938
#
917939
# So this is currently broken as it always sends to the least_loaded_node()
918-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
940+
future = self._send_request_to_least_loaded_node(request)
919941

920942
self._wait_for_futures([future])
921943
response = future.value

0 commit comments

Comments
 (0)