diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index f32792fea..bc0724e4a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -29,8 +29,10 @@ jobs: - "0.11.0.3" - "1.1.1" - "2.4.0" - - "2.5.0" - - "2.6.0" + - "2.8.2" + - "3.0.2" + - "3.5.2" + - "3.9.0" python: - "3.12" include: @@ -41,13 +43,13 @@ jobs: # kafka: "2.6.0" # experimental: true - python: "3.8" - kafka: "2.6.0" + kafka: "3.9.0" - python: "3.9" - kafka: "2.6.0" + kafka: "3.9.0" - python: "3.10" - kafka: "2.6.0" + kafka: "3.9.0" - python: "3.11" - kafka: "2.6.0" + kafka: "3.9.0" steps: - uses: actions/checkout@v4 diff --git a/kafka/conn.py b/kafka/conn.py index 7dab7995c..c9ad9cc27 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1181,6 +1181,7 @@ def _infer_broker_version_from_api_versions(self, api_versions): # in reverse order. As soon as we find one that works, return it test_cases = [ # format (, ) + # Make sure to update consumer_integration test check when adding newer versions. ((2, 6, 0), DescribeClientQuotasRequest[0]), ((2, 5, 0), DescribeAclsRequest_v2), ((2, 4, 0), ProduceRequest[8]), diff --git a/servers/resources/default/sasl_command.conf b/servers/resources/default/sasl_command.conf new file mode 100644 index 000000000..f4ae7bafa --- /dev/null +++ b/servers/resources/default/sasl_command.conf @@ -0,0 +1,3 @@ +security.protocol={transport} +sasl.mechanism={sasl_mechanism} +sasl.jaas.config={jaas_config} diff --git a/test/fixtures.py b/test/fixtures.py index 38ea6f047..673c0ac66 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -116,6 +116,12 @@ def test_resource(cls, filename): return path return os.path.join(cls.project_root, "servers", "resources", "default", filename) + @classmethod + def run_script(cls, script, *args): + result = [os.path.join(cls.kafka_root, 'bin', script)] + result.extend([str(arg) for arg in args]) + return result + @classmethod def kafka_run_class_args(cls, *args): result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] @@ -202,6 +208,7 @@ def open(self): # Configure Zookeeper child process template = self.test_resource("zookeeper.properties") properties = self.tmp_dir.join("zookeeper.properties") + # Consider replacing w/ run_script('zookeper-server-start.sh', ...) args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties.strpath) env = self.kafka_run_class_env() @@ -334,13 +341,13 @@ def _jaas_config(self): elif self.sasl_mechanism == 'PLAIN': jaas_config = ( - 'org.apache.kafka.common.security.plain.PlainLoginModule required\n' - ' username="{user}" password="{password}" user_{user}="{password}";\n' + 'org.apache.kafka.common.security.plain.PlainLoginModule required' + ' username="{user}" password="{password}" user_{user}="{password}";\n' ) elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"): jaas_config = ( - 'org.apache.kafka.common.security.scram.ScramLoginModule required\n' - ' username="{user}" password="{password}";\n' + 'org.apache.kafka.common.security.scram.ScramLoginModule required' + ' username="{user}" password="{password}";\n' ) else: raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) @@ -348,18 +355,16 @@ def _jaas_config(self): def _add_scram_user(self): self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user)) - args = self.kafka_run_class_args( - "kafka.admin.ConfigCommand", - "--zookeeper", - "%s:%d/%s" % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - "--alter", - "--entity-type", "users", - "--entity-name", self.broker_user, - "--add-config", - "{}=[password={}]".format(self.sasl_mechanism, self.broker_password), - ) + args = self.run_script('kafka-configs.sh', + '--zookeeper', + '%s:%d/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--alter', + '--entity-type', 'users', + '--entity-name', self.broker_user, + '--add-config', + '{}=[password={}]'.format(self.sasl_mechanism, self.broker_password)) env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -390,13 +395,12 @@ def out(self, message): def _create_zk_chroot(self): self.out("Creating Zookeeper chroot node...") - args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", - "-server", - "%s:%d" % (self.zookeeper.host, - self.zookeeper.port), - "create", - "/%s" % (self.zk_chroot,), - "kafka-python") + args = self.run_script('zookeeper-shell.sh', + '%s:%d' % (self.zookeeper.host, + self.zookeeper.port), + 'create', + '/%s' % (self.zk_chroot,), + 'kafka-python') env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -416,6 +420,7 @@ def start(self): properties_template = self.test_resource("kafka.properties") jaas_conf_template = self.test_resource("kafka_server_jaas.conf") + # Consider replacing w/ run_script('kafka-server-start.sh', ...) args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() if self.sasl_enabled: @@ -590,17 +595,15 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa raise errors.for_code(error_code) def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): - args = self.kafka_run_class_args('kafka.admin.TopicCommand', - '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - '--create', - '--topic', topic_name, - '--partitions', self.partitions \ - if num_partitions is None else num_partitions, - '--replication-factor', self.replicas \ - if replication_factor is None \ - else replication_factor) + args = self.run_script('kafka-topics.sh', + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor, + *self._cli_connect_args()) if env_kafka_version() >= (0, 10): args.append('--if-not-exists') env = self.kafka_run_class_env() @@ -613,16 +616,23 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): self.out(stderr) raise RuntimeError("Failed to create topic %s" % (topic_name,)) + def _cli_connect_args(self): + if env_kafka_version() < (3, 0, 0): + return ['--zookeeper', '%s:%s/%s' % (self.zookeeper.host, self.zookeeper.port, self.zk_chroot)] + else: + args = ['--bootstrap-server', '%s:%s' % (self.host, self.port)] + if self.sasl_enabled: + command_conf = self.tmp_dir.join("sasl_command.conf") + self.render_template(self.test_resource("sasl_command.conf"), command_conf, vars(self)) + args.append('--command-config') + args.append(command_conf.strpath) + return args + def get_topic_names(self): - args = self.kafka_run_class_args('kafka.admin.TopicCommand', - '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - '--list' - ) + cmd = self.run_script('kafka-topics.sh', '--list', *self._cli_connect_args()) env = self.kafka_run_class_env() env.pop('KAFKA_LOG4J_OPTS') - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() if proc.returncode != 0: self.out("Failed to list topics!") diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 90b7ed203..554589aab 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -13,6 +13,7 @@ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(env_kafka_version()[:2] > (2, 6, 0), reason="KAFKA_VERSION newer than max inferred version") def test_kafka_version_infer(kafka_consumer_factory): consumer = kafka_consumer_factory() actual_ver_major_minor = env_kafka_version()[:2]