Skip to content

Commit 39df941

Browse files
RikEndeRik Ende
authored and
Rik Ende
committed
Add a concurrent extension of CompositeHealthIndicator
Unit test to verify output is identical to CompositeHealthIndicator
1 parent 26da45a commit 39df941

File tree

3 files changed

+308
-0
lines changed

3 files changed

+308
-0
lines changed

spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,14 @@ public HealthIndicatorRegistry getRegistry() {
9090
return this.registry;
9191
}
9292

93+
/**
94+
* Return the {@link HealthAggregator} of this instance.
95+
* @return the aggregator for {@link HealthIndicator health indicators}
96+
*/
97+
public HealthAggregator getAggregator() {
98+
return this.aggregator;
99+
}
100+
93101
@Override
94102
public Health health() {
95103
Map<String, Health> healths = new LinkedHashMap<>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.health;
18+
19+
import java.time.Duration;
20+
import java.util.AbstractMap.SimpleEntry;
21+
import java.util.LinkedHashMap;
22+
import java.util.Map;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.Future;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
import java.util.function.Function;
28+
29+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
30+
31+
/**
32+
* Version of {@link CompositeHealthIndicator} that gathers health indications from all
33+
* registered delegates concurrently.
34+
*
35+
* @author Rik vd Ende
36+
*/
37+
public class ConcurrentCompositeHealthIndicator extends CompositeHealthIndicator {
38+
39+
private final ThreadPoolTaskExecutor executor;
40+
41+
private final Function<Future<Health>, Health> futureFunction;
42+
43+
/**
44+
* Create a new {@link ConcurrentCompositeHealthIndicator} from the specified
45+
* indicators.
46+
* @param healthAggregator the health aggregator
47+
* @param indicators a map of {@link HealthIndicator HealthIndicators} with the key
48+
* being used as an indicator name.
49+
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
50+
*/
51+
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
52+
Map<String, HealthIndicator> indicators, ThreadPoolTaskExecutor executor) {
53+
this(healthAggregator, new DefaultHealthIndicatorRegistry(indicators), executor);
54+
}
55+
56+
/**
57+
* Create a new {@link ConcurrentCompositeHealthIndicator} from the specified
58+
* indicators.
59+
* @param healthAggregator the health aggregator
60+
* @param indicators a map of {@link HealthIndicator HealthIndicators} with the key
61+
* being used as an indicator name.
62+
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
63+
* @param timeout the maximum time to wait for a HealthIndicator to complete
64+
*/
65+
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
66+
Map<String, HealthIndicator> indicators, ThreadPoolTaskExecutor executor,
67+
Duration timeout) {
68+
this(healthAggregator, new DefaultHealthIndicatorRegistry(indicators), executor,
69+
timeout);
70+
}
71+
72+
/**
73+
* Create a new
74+
* {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator}
75+
* from the indicators in the given {@code registry} and provide a
76+
* ThreadPoolTaskExecutor to submit the HealthIndicators to, without a timeout.
77+
* @param healthAggregator the health aggregator
78+
* @param registry the registry of {@link HealthIndicator HealthIndicators}.
79+
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
80+
*/
81+
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
82+
HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor) {
83+
this(healthAggregator, registry, executor, (future) -> {
84+
try {
85+
return future.get();
86+
}
87+
catch (InterruptedException | ExecutionException ex) {
88+
return Health.down()
89+
.withDetail("error",
90+
ex.getClass().getName()
91+
+ ": Health check did not compete successfully")
92+
.build();
93+
}
94+
});
95+
}
96+
97+
/**
98+
* Create a new
99+
* {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator}
100+
* from the indicators in the given {@code registry} and provide a
101+
* ThreadPoolTaskExecutor for submitting the health checks.
102+
* @param healthAggregator the health aggregator
103+
* @param registry the registry of {@link HealthIndicator HealthIndicators}.
104+
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
105+
* @param timeout the maximum time to wait for a HealthIndicator to complete
106+
*/
107+
public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
108+
HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor,
109+
Duration timeout) {
110+
this(healthAggregator, registry, executor, (future) -> {
111+
try {
112+
return future.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
113+
}
114+
catch (InterruptedException | ExecutionException ex) {
115+
return Health.down()
116+
.withDetail("error",
117+
ex.getClass().getName()
118+
+ ": health check did not compete successfully")
119+
.build();
120+
}
121+
catch (TimeoutException ex) {
122+
return Health.down()
123+
.withDetail("error",
124+
ex.getClass().getName()
125+
+ ": health check timed out after " + timeout)
126+
.build();
127+
}
128+
});
129+
}
130+
131+
/**
132+
* Create a new
133+
* {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator}
134+
* from the indicators in the given {@code registry} and provide a
135+
* ThreadPoolTaskExecutor for submitting the health checks.
136+
* @param healthAggregator the health aggregator
137+
* @param registry the registry of {@link HealthIndicator HealthIndicators}.
138+
* @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on
139+
* @param futureFunction function to select Future::get with or without a timeout
140+
*/
141+
private ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator,
142+
HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor,
143+
Function<Future<Health>, Health> futureFunction) {
144+
super(healthAggregator, registry);
145+
this.executor = executor;
146+
this.futureFunction = futureFunction;
147+
}
148+
149+
@Override
150+
public Health health() {
151+
Map<String, Future<Health>> futureHealths = getRegistry().getAll().entrySet()
152+
.stream()
153+
.map((entry) -> new SimpleEntry<>(entry.getKey(),
154+
this.executor.submit(() -> entry.getValue().health())))
155+
.collect(LinkedHashMap::new,
156+
(map, entry) -> map.put(entry.getKey(), entry.getValue()),
157+
Map::putAll);
158+
159+
Map<String, Health> healths = futureHealths.entrySet().stream()
160+
.map((entry) -> new SimpleEntry<>(entry.getKey(),
161+
this.futureFunction.apply(entry.getValue())))
162+
.collect(LinkedHashMap::new,
163+
(map, entry) -> map.put(entry.getKey(), entry.getValue()),
164+
Map::putAll);
165+
166+
return getAggregator().aggregate(healths);
167+
}
168+
169+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.health;
18+
19+
import java.time.Duration;
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
import java.util.concurrent.Callable;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
27+
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.Mock;
31+
import org.mockito.MockitoAnnotations;
32+
import org.mockito.invocation.InvocationOnMock;
33+
34+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
import static org.mockito.ArgumentMatchers.isA;
38+
import static org.mockito.BDDMockito.given;
39+
40+
/**
41+
* Tests for {@link ConcurrentCompositeHealthIndicator}
42+
*
43+
* @author Rik vd Ende
44+
*/
45+
public class ConcurrentCompositeHealthIndicatorTest {
46+
47+
private HealthAggregator healthAggregator;
48+
49+
@Mock
50+
private HealthIndicator one;
51+
52+
@Mock
53+
private HealthIndicator two;
54+
55+
@Mock
56+
private ThreadPoolTaskExecutor executor;
57+
58+
private ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors
59+
.newFixedThreadPool(2);
60+
61+
@Before
62+
@SuppressWarnings("unchecked")
63+
public void setup() {
64+
MockitoAnnotations.initMocks(this);
65+
given(this.one.health())
66+
.willReturn(new Health.Builder().unknown().withDetail("1", "1").build());
67+
given(this.two.health())
68+
.willReturn(new Health.Builder().unknown().withDetail("2", "2").build());
69+
given(this.executor.getThreadPoolExecutor()).willReturn(this.threadPoolExecutor);
70+
given(this.executor.submit(isA(Callable.class)))
71+
.will((InvocationOnMock invocation) -> this.threadPoolExecutor
72+
.submit((Callable<Health>) invocation.getArgument(0)));
73+
74+
this.healthAggregator = new OrderedHealthAggregator();
75+
}
76+
77+
@Test
78+
public void createWithIndicators() {
79+
Map<String, HealthIndicator> indicators = new HashMap<>();
80+
indicators.put("one", this.one);
81+
indicators.put("two", this.two);
82+
ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator(
83+
this.healthAggregator, indicators, this.executor);
84+
Health result = composite.health();
85+
assertThat(result.getDetails()).hasSize(2);
86+
assertThat(result.getDetails()).containsEntry("one",
87+
new Health.Builder().unknown().withDetail("1", "1").build());
88+
assertThat(result.getDetails()).containsEntry("two",
89+
new Health.Builder().unknown().withDetail("2", "2").build());
90+
}
91+
92+
@Test
93+
public void testSerialization() throws Exception {
94+
Map<String, HealthIndicator> indicators = new HashMap<>();
95+
indicators.put("db1", this.one);
96+
indicators.put("db2", this.two);
97+
ConcurrentCompositeHealthIndicator innerComposite = new ConcurrentCompositeHealthIndicator(
98+
this.healthAggregator, indicators, this.executor);
99+
ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator(
100+
this.healthAggregator, Collections.singletonMap("db", innerComposite),
101+
this.executor);
102+
Health result = composite.health();
103+
ObjectMapper mapper = new ObjectMapper();
104+
assertThat(mapper.writeValueAsString(result)).isEqualTo(
105+
"{\"status\":\"UNKNOWN\",\"details\":{\"db\":{\"status\":\"UNKNOWN\""
106+
+ ",\"details\":{\"db1\":{\"status\":\"UNKNOWN\",\"details\""
107+
+ ":{\"1\":\"1\"}},\"db2\":{\"status\":\"UNKNOWN\",\"details\""
108+
+ ":{\"2\":\"2\"}}}}}}");
109+
}
110+
111+
@Test
112+
public void testWithTimeout() throws Exception {
113+
Map<String, HealthIndicator> indicators = new HashMap<>();
114+
indicators.put("one", this.one);
115+
indicators.put("two", this.two);
116+
ConcurrentCompositeHealthIndicator innerComposite = new ConcurrentCompositeHealthIndicator(
117+
this.healthAggregator, indicators, this.executor);
118+
ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator(
119+
this.healthAggregator, Collections.singletonMap("db", innerComposite),
120+
this.executor, Duration.ZERO);
121+
122+
Health result = composite.health();
123+
assertThat(result.getDetails()).hasSize(1);
124+
assertThat(result.getDetails()).containsEntry("db",
125+
new Health.Builder().down().withException(
126+
new IllegalStateException("Health check timed out after PT0S"))
127+
.build());
128+
129+
}
130+
131+
}

0 commit comments

Comments
 (0)