Skip to content

Commit 7cd1982

Browse files
committed
Polish "Add Kafka health indicator"
Closes gh-11515
1 parent 0dbd942 commit 7cd1982

File tree

12 files changed

+102
-96
lines changed

12 files changed

+102
-96
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,6 @@
193193
<artifactId>elasticsearch</artifactId>
194194
<optional>true</optional>
195195
</dependency>
196-
<dependency>
197-
<groupId>org.springframework.kafka</groupId>
198-
<artifactId>spring-kafka</artifactId>
199-
<optional>true</optional>
200-
</dependency>
201196
<dependency>
202197
<groupId>org.flywaydb</groupId>
203198
<artifactId>flyway-core</artifactId>
@@ -311,6 +306,11 @@
311306
<artifactId>spring-integration-core</artifactId>
312307
<optional>true</optional>
313308
</dependency>
309+
<dependency>
310+
<groupId>org.springframework.kafka</groupId>
311+
<artifactId>spring-kafka</artifactId>
312+
<optional>true</optional>
313+
</dependency>
314314
<dependency>
315315
<groupId>org.springframework.security</groupId>
316316
<artifactId>spring-security-config</artifactId>

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
2929
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
3030
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
31+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
3132
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3233
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
3334
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -41,40 +42,35 @@
4142
* @author Juan Rada
4243
*/
4344
@Configuration
45+
@ConditionalOnClass(KafkaAdmin.class)
46+
@ConditionalOnBean(KafkaAdmin.class)
4447
@ConditionalOnEnabledHealthIndicator("kafka")
4548
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
4649
@AutoConfigureAfter(KafkaAutoConfiguration.class)
47-
public class KafkaHealthIndicatorAutoConfiguration {
50+
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
51+
public class KafkaHealthIndicatorAutoConfiguration extends
52+
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
4853

49-
@Configuration
50-
@ConditionalOnBean(KafkaAdmin.class)
51-
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
52-
static class KafkaClientHealthIndicatorConfiguration extends
53-
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
54+
private final Map<String, KafkaAdmin> admins;
5455

55-
private final Map<String, KafkaAdmin> admins;
56+
private final KafkaHealthIndicatorProperties properties;
5657

57-
private final KafkaHealthIndicatorProperties properties;
58-
59-
KafkaClientHealthIndicatorConfiguration(Map<String, KafkaAdmin> admins,
60-
KafkaHealthIndicatorProperties properties) {
61-
this.admins = admins;
62-
this.properties = properties;
63-
}
64-
65-
@Bean
66-
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
67-
public HealthIndicator kafkaHealthIndicator() {
68-
return createHealthIndicator(this.admins);
69-
}
58+
KafkaHealthIndicatorAutoConfiguration(Map<String, KafkaAdmin> admins,
59+
KafkaHealthIndicatorProperties properties) {
60+
this.admins = admins;
61+
this.properties = properties;
62+
}
7063

71-
@Override
72-
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
73-
Duration responseTimeout = this.properties.getResponseTimeout();
64+
@Bean
65+
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
66+
public HealthIndicator kafkaHealthIndicator() {
67+
return createHealthIndicator(this.admins);
68+
}
7469

75-
return new KafkaHealthIndicator(source,
76-
responseTimeout == null ? 100L : responseTimeout.toMillis());
77-
}
70+
@Override
71+
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
72+
Duration responseTimeout = this.properties.getResponseTimeout();
73+
return new KafkaHealthIndicator(source, responseTimeout.toMillis());
7874
}
7975

8076
}

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@
2525
* Configuration properties for {@link KafkaHealthIndicator}.
2626
*
2727
* @author Juan Rada
28+
* @since 2.0.0
2829
*/
2930
@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false)
3031
public class KafkaHealthIndicatorProperties {
3132

3233
/**
3334
* Time to wait for a response from the cluster description operation.
3435
*/
35-
private Duration responseTimeout = Duration.ofMillis(100);
36+
private Duration responseTimeout = Duration.ofMillis(1000);
3637

3738
public Duration getResponseTimeout() {
3839
return this.responseTimeout;

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616

1717
/**
18-
* Auto-configuration for actuator kafka support.
18+
* Auto-configuration for actuator Apache Kafka support.
1919
*/
2020
package org.springframework.boot.actuate.autoconfigure.kafka;

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@
103103
"description": "Whether to enable JMS health check.",
104104
"defaultValue": true
105105
},
106+
{
107+
"name": "management.health.kafka.enabled",
108+
"type": "java.lang.Boolean",
109+
"description": "Whether to enable Kafka health check.",
110+
"defaultValue": true
111+
},
106112
{
107113
"name": "management.health.ldap.enabled",
108114
"type": "java.lang.Boolean",
@@ -145,12 +151,6 @@
145151
"description": "Whether to enable Neo4j health check.",
146152
"defaultValue": true
147153
},
148-
{
149-
"name": "management.health.kafka.enabled",
150-
"type": "java.lang.Boolean",
151-
"description": "Whether to enable kafka health check.",
152-
"defaultValue": true
153-
},
154154
{
155155
"name": "management.info.build.enabled",
156156
"type": "java.lang.Boolean",

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP
1111
org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\
1212
org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\
1313
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\
14-
org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
1514
org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\
1615
org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\
1716
org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\
@@ -25,6 +24,7 @@ org.springframework.boot.actuate.autoconfigure.info.InfoEndpointAutoConfiguratio
2524
org.springframework.boot.actuate.autoconfigure.jdbc.DataSourceHealthIndicatorAutoConfiguration,\
2625
org.springframework.boot.actuate.autoconfigure.jms.JmsHealthIndicatorAutoConfiguration,\
2726
org.springframework.boot.actuate.autoconfigure.jolokia.JolokiaEndpointAutoConfiguration,\
27+
org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
2828
org.springframework.boot.actuate.autoconfigure.ldap.LdapHealthIndicatorAutoConfiguration,\
2929
org.springframework.boot.actuate.autoconfigure.liquibase.LiquibaseEndpointAutoConfiguration,\
3030
org.springframework.boot.actuate.autoconfigure.logging.LogFileWebEndpointAutoConfiguration,\

spring-boot-project/spring-boot-actuator/pom.xml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,6 @@
172172
<artifactId>spring-rabbit</artifactId>
173173
<optional>true</optional>
174174
</dependency>
175-
<dependency>
176-
<groupId>org.springframework.kafka</groupId>
177-
<artifactId>spring-kafka</artifactId>
178-
<optional>true</optional>
179-
</dependency>
180175
<dependency>
181176
<groupId>org.springframework.data</groupId>
182177
<artifactId>spring-data-cassandra</artifactId>
@@ -230,6 +225,11 @@
230225
<artifactId>spring-integration-core</artifactId>
231226
<optional>true</optional>
232227
</dependency>
228+
<dependency>
229+
<groupId>org.springframework.kafka</groupId>
230+
<artifactId>spring-kafka</artifactId>
231+
<optional>true</optional>
232+
</dependency>
233233
<dependency>
234234
<groupId>org.springframework.security</groupId>
235235
<artifactId>spring-security-core</artifactId>
@@ -263,13 +263,13 @@
263263
<scope>test</scope>
264264
</dependency>
265265
<dependency>
266-
<groupId>org.springframework.kafka</groupId>
267-
<artifactId>spring-kafka-test</artifactId>
266+
<groupId>org.springframework.boot</groupId>
267+
<artifactId>spring-boot-autoconfigure</artifactId>
268268
<scope>test</scope>
269269
</dependency>
270270
<dependency>
271-
<groupId>org.springframework.boot</groupId>
272-
<artifactId>spring-boot-autoconfigure</artifactId>
271+
<groupId>org.springframework.kafka</groupId>
272+
<artifactId>spring-kafka-test</artifactId>
273273
<scope>test</scope>
274274
</dependency>
275275
<dependency>

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
3131
import org.springframework.boot.actuate.health.Health.Builder;
3232
import org.springframework.boot.actuate.health.HealthIndicator;
33+
import org.springframework.boot.actuate.health.Status;
3334
import org.springframework.kafka.core.KafkaAdmin;
3435
import org.springframework.util.Assert;
3536

@@ -43,37 +44,35 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
4344
static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
4445

4546
private final KafkaAdmin kafkaAdmin;
47+
4648
private final DescribeClusterOptions describeOptions;
4749

4850
/**
4951
* Create a new {@link KafkaHealthIndicator} instance.
5052
*
5153
* @param kafkaAdmin the kafka admin
52-
* @param responseTimeout the describe cluster request timeout in milliseconds
54+
* @param requestTimeout the request timeout in milliseconds
5355
*/
54-
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) {
56+
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) {
5557
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
5658
this.kafkaAdmin = kafkaAdmin;
5759
this.describeOptions = new DescribeClusterOptions()
58-
.timeoutMs((int) responseTimeout);
60+
.timeoutMs((int) requestTimeout);
5961
}
6062

6163
@Override
6264
protected void doHealthCheck(Builder builder) throws Exception {
6365
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
64-
DescribeClusterResult result = adminClient.describeCluster(this.describeOptions);
66+
DescribeClusterResult result = adminClient.describeCluster(
67+
this.describeOptions);
6568
String brokerId = result.controller().get().idString();
6669
int replicationFactor = getReplicationFactor(brokerId, adminClient);
6770
int nodes = result.nodes().get().size();
68-
if (nodes >= replicationFactor) {
69-
builder.up();
70-
}
71-
else {
72-
builder.down();
73-
}
74-
builder.withDetail("clusterId", result.clusterId().get());
75-
builder.withDetail("brokerId", brokerId);
76-
builder.withDetail("nodes", nodes);
71+
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
72+
builder.status(status)
73+
.withDetail("clusterId", result.clusterId().get())
74+
.withDetail("brokerId", brokerId)
75+
.withDetail("nodes", nodes);
7776
}
7877
}
7978

@@ -85,5 +84,6 @@ private int getReplicationFactor(String brokerId,
8584
Config brokerConfig = kafkaConfig.get(configResource);
8685
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
8786
}
87+
8888
}
8989

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616

1717
/**
18-
* Actuator support for Kafka.
18+
* Actuator support for Apache Kafka.
1919
*/
2020
package org.springframework.boot.actuate.kafka;

spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,78 +20,82 @@
2020
import java.util.Map;
2121

2222
import org.apache.kafka.clients.producer.ProducerConfig;
23+
import org.junit.After;
2324
import org.junit.Test;
2425

2526
import org.springframework.boot.actuate.health.Health;
2627
import org.springframework.boot.actuate.health.Status;
2728
import org.springframework.kafka.core.KafkaAdmin;
2829
import org.springframework.kafka.test.rule.KafkaEmbedded;
30+
import org.springframework.util.SocketUtils;
2931

3032
import static org.assertj.core.api.Assertions.assertThat;
3133

3234
/**
33-
* Test for {@link KafkaHealthIndicator}
35+
* Tests for {@link KafkaHealthIndicator}.
3436
*
3537
* @author Juan Rada
38+
* @author Stephane Nicoll
3639
*/
3740
public class KafkaHealthIndicatorTests {
3841

39-
private static final Long RESPONSE_TIME = 1000L;
40-
4142
private KafkaEmbedded kafkaEmbedded;
42-
private KafkaAdmin kafkaAdmin;
4343

44-
private void startKafka(int replicationFactor) throws Exception {
45-
this.kafkaEmbedded = new KafkaEmbedded(1, true);
46-
this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
47-
KafkaHealthIndicator.REPLICATION_PROPERTY,
48-
String.valueOf(replicationFactor)));
49-
this.kafkaEmbedded.before();
50-
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
51-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
52-
this.kafkaEmbedded.getBrokersAsString()));
53-
}
44+
private KafkaAdmin kafkaAdmin;
5445

55-
private void shutdownKafka() throws Exception {
56-
this.kafkaEmbedded.destroy();
46+
@After
47+
public void shutdownKafka() throws Exception {
48+
if (this.kafkaEmbedded != null) {
49+
this.kafkaEmbedded.destroy();
50+
}
5751
}
58-
5952
@Test
6053
public void kafkaIsUp() throws Exception {
6154
startKafka(1);
6255
KafkaHealthIndicator healthIndicator =
63-
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
56+
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
6457
Health health = healthIndicator.health();
6558
assertThat(health.getStatus()).isEqualTo(Status.UP);
6659
assertDetails(health.getDetails());
67-
shutdownKafka();
6860
}
6961

70-
private void assertDetails(Map<String, Object> details) {
71-
assertThat(details).containsEntry("brokerId", "0");
72-
assertThat(details).containsKey("clusterId");
73-
assertThat(details).containsEntry("nodes", 1);
62+
@Test
63+
public void kafkaIsDown() {
64+
int freePort = SocketUtils.findAvailableTcpPort();
65+
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
66+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort));
67+
KafkaHealthIndicator healthIndicator =
68+
new KafkaHealthIndicator(this.kafkaAdmin, 1L);
69+
Health health = healthIndicator.health();
70+
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
71+
assertThat((String) health.getDetails().get("error")).isNotEmpty();
7472
}
7573

7674
@Test
7775
public void notEnoughNodesForReplicationFactor() throws Exception {
7876
startKafka(2);
7977
KafkaHealthIndicator healthIndicator =
80-
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
78+
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
8179
Health health = healthIndicator.health();
8280
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
8381
assertDetails(health.getDetails());
84-
shutdownKafka();
8582
}
8683

87-
@Test
88-
public void kafkaIsDown() throws Exception {
84+
private void assertDetails(Map<String, Object> details) {
85+
assertThat(details).containsEntry("brokerId", "0");
86+
assertThat(details).containsKey("clusterId");
87+
assertThat(details).containsEntry("nodes", 1);
88+
}
89+
90+
private void startKafka(int replicationFactor) throws Exception {
91+
this.kafkaEmbedded = new KafkaEmbedded(1, true);
92+
this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
93+
KafkaHealthIndicator.REPLICATION_PROPERTY,
94+
String.valueOf(replicationFactor)));
95+
this.kafkaEmbedded.before();
8996
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
90-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:34987"));
91-
KafkaHealthIndicator healthIndicator =
92-
new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME);
93-
Health health = healthIndicator.health();
94-
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
95-
assertThat((String) health.getDetails().get("error")).isNotEmpty();
97+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
98+
this.kafkaEmbedded.getBrokersAsString()));
9699
}
100+
97101
}

spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,8 @@ content into your application. Rather, pick only the properties that you need.
12711271
management.health.elasticsearch.response-timeout=100ms # The time to wait for a response from the cluster.
12721272
management.health.influxdb.enabled=true # Whether to enable InfluxDB health check.
12731273
management.health.jms.enabled=true # Whether to enable JMS health check.
1274+
management.health.kafka.enabled=true # Whether to enable Kafka health check.
1275+
management.health.kafka.response-timeout=1000ms # Time to wait for a response from the cluster description operation.
12741276
management.health.ldap.enabled=true # Whether to enable LDAP health check.
12751277
management.health.mail.enabled=true # Whether to enable Mail health check.
12761278
management.health.mongo.enabled=true # Whether to enable MongoDB health check.

0 commit comments

Comments
 (0)