Skip to content

Commit 672dfc8

Browse files
authored
More Effective RemoteSolver Circuit Check #526 (#528)
* refactoring * release notes, Log Remote Servers circuit states * [Gradle Release Plugin] - new version commit: '3.25.4-snapshot'. * ignoring to half open and from half open to open transition * refactoring * refactoring * comment * add logs * fixme notes * comments * adjusting fixme notes * docs * clean code
1 parent bc22a7a commit 672dfc8

File tree

12 files changed

+170
-17
lines changed

12 files changed

+170
-17
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
Thumbs.db
77

88
# Build Files
9-
*.log
9+
*.log*
1010
bin
1111
target
1212
build/

RELEASE-NOTES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 3.25.4
2+
* Disable CircuitBreaker check by using ping, keep using the queries to check the server status #526
3+
* Disable Cache Flush when Circuit Breaker change state to Half Open #526
4+
* Log Remote Servers circuit states #526
5+
16
## 3.25.3
27
* Resources utilization optimization by disabling connection check at every query #524
38

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=3.25.3-snapshot
1+
version=3.25.4-snapshot

src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,17 @@ public Map<String, Integer> findInstancesSizeMap(Name name) {
7272
;
7373
}
7474

75+
/**
76+
* This method should be called from one single thread, or it will cause deadlock.
77+
*/
7578
public void clearCaches() {
79+
// fixme #526 possible solutions for the deadlock:
80+
// 1 - only one thread can clear the cache at a time
81+
// 2 - move the locks to one centralized thread responsible for the cache management
7682
for (final var cache : this.getCaches()) {
83+
log.trace("status=clearing, cache={}", cache.name());
7784
cache.clear();
85+
log.trace("status=cleared, cache={}", cache.name());
7886
}
7987
}
8088
}

src/main/java/com/mageddo/dnsproxyserver/solver/SolverRemote.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public Response handle(Message query) {
5454

5555
Result queryResultFromAvailableResolvers(Message query, StopWatch stopWatch) {
5656
final var lastErrorMsg = new AtomicReference<Message>();
57-
57+
// fixme #526 better to exclude open circuits.
5858
for (int i = 0; i < this.delegate.resolvers().size(); i++) {
5959

6060
final var resolver = this.delegate.resolvers().get(i);

src/main/java/com/mageddo/dnsproxyserver/solver/remote/application/CircuitBreakerCheckerService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ void check(InetSocketAddress server, CircuitBreaker<Result> circuitBreaker) {
4343
});
4444
}
4545

46+
/**
47+
* Note: Ping isn't being effective for DPS circuit breaker usage.
48+
* @see https://github.com/mageddo/dns-proxy-server/issues/526#issuecomment-2261421618
49+
*/
4650
boolean ping(InetSocketAddress server) {
4751
return Networks.ping(server, SolverRemote.PING_TIMEOUT_IN_MS);
4852
}

src/main/java/com/mageddo/dnsproxyserver/solver/remote/application/CircuitBreakerFactory.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,21 @@
77
import com.mageddo.dnsproxyserver.solver.remote.dataprovider.SolverConsistencyGuaranteeDAO;
88
import com.mageddo.dnsproxyserver.solver.remote.mapper.CircuitBreakerStateMapper;
99
import dev.failsafe.CircuitBreaker;
10+
import dev.failsafe.Failsafe;
1011
import dev.failsafe.event.CircuitBreakerStateChangedEvent;
1112
import dev.failsafe.event.EventListener;
1213
import lombok.RequiredArgsConstructor;
14+
import lombok.Value;
1315
import lombok.extern.slf4j.Slf4j;
1416
import org.apache.commons.lang3.time.StopWatch;
1517

1618
import javax.inject.Inject;
1719
import javax.inject.Singleton;
1820
import java.net.InetSocketAddress;
21+
import java.util.List;
1922
import java.util.Map;
2023
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.function.Supplier;
2125

2226
@Slf4j
2327
@Singleton
@@ -29,7 +33,14 @@ public class CircuitBreakerFactory {
2933
private final CircuitBreakerCheckerService circuitBreakerCheckerService;
3034
private final SolverConsistencyGuaranteeDAO solverConsistencyGuaranteeDAO;
3135

32-
public CircuitBreaker<Result> createCircuitBreakerFor(InetSocketAddress address) {
36+
public Result check(InetSocketAddress remoteAddress, Supplier<Result> sup) {
37+
final var circuitBreaker = this.createOrGetCircuitBreaker(remoteAddress);
38+
return Failsafe
39+
.with(circuitBreaker)
40+
.get((ctx) -> sup.get());
41+
}
42+
43+
CircuitBreaker<Result> createOrGetCircuitBreaker(InetSocketAddress address) {
3344
final var config = this.findCircuitBreakerConfig();
3445
return this.circuitBreakerMap.computeIfAbsent(address, addr -> buildCircuitBreaker(addr, config));
3546
}
@@ -44,21 +55,34 @@ CircuitBreaker<Result> buildCircuitBreaker(
4455
.withDelay(config.getTestDelay())
4556
.onClose(build("CLOSED", address))
4657
.onOpen(build("OPEN", address))
47-
.onHalfOpen(build("HALF_OPEN", address))
4858
.build();
4959
}
5060

5161
EventListener<CircuitBreakerStateChangedEvent> build(String actualStateName, InetSocketAddress address) {
5262
return event -> {
5363
final var previousStateName = CircuitBreakerStateMapper.toStateNameFrom(event);
54-
this.solverConsistencyGuaranteeDAO.flushCachesFromCircuitBreakerStateChange();
64+
if (isHalfOpenToOpen(previousStateName, actualStateName)) {
65+
log.trace("status=ignoredTransition, from={}, to={}", previousStateName, actualStateName);
66+
return;
67+
}
68+
log.trace(
69+
"status=beforeFlushCaches, address={}, previous={}, actual={}", address, previousStateName, actualStateName
70+
);
71+
this.flushCache();
5572
log.debug(
56-
"status=clearedCache, address={}, previousStateName={}, actualStateName={}",
57-
address, previousStateName, actualStateName
73+
"status=clearedCache, address={}, previous={}, actual={}", address, previousStateName, actualStateName
5874
);
5975
};
6076
}
6177

78+
private static boolean isHalfOpenToOpen(String previousStateName, String actualStateName) {
79+
return "HALF_OPEN".equals(previousStateName) && "OPEN".equals(actualStateName);
80+
}
81+
82+
void flushCache() {
83+
this.solverConsistencyGuaranteeDAO.flushCachesFromCircuitBreakerStateChange();
84+
}
85+
6286
com.mageddo.dnsproxyserver.config.CircuitBreaker findCircuitBreakerConfig() {
6387
return this.configService.findCurrentConfig()
6488
.getSolverRemote()
@@ -87,7 +111,33 @@ boolean circuitBreakerSafeCheck(Map.Entry<InetSocketAddress, CircuitBreaker<Resu
87111
return this.circuitBreakerCheckerService.safeCheck(entry.getKey(), entry.getValue());
88112
}
89113

90-
public void reset(){
114+
public void reset() {
91115
this.circuitBreakerMap.clear();
92116
}
117+
118+
public List<Stats> stats() {
119+
return this.circuitBreakerMap.keySet()
120+
.stream()
121+
.map(this::toStats)
122+
.toList();
123+
}
124+
125+
private Stats toStats(InetSocketAddress remoteAddr) {
126+
final var circuitBreaker = this.circuitBreakerMap.get(remoteAddr);
127+
final var state = circuitBreaker.getState().name();
128+
return Stats.of(remoteAddr.toString(), state);
129+
}
130+
131+
132+
@Value
133+
public static class Stats {
134+
135+
private String remoteServerAddress;
136+
private String state;
137+
138+
public static Stats of(String remoteServerAddress, String state) {
139+
return new Stats(remoteServerAddress, state);
140+
}
141+
}
142+
93143
}

src/main/java/com/mageddo/dnsproxyserver/solver/remote/application/CircuitBreakerFailSafeService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.mageddo.dnsproxyserver.solver.remote.CircuitBreakerService;
55
import com.mageddo.dnsproxyserver.solver.remote.Result;
66
import dev.failsafe.CircuitBreakerOpenException;
7-
import dev.failsafe.Failsafe;
87
import lombok.RequiredArgsConstructor;
98
import lombok.extern.slf4j.Slf4j;
109
import org.apache.commons.lang3.ClassUtils;
@@ -36,10 +35,7 @@ public Result safeHandle(InetSocketAddress resolverAddress, Supplier<Result> sup
3635
}
3736

3837
private Result handle(InetSocketAddress resolverAddress, Supplier<Result> sup) {
39-
final var circuitBreaker = this.circuitBreakerFactory.createCircuitBreakerFor(resolverAddress);
40-
return Failsafe
41-
.with(circuitBreaker)
42-
.get((ctx) -> sup.get());
38+
return this.circuitBreakerFactory.check(resolverAddress, sup);
4339
}
4440

4541
public String getStatus() {

src/main/java/com/mageddo/dnsproxyserver/solver/remote/configurator/CircuitBreakerWatchDogScheduler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import com.mageddo.dnsproxyserver.di.StartupEvent;
55
import com.mageddo.dnsproxyserver.solver.remote.application.CircuitBreakerFactory;
66
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
78

89
import javax.inject.Inject;
910
import javax.inject.Singleton;
1011
import java.util.concurrent.ScheduledExecutorService;
1112
import java.util.concurrent.TimeUnit;
1213

14+
@Slf4j
1315
@Singleton
1416
@RequiredArgsConstructor(onConstructor = @__({@Inject}))
1517
public class CircuitBreakerWatchDogScheduler implements StartupEvent {
@@ -19,6 +21,14 @@ public class CircuitBreakerWatchDogScheduler implements StartupEvent {
1921

2022
@Override
2123
public void onStart() {
22-
this.executor.scheduleWithFixedDelay(this.circuitBreakerFactory::checkCreatedCircuits, 0, 10, TimeUnit.SECONDS);
24+
this.executor.scheduleWithFixedDelay(this::logStats, 0, 10, TimeUnit.SECONDS);
25+
}
26+
27+
void logStats() {
28+
this.circuitBreakerFactory
29+
.stats()
30+
.forEach(stats -> {
31+
log.debug("stats={}", stats);
32+
});
2333
}
2434
}

src/test/java/com/mageddo/dnsproxyserver/solver/remote/application/CircuitBreakerFactoryTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,26 @@
11
package com.mageddo.dnsproxyserver.solver.remote.application;
22

3+
import com.mageddo.commons.circuitbreaker.CircuitCheckException;
4+
import com.mageddo.commons.concurrent.Threads;
5+
import com.mageddo.dnsproxyserver.solver.remote.Result;
36
import org.junit.jupiter.api.Test;
47
import org.junit.jupiter.api.extension.ExtendWith;
58
import org.mockito.InjectMocks;
69
import org.mockito.Spy;
710
import org.mockito.junit.jupiter.MockitoExtension;
811
import testing.templates.CircuitBreakerConfigTemplates;
912
import testing.templates.InetSocketAddressTemplates;
13+
import testing.templates.solver.remote.ResultSupplierTemplates;
14+
15+
import java.net.InetSocketAddress;
16+
import java.util.function.Supplier;
1017

1118
import static org.junit.jupiter.api.Assertions.assertEquals;
19+
import static org.junit.jupiter.api.Assertions.assertThrows;
1220
import static org.mockito.ArgumentMatchers.any;
1321
import static org.mockito.Mockito.doReturn;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
1424

1525
@ExtendWith(MockitoExtension.class)
1626
class CircuitBreakerFactoryTest {
@@ -29,7 +39,7 @@ void mustCheckAllExistentCircuitsAndCountSuccessWhenSafeCheckReturnsTrue() {
2939
doReturn(true).when(this.factory).circuitBreakerSafeCheck(any());
3040

3141
final var addr = InetSocketAddressTemplates._8_8_8_8();
32-
this.factory.createCircuitBreakerFor(addr);
42+
this.factory.createOrGetCircuitBreaker(addr);
3343

3444
// act
3545
final var result = this.factory.checkCreatedCircuits();
@@ -50,7 +60,7 @@ void mustCheckAndCountErrorWhenSafeCheckReturnsFalse() {
5060
doReturn(false).when(this.factory).circuitBreakerSafeCheck(any());
5161

5262
final var addr = InetSocketAddressTemplates._8_8_8_8();
53-
this.factory.createCircuitBreakerFor(addr);
63+
this.factory.createOrGetCircuitBreaker(addr);
5464

5565
// act
5666
final var result = this.factory.checkCreatedCircuits();
@@ -59,4 +69,42 @@ void mustCheckAndCountErrorWhenSafeCheckReturnsFalse() {
5969
assertEquals(0, result.getKey());
6070
assertEquals(1, result.getValue());
6171
}
72+
73+
@Test
74+
void mustNotFlushCacheWhenChangeStateToHalfOpen(){
75+
76+
// arrange
77+
assertEquals("[]", this.factory.stats().toString());
78+
79+
final var addr = InetSocketAddressTemplates._8_8_8_8();
80+
final var supError = ResultSupplierTemplates.alwaysFail();
81+
final var supSuccess = ResultSupplierTemplates.alwaysSuccess();
82+
83+
doReturn(CircuitBreakerConfigTemplates.oneTryFailSuccess())
84+
.when(this.factory)
85+
.findCircuitBreakerConfig()
86+
;
87+
88+
this.checkFailAndSleep(addr, supError);
89+
this.checkFailAndSleep(addr, supError);
90+
91+
this.factory.check(addr, supSuccess);
92+
assertEquals(
93+
"[CircuitBreakerFactory.Stats(remoteServerAddress=/8.8.8.8:53, state=CLOSED)]",
94+
this.factory.stats().toString()
95+
);
96+
verify(this.factory, times(2)).flushCache();
97+
98+
99+
}
100+
101+
void checkFailAndSleep(InetSocketAddress addr, Supplier<Result> supError) {
102+
assertThrows(CircuitCheckException.class, () -> this.factory.check(addr, supError));
103+
assertEquals(
104+
"[CircuitBreakerFactory.Stats(remoteServerAddress=/8.8.8.8:53, state=OPEN)]",
105+
this.factory.stats().toString()
106+
);
107+
verify(this.factory).flushCache();
108+
Threads.sleep(100);
109+
}
62110
}

0 commit comments

Comments
 (0)