Skip to content

Commit eefeeee

Browse files
authored
Merge pull request #981 from ctripcorp/feature/check-network-isolate
check network isolate & markdown isolated cross region dc
2 parents 48765e4 + bd03007 commit eefeeee

File tree

47 files changed

+1706
-857
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1706
-857
lines changed

core/src/main/java/com/ctrip/xpipe/api/migration/OuterClientService.java

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.ctrip.xpipe.api.lifecycle.Ordered;
44
import com.ctrip.xpipe.codec.JsonCodec;
5-
import com.ctrip.xpipe.endpoint.ClusterShardHostPort;
65
import com.ctrip.xpipe.endpoint.HostPort;
76
import com.ctrip.xpipe.utils.ServicesUtil;
87
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -26,17 +25,7 @@ public interface OuterClientService extends Ordered{
2625

2726
String serviceName();
2827

29-
void markInstanceUp(ClusterShardHostPort clusterShardHostPort) throws OuterClientException;
30-
31-
void markInstanceUpIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException;
32-
33-
boolean isInstanceUp(ClusterShardHostPort clusterShardHostPort) throws OuterClientException;
34-
35-
Map<HostPort, Boolean> batchQueryInstanceStatus(String cluster, Set<HostPort> instances) throws OuterClientException;
36-
37-
void markInstanceDown(ClusterShardHostPort clusterShardHostPort) throws OuterClientException;
38-
39-
void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException;
28+
Map<HostPort, OutClientInstanceStatus> batchQueryInstanceStatus(String cluster, Set<HostPort> instances) throws OuterClientException;
4029

4130
void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException;
4231

@@ -808,6 +797,7 @@ class HostPortDcStatus{
808797
private int port;
809798
private String dc;
810799
private boolean canRead;
800+
private boolean suspect;
811801

812802
public HostPortDcStatus() {
813803
}
@@ -851,9 +841,18 @@ public void setCanRead(boolean canRead) {
851841
this.canRead = canRead;
852842
}
853843

844+
public boolean isSuspect() {
845+
return suspect;
846+
}
847+
848+
public HostPortDcStatus setSuspect(boolean suspect) {
849+
this.suspect = suspect;
850+
return this;
851+
}
852+
854853
@Override
855854
public String toString() {
856-
return String.format("{%s:%d-%s|%s}", host, port, dc, canRead);
855+
return String.format("{%s:%d-%s|%s|%s}", host, port, dc, canRead, suspect);
857856
}
858857
}
859858

@@ -942,4 +941,65 @@ public static InstanceStatus valueOf(Integer intVal){
942941
}
943942
}
944943

944+
@JsonIgnoreProperties(ignoreUnknown = true)
945+
class OutClientInstanceStatus {
946+
947+
private boolean canRead;
948+
private String env;
949+
private String IPAddress;
950+
private int port;
951+
private boolean suspect;
952+
953+
public int getPort() {
954+
return port;
955+
}
956+
957+
public OutClientInstanceStatus setPort(int port) {
958+
this.port = port;
959+
return this;
960+
}
961+
962+
public String getIPAddress() {
963+
964+
return IPAddress;
965+
}
966+
967+
public OutClientInstanceStatus setIPAddress(String IPAddress) {
968+
this.IPAddress = IPAddress;
969+
return this;
970+
}
971+
972+
public String getEnv() {
973+
return env;
974+
}
975+
976+
public OutClientInstanceStatus setEnv(String env) {
977+
this.env = env;
978+
return this;
979+
}
980+
981+
public boolean isCanRead() {
982+
return canRead;
983+
}
984+
985+
public OutClientInstanceStatus setCanRead(boolean canRead) {
986+
this.canRead = canRead;
987+
return this;
988+
}
989+
990+
public boolean isSuspect() {
991+
return suspect;
992+
}
993+
994+
public OutClientInstanceStatus setSuspect(boolean suspect) {
995+
this.suspect = suspect;
996+
return this;
997+
}
998+
999+
@Override
1000+
public String toString() {
1001+
return String.format("canRead:%s, env:%s, %s:%d, suspect:%s", canRead, env, IPAddress, port, suspect);
1002+
}
1003+
}
1004+
9451005
}

core/src/main/java/com/ctrip/xpipe/migration/AbstractOuterClientService.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.ctrip.xpipe.api.migration.OuterClientException;
44
import com.ctrip.xpipe.api.migration.OuterClientService;
5-
import com.ctrip.xpipe.endpoint.ClusterShardHostPort;
65
import com.ctrip.xpipe.endpoint.HostPort;
76
import org.slf4j.Logger;
87
import org.slf4j.LoggerFactory;
@@ -33,25 +32,10 @@ public String serviceName() {
3332
}
3433

3534
@Override
36-
public void markInstanceUp(ClusterShardHostPort hostPort) throws OuterClientException {
37-
38-
}
39-
40-
@Override
41-
public boolean isInstanceUp(ClusterShardHostPort hostPort) throws OuterClientException {
42-
return false;
43-
}
44-
45-
@Override
46-
public Map<HostPort, Boolean> batchQueryInstanceStatus(String cluster, Set<HostPort> instances) throws OuterClientException {
35+
public Map<HostPort, OutClientInstanceStatus> batchQueryInstanceStatus(String cluster, Set<HostPort> instances) throws OuterClientException {
4736
return Collections.emptyMap();
4837
}
4938

50-
@Override
51-
public void markInstanceDown(ClusterShardHostPort hostPort) throws OuterClientException {
52-
53-
}
54-
5539
@Override
5640
public boolean clusterMigratePreCheck(String clusterName) throws OuterClientException {
5741
return false;
@@ -87,16 +71,6 @@ public boolean batchExcludeIdcs(List<ClusterExcludedIdcInfo> excludedClusterIdcs
8771
return true;
8872
}
8973

90-
@Override
91-
public void markInstanceUpIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException {
92-
93-
}
94-
95-
@Override
96-
public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException {
97-
98-
}
99-
10074
@Override
10175
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {
10276

core/src/main/java/com/ctrip/xpipe/migration/DefaultOuterClientService.java

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package com.ctrip.xpipe.migration;
22

33
import com.ctrip.xpipe.api.migration.OuterClientException;
4-
import com.ctrip.xpipe.endpoint.ClusterShardHostPort;
54
import com.ctrip.xpipe.endpoint.HostPort;
65
import com.ctrip.xpipe.utils.DateTimeUtils;
7-
import com.ctrip.xpipe.utils.VisibleForTesting;
86
import com.google.common.collect.Lists;
97

108
import java.net.InetSocketAddress;
@@ -18,42 +16,34 @@
1816
*/
1917
public class DefaultOuterClientService extends AbstractOuterClientService {
2018

21-
private Map<HostPort, Boolean> instanceStatus = new ConcurrentHashMap<>();
19+
private Map<HostPort, HostPortDcStatus> instanceStatus = new ConcurrentHashMap<>();
2220

2321
private Map<String, Integer> cntMap = new ConcurrentHashMap<>();
2422

25-
@Override
26-
public void markInstanceUp(ClusterShardHostPort clusterShardHostPort) throws OuterClientException {
27-
logger.info("[markInstanceUp]{}", clusterShardHostPort);
28-
instanceStatus.put(clusterShardHostPort.getHostPort(), true);
29-
30-
}
31-
32-
@Override
33-
public boolean isInstanceUp(ClusterShardHostPort clusterShardHostPort) throws OuterClientException {
3423

35-
Boolean result = instanceStatus.get(clusterShardHostPort.getHostPort());
36-
if(result == null){
37-
return Boolean.parseBoolean(System.getProperty("InstanceUp", "true"));
24+
private OutClientInstanceStatus instanceStatus(HostPort hostPort) throws OuterClientException {
25+
HostPortDcStatus result = instanceStatus.get(hostPort);
26+
if (result == null) {
27+
return new OutClientInstanceStatus();
3828
}
39-
return result;
29+
OutClientInstanceStatus outClientInstanceStatus = new OutClientInstanceStatus();
30+
outClientInstanceStatus.setEnv(result.getDc());
31+
outClientInstanceStatus.setIPAddress(result.getHost());
32+
outClientInstanceStatus.setPort(result.getPort());
33+
outClientInstanceStatus.setCanRead(result.isCanRead());
34+
outClientInstanceStatus.setSuspect(result.isSuspect());
35+
return outClientInstanceStatus;
4036
}
4137

4238
@Override
43-
public Map<HostPort, Boolean> batchQueryInstanceStatus(String cluster, Set<HostPort> instances) throws OuterClientException {
44-
Map<HostPort, Boolean> result = new HashMap<>();
39+
public Map<HostPort, OutClientInstanceStatus> batchQueryInstanceStatus(String cluster, Set<HostPort> instances) throws OuterClientException {
40+
Map<HostPort, OutClientInstanceStatus> result = new HashMap<>();
4541
for (HostPort instance: instances) {
46-
result.put(instance, isInstanceUp(new ClusterShardHostPort(instance)));
42+
result.put(instance, instanceStatus(instance));
4743
}
4844
return result;
4945
}
5046

51-
@Override
52-
public void markInstanceDown(ClusterShardHostPort clusterShardHostPort) throws OuterClientException {
53-
logger.info("[markInstanceDown]{}", clusterShardHostPort);
54-
instanceStatus.put(clusterShardHostPort.getHostPort(), false);
55-
}
56-
5747
@Override
5848
public boolean clusterMigratePreCheck(String clusterName) throws OuterClientException {
5949
logger.info("[clusterMigratePreCheck]{}", clusterName);
@@ -116,23 +106,11 @@ public boolean batchExcludeIdcs(List<ClusterExcludedIdcInfo> excludedClusterIdcs
116106
return true;
117107
}
118108

119-
@Override
120-
public void markInstanceUpIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException {
121-
logger.info("[markInstanceUpIfNoModifyFor]{}", clusterShardHostPort);
122-
instanceStatus.put(clusterShardHostPort.getHostPort(), true);
123-
}
124-
125-
@Override
126-
public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException {
127-
logger.info("[markInstanceDownIfNoModifyFor]{}", clusterShardHostPort);
128-
instanceStatus.put(clusterShardHostPort.getHostPort(), false);
129-
}
130-
131109
@Override
132110
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {
133111
logger.info("[batchMarkInstance]{}", markInstanceRequest);
134112
for (HostPortDcStatus hostPortDcStatus : markInstanceRequest.getHostPortDcStatuses()) {
135-
instanceStatus.put(new HostPort(hostPortDcStatus.getHost(), hostPortDcStatus.getPort()), hostPortDcStatus.isCanRead());
113+
instanceStatus.put(new HostPort(hostPortDcStatus.getHost(), hostPortDcStatus.getPort()), hostPortDcStatus);
136114
}
137115
this.cntMap = markInstanceRequest.getInstanceCnt();
138116
}

core/src/main/java/com/ctrip/xpipe/netty/TcpPortCheckCommand.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@ public class TcpPortCheckCommand extends AbstractCommand<Boolean> {
3131
private Bootstrap b = new Bootstrap();
3232

3333
public TcpPortCheckCommand(String host, int port) {
34+
this(host, port, CHECK_TIMEOUT_MILLI);
35+
}
36+
37+
public TcpPortCheckCommand(String host, int port, int connectTimeoutMilli) {
3438
this.host = host;
3539
this.port = port;
3640
this.b.group(nioEventLoopGroup);
3741
this.b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
3842
this.b.channel(NioSocketChannel.class);
39-
this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CHECK_TIMEOUT_MILLI);
43+
this.b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMilli);
4044

4145
this.b.handler(new ChannelInitializer<SocketChannel>() {
4246
@Override

core/src/test/java/com/ctrip/xpipe/netty/TcpPortCheckCommandTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.ctrip.xpipe.AbstractTest;
44
import com.ctrip.xpipe.api.command.CommandFuture;
55
import com.ctrip.xpipe.simpleserver.Server;
6+
import io.netty.channel.ConnectTimeoutException;
67
import org.junit.Assert;
78
import org.junit.Test;
89

@@ -46,11 +47,32 @@ public void testFail() throws InterruptedException, ExecutionException, TimeoutE
4647
future.get(500, TimeUnit.SECONDS);
4748
Assert.fail();
4849
} catch (Exception e) {
50+
e.printStackTrace();
4951
}
5052
}
5153

5254
}
5355

56+
@Test
57+
public void testFail2() throws InterruptedException, ExecutionException, TimeoutException {
58+
59+
int port = randomPort();
60+
61+
long start = System.currentTimeMillis();
62+
63+
TcpPortCheckCommand checkCommand = new TcpPortCheckCommand("10.0.0.1", port, 500);
64+
CommandFuture<Boolean> future = checkCommand.execute();
65+
try {
66+
future.get(5, TimeUnit.SECONDS);
67+
Assert.fail();
68+
} catch (Exception e) {
69+
long end = System.currentTimeMillis();
70+
Assert.assertTrue(end - start < 1000);
71+
Assert.assertTrue(e.getCause() instanceof ConnectTimeoutException);
72+
}
73+
74+
}
75+
5476
// @Test
5577
public void testResource() throws Exception {
5678

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import com.ctrip.xpipe.endpoint.HostPort;
77
import com.ctrip.xpipe.redis.checker.alert.AlertMessageEntity;
88
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
9-
import com.ctrip.xpipe.redis.checker.model.*;
10-
import com.ctrip.xpipe.redis.core.entity.SentinelMeta;
9+
import com.ctrip.xpipe.redis.checker.model.CheckerStatus;
10+
import com.ctrip.xpipe.redis.checker.model.HealthCheckResult;
11+
import com.ctrip.xpipe.redis.checker.model.ProxyTunnelInfo;
12+
import com.ctrip.xpipe.redis.checker.model.RedisMsg;
1113
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
1214
import com.fasterxml.jackson.annotation.JsonIgnore;
1315
import org.xml.sax.SAXException;
@@ -55,9 +57,9 @@ public interface CheckerConsoleService {
5557

5658
Map<String, OuterClientService.ClusterInfo> loadCurrentDcOneWayClusterInfo(String console, String dc);
5759

58-
void bindShardSentinel(String console, String dc, String cluster, String shard, SentinelMeta sentinelMeta);
60+
boolean dcIsolated(String console);
5961

60-
public class AlertMessage {
62+
class AlertMessage {
6163
private AlertMessageEntity message;
6264
private Properties properties;
6365
private String eventOperator;

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/RemoteCheckerManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.ctrip.xpipe.redis.checker;
22

3+
import com.ctrip.xpipe.api.command.CommandFuture;
34
import com.ctrip.xpipe.endpoint.HostPort;
45
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE;
56
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc;
@@ -19,4 +20,11 @@ public interface RemoteCheckerManager {
1920

2021
List<CheckerService> getAllCheckerServices();
2122

23+
Map<String,Boolean> getAllDcIsolatedCheckResult();
24+
25+
Boolean getDcIsolatedCheckResult(String dcId);
26+
27+
CommandFuture<Boolean> connectDc(String dc, int connectTimeoutMilli);
28+
29+
List<String> dcsInCurrentRegion();
2230
}

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/CheckConfigBean.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public Boolean getSiteStable() {
311311
}
312312

313313
public int getStableRecoverAfterRounds() {
314-
return getIntProperty(KEY_CHECKER_STABLE_RECOVER_AFTER_ROUNDS, 30 * 30);
314+
return getIntProperty(KEY_CHECKER_STABLE_RECOVER_AFTER_ROUNDS, 30);
315315
}
316316

317317
public float getSiteStableThreshold() {

0 commit comments

Comments
 (0)