Skip to content
This repository was archived by the owner on Nov 20, 2024. It is now read-only.

Commit 577ffbb

Browse files
sobychackogaryrussell
authored andcommitted
Enable custom binder health check impelementation
Currently, KafkaBinderHealthIndicator is not customizable and included by default when Spring Boot actuator is on the classpath. Fix this by allowing the application to provide a custom implementation. A new marker interface called KafkaBinderHealth can be used by the applicaiton to provide a custom HealthIndicator implementation, in which case, the binder's default implementation will be excluded. Tests and docs changes. Resolves #1180
1 parent 3770db7 commit 577ffbb

File tree

5 files changed

+136
-7
lines changed

5 files changed

+136
-7
lines changed

docs/src/main/asciidoc/overview.adoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,3 +971,28 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() {
971971
};
972972
}
973973
```
974+
975+
[[custom-kafka-binder-health-indicator]]
976+
=== Custom Kafka Binder Health Indicator
977+
978+
Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
979+
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
980+
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface.
981+
`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`.
982+
In the custom implementation, it must provide an implementation for the `health()` method.
983+
The custom implementation must be present in the application configuration as a bean.
984+
When the binder discovers the custom implementation, it will use that instead of the default implementation.
985+
Here is an example of such a custom implementation bean in the application.
986+
987+
```
988+
@Bean
989+
public KafkaBinderHealth kafkaBinderHealthIndicator() {
990+
return new KafkaBinderHealth() {
991+
@Override
992+
public Health health() {
993+
// custom implementation details.
994+
}
995+
};
996+
}
997+
```
998+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka;
18+
19+
import org.springframework.boot.actuate.health.HealthIndicator;
20+
21+
/**
22+
* Marker interface used for custom KafkaBinderHealth indicator implementations.
23+
*
24+
* @author Soby Chacko
25+
* @since 3.2.2
26+
*/
27+
public interface KafkaBinderHealth extends HealthIndicator {
28+
29+
}

spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,7 +35,6 @@
3535

3636
import org.springframework.beans.factory.DisposableBean;
3737
import org.springframework.boot.actuate.health.Health;
38-
import org.springframework.boot.actuate.health.HealthIndicator;
3938
import org.springframework.boot.actuate.health.Status;
4039
import org.springframework.boot.actuate.health.StatusAggregator;
4140
import org.springframework.kafka.core.ConsumerFactory;
@@ -55,7 +54,7 @@
5554
* @author Chukwubuikem Ume-Ugwa
5655
* @author Taras Danylchuk
5756
*/
58-
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
57+
public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean {
5958

6059
private static final int DEFAULT_TIMEOUT = 60;
6160

@@ -73,7 +72,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
7372
private boolean considerDownWhenAnyPartitionHasNoLeader;
7473

7574
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
76-
ConsumerFactory<?, ?> consumerFactory) {
75+
ConsumerFactory<?, ?> consumerFactory) {
7776
this.binder = binder;
7877
this.consumerFactory = consumerFactory;
7978
}
@@ -219,7 +218,7 @@ private Health buildListenerContainersHealth() {
219218
}
220219

221220
@Override
222-
public void destroy() throws Exception {
221+
public void destroy() {
223222
executor.shutdown();
224223
}
225224

spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderHealthIndicatorConfiguration.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@
2424

2525
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
2626
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
27+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
28+
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
2729
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
2830
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
2931
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
@@ -38,11 +40,13 @@
3840
*
3941
* @author Oleg Zhurakousky
4042
* @author Chukwubuikem Ume-Ugwa
43+
* @author Soby Chacko
4144
*/
4245

43-
@Configuration
46+
@Configuration(proxyBeanMethods = false)
4447
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
4548
@ConditionalOnEnabledHealthIndicator("binders")
49+
@ConditionalOnMissingBean(KafkaBinderHealth.class)
4650
public class KafkaBinderHealthIndicatorConfiguration {
4751

4852
@Bean
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka.integration2;
18+
19+
import org.junit.ClassRule;
20+
import org.junit.Test;
21+
22+
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
23+
import org.springframework.boot.WebApplicationType;
24+
import org.springframework.boot.actuate.health.Health;
25+
import org.springframework.boot.autoconfigure.SpringBootApplication;
26+
import org.springframework.boot.builder.SpringApplicationBuilder;
27+
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
28+
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
29+
import org.springframework.context.ConfigurableApplicationContext;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
32+
33+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
34+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
35+
36+
/**
37+
* @author Soby Chacko
38+
*/
39+
public class KafkaBinderCustomHealthCheckTests {
40+
41+
@ClassRule
42+
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
43+
44+
@Test
45+
public void testCustomHealthIndicatorIsActivated() {
46+
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
47+
CustomHealthCheckApplication.class).web(WebApplicationType.NONE).run(
48+
"--spring.cloud.stream.kafka.binder.brokers="
49+
+ embeddedKafka.getEmbeddedKafka().getBrokersAsString());
50+
final KafkaBinderHealth kafkaBinderHealth = applicationContext.getBean(KafkaBinderHealth.class);
51+
assertThat(kafkaBinderHealth).isInstanceOf(CustomHealthIndicator.class);
52+
assertThatThrownBy(() -> applicationContext.getBean(KafkaBinderHealthIndicator.class)).isInstanceOf(NoSuchBeanDefinitionException.class);
53+
applicationContext.close();
54+
}
55+
56+
@SpringBootApplication
57+
static class CustomHealthCheckApplication {
58+
59+
@Bean
60+
public CustomHealthIndicator kafkaBinderHealthIndicator() {
61+
return new CustomHealthIndicator();
62+
}
63+
}
64+
65+
static class CustomHealthIndicator implements KafkaBinderHealth {
66+
67+
@Override
68+
public Health health() {
69+
return null;
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)