Description
Version: org.springframework.kafka:spring-kafka:jar:3.2.0
Describe the bug
If you enable observability and define clutserId inside the KafkaAdmin configuration, unless you specify an observability supplier, which matches the producer bootstrap server, the code will re-create a KafkaAdmin passing the defined properties as well as the producer bootstrap config.
if (this.kafkaAdmin != null) {
Object producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String adminServers = this.kafkaAdmin.getBootstrapServers();
if (!producerServers.equals(adminServers)) {
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
int opTo = this.kafkaAdmin.getOperationTimeout();
this.kafkaAdmin = new KafkaAdmin(props);
this.kafkaAdmin.setOperationTimeout(opTo);
}
}
Unfortunately if you have specified a cluster id with
kafkaAdmin.setClusterId(clusterId);
this will not be part of the property map, so the code above will basically override this configuration and clusterId will be null.
To Reproduce
-
enable observability
-
create a KafkaAdminConfiguration and specify bootstrap property or no property at all, just a clusterId
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
KafkaAdmin kafkaAdmin = new KafkaAdmin(configs);
kafkaAdmin.setClusterId(clusterId);
//kafkaAdmin.setBootstrapServersSupplier(() -> bootstrapServer);
return kafkaAdmin;
}
- send an event
Expected behavior
A kafka admin with properties, bootstrap and clusterId is set whereas clusterId is null.