Skip to content

KafkaAdminClient _send_request_to_node races with cluster metadata and can enter an infinite loop #2193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
hackaugusto opened this issue Jan 14, 2021 · 0 comments

Comments

@hackaugusto
Copy link

hackaugusto commented Jan 14, 2021

The problem is this loop:

while not self._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self._client.poll()

If node_id exits the cluster, the code above will be an infinite loop.

The code bellow reproduces the bug with these steps:

  • starts kafka-0 and kafka-1
  • instantiates KafkaAdminClient
  • stops kafka-1 but dot not update the KafkaAdminClient metadata yet
  • simulates kafka-1 being chosen by least_loaded_node() and do a call to _send_request_to_node
  • even though _send_request_to_node calls poll and that will eventually updates the metadata, the value of node_id never changes, so the method will block for ever in an infinite loop

It seems the steps above can happen with any of the methods in the class, this is just one example:

future = self._send_request_to_node(self._client.least_loaded_node(), request)

To use the script bellow the local ip address has to be used, e.g. python test.py <ip_address>

#!/bin/env python
import sys
from contextlib import contextmanager
import subprocess
import logging
from time import sleep
from kafka.errors import UnrecognizedBrokerVersion
from kafka.protocol.metadata import MetadataRequest
from kafka.client_async import KafkaClient
from kafka.admin.client import KafkaAdminClient

PORT_START = 2000
ZK_PORT = 2181


def start_zk():
    args = [
        "podman",
        "run",
        "--detach",
        "--tty",
        "--rm",
        "--publish",
        f"{ZK_PORT}:{ZK_PORT}",
        "--name",
        "zk",
        "--env",
        f"ZOOKEEPER_CLIENT_PORT={ZK_PORT}",
        "confluentinc/cp-zookeeper",
    ]
    subprocess.run(args)
    print("zk started")


def start_kafka(host_ip: str, num: int):
    port = PORT_START + num * 2
    port_host = port + 1

    args = [
        "podman",
        "run",
        "--detach",
        "--tty",
        "--rm",
        "--publish",
        f"{port}:{port}",
        "--publish",
        f"{port_host}:{port_host}",
        "--name",
        f"kafka-{num}",
        "--env",
        "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
        "--env",
        f"KAFKA_ZOOKEEPER_CONNECT={host_ip}:{ZK_PORT}",
        "--env",
        f"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:{port},PLAINTEXT_HOST://localhost:{port_host}",
        "--env",
        "KAFKA_AUTO_CREATE_TOPICS_ENABLE=false",
        "confluentinc/cp-kafka",
    ]
    subprocess.run(args)
    print(f"kafka-{num} started")


def stop_kafka(num: int):
    args = [
        "podman",
        "stop",
        f"kafka-{num}",
    ]
    subprocess.run(args)
    print(f"kafka-{num} stopped")


def stop_zk():
    args = [
        "podman",
        "stop",
        "zk",
    ]
    subprocess.run(args)
    print("zk stopped")


@contextmanager
def setup_env(host_ip):
    start_zk()
    start_kafka(host_ip, 0)
    sleep(2)  # make sure kafka-0 has the lowest id
    start_kafka(host_ip, 1)

    try:
        yield
    except:
        logging.getLogger().exception("An error occurred")
    finally:
        stop_kafka(1)
        stop_kafka(0)
        stop_zk()


def test(host_ip):
    logging.basicConfig()
    logging.getLogger().setLevel("DEBUG")

    port = PORT_START
    bootstrap = f"localhost:{port}"

    with setup_env(host_ip):
        while True:
            try:
                ka = KafkaAdminClient(bootstrap_servers=[bootstrap])
                kc = ka._client
                break
            except UnrecognizedBrokerVersion:
                print(f"Waiting broker to become available {bootstrap}")
                sleep(0.5)

        kc.poll()
        while len(kc.cluster.brokers()) != 2:
            print(f"Waiting metadata to have both brokers {kc.cluster.brokers()}")
            sleep(0.5)
            future = kc.cluster.request_update()
            kc.poll(future=future)

        # Simulate the second node being drawn in least_loaded_node
        second_node = max(broker.nodeId for broker in kc.cluster.brokers())

        # Make sure there are no connections open to kafka-1, otherwise the
        # code does not enter the loop
        assert second_node not in kc._conns

        stop_kafka(1)

        # this will block forever
        version = ka._matching_api_version(MetadataRequest)
        request = MetadataRequest[version]()
        future = ka._send_request_to_node(second_node, request)


if __name__ == "__main__":
    host_ip = sys.argv[1]
    test(host_ip)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants