Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.9.7
Describe the bug
AdminClient in class AbstractMessageListenerContainer#checkTopics doesn't use the container properties for connection to Kafka. It uses the properties from consumerFactory instead which are not in sync.
To Reproduce
- configure and create an instance of ConcurrentKafkaListenerContainerFactory with container-property "MissingTopicsFatal" set to true
- create a container using the container factory from above
- configure container properties with some SSL settings needed for connecting to Kafka
- start container
- "SslAuthenticationException: SSL handshake failed" Exception is thrown (no matter if topic exists or not)
If I set the "MissingTopicsFatal" set to false, all is working fine.
After debugging I noticed, that the checkTopics method in AbstractMessageListenerContainer class uses the AdminClient for checking for topic existence. This admin client uses the connection properties from consumer factory. These properties are not in sync with the container properties in container.
Expected behavior
starting a listener container with "MissingTopicsFatal" property set to true in container will check for topic existence and fail if topic doesn't exist USING configured connection parameters from container.
throwing following exception if topic doesn't exist:
java.lang.IllegalStateException: Topic(s) [my-topic] is/are not present and missingTopicsFatal is true
Workaround
Set same properties for kafka connection on consumer factory AND container.
Sample
private void test(KafkaProperties kafkaProperties) {
Map<String, Object> configs = kafkaProperties.buildConsumerProperties();
var consumerFactory = new DefaultKafkaConsumerFactory<>(configs);
var listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerContainerFactory.setConsumerFactory(consumerFactory);
listenerContainerFactory.setAutoStartup(Boolean.FALSE);
listenerContainerFactory.setMissingTopicsFatal(Boolean.TRUE);
Map<String, Object> connectionConfig = new HashMap<>();
connectionConfig.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");
connectionConfig.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
connectionConfig.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "cert-chain");
connectionConfig.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "private key");
connectionConfig.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, DefaultSslEngineFactory.class);
connectionConfig.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "truststore");
listenerContainerFactory.getContainerProperties().getKafkaConsumerProperties().putAll(connectionConfig);
var listener = listenerContainerFactory.createContainer("my-topic");
listener.start();
}