Skip to content

Fix testing with external fixtures #2533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
from __future__ import absolute_import

import os
import uuid

import pytest

from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from test.testutil import env_kafka_version, random_string
from test.fixtures import KafkaFixture, ZookeeperFixture

@pytest.fixture(scope="module")
def zookeeper():
"""Return a Zookeeper fixture"""
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
yield ZookeeperFixture.instance(host=host, port=port, external=True)
else:
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()


@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
def kafka_broker(kafka_broker_factory, zookeeper):
"""Return a Kafka broker fixture"""
return kafka_broker_factory()[0]
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
return KafkaFixture.instance(0, zookeeper, host=host, port=port, external=True)
else:
return kafka_broker_factory()[0]


@pytest.fixture(scope="module")
Expand Down
108 changes: 37 additions & 71 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import uuid

import py
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
from kafka.vendor.six.moves import range

from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError
Expand Down Expand Up @@ -74,43 +73,6 @@ def __init__(self):
if not os.path.isdir(self.kafka_root):
raise FileNotFoundError(self.kafka_root)

@classmethod
def download_official_distribution(cls,
kafka_version=None,
scala_version=None,
output_dir=None):
if not kafka_version:
kafka_version = cls.kafka_version
if not scala_version:
scala_version = cls.scala_version
if not output_dir:
output_dir = os.path.join(cls.project_root, 'servers', 'dist')

distfile = 'kafka_%s-%s' % (scala_version, kafka_version,)
url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,)
output_file = os.path.join(output_dir, distfile + '.tgz')

if os.path.isfile(output_file):
log.info("Found file already on disk: %s", output_file)
return output_file

# New tarballs are .tgz, older ones are sometimes .tar.gz
try:
url = url_base + distfile + '.tgz'
log.info("Attempting to download %s", url)
response = urllib.request.urlopen(url)
except urllib.error.HTTPError:
log.exception("HTTP Error")
url = url_base + distfile + '.tar.gz'
log.info("Attempting to download %s", url)
response = urllib.request.urlopen(url)

log.info("Saving distribution file to %s", output_file)
with open(output_file, 'w') as output_file_fd:
output_file_fd.write(response.read())

return output_file

@classmethod
def test_resource(cls, filename):
path = os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename)
Expand Down Expand Up @@ -169,23 +131,18 @@ def dump_logs(self):

class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
if "ZOOKEEPER_URI" in os.environ:
parse = urlparse(os.environ["ZOOKEEPER_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
(host, port) = ("127.0.0.1", None)
fixture = cls(host, port)

def instance(cls, host=None, port=None, external=False):
if host is None:
host = "127.0.0.1"
fixture = cls(host, port, external=external)
fixture.open()
return fixture

def __init__(self, host, port, tmp_dir=None):
def __init__(self, host, port, external=False, tmp_dir=None):
super(ZookeeperFixture, self).__init__()
self.host = host
self.port = port

self.running = external
self.tmp_dir = tmp_dir

def kafka_run_class_env(self):
Expand All @@ -198,6 +155,8 @@ def out(self, message):
log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message)

def open(self):
if self.running:
return
if self.tmp_dir is None:
self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member
self.tmp_dir.ensure(dir=True)
Expand Down Expand Up @@ -262,34 +221,30 @@ class KafkaFixture(Fixture):

@classmethod
def instance(cls, broker_id, zookeeper, zk_chroot=None,
host=None, port=None,
transport='PLAINTEXT', replicas=1, partitions=2,
host=None, port=None, external=False,
transport='PLAINTEXT', replicas=1, partitions=4,
sasl_mechanism=None, auto_create_topic=True, tmp_dir=None):

if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
parse = urlparse(os.environ["KAFKA_URI"])
(host, port) = (parse.hostname, parse.port)
fixture = ExternalService(host, port)
else:
if host is None:
host = "localhost"
fixture = KafkaFixture(host, port, broker_id,
zookeeper, zk_chroot,
transport=transport,
replicas=replicas, partitions=partitions,
sasl_mechanism=sasl_mechanism,
auto_create_topic=auto_create_topic,
tmp_dir=tmp_dir)

fixture.open()
if host is None:
host = "localhost"
fixture = KafkaFixture(host, port, broker_id,
zookeeper, zk_chroot,
external=external,
transport=transport,
replicas=replicas, partitions=partitions,
sasl_mechanism=sasl_mechanism,
auto_create_topic=auto_create_topic,
tmp_dir=tmp_dir)

fixture.open()
return fixture

def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
replicas=1, partitions=2, transport='PLAINTEXT',
sasl_mechanism=None, auto_create_topic=True,
tmp_dir=None):
tmp_dir=None, external=False):
super(KafkaFixture, self).__init__()

self.host = host
Expand Down Expand Up @@ -321,9 +276,16 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot,
self.partitions = partitions

self.tmp_dir = tmp_dir
self.running = False
self.external = external

if self.external:
self.child = ExternalService(self.host, self.port)
(self._client,) = self.get_clients(1, client_id='_internal_client')
self.running = True
else:
self._client = None
self.running = False

self._client = None
self.sasl_config = ''
self.jaas_config = ''

Expand Down Expand Up @@ -416,6 +378,8 @@ def _create_zk_chroot(self):
self.out("Kafka chroot created in Zookeeper!")

def start(self):
if self.running:
return True
# Configure Kafka child process
properties = self.tmp_dir.join("kafka.properties")
jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf")
Expand Down Expand Up @@ -515,6 +479,8 @@ def __del__(self):
self.close()

def stop(self):
if self.external:
return
if not self.running:
self.out("Instance already stopped")
return
Expand Down
10 changes: 8 additions & 2 deletions test/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def open(self):
def close(self):
pass

def dump_logs(self):
pass

def wait_for(self, pattern, timeout=30):
pass

class SpawnedService(threading.Thread):
def __init__(self, args=None, env=None):
Expand All @@ -52,8 +57,8 @@ def __init__(self, args=None, env=None):
log.debug(" {key}={value}".format(key=key, value=value))

def _spawn(self):
if self.alive: return
if self.child and self.child.poll() is None: return
if self.alive or (self.child and self.child.poll() is None):
return

self.child = subprocess.Popen(
self.args,
Expand All @@ -76,6 +81,7 @@ def _despawn(self):
else:
self.child.kill()

# via threading.Thread
def run(self):
self._spawn()
while True:
Expand Down
Loading