Skip to content

Help: KafkaConsumer iterator ignores consumer_timeout_ms if entire cluster is down #1322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
KunTjz opened this issue Dec 9, 2017 · 5 comments · Fixed by #2526
Closed

Help: KafkaConsumer iterator ignores consumer_timeout_ms if entire cluster is down #1322

KunTjz opened this issue Dec 9, 2017 · 5 comments · Fixed by #2526

Comments

@KunTjz
Copy link

KunTjz commented Dec 9, 2017

Hi!
Forgive my poor English, but I need your help. @dpkp
While stress testing Kafka, I have found that comsumer can block forever when all brokers down. After all brokers down, comsumer will fall into a loop to getting metadata from broker. This loop will never break util one or more brokers restart.
Is there a way that consumer can break this loop? I do not want block here, I want to do some other things when brokers down.

my code is like this:

consumer = KafkaConsumer(bootstrap_servers=['xxxxx','xxxxx'],
                                              group_id="test1",
                                              consumer_timeout_ms= 2000)
partition = TopicPartition("topic1", 0)
consumer.assign([partition])
consumer.seek_to_end()
for message in consumer: # it will block here when all brokers down
    print msg

After consumer block, I got this when keyboardInterrupt:
image

other infos:
kafka_python-1.3.2
python2.6.3
kafka_2.11-0.10.2.1

@dpkp dpkp changed the title comsumer block forever KafkaConsumer iterator ignores consumer_timeout_ms if entire cluster is down Dec 12, 2017
@KunTjz
Copy link
Author

KunTjz commented Dec 17, 2017

I have also found that kafkaConsumer iterator ignores consumer_timeout_ms if consumer assigns to a non exist topic. It seems to the same question.
Is this a bug? Or my configuration is wrong?
@dpkp thanks a lot

@stigok
Copy link

stigok commented Jan 30, 2018

I experience the same thing. And the loop makes CPU go to 100%.

@dpkp
Copy link
Owner

dpkp commented Mar 1, 2018

@RechardTheGrey I can't reproduce this with the latest release. Have you tried upgrading?

@dpkp dpkp changed the title KafkaConsumer iterator ignores consumer_timeout_ms if entire cluster is down Help: KafkaConsumer iterator ignores consumer_timeout_ms if entire cluster is down Mar 1, 2018
@JamesMackerel
Copy link

JamesMackerel commented Mar 8, 2018

I am experiencing the same problem, and here is the step to reproduce:

First, save this piece of code to test_error.py:

from kafka import KafkaProducer, KafkaConsumer
import time

config = {
    'bootstrap_servers': 'localhost:9092',
    'group_id': 'test',
    'consumer_timeout_ms': 1000
}

consumer = KafkaConsumer('test-consumer', **config)

while True:
    for message in consumer:
        print(message)
    
    print('no message')

Second, start a kafka server, create a topic named test, and run the test_error.py, we should see there are "no message" messages rolling in the terminal every 1 second.

Third, kill the kafka server. Turn back to your terminal, you will see there is no more no message output.


So I followed the code of kafka-python, and finally I stepped into here. I found the fact that the only condition to break the infinite loop is to have the future variable set to None or change the future.is_done to True, but there is not any code to modify the future variable.

Maybe the call stack would help you:

poll (/home/jm/tmp/test/kafka/client_async.py:544)
ensure_coordinator_ready (/home/jm/tmp/test/kafka/coordinator/base.py:260)
poll (/home/jm/tmp/test/kafka/coordinator/consumer.py:262)
_message_generator (/home/jm/tmp/test/kafka/consumer/group.py:1053)
__next__ (/home/jm/tmp/test/kafka/consumer/group.py:1119)
<module> (/home/jm/tmp/test/test_error.py:23)

And thank you for this great opensource project, you are doing a great job.

@JamesMackerel
Copy link

After testing, I found a surprise. It will block, until the cluster is back.
But I still want to know if there is any way to know if the cluster is down in consumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants