Skip to content

Commit b5f4010

Browse files
author
vzc祝辰
committed
Merge branch 'bugfix/migrate-and-delaycheck-and-crossdcelector' into 'master'
bugfix for migration pre-check, cross dc leader elector and redis delay get api See merge request framework/x-pipe!23
2 parents fbd4b2f + a0b4017 commit b5f4010

File tree

8 files changed

+81
-24
lines changed

8 files changed

+81
-24
lines changed

core/src/main/java/com/ctrip/xpipe/migration/DefaultOuterClientService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public MigrationPublishResult doMigrationPublish(String clusterName, String shar
7575
@Override
7676
public ClusterInfo getClusterInfo(String clusterName) {
7777
ClusterInfo clusterInfo = new ClusterInfo();
78+
clusterInfo.setName(clusterName);
7879
clusterInfo.setGroups(Lists.newArrayList(new GroupInfo()));
7980
return clusterInfo;
8081
}

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/ConsoleServiceManager.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ctrip.xpipe.redis.console.console.impl;
22

33
import com.ctrip.xpipe.endpoint.HostPort;
4+
import com.ctrip.xpipe.exception.XpipeRuntimeException;
45
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
56
import com.ctrip.xpipe.redis.console.console.ConsoleService;
67
import com.ctrip.xpipe.redis.console.healthcheck.actions.interaction.HEALTH_STATE;
@@ -93,15 +94,20 @@ public UnhealthyInfoModel getAllUnhealthyInstanceFromParallelService() {
9394
return parallelService.getAllUnhealthyInstance();
9495
}
9596

96-
private ConsoleService getServiceByDc(String activeIdc) {
97-
String dcId = activeIdc.toUpperCase();
98-
ConsoleService service = services.get(dcId);
97+
private ConsoleService getServiceByDc(String dcId) {
98+
String upperCaseDcId = dcId.toUpperCase();
99+
ConsoleService service = services.get(upperCaseDcId);
99100
if (service == null) {
100101
synchronized (this) {
101-
service = services.get(dcId);
102+
service = services.get(upperCaseDcId);
102103
if (service == null) {
103-
service = new DefaultConsoleService(consoleConfig.getConsoleDomains().get(dcId));
104-
services.put(activeIdc, service);
104+
Optional<String> optionalKey = consoleConfig.getConsoleDomains().keySet().stream().filter(dcId::equalsIgnoreCase).findFirst();
105+
if (!optionalKey.isPresent()) {
106+
throw new XpipeRuntimeException("unknown dc id " + dcId);
107+
}
108+
109+
service = new DefaultConsoleService(consoleConfig.getConsoleDomains().get(optionalKey.get()));
110+
services.put(upperCaseDcId, service);
105111
}
106112
}
107113
}

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/election/CrossDcLeaderElectionAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ctrip.xpipe.redis.console.election;
22

33
import com.ctrip.xpipe.api.foundation.FoundationService;
4+
import com.ctrip.xpipe.cluster.ClusterType;
45
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
56
import com.ctrip.xpipe.redis.console.dao.ConfigDao;
67
import com.ctrip.xpipe.redis.console.model.ConfigModel;
@@ -146,14 +147,20 @@ private long calculateElectDelay() {
146147

147148
private float calculateActiveClusterRatio() {
148149
XpipeMeta xpipeMeta = metaCache.getXpipeMeta();
150+
if (null == xpipeMeta) return 1L;
151+
149152
long count;
150153
long totalCluster = 0;
151154
long activeClusterCount = 0;
152155

153156
for (DcMeta dcMeta : xpipeMeta.getDcs().values()) {
154157
count = 0;
155158
for (ClusterMeta clusterMeta : dcMeta.getClusters().values()) {
156-
if (!clusterMeta.getActiveDc().equals(dcMeta.getId())) {
159+
ClusterType clusterType = ClusterType.lookup(clusterMeta.getType());
160+
if (clusterType.supportSingleActiveDC() && !clusterMeta.getActiveDc().equals(dcMeta.getId())) {
161+
continue;
162+
}
163+
if (clusterType.supportMultiActiveDC() && !dcMeta.getId().equalsIgnoreCase(dataCenter)) {
157164
continue;
158165
}
159166

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/actions/delay/CrossMasterDelayService.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.stereotype.Component;
2020

2121
import java.util.Collections;
22+
import java.util.HashMap;
2223
import java.util.Map;
2324
import java.util.Optional;
2425
import java.util.concurrent.TimeUnit;
@@ -67,13 +68,18 @@ public void stopWatch(HealthCheckAction action) {
6768

6869
public Map<String, Pair<HostPort, Long>> getPeerMasterDelayFromCurrentDc(String clusterId, String shardId) {
6970
Map<String, Pair<HostPort, Long>> peerMasterDelays = crossMasterDelays.get(new DcClusterShard(currentDcId, clusterId, shardId));
70-
if (null != peerMasterDelays) {
71-
peerMasterDelays.forEach((key, delay) -> {
72-
if (delay.getValue() > 0) delay.setValue(TimeUnit.NANOSECONDS.toMillis(delay.getValue()));
73-
});
74-
}
71+
if (null == peerMasterDelays) return null;
72+
73+
Map<String, Pair<HostPort, Long>> result = new HashMap<>(peerMasterDelays.size());
74+
peerMasterDelays.forEach((targetDc, delay) -> {
75+
if (delay.getValue() > 0) {
76+
result.put(targetDc, Pair.of(delay.getKey(), TimeUnit.NANOSECONDS.toMillis(delay.getValue())));
77+
} else {
78+
result.put(targetDc, Pair.of(delay.getKey(), delay.getValue()));
79+
}
80+
});
7581

76-
return peerMasterDelays;
82+
return result;
7783
}
7884

7985
public Map<String, Pair<HostPort, Long>> getPeerMasterDelayFromSourceDc(String sourceDcId, String clusterId, String shardId) {

redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.ctrip.xpipe.redis.console.cluster.ConsoleCrossDcServerTest;
77
import com.ctrip.xpipe.redis.console.config.impl.DefaultConsoleConfigTest;
88
import com.ctrip.xpipe.redis.console.config.impl.DefaultConsoleDbConfigTest;
9+
import com.ctrip.xpipe.redis.console.console.impl.ConsoleServiceManagerTest;
910
import com.ctrip.xpipe.redis.console.controller.api.HealthControllerTest;
1011
import com.ctrip.xpipe.redis.console.controller.api.data.*;
1112
import com.ctrip.xpipe.redis.console.controller.api.data.meta.CheckPrepareRequestTest;
@@ -225,7 +226,8 @@
225226
TombstoneSizeCheckActionTest.class,
226227
TombstoneSizeMetricListenerTest.class,
227228
DefaultRedisMasterActionListenerTest.class,
228-
DefaultMetaCacheTest.class
229+
DefaultMetaCacheTest.class,
230+
ConsoleServiceManagerTest.class
229231
})
230232
public class AllTests {
231233

redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/console/impl/ConsoleServiceManagerTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
package com.ctrip.xpipe.redis.console.console.impl;
22

3+
import com.ctrip.xpipe.exception.ExceptionUtils;
4+
import com.ctrip.xpipe.exception.XpipeRuntimeException;
35
import com.ctrip.xpipe.redis.console.AbstractConsoleTest;
46
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
7+
import com.ctrip.xpipe.redis.console.console.ConsoleService;
58
import com.ctrip.xpipe.redis.console.healthcheck.actions.interaction.HEALTH_STATE;
9+
import org.junit.Assert;
610
import org.junit.Test;
711
import org.junit.runner.RunWith;
812
import org.mockito.Mock;
913
import org.mockito.runners.MockitoJUnitRunner;
1014

15+
import java.lang.reflect.Method;
16+
import java.util.HashMap;
1117
import java.util.List;
1218

1319
import static org.mockito.Mockito.when;
@@ -50,4 +56,29 @@ public void testMulti(){
5056
logger.info("{}", health_states);
5157

5258
}
59+
60+
@Test
61+
public void testGetServiceByDcIgnoreCase() throws Exception {
62+
when(consoleConfig.getConsoleDomains()).thenReturn(new HashMap<String, String>(){{
63+
put("jq", "http://127.0.0.1:8080");
64+
put("OY", "http://127.0.0.1:8081");
65+
}});
66+
67+
ConsoleServiceManager manager = new ConsoleServiceManager(consoleConfig);
68+
Method method = ConsoleServiceManager.class.getDeclaredMethod("getServiceByDc", String.class);
69+
method.setAccessible(true);
70+
71+
ConsoleService consoleService = (ConsoleService) method.invoke(manager, "jq");
72+
Assert.assertNotNull(consoleService);
73+
74+
consoleService = (ConsoleService) method.invoke(manager, "oy");
75+
Assert.assertNotNull(consoleService);
76+
77+
try {
78+
method.invoke(manager, "rb");
79+
Assert.fail();
80+
} catch (Exception e) {
81+
logger.info("[testGetServiceByDc] get unknown dc", e);
82+
}
83+
}
5384
}

redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/election/CrossDcLeaderElectionActionTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.ctrip.xpipe.AbstractTest;
55
import com.ctrip.xpipe.api.observer.Observable;
66
import com.ctrip.xpipe.api.observer.Observer;
7+
import com.ctrip.xpipe.cluster.ClusterType;
78
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
89
import com.ctrip.xpipe.redis.console.dao.ConfigDao;
910
import com.ctrip.xpipe.redis.console.exception.DalInsertException;
@@ -153,18 +154,21 @@ private XpipeMeta mockXpipeMeta() {
153154
xpipeMeta.addDc(dcMeta);
154155
});
155156

156-
ClusterMeta oyClusterMeta = new ClusterMeta();
157-
ClusterMeta oyClusterMeta2 = new ClusterMeta();
158-
ClusterMeta rbClusterMeta = new ClusterMeta();
157+
ClusterMeta oyClusterMeta = new ClusterMeta("oyCluster");
158+
ClusterMeta oyClusterMeta2 = new ClusterMeta("oyCluster2");
159+
ClusterMeta rbClusterMeta = new ClusterMeta("rbCluster");
160+
ClusterMeta biClusterMeta = new ClusterMeta("bi-cluster");
159161
oyClusterMeta.setActiveDc("oy");
160162
oyClusterMeta2.setActiveDc("oy");
161163
rbClusterMeta.setActiveDc("rb");
162-
oyClusterMeta.setId("oyCluster");
163-
oyClusterMeta2.setId("oyCluster2");
164-
rbClusterMeta.setId("rbCluster");
165-
166-
xpipeMeta.getDcs().get("oy").addCluster(oyClusterMeta).addCluster(rbClusterMeta).addCluster(oyClusterMeta2);
167-
xpipeMeta.getDcs().get("rb").addCluster(oyClusterMeta).addCluster(rbClusterMeta).addCluster(oyClusterMeta2);
164+
oyClusterMeta.setType(ClusterType.ONE_WAY.toString());
165+
oyClusterMeta2.setType(ClusterType.ONE_WAY.toString());
166+
rbClusterMeta.setType(ClusterType.ONE_WAY.toString());
167+
biClusterMeta.setType(ClusterType.BI_DIRECTION.toString());
168+
169+
xpipeMeta.getDcs().get("fq").addCluster(biClusterMeta);
170+
xpipeMeta.getDcs().get("oy").addCluster(oyClusterMeta).addCluster(rbClusterMeta).addCluster(oyClusterMeta2).addCluster(biClusterMeta);
171+
xpipeMeta.getDcs().get("rb").addCluster(oyClusterMeta).addCluster(rbClusterMeta).addCluster(oyClusterMeta2).addCluster(biClusterMeta);
168172
return xpipeMeta;
169173
}
170174

redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/healthcheck/actions/delay/CrossMasterDelayServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void setupCrossMasterDelayServiceTest() {
5454

5555
@Test
5656
public void testOnAction() {
57-
service.onAction(new DelayActionContext(instance, 10L));
57+
service.onAction(new DelayActionContext(instance, 10 * 1000 * 1000L));
5858
Assert.assertEquals(Collections.singletonMap(remoteDcId, Pair.of(new HostPort(), 10L)), service.getPeerMasterDelayFromCurrentDc(clusterId, shardId));
5959
Assert.assertEquals(Collections.singletonMap(remoteDcId, Pair.of(new HostPort(), 10L)), service.getPeerMasterDelayFromSourceDc(dcId, clusterId, shardId));
6060
}

0 commit comments

Comments
 (0)