Skip to content

Poll hangs forever if consumer was closed concurrently #2059

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
lukesolo opened this issue May 28, 2020 · 2 comments
Closed

Poll hangs forever if consumer was closed concurrently #2059

lukesolo opened this issue May 28, 2020 · 2 comments

Comments

@lukesolo
Copy link

lukesolo commented May 28, 2020

import threading
import time

from kafka import KafkaConsumer

consumer = KafkaConsumer('topic', bootstrap_servers='localhost:9092', group_id='group')


def poll():
    print('polling')
    consumer.poll(timeout_ms=2000, max_records=1)
    print('polled')


t = threading.Thread(target=poll)
t.start()
time.sleep(1)

print('closing')
consumer.close(autocommit=False)
print('closed')

t.join()

Poll function hangs forever if the consumer was closed concurrently.
version: 2.0.1

@tvoinarovskyi
Copy link
Collaborator

The consumer is strictly single-threaded. It will fail in many multithreaded scenarios. If you need to poll from different threads please either synchronize access or use several isolated Consumers.

@lukesolo
Copy link
Author

Thanks for information.
If consumer is single-threaded, how can I close it when poll function hangs forever after disconnect?

import logging
import subprocess
import threading
import time

from kafka import KafkaConsumer

logging.basicConfig(level='ERROR')


def start_kafka():
    subprocess.run(['start_kafka.sh'])


def stop_kafka():
    subprocess.run(['stop_kafka.sh'])


def poll():
    print('polling')
    consumer.poll(timeout_ms=2000, max_records=1)
    print('polled')


start_kafka()
consumer = KafkaConsumer('topic', bootstrap_servers='localhost:9092', group_id='group')

t = threading.Thread(target=poll)
t.start()
time.sleep(1)

stop_kafka()
t.join()

This code produces eternal

ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants