diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 5b4752bf8..4c1b38644 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -54,7 +54,7 @@ def __init__(self, client, subscription, metrics, **configs): auto_commit_interval_ms (int): milliseconds between automatic offset commits, if enable_auto_commit is True. Default: 5000. default_offset_commit_callback (callable): called as - callback(offsets, exception) response will be either an Exception + callback(offsets, response) response will be either an Exception or None. This callback can be used to trigger custom actions when a commit request completes. assignors (list): List of objects to use to distribute partition @@ -453,8 +453,8 @@ def close(self, autocommit=True, timeout_ms=None): def _invoke_completed_offset_commit_callbacks(self): while self.completed_offset_commits: - callback, offsets, exception = self.completed_offset_commits.popleft() - callback(offsets, exception) + callback, offsets, res_or_exc = self.completed_offset_commits.popleft() + callback(offsets, res_or_exc) def commit_offsets_async(self, offsets, callback=None): """Commit specific offsets asynchronously. @@ -859,20 +859,19 @@ def _handle_offset_fetch_response(self, future, response): " %s", self.group_id, tp) future.success(offsets) - def _default_offset_commit_callback(self, offsets, exception): - if exception is not None: - log.error("Offset commit failed: %s", exception) - - def _commit_offsets_async_on_complete(self, offsets, exception): - if exception is not None: + def _default_offset_commit_callback(self, offsets, res_or_exc): + if isinstance(res_or_exc, Exception): log.warning("Auto offset commit failed for group %s: %s", - self.group_id, exception) - if getattr(exception, 'retriable', False): - self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) + self.group_id, res_or_exc) else: log.debug("Completed autocommit of offsets %s for group %s", offsets, self.group_id) + def _commit_offsets_async_on_complete(self, offsets, res_or_exc): + if isinstance(res_or_exc, Exception) and getattr(res_or_exc, 'retriable', False): + self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) + self.config['default_offset_commit_callback'](offsets, res_or_exc) + def _maybe_auto_commit_offsets_async(self): if self.config['enable_auto_commit']: if self.coordinator_unknown():