From dd388ecb0c9b1040b7f398e29ad350a916a54ae6 Mon Sep 17 00:00:00 2001 From: Laityned Date: Mon, 16 Oct 2023 11:17:14 +0200 Subject: [PATCH] Change loglevel of cancelled errors to info --- kafka/consumer/fetcher.py | 10 +++++++++- test/test_fetcher.py | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7ff9daf7b..0b5df4e9a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -125,7 +125,7 @@ def send_fetches(self): log.debug("Sending FetchRequest to node %s", node_id) future = self._client.send(node_id, request, wakeup=False) future.add_callback(self._handle_fetch_response, request, time.time()) - future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) + future.add_errback(self._handle_fetch_error, node_id) futures.append(future) self._fetch_futures.extend(futures) self._clean_done_fetch_futures() @@ -778,6 +778,14 @@ def _handle_fetch_response(self, request, send_time, response): self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms) self._sensors.fetch_latency.record((time.time() - send_time) * 1000) + def _handle_fetch_error(self, node_id, exception): + log.log( + logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR, + 'Fetch to node %s failed: %s', + node_id, + exception + ) + def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition fetch_offset = completed_fetch.fetched_offset diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 697f8be1f..f8311ac79 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -1,5 +1,6 @@ # pylint: skip-file from __future__ import absolute_import +import logging import pytest @@ -12,6 +13,7 @@ CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError ) from kafka.consumer.subscription_state import SubscriptionState +import kafka.errors as Errors from kafka.future import Future from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest, FetchResponse @@ -378,6 +380,22 @@ def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_part assert len(fetcher._completed_fetches) == num_partitions +@pytest.mark.parametrize(("exception", "log_level"), [ +( + Errors.Cancelled(), + logging.INFO +), +( + Errors.KafkaError(), + logging.ERROR +) +]) +def test__handle_fetch_error(fetcher, caplog, exception, log_level): + fetcher._handle_fetch_error(3, exception) + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == logging.getLevelName(log_level) + + def test__unpack_message_set(fetcher): fetcher.config['check_crcs'] = False tp = TopicPartition('foo', 0)