Skip to content

Consumer gets stuck when consuming messages with incremental fetch sessions enabled #2575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
yjh126yjh opened this issue Mar 28, 2025 · 1 comment · Fixed by #2576
Closed

Comments

@yjh126yjh
Copy link

yjh126yjh commented Mar 28, 2025

Environment

  • Kafka version: 2.8.0
  • kafka-python version: 2.1.3

Steps to Reproduce

  1. Create a consumer for a single-partition topic
  2. Call partitions_for_topic(topic) before starting consumption
  3. Seek to a specific offset
  4. Start consuming messages

Reproduction Code

import logging
import time
from kafka import KafkaConsumer
from kafka.consumer.group import TopicPartition

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s | %(levelname)s %(message)s',
    datefmt='%H:%M:%S'
)

bootstrap_servers = 'kafka-headless.kafka:9092'
topic = 'test_topic'
start_offset = 306105260
end_offset = 306189327

consumer = KafkaConsumer(
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    # max_partition_fetch_bytes=104857600,
    # enable_incremental_fetch_sessions=False
)
partition = TopicPartition(topic, 0)

# Force metadata update to trigger the bug. This is equivalent to calling consumer._fetch_all_topic_metadata() directly.
consumer.partitions_for_topic(topic)
# time.sleep(0.1)

consumer.assign([partition])
consumer.seek(partition, start_offset)

total_messages = 0

while True:
    messages = consumer.poll(timeout_ms=3000)
    if not messages:
        logging.info("No messages received")
        continue
    for tp, records in messages.items():
        for message in records:
            if message.offset >= end_offset:
                logging.info(f"Total messages: {total_messages}")
                consumer.close()
                exit(0)

            total_messages += 1

            if total_messages % 10000 == 0:
                logging.info(f"Processed message at offset: {message.offset}")

Expected Behavior

The consumer should continue consuming messages until reaching the end_offset (306189327).

Actual Behavior

The consumer's behavior is affected by the consumer.partitions_for_topic(topic) call, or more specifically, its internal call to _fetch_all_topic_metadata().
The consumer gets stuck at offset 306114680. Each poll() call returns empty after the 3-second timeout. The DEBUG log shows that the broker keeps returning data from offset 306105257 to 306114680. If I remove this line, the problem disappears.

Log

13:43:23 | DEBUG Added sensor with name connections-closed
13:43:23 | DEBUG Added sensor with name connections-created
13:43:23 | DEBUG Added sensor with name select-time
13:43:23 | DEBUG Added sensor with name io-time
13:43:23 | DEBUG Attempting to check version with node bootstrap-0
13:43:23 | DEBUG Initiating connection to node bootstrap-0 at kafka-headless.kafka:9092
13:43:23 | DEBUG Added sensor with name bytes-sent-received
13:43:23 | DEBUG Added sensor with name bytes-sent
13:43:23 | DEBUG Added sensor with name bytes-received
13:43:23 | DEBUG Added sensor with name request-latency
13:43:23 | DEBUG Added sensor with name throttle-time
13:43:23 | DEBUG Added sensor with name node-bootstrap-0.bytes-sent
13:43:23 | DEBUG Added sensor with name node-bootstrap-0.bytes-received
13:43:23 | DEBUG Added sensor with name node-bootstrap-0.latency
13:43:23 | DEBUG Added sensor with name node-bootstrap-0.throttle
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <disconnected> [unspecified None]>: creating new socket
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <disconnected> [IPv4 ('192.168.1.55', 9092)]>: setting socket option (6, 1, 1)
13:43:23 | INFO <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <connecting> [IPv4 ('192.168.1.55', 9092)]>: connecting to kafka-headless.kafka:9092 [('192.168.1.55', 9092) IPv4]
13:43:23 | DEBUG Timeouts: user 199.969292, metadata inf, idle connection inf, request inf
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <connecting> [IPv4 ('192.168.1.55', 9092)]>: established TCP connection
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <connecting> [IPv4 ('192.168.1.55', 9092)]>: checking broker Api Versions
13:43:23 | DEBUG Sending request ApiVersionsRequest_v4(client_software_name='kafka-python', client_software_version='2.1.3', _tagged_fields={})
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <checking_api_versions_send> [IPv4 ('192.168.1.55', 9092)]> Request 1 (timeout_ms 1600.0): ApiVersionsRequest_v4(client_software_name='kafka-python', client_software_version='2.1.3', _tagged_fields={})
13:43:23 | DEBUG Timeouts: user 199.975967, metadata inf, idle connection inf, request 1599.713326
13:43:23 | DEBUG Received correlation id: 1
13:43:23 | DEBUG Processing response ApiVersionsResponse_v4
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <checking_api_versions_recv> [IPv4 ('192.168.1.55', 9092)]> Response 1 (0.5891323089599609 ms): ApiVersionsResponse_v0(error_code=35, api_versions=[(api_key=18, min_version=0, max_version=3)])
13:43:23 | DEBUG Timeouts: user 199.982405, metadata inf, idle connection 539994.000000, request inf
13:43:23 | DEBUG Sending request ApiVersionsRequest_v3(client_software_name='kafka-python', client_software_version='2.1.3', _tagged_fields={})
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <checking_api_versions_send> [IPv4 ('192.168.1.55', 9092)]> Request 2 (timeout_ms 1600.0): ApiVersionsRequest_v3(client_software_name='kafka-python', client_software_version='2.1.3', _tagged_fields={})
13:43:23 | DEBUG Timeouts: user 199.982405, metadata inf, idle connection 539994.000000, request 1599.795341
13:43:23 | DEBUG Received correlation id: 2
13:43:23 | DEBUG Processing response ApiVersionsResponse_v3
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <checking_api_versions_recv> [IPv4 ('192.168.1.55', 9092)]> Response 2 (0.8835792541503906 ms): ApiVersionsResponse_v3(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=9, _tagged_fields={}), (api_key=1, min_version=0, max_version=12, _tagged_fields={}), (api_key=2, min_version=0, max_version=6, _tagged_fields={}), (api_key=3, min_version=0, max_version=11, _tagged_fields={}), (api_key=4, min_version=0, max_version=5, _tagged_fields={}), (api_key=5, min_version=0, max_version=3, _tagged_fields={}), (api_key=6, min_version=0, max_version=7, _tagged_fields={}), (api_key=7, min_version=0, max_version=3, _tagged_fields={}), (api_key=8, min_version=0, max_version=8, _tagged_fields={}), (api_key=9, min_version=0, max_version=7, _tagged_fields={}), (api_key=10, min_version=0, max_version=3, _tagged_fields={}), (api_key=11, min_version=0, max_version=7, _tagged_fields={}), (api_key=12, min_version=0, max_version=4, _tagged_fields={}), (api_key=13, min_version=0, max_version=4, _tagged_fields={}), (api_key=14, min_version=0, max_version=5, _tagged_fields={}), (api_key=15, min_version=0, max_version=5, _tagged_fields={}), (api_key=16, min_version=0, max_version=4, _tagged_fields={}), (api_key=17, min_version=0, max_version=1, _tagged_fields={}), (api_key=18, min_version=0, max_version=3, _tagged_fields={}), (api_key=19, min_version=0, max_version=7, _tagged_fields={}), (api_key=20, min_version=0, max_version=6, _tagged_fields={}), (api_key=21, min_version=0, max_version=2, _tagged_fields={}), (api_key=22, min_version=0, max_version=4, _tagged_fields={}), (api_key=23, min_version=0, max_version=4, _tagged_fields={}), (api_key=24, min_version=0, max_version=3, _tagged_fields={}), (api_key=25, min_version=0, max_version=3, _tagged_fields={}), (api_key=26, min_version=0, max_version=3, _tagged_fields={}), (api_key=27, min_version=0, max_version=1, _tagged_fields={}), (api_key=28, min_version=0, max_version=3, _tagged_fields={}), (api_key=29, min_version=0, max_version=2, _tagged_fields={}), (api_key=30, min_version=0, max_version=2, _tagged_fields={}), (api_key=31, min_version=0, max_version=2, _tagged_fields={}), (api_key=32, min_version=0, max_version=4, _tagged_fields={}), (api_key=33, min_version=0, max_version=2, _tagged_fields={}), (api_key=34, min_version=0, max_version=2, _tagged_fields={}), (api_key=35, min_version=0, max_version=2, _tagged_fields={}), (api_key=36, min_version=0, max_version=2, _tagged_fields={}), (api_key=37, min_version=0, max_version=3, _tagged_fields={}), (api_key=38, min_version=0, max_version=2, _tagged_fields={}), (api_key=39, min_version=0, max_version=2, _tagged_fields={}), (api_key=40, min_version=0, max_version=2, _tagged_fields={}), (api_key=41, min_version=0, max_version=2, _tagged_fields={}), (api_key=42, min_version=0, max_version=2, _tagged_fields={}), (api_key=43, min_version=0, max_version=2, _tagged_fields={}), (api_key=44, min_version=0, max_version=1, _tagged_fields={}), (api_key=45, min_version=0, max_version=0, _tagged_fields={}), (api_key=46, min_version=0, max_version=0, _tagged_fields={}), (api_key=47, min_version=0, max_version=0, _tagged_fields={}), (api_key=48, min_version=0, max_version=1, _tagged_fields={}), (api_key=49, min_version=0, max_version=1, _tagged_fields={}), (api_key=50, min_version=0, max_version=0, _tagged_fields={}), (api_key=51, min_version=0, max_version=0, _tagged_fields={}), (api_key=56, min_version=0, max_version=0, _tagged_fields={}), (api_key=57, min_version=0, max_version=0, _tagged_fields={}), (api_key=60, min_version=0, max_version=0, _tagged_fields={}), (api_key=61, min_version=0, max_version=0, _tagged_fields={})], throttle_time_ms=0, _tagged_fields={1: b'\x00\x00\x00\x00\x00\x00\x00\x00'})
13:43:23 | INFO Broker version identified as 2.6
13:43:23 | INFO <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <checking_api_versions_recv> [IPv4 ('192.168.1.55', 9092)]>: Connection complete.
13:43:23 | DEBUG Node bootstrap-0 connected
13:43:23 | DEBUG Added sensor with name bytes-fetched
13:43:23 | DEBUG Added sensor with name records-fetched
13:43:23 | DEBUG Added sensor with name fetch-latency
13:43:23 | DEBUG Added sensor with name records-lag
13:43:23 | DEBUG Added sensor with name heartbeat-latency
13:43:23 | DEBUG Added sensor with name join-latency
13:43:23 | DEBUG Added sensor with name sync-latency
13:43:23 | DEBUG Added sensor with name commit-latency
13:43:23 | DEBUG Sending metadata request MetadataRequest_v7(topics=NULL, allow_auto_topic_creation=True) to node bootstrap-0
13:43:23 | DEBUG Sending request MetadataRequest_v7(topics=NULL, allow_auto_topic_creation=True)
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Request 3 (timeout_ms 305000): MetadataRequest_v7(topics=NULL, allow_auto_topic_creation=True)
13:43:23 | DEBUG Timeouts: user 304999.702215, metadata 305000.000000, idle connection 539991.000000, request 304999.981403
13:43:23 | DEBUG Timeouts: user 304999.510050, metadata 305000.000000, idle connection 539991.000000, request 304999.791622
13:43:23 | DEBUG Received correlation id: 3
13:43:23 | DEBUG Processing response MetadataResponse_v7
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Response 3 (1.697540283203125 ms): MetadataResponse_v7(throttle_time_ms=0, brokers=[(node_id=0, host='kafka-headless.kafka', port=9092, rack=None)], cluster_id='vk3qwR3XRuONV2fLzyt99Q', controller_id=0, topics=[(error_code=0, topic='bukong_frame_result', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[])]), (error_code=0, topic='HitEvent', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[])]), (error_code=0, topic='__consumer_offsets', is_internal=True, partitions=[(error_code=0, partition=0, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=10, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=20, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=40, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=30, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=9, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=39, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=11, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=31, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=13, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=18, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=22, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=8, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=32, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=43, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=29, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=34, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=1, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=6, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=41, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=27, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=48, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=5, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=15, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=35, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=25, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=46, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=26, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=36, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=44, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=16, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=37, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=17, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=45, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=3, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=4, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=24, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=38, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=33, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=23, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=28, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=2, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=12, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=19, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=14, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=47, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=49, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=42, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=7, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[]), (error_code=0, partition=21, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[])]), (error_code=0, topic='train_frame_result', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[])]), (error_code=0, topic='test_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[])])])
13:43:23 | DEBUG Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 5, groups: 0)
13:43:23 | DEBUG Subscribed to partition(s): [TopicPartition(topic='test_topic', partition=0)]
13:43:23 | DEBUG Seeking to offset 306105260 for partition TopicPartition(topic='test_topic', partition=0)
13:43:23 | DEBUG Adding fetch request for partition TopicPartition(topic='test_topic', partition=0) at offset 306105260
13:43:23 | DEBUG Built full fetch <kafka.consumer.fetcher.FetchMetadata object at 0x7fe67c145570> for node 0 with 1 partition(s).
13:43:23 | DEBUG Initiating connection to node 0 at kafka-headless.kafka:9092
13:43:23 | DEBUG Added sensor with name node-0.bytes-sent
13:43:23 | DEBUG Added sensor with name node-0.bytes-received
13:43:23 | DEBUG Added sensor with name node-0.latency
13:43:23 | DEBUG Added sensor with name node-0.throttle
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <disconnected> [unspecified None]>: creating new socket
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <disconnected> [IPv4 ('192.168.1.55', 9092)]>: setting socket option (6, 1, 1)
13:43:23 | INFO <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connecting> [IPv4 ('192.168.1.55', 9092)]>: connecting to kafka-headless.kafka:9092 [('192.168.1.55', 9092) IPv4]
13:43:23 | DEBUG Timeouts: user 2999.025345, metadata 98.763916, idle connection 539987.000000, request inf
13:43:23 | DEBUG Adding fetch request for partition TopicPartition(topic='test_topic', partition=0) at offset 306105260
13:43:23 | DEBUG Built full fetch <kafka.consumer.fetcher.FetchMetadata object at 0x7fe67c145570> for node 0 with 1 partition(s).
13:43:23 | DEBUG Timeouts: user 2998.739243, metadata 98.478760, idle connection 539987.000000, request inf
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connecting> [IPv4 ('192.168.1.55', 9092)]>: established TCP connection
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connecting> [IPv4 ('192.168.1.55', 9092)]>: checking broker Api Versions
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <checking_api_versions_send> [IPv4 ('192.168.1.55', 9092)]>: Using pre-configured api_version (2, 6) for ApiVersions
13:43:23 | INFO <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <checking_api_versions_send> [IPv4 ('192.168.1.55', 9092)]>: Connection complete.
13:43:23 | DEBUG Node 0 connected
13:43:23 | INFO <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]>: Closing connection. 
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=bootstrap-0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]>: reconnect backoff 0.04152804945611029 after 1 failures
13:43:23 | DEBUG Adding fetch request for partition TopicPartition(topic='test_topic', partition=0) at offset 306105260
13:43:23 | DEBUG Built full fetch <kafka.consumer.fetcher.FetchMetadata object at 0x7fe67c145570> for node 0 with 1 partition(s).
13:43:23 | DEBUG Sending FetchRequest to node 0
13:43:23 | DEBUG Sending request FetchRequest_v10(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, session_id=0, session_epoch=0, topics=[(topic='test_topic', partitions=[(partition=0, current_leader_epoch=-1, fetch_offset=306105260, log_start_offset=-1, max_bytes=1048576)])], forgotten_topics_data=[])
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Request 1 (timeout_ms 305000): FetchRequest_v10(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, session_id=0, session_epoch=0, topics=[(topic='test_topic', partitions=[(partition=0, current_leader_epoch=-1, fetch_offset=306105260, log_start_offset=-1, max_bytes=1048576)])], forgotten_topics_data=[])
13:43:23 | DEBUG Timeouts: user 0.000000, metadata 97.528320, idle connection 539986.000000, request 304999.972105
13:43:23 | DEBUG Timeouts: user 2997.584105, metadata 97.324463, idle connection 539986.000000, request 304999.770641
13:43:23 | DEBUG Timeouts: user 2996.448755, metadata 96.188965, idle connection 539985.000000, request 304998.635530
13:43:23 | DEBUG Timeouts: user 2996.097565, metadata 95.837646, idle connection 539984.000000, request 304998.284578
13:43:23 | DEBUG Timeouts: user 2995.845079, metadata 95.585449, idle connection 539984.000000, request 304998.033285
13:43:23 | DEBUG Timeouts: user 2995.561838, metadata 95.302002, idle connection 539984.000000, request 304997.749329
13:43:23 | DEBUG Timeouts: user 2994.659901, metadata 94.400146, idle connection 539983.000000, request 304996.846914
13:43:23 | DEBUG Timeouts: user 2993.324041, metadata 93.063965, idle connection 539981.000000, request 304995.511293
13:43:23 | DEBUG Received correlation id: 1
13:43:23 | DEBUG Processing response FetchResponse_v10
13:43:23 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Response 1 (6.2007904052734375 ms): FetchResponse_v10(throttle_time_ms=0, error_code=0, session_id=237120174, topics=[(topics='test_topic', partitions=[(partition=0, error_code=0, highwater_offset=314091112, last_stable_offset=314091112, log_start_offset=294308917, aborted_transactions=NULL, records=b'\x00\x00\x00\x00\x12>\xcb\xa9\x00\x00\x00P\x00\x00\x00\x00\x02\xfe-Z\xbc\x00\x00\x00\x00\x00\x03\x00\x00\x01\x95\xda\x9aJ\x83\x00\x00\x01\x95\xda\x9aL5\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x04\x0c\x00\x00\x00\x00\x00\x00\x0e\x00\x98\x02\x02\x00\x00\x00\x0e\x00\x9a\x04\x04\x00\x00\x00\x0e\x00\xe4\x06\x06\x00\x00\x00\x00\x00\x00\x00\x12>\xcb\xad...')])])
13:43:23 | DEBUG Node 0 sent a full fetch response that created a new incremental fetch session 237120174 with 1 response partitions
13:43:23 | DEBUG Preparing to read 1048576 bytes of data for partition TopicPartition(topic='test_topic', partition=0) with offset 306105260
13:43:23 | DEBUG Returning fetched records at offset 306105260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:23 | DEBUG Skipping message offset: 306105257 (expecting 306105260)
13:43:23 | DEBUG Skipping message offset: 306105258 (expecting 306105260)
13:43:23 | DEBUG Skipping message offset: 306105259 (expecting 306105260)
13:43:23 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306105760 (leader epoch 0)
13:43:23 | DEBUG Returning fetched records at offset 306105760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:23 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306106260 (leader epoch 0)
13:43:23 | DEBUG Returning fetched records at offset 306106260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306106760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306106760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306107260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306107260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306107760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306107760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306108260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306108260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306108760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306108760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306109260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306109260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306109760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306109760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306110260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306110260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306110760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306110760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306111260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306111260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306111760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306111760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306112260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306112260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306112760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306112760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306113260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306113260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306113760 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306113760 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306114260 (leader epoch 0)
13:43:24 | DEBUG Returning fetched records at offset 306114260 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Added sensor with name topic.test_topic.bytes-fetched
13:43:24 | DEBUG Added sensor with name topic.test_topic.records-fetched
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306114681 (leader epoch 0)
13:43:24 | DEBUG Adding fetch request for partition TopicPartition(topic='test_topic', partition=0) at offset 306114681
13:43:24 | DEBUG Building incremental partitions from next: {TopicPartition(topic='test_topic', partition=0)}, previous: {TopicPartition(topic='test_topic', partition=0)}
13:43:24 | DEBUG Built incremental fetch <kafka.consumer.fetcher.FetchMetadata object at 0x7fe67c9835e0> for node 0. Added set(), altered {TopicPartition(topic='test_topic', partition=0)}, removed set() out of odict_keys([TopicPartition(topic='test_topic', partition=0)])
13:43:24 | DEBUG Adding fetch request for partition TopicPartition(topic='test_topic', partition=0) at offset 306114681
13:43:24 | DEBUG Building incremental partitions from next: {TopicPartition(topic='test_topic', partition=0)}, previous: {TopicPartition(topic='test_topic', partition=0)}
13:43:24 | DEBUG Built incremental fetch <kafka.consumer.fetcher.FetchMetadata object at 0x7fe67c9835e0> for node 0. Added set(), altered set(), removed set() out of odict_keys([TopicPartition(topic='test_topic', partition=0)])
13:43:24 | DEBUG Sending metadata request MetadataRequest_v7(topics=['test_topic'], allow_auto_topic_creation=True) to node 0
13:43:24 | DEBUG Sending request MetadataRequest_v7(topics=['test_topic'], allow_auto_topic_creation=True)
13:43:24 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Request 2 (timeout_ms 305000): MetadataRequest_v7(topics=['test_topic'], allow_auto_topic_creation=True)
13:43:24 | DEBUG Timeouts: user 2999.427080, metadata 305000.000000, idle connection 539583.000000, request 304999.972343
13:43:24 | DEBUG Timeouts: user 2999.133110, metadata 305000.000000, idle connection 539583.000000, request 304999.684811
13:43:24 | DEBUG Received correlation id: 2
13:43:24 | DEBUG Processing response MetadataResponse_v7
13:43:24 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Response 2 (0.9360313415527344 ms): MetadataResponse_v7(throttle_time_ms=0, brokers=[(node_id=0, host='kafka-headless.kafka', port=9092, rack=None)], cluster_id='vk3qwR3XRuONV2fLzyt99Q', controller_id=0, topics=[(error_code=0, topic='test_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, leader_epoch=0, replicas=[0], isr=[0], offline_replicas=[])])])
13:43:24 | DEBUG Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
13:43:24 | DEBUG Adding fetch request for partition TopicPartition(topic='test_topic', partition=0) at offset 306114681
13:43:24 | DEBUG Building incremental partitions from next: {TopicPartition(topic='test_topic', partition=0)}, previous: {TopicPartition(topic='test_topic', partition=0)}
13:43:24 | DEBUG Built incremental fetch <kafka.consumer.fetcher.FetchMetadata object at 0x7fe67c9835e0> for node 0. Added set(), altered set(), removed set() out of odict_keys([TopicPartition(topic='test_topic', partition=0)])
13:43:24 | DEBUG Sending FetchRequest to node 0
13:43:24 | DEBUG Sending request FetchRequest_v10(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, session_id=237120174, session_epoch=1, topics=[], forgotten_topics_data=[])
13:43:24 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Request 3 (timeout_ms 305000): FetchRequest_v10(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, session_id=237120174, session_epoch=1, topics=[], forgotten_topics_data=[])
13:43:24 | DEBUG Timeouts: user 0.000000, metadata 299999.457031, idle connection 539581.000000, request 304999.968052
13:43:24 | DEBUG Timeouts: user 2997.504473, metadata 299999.253174, idle connection 539581.000000, request 304999.767780
13:43:24 | DEBUG Timeouts: user 2996.280193, metadata 299998.029053, idle connection 539580.000000, request 304998.546600
13:43:24 | DEBUG Timeouts: user 2996.028423, metadata 299997.777344, idle connection 539580.000000, request 304998.295307
13:43:24 | DEBUG Timeouts: user 2995.790243, metadata 299997.539062, idle connection 539579.000000, request 304998.056412
13:43:24 | DEBUG Timeouts: user 2995.472908, metadata 299997.222412, idle connection 539579.000000, request 304997.740269
13:43:24 | DEBUG Timeouts: user 2995.191097, metadata 299996.940918, idle connection 539579.000000, request 304997.458696
13:43:24 | DEBUG Timeouts: user 2994.910240, metadata 299996.659424, idle connection 539579.000000, request 304997.177362
13:43:24 | DEBUG Timeouts: user 2994.642735, metadata 299996.392822, idle connection 539578.000000, request 304996.910095
13:43:24 | DEBUG Timeouts: user 2994.364262, metadata 299996.113525, idle connection 539578.000000, request 304996.631384
13:43:24 | DEBUG Timeouts: user 2994.100571, metadata 299995.849609, idle connection 539578.000000, request 304996.367216
13:43:24 | DEBUG Received correlation id: 3
13:43:24 | DEBUG Processing response FetchResponse_v10
13:43:24 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Response 3 (5.171298980712891 ms): FetchResponse_v10(throttle_time_ms=0, error_code=0, session_id=237120174, topics=[(topics='test_topic', partitions=[(partition=0, error_code=0, highwater_offset=314091176, last_stable_offset=314091176, log_start_offset=294308917, aborted_transactions=NULL, records=b'\x00\x00\x00\x00\x12\xcb\xad...')])])
13:43:24 | DEBUG Node 0 sent an incremental fetch response for session 237120174 with 1 response partitions (0 implied)
13:43:24 | DEBUG Preparing to read 1048576 bytes of data for partition TopicPartition(topic='test_topic', partition=0) with offset 306114681
13:43:24 | DEBUG Returning fetched records at offset 306114681 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Skipping message offset: 306105257 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105258 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105259 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105260 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105261 (expecting 306114681)

......

13:43:24 | DEBUG Skipping message offset: 306114676 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306114677 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306114678 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306114679 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306114680 (expecting 306114681)
13:43:24 | DEBUG Updating fetch position for assigned partition TopicPartition(topic='test_topic', partition=0) to 306114681 (leader epoch 0)
13:43:24 | DEBUG Adding fetch request for partition TopicPartition(topic='test_topic', partition=0) at offset 306114681
13:43:24 | DEBUG Building incremental partitions from next: {TopicPartition(topic='test_topic', partition=0)}, previous: {TopicPartition(topic='test_topic', partition=0)}
13:43:24 | DEBUG Built incremental fetch <kafka.consumer.fetcher.FetchMetadata object at 0x7fe67c9b0220> for node 0. Added set(), altered set(), removed set() out of odict_keys([TopicPartition(topic='test_topic', partition=0)])
13:43:24 | DEBUG Sending FetchRequest to node 0
13:43:24 | DEBUG Sending request FetchRequest_v10(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, session_id=237120174, session_epoch=2, topics=[], forgotten_topics_data=[])
13:43:24 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Request 4 (timeout_ms 305000): FetchRequest_v10(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, session_id=237120174, session_epoch=2, topics=[], forgotten_topics_data=[])
13:43:24 | DEBUG Timeouts: user 0.000000, metadata 299356.477295, idle connection 538938.000000, request 304999.770880
13:43:24 | DEBUG Timeouts: user 2354.520559, metadata 299356.268066, idle connection 538938.000000, request 304999.568462
13:43:24 | DEBUG Timeouts: user 2353.551626, metadata 299355.301025, idle connection 538937.000000, request 304998.601675
13:43:24 | DEBUG Timeouts: user 2353.361368, metadata 299355.111084, idle connection 538937.000000, request 304998.412371
13:43:24 | DEBUG Timeouts: user 2353.099346, metadata 299354.848633, idle connection 538937.000000, request 304998.150110
13:43:24 | DEBUG Timeouts: user 2352.871895, metadata 299354.620850, idle connection 538937.000000, request 304997.922421
13:43:24 | DEBUG Timeouts: user 2352.630854, metadata 299354.380615, idle connection 538936.000000, request 304997.680664
13:43:24 | DEBUG Timeouts: user 2352.317333, metadata 299354.065918, idle connection 538936.000000, request 304997.364521
13:43:24 | DEBUG Timeouts: user 2351.977110, metadata 299353.725586, idle connection 538936.000000, request 304997.024059
13:43:24 | DEBUG Timeouts: user 2351.496458, metadata 299353.245605, idle connection 538935.000000, request 304996.546030
13:43:24 | DEBUG Timeouts: user 2351.246119, metadata 299352.995605, idle connection 538935.000000, request 304996.296644
13:43:24 | DEBUG Timeouts: user 2350.994587, metadata 299352.743408, idle connection 538935.000000, request 304996.045113
13:43:24 | DEBUG Timeouts: user 2350.752831, metadata 299352.502197, idle connection 538934.000000, request 304995.803356
13:43:24 | DEBUG Received correlation id: 4
13:43:24 | DEBUG Processing response FetchResponse_v10
13:43:24 | DEBUG <BrokerConnection client_id=kafka-python-2.1.3, node_id=0 host=kafka-headless.kafka:9092 <connected> [IPv4 ('192.168.1.55', 9092)]> Response 4 (5.615949630737305 ms): FetchResponse_v10(throttle_time_ms=0, error_code=0, session_id=237120174, topics=[(topics='test_topic', partitions=[(partition=0, error_code=0, highwater_offset=314091287, last_stable_offset=314091287, log_start_offset=294308917, aborted_transactions=NULL, records=b'\x00\x00\x00\x00\x12>>\xcb\xad...')])])
13:43:24 | DEBUG Node 0 sent an incremental fetch response for session 237120174 with 1 response partitions (0 implied)
13:43:24 | DEBUG Preparing to read 1048576 bytes of data for partition TopicPartition(topic='test_topic', partition=0) with offset 306114681
13:43:24 | DEBUG Returning fetched records at offset 306114681 for assigned partition TopicPartition(topic='test_topic', partition=0)
13:43:24 | DEBUG Skipping message offset: 306105257 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105258 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105259 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105260 (expecting 306114681)
13:43:24 | DEBUG Skipping message offset: 306105261 (expecting 306114681)
......

FYI, I found that there are several workarounds to fix this issue in my case:

  1. Increase max_partition_fetch_bytes. If I set it to 10485760, the consumer will consume messages until offset 306184436 before getting stuck. If I further increase this value to 104857600, the consumer will be able to consume all the data successfully.
  2. Set enable_incremental_fetch_sessions=False
  3. Run time.sleep(0.1) immediately after calling consumer.partitions_for_topic(topic)

Potentially Related Issues

#1571
#1701

@dpkp
Copy link
Owner

dpkp commented Mar 28, 2025

Thanks! Logs suggest this may be a bug in incremental fetch session where we fail / dont send a topic/partition update and then forget about it. So the broker's incremental session state is out of sync with the client, and that causes broker to return the same offsets again and then the client skips them. We need to handle this. I think in this case the forced in-flight metadata request is what causes us to get out of sync (because we update the client session but then do not send to broker because we're still waiting on metadata request to complete).

Best workaround I think is enable_incremental_fetch_sessions=False

@yjh126yjh yjh126yjh changed the title Consumer gets stuck when consuming messages Consumer gets stuck when consuming messages with incremental fetch sessions enabled Mar 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants