File tree 1 file changed +3
-2
lines changed 1 file changed +3
-2
lines changed Original file line number Diff line number Diff line change @@ -355,21 +355,22 @@ def _find_coordinator_ids(self, group_ids):
355
355
}
356
356
return groups_coordinators
357
357
358
- def _send_request_to_node (self , node_id , request ):
358
+ def _send_request_to_node (self , node_id , request , wakeup = True ):
359
359
"""Send a Kafka protocol message to a specific broker.
360
360
361
361
Returns a future that may be polled for status and results.
362
362
363
363
:param node_id: The broker id to which to send the message.
364
364
:param request: The message to send.
365
+ :param wakeup: Optional flag to disable thread-wakeup.
365
366
:return: A future object that may be polled for status and results.
366
367
:exception: The exception if the message could not be sent.
367
368
"""
368
369
while not self ._client .ready (node_id ):
369
370
# poll until the connection to broker is ready, otherwise send()
370
371
# will fail with NodeNotReadyError
371
372
self ._client .poll ()
372
- return self ._client .send (node_id , request )
373
+ return self ._client .send (node_id , request , wakeup )
373
374
374
375
def _send_request_to_controller (self , request ):
375
376
"""Send a Kafka protocol message to the cluster controller.
You can’t perform that action at this time.
0 commit comments