Skip to content

Commit 159f5a3

Browse files
author
Dana Powers
committed
KAFKA-4160: Ensure rebalance listener not called with coordinator lock
1 parent ec9049c commit 159f5a3

File tree

1 file changed

+112
-84
lines changed

1 file changed

+112
-84
lines changed

kafka/coordinator/base.py

+112-84
Original file line numberDiff line numberDiff line change
@@ -323,90 +323,108 @@ def time_to_next_heartbeat(self):
323323
return sys.maxsize
324324
return self.heartbeat.time_to_next_heartbeat()
325325

326+
def _reset_join_group_future(self):
327+
with self._lock:
328+
self.join_future = None
329+
330+
def _initiate_join_group(self):
331+
with self._lock:
332+
# we store the join future in case we are woken up by the user
333+
# after beginning the rebalance in the call to poll below.
334+
# This ensures that we do not mistakenly attempt to rejoin
335+
# before the pending rebalance has completed.
336+
if self.join_future is None:
337+
self.state = MemberState.REBALANCING
338+
self.join_future = self._send_join_group_request()
339+
340+
# handle join completion in the callback so that the
341+
# callback will be invoked even if the consumer is woken up
342+
# before finishing the rebalance
343+
self.join_future.add_callback(self._handle_join_success)
344+
345+
# we handle failures below after the request finishes.
346+
# If the join completes after having been woken up, the
347+
# exception is ignored and we will rejoin
348+
self.join_future.add_errback(self._handle_join_failure)
349+
350+
return self.join_future
351+
326352
def _handle_join_success(self, member_assignment_bytes):
353+
# handle join completion in the callback so that the callback
354+
# will be invoked even if the consumer is woken up before
355+
# finishing the rebalance
327356
with self._lock:
328357
log.info("Successfully joined group %s with generation %s",
329358
self.group_id, self._generation.generation_id)
330-
self.join_future = None
331359
self.state = MemberState.STABLE
332-
self.rejoining = False
333-
self._heartbeat_thread.enable()
334-
self._on_join_complete(self._generation.generation_id,
335-
self._generation.member_id,
336-
self._generation.protocol,
337-
member_assignment_bytes)
360+
if self._heartbeat_thread is not None:
361+
self._heartbeat_thread.enable()
338362

339363
def _handle_join_failure(self, _):
364+
# we handle failures below after the request finishes.
365+
# if the join completes after having been woken up,
366+
# the exception is ignored and we will rejoin
340367
with self._lock:
341-
self.join_future = None
342368
self.state = MemberState.UNJOINED
343369

344370
def ensure_active_group(self):
345371
"""Ensure that the group is active (i.e. joined and synced)"""
346-
with self._lock:
347-
if self._heartbeat_thread is None:
348-
self._start_heartbeat_thread()
349-
350-
while self.need_rejoin():
351-
self.ensure_coordinator_ready()
352-
353-
# call on_join_prepare if needed. We set a flag
354-
# to make sure that we do not call it a second
355-
# time if the client is woken up before a pending
356-
# rebalance completes. This must be called on each
357-
# iteration of the loop because an event requiring
358-
# a rebalance (such as a metadata refresh which
359-
# changes the matched subscription set) can occur
360-
# while another rebalance is still in progress.
361-
if not self.rejoining:
362-
self._on_join_prepare(self._generation.generation_id,
363-
self._generation.member_id)
364-
self.rejoining = True
365-
366-
# ensure that there are no pending requests to the coordinator.
367-
# This is important in particular to avoid resending a pending
368-
# JoinGroup request.
369-
while not self.coordinator_unknown():
370-
if not self._client.in_flight_request_count(self.coordinator_id):
371-
break
372-
self._client.poll()
373-
else:
372+
self.ensure_coordinator_ready()
373+
self._start_heartbeat_thread()
374+
self.join_group()
375+
376+
def join_group(self):
377+
while self.need_rejoin():
378+
self.ensure_coordinator_ready()
379+
380+
# call on_join_prepare if needed. We set a flag
381+
# to make sure that we do not call it a second
382+
# time if the client is woken up before a pending
383+
# rebalance completes. This must be called on each
384+
# iteration of the loop because an event requiring
385+
# a rebalance (such as a metadata refresh which
386+
# changes the matched subscription set) can occur
387+
# while another rebalance is still in progress.
388+
if not self.rejoining:
389+
self._on_join_prepare(self._generation.generation_id,
390+
self._generation.member_id)
391+
self.rejoining = True
392+
393+
# fence off the heartbeat thread explicitly so that it cannot
394+
# interfere with the join group. # Note that this must come after
395+
# the call to onJoinPrepare since we must be able to continue
396+
# sending heartbeats if that callback takes some time.
397+
self._disable_heartbeat_thread()
398+
399+
# ensure that there are no pending requests to the coordinator.
400+
# This is important in particular to avoid resending a pending
401+
# JoinGroup request.
402+
while not self.coordinator_unknown():
403+
if not self._client.in_flight_request_count(self.coordinator_id):
404+
break
405+
self._client.poll()
406+
else:
407+
continue
408+
409+
future = self._initiate_join_group()
410+
self._client.poll(future=future)
411+
self._reset_join_group_future()
412+
413+
if future.succeeded():
414+
self.rejoining = False
415+
self._on_join_complete(self._generation.generation_id,
416+
self._generation.member_id,
417+
self._generation.protocol,
418+
future.value)
419+
else:
420+
exception = future.exception
421+
if isinstance(exception, (Errors.UnknownMemberIdError,
422+
Errors.RebalanceInProgressError,
423+
Errors.IllegalGenerationError)):
374424
continue
375-
376-
# we store the join future in case we are woken up by the user
377-
# after beginning the rebalance in the call to poll below.
378-
# This ensures that we do not mistakenly attempt to rejoin
379-
# before the pending rebalance has completed.
380-
if self.join_future is None:
381-
self.state = MemberState.REBALANCING
382-
future = self._send_join_group_request()
383-
384-
self.join_future = future # this should happen before adding callbacks
385-
386-
# handle join completion in the callback so that the
387-
# callback will be invoked even if the consumer is woken up
388-
# before finishing the rebalance
389-
future.add_callback(self._handle_join_success)
390-
391-
# we handle failures below after the request finishes.
392-
# If the join completes after having been woken up, the
393-
# exception is ignored and we will rejoin
394-
future.add_errback(self._handle_join_failure)
395-
396-
else:
397-
future = self.join_future
398-
399-
self._client.poll(future=future)
400-
401-
if future.failed():
402-
exception = future.exception
403-
if isinstance(exception, (Errors.UnknownMemberIdError,
404-
Errors.RebalanceInProgressError,
405-
Errors.IllegalGenerationError)):
406-
continue
407-
elif not future.retriable():
408-
raise exception # pylint: disable-msg=raising-bad-type
409-
time.sleep(self.config['retry_backoff_ms'] / 1000)
425+
elif not future.retriable():
426+
raise exception # pylint: disable-msg=raising-bad-type
427+
time.sleep(self.config['retry_backoff_ms'] / 1000)
410428

411429
def _send_join_group_request(self):
412430
"""Join the group and return the assignment for the next generation.
@@ -716,20 +734,27 @@ def request_rejoin(self):
716734
self.rejoin_needed = True
717735

718736
def _start_heartbeat_thread(self):
719-
if self._heartbeat_thread is None:
720-
log.info('Starting new heartbeat thread')
721-
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
722-
self._heartbeat_thread.daemon = True
723-
self._heartbeat_thread.start()
737+
with self._lock:
738+
if self._heartbeat_thread is None:
739+
log.info('Starting new heartbeat thread')
740+
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
741+
self._heartbeat_thread.daemon = True
742+
self._heartbeat_thread.start()
743+
744+
def _disable_heartbeat_thread(self):
745+
with self._lock:
746+
if self._heartbeat_thread is not None:
747+
self._heartbeat_thread.disable()
724748

725749
def _close_heartbeat_thread(self):
726-
if self._heartbeat_thread is not None:
727-
log.info('Stopping heartbeat thread')
728-
try:
729-
self._heartbeat_thread.close()
730-
except ReferenceError:
731-
pass
732-
self._heartbeat_thread = None
750+
with self._lock:
751+
if self._heartbeat_thread is not None:
752+
log.info('Stopping heartbeat thread')
753+
try:
754+
self._heartbeat_thread.close()
755+
except ReferenceError:
756+
pass
757+
self._heartbeat_thread = None
733758

734759
def __del__(self):
735760
self._close_heartbeat_thread()
@@ -892,12 +917,15 @@ def __init__(self, coordinator):
892917

893918
def enable(self):
894919
with self.coordinator._lock:
920+
log.debug('Enabling heartbeat thread')
895921
self.enabled = True
896922
self.coordinator.heartbeat.reset_timeouts()
897923
self.coordinator._lock.notify()
898924

899925
def disable(self):
900-
self.enabled = False
926+
with self.coordinator._lock:
927+
log.debug('Disabling heartbeat thread')
928+
self.enabled = False
901929

902930
def close(self):
903931
self.closed = True

0 commit comments

Comments
 (0)