Skip to content

Update kafka broker test matrix; test against 3.9.0 #2486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ jobs:
- "0.11.0.3"
- "1.1.1"
- "2.4.0"
- "2.5.0"
- "2.6.0"
- "2.8.2"
- "3.0.2"
- "3.5.2"
- "3.9.0"
python:
- "3.12"
include:
Expand All @@ -41,13 +43,13 @@ jobs:
# kafka: "2.6.0"
# experimental: true
- python: "3.8"
kafka: "2.6.0"
kafka: "3.9.0"
- python: "3.9"
kafka: "2.6.0"
kafka: "3.9.0"
- python: "3.10"
kafka: "2.6.0"
kafka: "3.9.0"
- python: "3.11"
kafka: "2.6.0"
kafka: "3.9.0"

steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
# Make sure to update consumer_integration test check when adding newer versions.
((2, 6, 0), DescribeClientQuotasRequest[0]),
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
Expand Down
3 changes: 3 additions & 0 deletions servers/resources/default/sasl_command.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
security.protocol={transport}
sasl.mechanism={sasl_mechanism}
sasl.jaas.config={jaas_config}
92 changes: 51 additions & 41 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ def test_resource(cls, filename):
return path
return os.path.join(cls.project_root, "servers", "resources", "default", filename)

@classmethod
def run_script(cls, script, *args):
result = [os.path.join(cls.kafka_root, 'bin', script)]
result.extend([str(arg) for arg in args])
return result

@classmethod
def kafka_run_class_args(cls, *args):
result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
Expand Down Expand Up @@ -202,6 +208,7 @@ def open(self):
# Configure Zookeeper child process
template = self.test_resource("zookeeper.properties")
properties = self.tmp_dir.join("zookeeper.properties")
# Consider replacing w/ run_script('zookeper-server-start.sh', ...)
args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties.strpath)
env = self.kafka_run_class_env()
Expand Down Expand Up @@ -334,32 +341,30 @@ def _jaas_config(self):

elif self.sasl_mechanism == 'PLAIN':
jaas_config = (
'org.apache.kafka.common.security.plain.PlainLoginModule required\n'
' username="{user}" password="{password}" user_{user}="{password}";\n'
'org.apache.kafka.common.security.plain.PlainLoginModule required'
' username="{user}" password="{password}" user_{user}="{password}";\n'
)
elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"):
jaas_config = (
'org.apache.kafka.common.security.scram.ScramLoginModule required\n'
' username="{user}" password="{password}";\n'
'org.apache.kafka.common.security.scram.ScramLoginModule required'
' username="{user}" password="{password}";\n'
)
else:
raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism))
return jaas_config.format(user=self.broker_user, password=self.broker_password)

def _add_scram_user(self):
self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user))
args = self.kafka_run_class_args(
"kafka.admin.ConfigCommand",
"--zookeeper",
"%s:%d/%s" % (self.zookeeper.host,
self.zookeeper.port,
self.zk_chroot),
"--alter",
"--entity-type", "users",
"--entity-name", self.broker_user,
"--add-config",
"{}=[password={}]".format(self.sasl_mechanism, self.broker_password),
)
args = self.run_script('kafka-configs.sh',
'--zookeeper',
'%s:%d/%s' % (self.zookeeper.host,
self.zookeeper.port,
self.zk_chroot),
'--alter',
'--entity-type', 'users',
'--entity-name', self.broker_user,
'--add-config',
'{}=[password={}]'.format(self.sasl_mechanism, self.broker_password))
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

Expand Down Expand Up @@ -390,13 +395,12 @@ def out(self, message):

def _create_zk_chroot(self):
self.out("Creating Zookeeper chroot node...")
args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
"-server",
"%s:%d" % (self.zookeeper.host,
self.zookeeper.port),
"create",
"/%s" % (self.zk_chroot,),
"kafka-python")
args = self.run_script('zookeeper-shell.sh',
'%s:%d' % (self.zookeeper.host,
self.zookeeper.port),
'create',
'/%s' % (self.zk_chroot,),
'kafka-python')
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

Expand All @@ -416,6 +420,7 @@ def start(self):
properties_template = self.test_resource("kafka.properties")
jaas_conf_template = self.test_resource("kafka_server_jaas.conf")

# Consider replacing w/ run_script('kafka-server-start.sh', ...)
args = self.kafka_run_class_args("kafka.Kafka", properties.strpath)
env = self.kafka_run_class_env()
if self.sasl_enabled:
Expand Down Expand Up @@ -590,17 +595,15 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa
raise errors.for_code(error_code)

def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
args = self.kafka_run_class_args('kafka.admin.TopicCommand',
'--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
self.zookeeper.port,
self.zk_chroot),
'--create',
'--topic', topic_name,
'--partitions', self.partitions \
if num_partitions is None else num_partitions,
'--replication-factor', self.replicas \
if replication_factor is None \
else replication_factor)
args = self.run_script('kafka-topics.sh',
'--create',
'--topic', topic_name,
'--partitions', self.partitions \
if num_partitions is None else num_partitions,
'--replication-factor', self.replicas \
if replication_factor is None \
else replication_factor,
*self._cli_connect_args())
if env_kafka_version() >= (0, 10):
args.append('--if-not-exists')
env = self.kafka_run_class_env()
Expand All @@ -613,16 +616,23 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
self.out(stderr)
raise RuntimeError("Failed to create topic %s" % (topic_name,))

def _cli_connect_args(self):
if env_kafka_version() < (3, 0, 0):
return ['--zookeeper', '%s:%s/%s' % (self.zookeeper.host, self.zookeeper.port, self.zk_chroot)]
else:
args = ['--bootstrap-server', '%s:%s' % (self.host, self.port)]
if self.sasl_enabled:
command_conf = self.tmp_dir.join("sasl_command.conf")
self.render_template(self.test_resource("sasl_command.conf"), command_conf, vars(self))
args.append('--command-config')
args.append(command_conf.strpath)
return args

def get_topic_names(self):
args = self.kafka_run_class_args('kafka.admin.TopicCommand',
'--zookeeper', '%s:%s/%s' % (self.zookeeper.host,
self.zookeeper.port,
self.zk_chroot),
'--list'
)
cmd = self.run_script('kafka-topics.sh', '--list', *self._cli_connect_args())
env = self.kafka_run_class_env()
env.pop('KAFKA_LOG4J_OPTS')
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
self.out("Failed to list topics!")
Expand Down
1 change: 1 addition & 0 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@pytest.mark.skipif(env_kafka_version()[:2] > (2, 6, 0), reason="KAFKA_VERSION newer than max inferred version")
def test_kafka_version_infer(kafka_consumer_factory):
consumer = kafka_consumer_factory()
actual_ver_major_minor = env_kafka_version()[:2]
Expand Down