Skip to content

Commit c1d7d5e

Browse files
committed
Merge pull request #11515 from Jcamilorada:GH-11435
* pr/11515: Polish "Add Kafka health indicator" Add Kafka health indicator
2 parents 76a450d + 7cd1982 commit c1d7d5e

File tree

13 files changed

+440
-0
lines changed

13 files changed

+440
-0
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@
306306
<artifactId>spring-integration-core</artifactId>
307307
<optional>true</optional>
308308
</dependency>
309+
<dependency>
310+
<groupId>org.springframework.kafka</groupId>
311+
<artifactId>spring-kafka</artifactId>
312+
<optional>true</optional>
313+
</dependency>
309314
<dependency>
310315
<groupId>org.springframework.security</groupId>
311316
<artifactId>spring-security-config</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.autoconfigure.kafka;
18+
19+
import java.time.Duration;
20+
import java.util.Map;
21+
22+
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration;
23+
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
24+
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
25+
import org.springframework.boot.actuate.health.HealthIndicator;
26+
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
27+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
28+
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
29+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
30+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
31+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
32+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
33+
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
34+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.kafka.core.KafkaAdmin;
38+
39+
/**
40+
* {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}.
41+
*
42+
* @author Juan Rada
43+
*/
44+
@Configuration
45+
@ConditionalOnClass(KafkaAdmin.class)
46+
@ConditionalOnBean(KafkaAdmin.class)
47+
@ConditionalOnEnabledHealthIndicator("kafka")
48+
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
49+
@AutoConfigureAfter(KafkaAutoConfiguration.class)
50+
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
51+
public class KafkaHealthIndicatorAutoConfiguration extends
52+
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
53+
54+
private final Map<String, KafkaAdmin> admins;
55+
56+
private final KafkaHealthIndicatorProperties properties;
57+
58+
KafkaHealthIndicatorAutoConfiguration(Map<String, KafkaAdmin> admins,
59+
KafkaHealthIndicatorProperties properties) {
60+
this.admins = admins;
61+
this.properties = properties;
62+
}
63+
64+
@Bean
65+
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
66+
public HealthIndicator kafkaHealthIndicator() {
67+
return createHealthIndicator(this.admins);
68+
}
69+
70+
@Override
71+
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
72+
Duration responseTimeout = this.properties.getResponseTimeout();
73+
return new KafkaHealthIndicator(source, responseTimeout.toMillis());
74+
}
75+
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.autoconfigure.kafka;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
22+
import org.springframework.boot.context.properties.ConfigurationProperties;
23+
24+
/**
25+
* Configuration properties for {@link KafkaHealthIndicator}.
26+
*
27+
* @author Juan Rada
28+
* @since 2.0.0
29+
*/
30+
@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false)
31+
public class KafkaHealthIndicatorProperties {
32+
33+
/**
34+
* Time to wait for a response from the cluster description operation.
35+
*/
36+
private Duration responseTimeout = Duration.ofMillis(1000);
37+
38+
public Duration getResponseTimeout() {
39+
return this.responseTimeout;
40+
}
41+
42+
public void setResponseTimeout(Duration responseTimeout) {
43+
this.responseTimeout = responseTimeout;
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Auto-configuration for actuator Apache Kafka support.
19+
*/
20+
package org.springframework.boot.actuate.autoconfigure.kafka;

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@
103103
"description": "Whether to enable JMS health check.",
104104
"defaultValue": true
105105
},
106+
{
107+
"name": "management.health.kafka.enabled",
108+
"type": "java.lang.Boolean",
109+
"description": "Whether to enable Kafka health check.",
110+
"defaultValue": true
111+
},
106112
{
107113
"name": "management.health.ldap.enabled",
108114
"type": "java.lang.Boolean",

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ org.springframework.boot.actuate.autoconfigure.info.InfoEndpointAutoConfiguratio
2424
org.springframework.boot.actuate.autoconfigure.jdbc.DataSourceHealthIndicatorAutoConfiguration,\
2525
org.springframework.boot.actuate.autoconfigure.jms.JmsHealthIndicatorAutoConfiguration,\
2626
org.springframework.boot.actuate.autoconfigure.jolokia.JolokiaEndpointAutoConfiguration,\
27+
org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
2728
org.springframework.boot.actuate.autoconfigure.ldap.LdapHealthIndicatorAutoConfiguration,\
2829
org.springframework.boot.actuate.autoconfigure.liquibase.LiquibaseEndpointAutoConfiguration,\
2930
org.springframework.boot.actuate.autoconfigure.logging.LogFileWebEndpointAutoConfiguration,\
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.autoconfigure.kafka;
18+
19+
import org.junit.Test;
20+
21+
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
22+
import org.springframework.boot.actuate.health.ApplicationHealthIndicator;
23+
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
24+
import org.springframework.boot.autoconfigure.AutoConfigurations;
25+
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
26+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/**
31+
* Tests for {@link KafkaHealthIndicatorAutoConfiguration}.
32+
*
33+
* @author Juan Rada
34+
*/
35+
public class KafkaHealthIndicatorAutoConfigurationTests {
36+
37+
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
38+
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class,
39+
KafkaHealthIndicatorAutoConfiguration.class,
40+
HealthIndicatorAutoConfiguration.class));
41+
42+
@Test
43+
public void runShouldCreateIndicator() {
44+
this.contextRunner.run((context) -> assertThat(context)
45+
.hasSingleBean(KafkaHealthIndicator.class)
46+
.doesNotHaveBean(ApplicationHealthIndicator.class));
47+
}
48+
49+
@Test
50+
public void runWhenDisabledShouldNotCreateIndicator() {
51+
this.contextRunner.withPropertyValues("management.health.kafka.enabled:false")
52+
.run((context) -> assertThat(context)
53+
.doesNotHaveBean(KafkaHealthIndicator.class)
54+
.hasSingleBean(ApplicationHealthIndicator.class));
55+
}
56+
}

spring-boot-project/spring-boot-actuator/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@
225225
<artifactId>spring-integration-core</artifactId>
226226
<optional>true</optional>
227227
</dependency>
228+
<dependency>
229+
<groupId>org.springframework.kafka</groupId>
230+
<artifactId>spring-kafka</artifactId>
231+
<optional>true</optional>
232+
</dependency>
228233
<dependency>
229234
<groupId>org.springframework.security</groupId>
230235
<artifactId>spring-security-core</artifactId>
@@ -262,11 +267,21 @@
262267
<artifactId>spring-boot-autoconfigure</artifactId>
263268
<scope>test</scope>
264269
</dependency>
270+
<dependency>
271+
<groupId>org.springframework.kafka</groupId>
272+
<artifactId>spring-kafka-test</artifactId>
273+
<scope>test</scope>
274+
</dependency>
265275
<dependency>
266276
<groupId>org.apache.logging.log4j</groupId>
267277
<artifactId>log4j-slf4j-impl</artifactId>
268278
<scope>test</scope>
269279
</dependency>
280+
<dependency>
281+
<groupId>org.slf4j</groupId>
282+
<artifactId>log4j-over-slf4j</artifactId>
283+
<scope>test</scope>
284+
</dependency>
270285
<dependency>
271286
<groupId>org.apache.logging.log4j</groupId>
272287
<artifactId>log4j-api</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.kafka;
18+
19+
import java.util.Collections;
20+
import java.util.Map;
21+
import java.util.concurrent.ExecutionException;
22+
23+
import org.apache.kafka.clients.admin.AdminClient;
24+
import org.apache.kafka.clients.admin.Config;
25+
import org.apache.kafka.clients.admin.DescribeClusterOptions;
26+
import org.apache.kafka.clients.admin.DescribeClusterResult;
27+
import org.apache.kafka.common.config.ConfigResource;
28+
import org.apache.kafka.common.config.ConfigResource.Type;
29+
30+
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
31+
import org.springframework.boot.actuate.health.Health.Builder;
32+
import org.springframework.boot.actuate.health.HealthIndicator;
33+
import org.springframework.boot.actuate.health.Status;
34+
import org.springframework.kafka.core.KafkaAdmin;
35+
import org.springframework.util.Assert;
36+
37+
/**
38+
* {@link HealthIndicator} for Kafka cluster.
39+
*
40+
* @author Juan Rada
41+
*/
42+
public class KafkaHealthIndicator extends AbstractHealthIndicator {
43+
44+
static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
45+
46+
private final KafkaAdmin kafkaAdmin;
47+
48+
private final DescribeClusterOptions describeOptions;
49+
50+
/**
51+
* Create a new {@link KafkaHealthIndicator} instance.
52+
*
53+
* @param kafkaAdmin the kafka admin
54+
* @param requestTimeout the request timeout in milliseconds
55+
*/
56+
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) {
57+
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
58+
this.kafkaAdmin = kafkaAdmin;
59+
this.describeOptions = new DescribeClusterOptions()
60+
.timeoutMs((int) requestTimeout);
61+
}
62+
63+
@Override
64+
protected void doHealthCheck(Builder builder) throws Exception {
65+
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
66+
DescribeClusterResult result = adminClient.describeCluster(
67+
this.describeOptions);
68+
String brokerId = result.controller().get().idString();
69+
int replicationFactor = getReplicationFactor(brokerId, adminClient);
70+
int nodes = result.nodes().get().size();
71+
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
72+
builder.status(status)
73+
.withDetail("clusterId", result.clusterId().get())
74+
.withDetail("brokerId", brokerId)
75+
.withDetail("nodes", nodes);
76+
}
77+
}
78+
79+
private int getReplicationFactor(String brokerId,
80+
AdminClient adminClient) throws ExecutionException, InterruptedException {
81+
ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId);
82+
Map<ConfigResource, Config> kafkaConfig = adminClient
83+
.describeConfigs(Collections.singletonList(configResource)).all().get();
84+
Config brokerConfig = kafkaConfig.get(configResource);
85+
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
86+
}
87+
88+
}
89+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Actuator support for Apache Kafka.
19+
*/
20+
package org.springframework.boot.actuate.kafka;

0 commit comments

Comments
 (0)