Skip to content

Commit c0510f7

Browse files
committed
spring-projectsGH-11435: Add Kafka health indicator
- refactor code to create and destroy admin client when health monitoring
1 parent 2a1cb28 commit c0510f7

File tree

6 files changed

+54
-72
lines changed

6 files changed

+54
-72
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import java.time.Duration;
2020
import java.util.Map;
2121

22-
import org.apache.kafka.clients.admin.AdminClient;
23-
2422
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration;
2523
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
2624
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
@@ -35,6 +33,7 @@
3533
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3634
import org.springframework.context.annotation.Bean;
3735
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.core.KafkaAdmin;
3837

3938
/**
4039
* {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}.
@@ -48,16 +47,16 @@
4847
public class KafkaHealthIndicatorAutoConfiguration {
4948

5049
@Configuration
51-
@ConditionalOnBean(AdminClient.class)
50+
@ConditionalOnBean(KafkaAdmin.class)
5251
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
5352
static class KafkaClientHealthIndicatorConfiguration extends
54-
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, AdminClient> {
53+
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
5554

56-
private final Map<String, AdminClient> admins;
55+
private final Map<String, KafkaAdmin> admins;
5756

5857
private final KafkaHealthIndicatorProperties properties;
5958

60-
KafkaClientHealthIndicatorConfiguration(Map<String, AdminClient> admins,
59+
KafkaClientHealthIndicatorConfiguration(Map<String, KafkaAdmin> admins,
6160
KafkaHealthIndicatorProperties properties) {
6261
this.admins = admins;
6362
this.properties = properties;
@@ -70,7 +69,7 @@ public HealthIndicator kafkaHealthIndicator() {
7069
}
7170

7271
@Override
73-
protected KafkaHealthIndicator createHealthIndicator(AdminClient source) {
72+
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
7473
Duration responseTimeout = this.properties.getResponseTimeout();
7574

7675
return new KafkaHealthIndicator(source,

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616

1717
/**
18-
* Auto-configuration for actuator JDBC concerns.
18+
* Auto-configuration for actuator kafka support.
1919
*/
2020
package org.springframework.boot.actuate.autoconfigure.kafka;

spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,16 @@
1616

1717
package org.springframework.boot.actuate.autoconfigure.kafka;
1818

19-
import org.apache.kafka.clients.admin.AdminClient;
2019
import org.junit.Test;
2120

2221
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
2322
import org.springframework.boot.actuate.health.ApplicationHealthIndicator;
2423
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
2524
import org.springframework.boot.autoconfigure.AutoConfigurations;
25+
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
2626
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
27-
import org.springframework.context.annotation.Bean;
28-
import org.springframework.context.annotation.Configuration;
2927

3028
import static org.assertj.core.api.Assertions.assertThat;
31-
import static org.mockito.Mockito.mock;
3229

3330
/**
3431
* Tests for {@link KafkaHealthIndicatorAutoConfiguration}.
@@ -38,7 +35,7 @@
3835
public class KafkaHealthIndicatorAutoConfigurationTests {
3936

4037
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
41-
.withConfiguration(AutoConfigurations.of(KafkaConfiguration.class,
38+
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class,
4239
KafkaHealthIndicatorAutoConfiguration.class,
4340
HealthIndicatorAutoConfiguration.class));
4441

@@ -56,14 +53,4 @@ public void runWhenDisabledShouldNotCreateIndicator() {
5653
.doesNotHaveBean(KafkaHealthIndicator.class)
5754
.hasSingleBean(ApplicationHealthIndicator.class));
5855
}
59-
60-
@Configuration
61-
protected static class KafkaConfiguration {
62-
63-
@Bean
64-
public AdminClient kafkaAdminClient() {
65-
return mock(AdminClient.class);
66-
}
67-
68-
}
6956
}

spring-boot-project/spring-boot-actuator/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@
246246
<artifactId>spring-boot-test-support</artifactId>
247247
<scope>test</scope>
248248
</dependency>
249+
<dependency>
250+
<groupId>org.springframework.kafka</groupId>
251+
<artifactId>spring-kafka-test</artifactId>
252+
<scope>test</scope>
253+
</dependency>
249254
<dependency>
250255
<groupId>org.springframework.boot</groupId>
251256
<artifactId>spring-boot-autoconfigure</artifactId>
@@ -256,6 +261,11 @@
256261
<artifactId>log4j-slf4j-impl</artifactId>
257262
<scope>test</scope>
258263
</dependency>
264+
<dependency>
265+
<groupId>org.slf4j</groupId>
266+
<artifactId>log4j-over-slf4j</artifactId>
267+
<scope>test</scope>
268+
</dependency>
259269
<dependency>
260270
<groupId>org.apache.logging.log4j</groupId>
261271
<artifactId>log4j-api</artifactId>

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
2424
import org.springframework.boot.actuate.health.Health.Builder;
2525
import org.springframework.boot.actuate.health.HealthIndicator;
26+
import org.springframework.kafka.core.KafkaAdmin;
2627
import org.springframework.util.Assert;
2728

2829
/**
@@ -32,26 +33,30 @@
3233
*/
3334
public class KafkaHealthIndicator extends AbstractHealthIndicator {
3435

35-
private final AdminClient adminClient;
36+
private final KafkaAdmin kafkaAdmin;
3637
private final DescribeClusterOptions describeOptions;
3738

3839
/**
3940
* Create a new {@link KafkaHealthIndicator} instance.
4041
*
41-
* @param adminClient the kafka admin client
42+
* @param kafkaAdmin the kafka admin
4243
* @param responseTimeout the describe cluster request timeout in milliseconds
4344
*/
44-
public KafkaHealthIndicator(AdminClient adminClient, long responseTimeout) {
45-
Assert.notNull(adminClient, "KafkaAdmin must not be null");
45+
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) {
46+
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
4647

47-
this.adminClient = adminClient;
48-
this.describeOptions = new DescribeClusterOptions().timeoutMs((int) responseTimeout);
48+
this.kafkaAdmin = kafkaAdmin;
49+
this.describeOptions = new DescribeClusterOptions()
50+
.timeoutMs((int) responseTimeout);
4951
}
5052

5153
@Override
5254
protected void doHealthCheck(Builder builder) throws Exception {
53-
DescribeClusterResult result = this.adminClient.describeCluster(this.describeOptions);
54-
builder.up().withDetail("clusterId", result.clusterId().get());
55+
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
56+
DescribeClusterResult result = adminClient
57+
.describeCluster(this.describeOptions);
58+
builder.up().withDetail("clusterId", result.clusterId().get());
59+
}
5560
}
5661
}
5762

spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,20 @@
1616

1717
package org.springframework.boot.actuate.kafka;
1818

19-
import org.apache.kafka.clients.admin.AdminClient;
20-
import org.apache.kafka.clients.admin.DescribeClusterOptions;
21-
import org.apache.kafka.clients.admin.DescribeClusterResult;
22-
import org.apache.kafka.common.KafkaFuture;
19+
import java.util.Collections;
20+
21+
import org.apache.kafka.clients.producer.ProducerConfig;
2322
import org.assertj.core.data.MapEntry;
2423
import org.junit.Before;
24+
import org.junit.Rule;
2525
import org.junit.Test;
26-
import org.mockito.ArgumentCaptor;
27-
import org.mockito.Captor;
28-
import org.mockito.Mock;
29-
import org.mockito.MockitoAnnotations;
3026

3127
import org.springframework.boot.actuate.health.Health;
3228
import org.springframework.boot.actuate.health.Status;
29+
import org.springframework.kafka.core.KafkaAdmin;
30+
import org.springframework.kafka.test.rule.KafkaEmbedded;
3331

3432
import static org.assertj.core.api.Assertions.assertThat;
35-
import static org.mockito.ArgumentMatchers.any;
36-
import static org.mockito.BDDMockito.given;
37-
import static org.mockito.Mockito.verify;
3833

3934
/**
4035
* Test for {@link KafkaHealthIndicator}
@@ -43,48 +38,34 @@
4338
*/
4439
public class KafkaHealthIndicatorTests {
4540

46-
private static final Long RESPONSE_TIME = 10L;
47-
private static final String CLUSTER_ID = "abc_123";
48-
49-
@Mock
50-
private AdminClient adminClient;
51-
52-
@Mock
53-
private DescribeClusterResult describeClusterResult;
54-
55-
@Mock
56-
private KafkaFuture<String> clusterIdFuture;
41+
private static final Long RESPONSE_TIME = 1000L;
5742

58-
@Captor
59-
private ArgumentCaptor<DescribeClusterOptions> describeOptionsCaptor;
43+
@Rule
44+
public KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true);
6045

61-
private KafkaHealthIndicator healthIndicator;
46+
private KafkaAdmin kafkaAdmin;
6247

6348
@Before
6449
public void setup() {
65-
MockitoAnnotations.initMocks(this);
66-
this.healthIndicator = new KafkaHealthIndicator(this.adminClient, RESPONSE_TIME);
67-
given(this.describeClusterResult.clusterId()).willReturn(this.clusterIdFuture);
50+
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
51+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaEmbedded.getBrokersAsString()));
6852
}
6953

7054
@Test
71-
public void kafkaIsUp() throws Exception {
72-
given(this.adminClient.describeCluster(any(DescribeClusterOptions.class)))
73-
.willReturn(this.describeClusterResult);
74-
given(this.clusterIdFuture.get()).willReturn(CLUSTER_ID);
75-
Health health = this.healthIndicator.health();
55+
public void kafkaIsUp() {
56+
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
57+
Health health = healthIndicator.health();
7658
assertThat(health.getStatus()).isEqualTo(Status.UP);
77-
assertThat(health.getDetails()).containsOnly(MapEntry.entry("clusterId", CLUSTER_ID));
78-
verify(this.adminClient).describeCluster(this.describeOptionsCaptor.capture());
79-
assertThat(this.describeOptionsCaptor.getValue().timeoutMs()).isEqualTo(RESPONSE_TIME.intValue());
59+
assertThat(health.getDetails()).containsOnly(MapEntry.entry(
60+
"clusterId", this.kafkaEmbedded.getKafkaServer(0).clusterId()));
8061
}
8162

8263
@Test
83-
public void kafkaIsDown() {
84-
given(this.adminClient.describeCluster(any(DescribeClusterOptions.class)))
85-
.willThrow(new IllegalStateException("test, expected"));
86-
Health health = this.healthIndicator.health();
64+
public void kafkaIsDown() throws Exception {
65+
this.kafkaEmbedded.destroy();
66+
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
67+
Health health = healthIndicator.health();
8768
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
88-
assertThat((String) health.getDetails().get("error")).contains("test, expected");
69+
assertThat((String) health.getDetails().get("error")).isNotEmpty();
8970
}
9071
}

0 commit comments

Comments
 (0)