Skip to content

Commit d6c53f0

Browse files
varmenisesobychacko
authored andcommitted
GH-3402: Fix KafkaAdmin clusterId config with observability enabled
Fixes: #3402 Re-set clusterId after creating new KafkaAdmin to ensure proper configuration when observability is enabled and bootstrap supplier is not set. This addresses the issue where kafkaAdmin clusterId configuration was being ignored under specific conditions. **Auto-cherry-pick to `3.2.x` & `3.1.x`**
1 parent fe66062 commit d6c53f0

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@
7474
* @author Gary Russell
7575
* @author Artem Bilan
7676
* @author Adrian Gygax
77+
* @author Sanghyeok An
78+
* @author Valentina Armenise
7779
*
7880
* @since 1.3
7981
*/
@@ -213,6 +215,15 @@ public void setClusterId(String clusterId) {
213215
this.clusterId = clusterId;
214216
}
215217

218+
/**
219+
* Get the clusterId property.
220+
* @return the cluster id.
221+
* @since 3.1.8
222+
*/
223+
public String getClusterId() {
224+
return this.clusterId;
225+
}
226+
216227
@Override
217228
public Map<String, Object> getConfigurationProperties() {
218229
Map<String, Object> configs2 = new HashMap<>(this.configs);

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
* @author Thomas Strauß
103103
* @author Soby Chacko
104104
* @author Gurps Bassi
105+
* @author Valentina Armenise
105106
*/
106107
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
107108
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
@@ -485,13 +486,17 @@ public void afterSingletonsInstantiated() {
485486
if (this.kafkaAdmin != null) {
486487
Object producerServers = this.producerFactory.getConfigurationProperties()
487488
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
488-
String adminServers = this.kafkaAdmin.getBootstrapServers();
489+
String adminServers = getAdminBootstrapAddress();
489490
if (!producerServers.equals(adminServers)) {
490491
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
491492
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
492493
int opTo = this.kafkaAdmin.getOperationTimeout();
494+
String clusterId = this.kafkaAdmin.getClusterId();
493495
this.kafkaAdmin = new KafkaAdmin(props);
494496
this.kafkaAdmin.setOperationTimeout(opTo);
497+
if (clusterId != null && !clusterId.isEmpty()) {
498+
this.kafkaAdmin.setClusterId(clusterId);
499+
}
495500
}
496501
}
497502
}
@@ -501,6 +506,21 @@ else if (this.micrometerEnabled) {
501506
}
502507
}
503508

509+
private String getAdminBootstrapAddress() {
510+
// Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available
511+
String adminServers = this.kafkaAdmin.getBootstrapServers();
512+
513+
// Fallback to configuration properties if bootstrap servers are not set
514+
if (adminServers == null) {
515+
adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault(
516+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
517+
""
518+
).toString();
519+
}
520+
521+
return adminServers;
522+
}
523+
504524
@Nullable
505525
private String clusterId() {
506526
if (this.kafkaAdmin != null && this.clusterId == null) {

0 commit comments

Comments
 (0)