From 39df941846fc4922ab9444ecbc2c98d419168dca Mon Sep 17 00:00:00 2001 From: Rik Date: Sat, 2 Feb 2019 11:33:22 -0500 Subject: [PATCH 1/2] Add a concurrent extension of CompositeHealthIndicator Unit test to verify output is identical to CompositeHealthIndicator --- .../health/CompositeHealthIndicator.java | 8 + .../ConcurrentCompositeHealthIndicator.java | 169 ++++++++++++++++++ ...oncurrentCompositeHealthIndicatorTest.java | 131 ++++++++++++++ 3 files changed, 308 insertions(+) create mode 100644 spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicator.java create mode 100644 spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java index 6cd60dfa44ba..5c663c8b5520 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java @@ -90,6 +90,14 @@ public HealthIndicatorRegistry getRegistry() { return this.registry; } + /** + * Return the {@link HealthAggregator} of this instance. + * @return the aggregator for {@link HealthIndicator health indicators} + */ + public HealthAggregator getAggregator() { + return this.aggregator; + } + @Override public Health health() { Map healths = new LinkedHashMap<>(); diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicator.java new file mode 100644 index 000000000000..86d803bee432 --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicator.java @@ -0,0 +1,169 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.health; + +import java.time.Duration; +import java.util.AbstractMap.SimpleEntry; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +/** + * Version of {@link CompositeHealthIndicator} that gathers health indications from all + * registered delegates concurrently. + * + * @author Rik vd Ende + */ +public class ConcurrentCompositeHealthIndicator extends CompositeHealthIndicator { + + private final ThreadPoolTaskExecutor executor; + + private final Function, Health> futureFunction; + + /** + * Create a new {@link ConcurrentCompositeHealthIndicator} from the specified + * indicators. + * @param healthAggregator the health aggregator + * @param indicators a map of {@link HealthIndicator HealthIndicators} with the key + * being used as an indicator name. + * @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on + */ + public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator, + Map indicators, ThreadPoolTaskExecutor executor) { + this(healthAggregator, new DefaultHealthIndicatorRegistry(indicators), executor); + } + + /** + * Create a new {@link ConcurrentCompositeHealthIndicator} from the specified + * indicators. + * @param healthAggregator the health aggregator + * @param indicators a map of {@link HealthIndicator HealthIndicators} with the key + * being used as an indicator name. + * @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on + * @param timeout the maximum time to wait for a HealthIndicator to complete + */ + public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator, + Map indicators, ThreadPoolTaskExecutor executor, + Duration timeout) { + this(healthAggregator, new DefaultHealthIndicatorRegistry(indicators), executor, + timeout); + } + + /** + * Create a new + * {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator} + * from the indicators in the given {@code registry} and provide a + * ThreadPoolTaskExecutor to submit the HealthIndicators to, without a timeout. + * @param healthAggregator the health aggregator + * @param registry the registry of {@link HealthIndicator HealthIndicators}. + * @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on + */ + public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator, + HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor) { + this(healthAggregator, registry, executor, (future) -> { + try { + return future.get(); + } + catch (InterruptedException | ExecutionException ex) { + return Health.down() + .withDetail("error", + ex.getClass().getName() + + ": Health check did not compete successfully") + .build(); + } + }); + } + + /** + * Create a new + * {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator} + * from the indicators in the given {@code registry} and provide a + * ThreadPoolTaskExecutor for submitting the health checks. + * @param healthAggregator the health aggregator + * @param registry the registry of {@link HealthIndicator HealthIndicators}. + * @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on + * @param timeout the maximum time to wait for a HealthIndicator to complete + */ + public ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator, + HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor, + Duration timeout) { + this(healthAggregator, registry, executor, (future) -> { + try { + return future.get(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + catch (InterruptedException | ExecutionException ex) { + return Health.down() + .withDetail("error", + ex.getClass().getName() + + ": health check did not compete successfully") + .build(); + } + catch (TimeoutException ex) { + return Health.down() + .withDetail("error", + ex.getClass().getName() + + ": health check timed out after " + timeout) + .build(); + } + }); + } + + /** + * Create a new + * {@link org.springframework.boot.actuate.health.ConcurrentCompositeHealthIndicator} + * from the indicators in the given {@code registry} and provide a + * ThreadPoolTaskExecutor for submitting the health checks. + * @param healthAggregator the health aggregator + * @param registry the registry of {@link HealthIndicator HealthIndicators}. + * @param executor the {@link ThreadPoolTaskExecutor} to submit HealthIndicators on + * @param futureFunction function to select Future::get with or without a timeout + */ + private ConcurrentCompositeHealthIndicator(HealthAggregator healthAggregator, + HealthIndicatorRegistry registry, ThreadPoolTaskExecutor executor, + Function, Health> futureFunction) { + super(healthAggregator, registry); + this.executor = executor; + this.futureFunction = futureFunction; + } + + @Override + public Health health() { + Map> futureHealths = getRegistry().getAll().entrySet() + .stream() + .map((entry) -> new SimpleEntry<>(entry.getKey(), + this.executor.submit(() -> entry.getValue().health()))) + .collect(LinkedHashMap::new, + (map, entry) -> map.put(entry.getKey(), entry.getValue()), + Map::putAll); + + Map healths = futureHealths.entrySet().stream() + .map((entry) -> new SimpleEntry<>(entry.getKey(), + this.futureFunction.apply(entry.getValue()))) + .collect(LinkedHashMap::new, + (map, entry) -> map.put(entry.getKey(), entry.getValue()), + Map::putAll); + + return getAggregator().aggregate(healths); + } + +} diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java new file mode 100644 index 000000000000..d47e7b2c3b8a --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.health; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; + +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.BDDMockito.given; + +/** + * Tests for {@link ConcurrentCompositeHealthIndicator} + * + * @author Rik vd Ende + */ +public class ConcurrentCompositeHealthIndicatorTest { + + private HealthAggregator healthAggregator; + + @Mock + private HealthIndicator one; + + @Mock + private HealthIndicator two; + + @Mock + private ThreadPoolTaskExecutor executor; + + private ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors + .newFixedThreadPool(2); + + @Before + @SuppressWarnings("unchecked") + public void setup() { + MockitoAnnotations.initMocks(this); + given(this.one.health()) + .willReturn(new Health.Builder().unknown().withDetail("1", "1").build()); + given(this.two.health()) + .willReturn(new Health.Builder().unknown().withDetail("2", "2").build()); + given(this.executor.getThreadPoolExecutor()).willReturn(this.threadPoolExecutor); + given(this.executor.submit(isA(Callable.class))) + .will((InvocationOnMock invocation) -> this.threadPoolExecutor + .submit((Callable) invocation.getArgument(0))); + + this.healthAggregator = new OrderedHealthAggregator(); + } + + @Test + public void createWithIndicators() { + Map indicators = new HashMap<>(); + indicators.put("one", this.one); + indicators.put("two", this.two); + ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator( + this.healthAggregator, indicators, this.executor); + Health result = composite.health(); + assertThat(result.getDetails()).hasSize(2); + assertThat(result.getDetails()).containsEntry("one", + new Health.Builder().unknown().withDetail("1", "1").build()); + assertThat(result.getDetails()).containsEntry("two", + new Health.Builder().unknown().withDetail("2", "2").build()); + } + + @Test + public void testSerialization() throws Exception { + Map indicators = new HashMap<>(); + indicators.put("db1", this.one); + indicators.put("db2", this.two); + ConcurrentCompositeHealthIndicator innerComposite = new ConcurrentCompositeHealthIndicator( + this.healthAggregator, indicators, this.executor); + ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator( + this.healthAggregator, Collections.singletonMap("db", innerComposite), + this.executor); + Health result = composite.health(); + ObjectMapper mapper = new ObjectMapper(); + assertThat(mapper.writeValueAsString(result)).isEqualTo( + "{\"status\":\"UNKNOWN\",\"details\":{\"db\":{\"status\":\"UNKNOWN\"" + + ",\"details\":{\"db1\":{\"status\":\"UNKNOWN\",\"details\"" + + ":{\"1\":\"1\"}},\"db2\":{\"status\":\"UNKNOWN\",\"details\"" + + ":{\"2\":\"2\"}}}}}}"); + } + + @Test + public void testWithTimeout() throws Exception { + Map indicators = new HashMap<>(); + indicators.put("one", this.one); + indicators.put("two", this.two); + ConcurrentCompositeHealthIndicator innerComposite = new ConcurrentCompositeHealthIndicator( + this.healthAggregator, indicators, this.executor); + ConcurrentCompositeHealthIndicator composite = new ConcurrentCompositeHealthIndicator( + this.healthAggregator, Collections.singletonMap("db", innerComposite), + this.executor, Duration.ZERO); + + Health result = composite.health(); + assertThat(result.getDetails()).hasSize(1); + assertThat(result.getDetails()).containsEntry("db", + new Health.Builder().down().withException( + new IllegalStateException("Health check timed out after PT0S")) + .build()); + + } + +} From 5a99a824b54517f30f6143e675eefa7b318498e9 Mon Sep 17 00:00:00 2001 From: Rik Date: Sat, 2 Feb 2019 12:56:11 -0500 Subject: [PATCH 2/2] Add a concurrent extension of CompositeHealthIndicator Unit test to verify output is identical to CompositeHealthIndicator test must work --- .../health/ConcurrentCompositeHealthIndicatorTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java index d47e7b2c3b8a..55fc9b0ce4a3 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/health/ConcurrentCompositeHealthIndicatorTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeoutException; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Before; @@ -122,8 +123,9 @@ public void testWithTimeout() throws Exception { Health result = composite.health(); assertThat(result.getDetails()).hasSize(1); assertThat(result.getDetails()).containsEntry("db", - new Health.Builder().down().withException( - new IllegalStateException("Health check timed out after PT0S")) + Health.down() + .withDetail("error", TimeoutException.class.getName() + + ": health check timed out after " + Duration.ZERO) .build()); }