Skip to content

Commit bc2fbcc

Browse files
pulkitmehraRobWin
authored andcommitted
Issue ReactiveX#657: Added future decorator to CircuitBreaker Interface (ReactiveX#714)
1 parent 90f3c0b commit bc2fbcc

File tree

7 files changed

+755
-24
lines changed

7 files changed

+755
-24
lines changed

resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/CircuitBreaker.java

Lines changed: 114 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,15 @@
2121
import io.github.resilience4j.circuitbreaker.event.*;
2222
import io.github.resilience4j.circuitbreaker.internal.CircuitBreakerStateMachine;
2323
import io.github.resilience4j.core.EventConsumer;
24+
import io.github.resilience4j.core.functions.OnceConsumer;
2425
import io.vavr.*;
2526
import io.vavr.control.Either;
2627
import io.vavr.control.Try;
2728

2829
import java.util.Arrays;
2930
import java.util.Map;
30-
import java.util.concurrent.Callable;
31-
import java.util.concurrent.CompletableFuture;
32-
import java.util.concurrent.CompletionStage;
33-
import java.util.concurrent.TimeUnit;
31+
import java.util.Objects;
32+
import java.util.concurrent.*;
3433
import java.util.function.Consumer;
3534
import java.util.function.Function;
3635
import java.util.function.Supplier;
@@ -403,6 +402,20 @@ default <T> CheckedConsumer<T> decorateCheckedConsumer(CheckedConsumer<T> consum
403402
return decorateCheckedConsumer(this, consumer);
404403
}
405404

405+
/**
406+
* Returns a supplier of type Future which is decorated by a CircuitBreaker.
407+
* The elapsed time includes {@link Future#get()} evaluation time even if the
408+
* underlying call took less time to return. Any delays in evaluating Future by caller will
409+
* add towards total time.
410+
*
411+
* @param supplier the original supplier
412+
* @param <T> the type of the returned CompletionStage's result
413+
* @return a supplier which is decorated by a CircuitBreaker.
414+
*/
415+
default <T> Supplier<Future<T>> decorateFuture(Supplier<Future<T>> supplier) {
416+
return decorateFuture(this, supplier);
417+
}
418+
406419
/**
407420
* States of the CircuitBreaker state machine.
408421
*/
@@ -997,4 +1010,101 @@ static CircuitBreaker of(String name, Supplier<CircuitBreakerConfig> circuitBrea
9971010
static CircuitBreaker of(String name, Supplier<CircuitBreakerConfig> circuitBreakerConfigSupplier, io.vavr.collection.Map<String, String> tags){
9981011
return new CircuitBreakerStateMachine(name, circuitBreakerConfigSupplier, tags);
9991012
}
1013+
1014+
/**
1015+
* Returns a supplier of type Future which is decorated by a CircuitBreaker.
1016+
* The elapsed time includes {@link Future#get()} evaluation time even if the
1017+
* underlying call took less time to return. Any delays in evaluating Future by
1018+
* caller will add towards total time.
1019+
*
1020+
* @param circuitBreaker the CircuitBreaker
1021+
* @param supplier the original supplier
1022+
* @param <T> the type of the returned Future's result
1023+
* @return a supplier which is decorated by a CircuitBreaker.
1024+
*/
1025+
static <T> Supplier<Future<T>> decorateFuture(CircuitBreaker circuitBreaker, Supplier<Future<T>> supplier) {
1026+
return () -> {
1027+
if (!circuitBreaker.tryAcquirePermission()) {
1028+
CompletableFuture<T> promise = new CompletableFuture<>();
1029+
promise.completeExceptionally(CallNotPermittedException.createCallNotPermittedException(circuitBreaker));
1030+
return promise;
1031+
} else {
1032+
final long start = System.nanoTime();
1033+
try {
1034+
return new CircuitBreakerFuture<>(circuitBreaker, supplier.get(), start);
1035+
} catch (Exception e) {
1036+
long durationInNanos = System.nanoTime() - start;
1037+
circuitBreaker.onError(durationInNanos, TimeUnit.NANOSECONDS, e);
1038+
throw e;
1039+
}
1040+
}
1041+
};
1042+
}
1043+
1044+
/**
1045+
* This class decorates future to add CircuitBreaking functionality around invocation.
1046+
*
1047+
* @param <T> of return type
1048+
*/
1049+
final class CircuitBreakerFuture<T> implements Future<T> {
1050+
final private Future<T> future;
1051+
final private OnceConsumer<CircuitBreaker> onceToCircuitbreaker;
1052+
final private long start;
1053+
1054+
CircuitBreakerFuture(CircuitBreaker circuitBreaker, Future<T> future) {
1055+
this(circuitBreaker, future, System.nanoTime());
1056+
}
1057+
1058+
CircuitBreakerFuture(CircuitBreaker circuitBreaker, Future<T> future, long start) {
1059+
Objects.requireNonNull(future, "Non null Future is required to decorate");
1060+
this.onceToCircuitbreaker = OnceConsumer.of(circuitBreaker);
1061+
this.future = future;
1062+
this.start = start;
1063+
}
1064+
1065+
@Override
1066+
public boolean cancel(boolean mayInterruptIfRunning) {
1067+
return future.cancel(mayInterruptIfRunning);
1068+
}
1069+
1070+
@Override
1071+
public boolean isCancelled() {
1072+
return future.isCancelled();
1073+
}
1074+
1075+
@Override
1076+
public boolean isDone() {
1077+
return future.isDone();
1078+
}
1079+
1080+
@Override
1081+
public T get() throws InterruptedException, ExecutionException {
1082+
try {
1083+
T v = future.get();
1084+
onceToCircuitbreaker.applyOnce(cb -> cb.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS));
1085+
return v;
1086+
} catch (CancellationException | InterruptedException e) {
1087+
onceToCircuitbreaker.applyOnce(cb -> cb.releasePermission());
1088+
throw e;
1089+
} catch (Exception e) {
1090+
onceToCircuitbreaker.applyOnce(cb -> cb.onError(System.nanoTime() - start, TimeUnit.NANOSECONDS, e));
1091+
throw e;
1092+
}
1093+
}
1094+
1095+
@Override
1096+
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1097+
try {
1098+
T v = future.get(timeout, unit);
1099+
onceToCircuitbreaker.applyOnce(cb -> cb.onSuccess(System.nanoTime() - start, TimeUnit.NANOSECONDS));
1100+
return v;
1101+
} catch (CancellationException | InterruptedException e) {
1102+
onceToCircuitbreaker.applyOnce(cb -> cb.releasePermission());
1103+
throw e;
1104+
} catch (Exception e) {
1105+
onceToCircuitbreaker.applyOnce(cb -> cb.onError(System.nanoTime() - start, TimeUnit.NANOSECONDS, e));
1106+
throw e;
1107+
}
1108+
}
1109+
}
10001110
}

resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.time.Instant;
3535
import java.util.Objects;
3636
import java.util.concurrent.CompletionException;
37+
import java.util.concurrent.ExecutionException;
3738
import java.util.concurrent.ScheduledExecutorService;
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.atomic.AtomicBoolean;
@@ -190,7 +191,7 @@ public void acquirePermission() {
190191
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
191192
// Handle the case if the completable future throws a CompletionException wrapping the original exception
192193
// where original exception is the the one to retry not the CompletionException.
193-
if (throwable instanceof CompletionException) {
194+
if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
194195
Throwable cause = throwable.getCause();
195196
handleThrowable(duration, durationUnit, cause);
196197
}else{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package io.github.resilience4j.circuitbreaker;
2+
3+
import io.github.resilience4j.circuitbreaker.CircuitBreaker.CircuitBreakerFuture;
4+
import org.junit.Test;
5+
6+
import java.util.concurrent.*;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
import static org.assertj.core.api.Assertions.catchThrowable;
10+
import static org.mockito.ArgumentMatchers.any;
11+
import static org.mockito.ArgumentMatchers.anyLong;
12+
import static org.mockito.BDDMockito.then;
13+
import static org.mockito.Mockito.mock;
14+
import static org.mockito.Mockito.times;
15+
import static org.mockito.Mockito.when;
16+
17+
/**
18+
* Class CircuitBreakerFutureTest.
19+
*/
20+
public class CircuitBreakerFutureTest {
21+
22+
@Test
23+
public void shouldDecorateFutureAndReturnSuccess() throws Exception {
24+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
25+
26+
final Future<String> future = mock(Future.class);
27+
when(future.get()).thenReturn("Hello World");
28+
29+
CircuitBreakerFuture<String> decoratedFuture = new CircuitBreakerFuture<>(circuitBreaker, future);
30+
String value = decoratedFuture.get();
31+
32+
assertThat(value).isEqualTo("Hello World");
33+
34+
assertThat(circuitBreaker.getMetrics().getNumberOfBufferedCalls()).isEqualTo(1);
35+
assertThat(circuitBreaker.getMetrics().getNumberOfFailedCalls()).isEqualTo(0);
36+
assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(1);
37+
assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(0);
38+
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
39+
40+
then(future).should().get();
41+
}
42+
43+
@Test
44+
public void shouldDecorateFutureAndCircuitBreakingLogicApplyOnceOnMultipleFutureEval() throws Exception {
45+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
46+
47+
final Future<String> future = mock(Future.class);
48+
when(future.get()).thenReturn("Hello World");
49+
50+
CircuitBreakerFuture<String> decoratedFuture = new CircuitBreakerFuture<>(circuitBreaker, future);
51+
52+
//called twice but circuit breaking should be evaluated once.
53+
decoratedFuture.get();
54+
decoratedFuture.get();
55+
56+
assertThat(circuitBreaker.getMetrics().getNumberOfBufferedCalls()).isEqualTo(1);
57+
assertThat(circuitBreaker.getMetrics().getNumberOfFailedCalls()).isEqualTo(0);
58+
assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(1);
59+
assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(0);
60+
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
61+
62+
then(future).should(times(2)).get();
63+
}
64+
65+
@Test
66+
public void shouldDecorateFutureAndThrowExecutionException() throws Exception {
67+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
68+
69+
final Future<String> future = mock(Future.class);
70+
when(future.get()).thenThrow(new ExecutionException(new RuntimeException("BAM!")));
71+
72+
CircuitBreakerFuture<String> decoratedFuture = new CircuitBreakerFuture<>(circuitBreaker, future);
73+
74+
Throwable thrown = catchThrowable(() -> decoratedFuture.get());
75+
76+
assertThat(thrown).isInstanceOf(ExecutionException.class)
77+
.hasCauseInstanceOf(RuntimeException.class);
78+
79+
assertThat(circuitBreaker.getMetrics().getNumberOfBufferedCalls()).isEqualTo(1);
80+
assertThat(circuitBreaker.getMetrics().getNumberOfFailedCalls()).isEqualTo(1);
81+
assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(0);
82+
assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(0);
83+
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
84+
85+
then(future).should().get();
86+
}
87+
88+
@Test
89+
public void shouldDecorateFutureAndThrowTimeoutException() throws Exception {
90+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
91+
92+
final Future<String> future = mock(Future.class);
93+
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(new TimeoutException());
94+
95+
CircuitBreakerFuture<String> decoratedFuture = new CircuitBreakerFuture<>(circuitBreaker, future);
96+
97+
Throwable thrown = catchThrowable(() -> decoratedFuture.get(5, TimeUnit.SECONDS));
98+
99+
assertThat(thrown).isInstanceOf(TimeoutException.class);
100+
101+
assertThat(circuitBreaker.getMetrics().getNumberOfBufferedCalls()).isEqualTo(1);
102+
assertThat(circuitBreaker.getMetrics().getNumberOfFailedCalls()).isEqualTo(1);
103+
assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(0);
104+
assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(0);
105+
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
106+
107+
then(future).should().get(anyLong(), any(TimeUnit.class));
108+
}
109+
110+
@Test
111+
public void shouldDecorateFutureAndCallerRequestCancelled() throws Exception {
112+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
113+
114+
final Future<String> future = mock(Future.class);
115+
when(future.get()).thenThrow(new CancellationException());
116+
117+
CircuitBreakerFuture<String> decoratedFuture = new CircuitBreakerFuture<>(circuitBreaker, future);
118+
Throwable thrown = catchThrowable(() -> decoratedFuture.get());
119+
120+
assertThat(thrown).isInstanceOf(CancellationException.class);
121+
122+
assertThat(circuitBreaker.getMetrics().getNumberOfBufferedCalls()).isEqualTo(0);
123+
assertThat(circuitBreaker.getMetrics().getNumberOfFailedCalls()).isEqualTo(0);
124+
assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(0);
125+
assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(0);
126+
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
127+
128+
then(future).should().get();
129+
}
130+
131+
@Test
132+
public void shouldDecorateFutureAndInterruptedExceptionThrownByTaskThread() throws Exception {
133+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
134+
135+
final Future<String> future = mock(Future.class);
136+
when(future.get(anyLong(), any(TimeUnit.class))).thenThrow(new ExecutionException(new InterruptedException()));
137+
138+
CircuitBreakerFuture<String> decoratedFuture = new CircuitBreakerFuture<>(circuitBreaker, future);
139+
Throwable thrown = catchThrowable(() -> decoratedFuture.get(5, TimeUnit.SECONDS));
140+
141+
//If interrupt is called on the Task thread than InterruptedException is thrown wrapped in
142+
// ExecutionException where as if current thread gets interrupted it throws
143+
// InterruptedException directly.
144+
assertThat(thrown).isInstanceOf(ExecutionException.class).hasCauseInstanceOf(InterruptedException.class);
145+
146+
assertThat(circuitBreaker.getMetrics().getNumberOfBufferedCalls()).isEqualTo(1);
147+
assertThat(circuitBreaker.getMetrics().getNumberOfFailedCalls()).isEqualTo(1);
148+
assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(0);
149+
assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(0);
150+
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
151+
152+
then(future).should().get(anyLong(), any(TimeUnit.class));
153+
}
154+
155+
@Test
156+
public void shouldDecorateFutureAndInterruptedExceptionThrownByCallingThread() throws Exception {
157+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
158+
159+
//long running task
160+
final Future<String> future = mock(Future.class);
161+
when(future.get()).thenThrow(new InterruptedException());
162+
163+
CircuitBreakerFuture<String> decoratedFuture = new CircuitBreakerFuture<>(circuitBreaker, future);
164+
165+
Throwable thrown = catchThrowable(() -> decoratedFuture.get());
166+
//If interrupt is called on the Task thread than InterruptedException is thrown wrapped in
167+
// ExecutionException where as if current thread gets interrupted it throws
168+
// InterruptedException directly.
169+
assertThat(thrown).isInstanceOf(InterruptedException.class);
170+
171+
assertThat(circuitBreaker.getMetrics().getNumberOfBufferedCalls()).isEqualTo(0);
172+
assertThat(circuitBreaker.getMetrics().getNumberOfFailedCalls()).isEqualTo(0);
173+
assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isEqualTo(0);
174+
assertThat(circuitBreaker.getMetrics().getNumberOfNotPermittedCalls()).isEqualTo(0);
175+
assertThat(circuitBreaker.getState()).isEqualTo(CircuitBreaker.State.CLOSED);
176+
177+
then(future).should().get();
178+
}
179+
180+
}

0 commit comments

Comments
 (0)