From dca65020d5a89440f46563e55ae5db77bbb1a8ff Mon Sep 17 00:00:00 2001 From: Rik Ende Date: Wed, 23 Jan 2019 15:13:15 -0500 Subject: [PATCH 1/3] Gather health checks in parallel and preserve original order --- .../actuate/health/CompositeHealthIndicator.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java index 6cd60dfa44ba..4ad28e50fcf6 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java @@ -16,6 +16,7 @@ package org.springframework.boot.actuate.health; +import java.util.AbstractMap.SimpleEntry; import java.util.LinkedHashMap; import java.util.Map; @@ -90,13 +91,17 @@ public HealthIndicatorRegistry getRegistry() { return this.registry; } + // TODO Guarantee preserved ordering + // TODO it doesn't seem important but don't make assumptions on other people's behalf @Override public Health health() { - Map healths = new LinkedHashMap<>(); - for (Map.Entry entry : this.registry.getAll() - .entrySet()) { - healths.put(entry.getKey(), entry.getValue().health()); - } + /* Gather healths in parallel */ + Map healths = this.registry.getAll().entrySet().parallelStream() + .map((e) -> new SimpleEntry<>(e.getKey(), e.getValue().health())) + /* Merge the streams together */ + .collect(LinkedHashMap::new, + (map, e) -> map.put(e.getKey(), e.getValue()), Map::putAll); + // .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue, return this.aggregator.aggregate(healths); } From 8a4370e1cbcb760f8c001569d73e88b61efdfb72 Mon Sep 17 00:00:00 2001 From: Rik Ende Date: Wed, 23 Jan 2019 16:01:15 -0500 Subject: [PATCH 2/3] Preserve order, clean up --- .../boot/actuate/health/CompositeHealthIndicator.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java index 4ad28e50fcf6..425f0e56ec13 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeHealthIndicator.java @@ -91,17 +91,14 @@ public HealthIndicatorRegistry getRegistry() { return this.registry; } - // TODO Guarantee preserved ordering - // TODO it doesn't seem important but don't make assumptions on other people's behalf @Override public Health health() { /* Gather healths in parallel */ Map healths = this.registry.getAll().entrySet().parallelStream() .map((e) -> new SimpleEntry<>(e.getKey(), e.getValue().health())) - /* Merge the streams together */ + /* Merge the entries, preserving original order */ .collect(LinkedHashMap::new, (map, e) -> map.put(e.getKey(), e.getValue()), Map::putAll); - // .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue, return this.aggregator.aggregate(healths); } From 54ca12343bebb0f51e6b05dc8a0c53801e30bdab Mon Sep 17 00:00:00 2001 From: Rik Date: Fri, 25 Jan 2019 06:41:12 -0500 Subject: [PATCH 3/3] Gather health checks in parallel --- .../actuate/health/CompositeReactiveHealthIndicator.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeReactiveHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeReactiveHealthIndicator.java index 007f454fcf62..8dfd12d23845 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeReactiveHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/health/CompositeReactiveHealthIndicator.java @@ -23,7 +23,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; +import reactor.core.scheduler.Schedulers; /** * {@link ReactiveHealthIndicator} that returns health indications from all registered @@ -122,10 +122,13 @@ public CompositeReactiveHealthIndicator timeoutStrategy(long timeout, @Override public Mono health() { - return Flux.fromIterable(this.registry.getAll().entrySet()) + return Flux.fromIterable(this.registry.getAll().entrySet()).parallel() + .runOn(Schedulers.elastic()) .flatMap((entry) -> Mono.zip(Mono.just(entry.getKey()), entry.getValue().health().compose(this.timeoutCompose))) - .collectMap(Tuple2::getT1, Tuple2::getT2) + .sequential() + .collect(LinkedHashMap::new, + (map, entry) -> map.put(entry.getT1(), entry.getT2())) .map(this.healthAggregator::aggregate); }