diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/MetaStore.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/MetaStore.java index 7142fcaf9f..c8db7c549e 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/MetaStore.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/MetaStore.java @@ -106,6 +106,8 @@ public interface MetaStore { boolean increaseLost(GtidSet lost) throws IOException; + int removeLost(GtidSet gtidSet) throws IOException; + boolean xsyncContinue(String replId, long beginReplOffset, long backlogOff, String masterUuid, GtidSet gtidCont, GtidSet gtidIndexed) throws IOException; ReplicationStoreMeta switchToXsync(String replId, long beginReplOffset, long backlogOff, String masterUuid, GtidSet gtidCont, GtidSet gtidLost) throws IOException; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/CommandHandlerManager.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/CommandHandlerManager.java index ee93c4aa7e..7e87ae7d02 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/CommandHandlerManager.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/CommandHandlerManager.java @@ -43,6 +43,7 @@ protected void initCommands() { putHandler(new ConfigHandler()); putHandler(new GapAllowPSyncHandler()); putHandler(new GapAllowXSyncHandler()); + putHandler(new GtidxHandler()); } protected void putHandler(CommandHandler handler) { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GapAllowSyncHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GapAllowSyncHandler.java index bab869e2b1..e6336047f5 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GapAllowSyncHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GapAllowSyncHandler.java @@ -85,7 +85,7 @@ public void run() { protected abstract SyncRequest parseRequest(final String[] args, RedisSlave redisSlave); - protected SyncAction anaRequest(SyncRequest request, RedisKeeperServer redisKeeperServer, RedisSlave slave) throws Exception { + protected SyncAction anaRequest(SyncRequest request, RedisKeeperServer redisKeeperServer, RedisSlave slave) throws Exception { KeeperRepl keeperRepl = redisKeeperServer.getKeeperRepl(); KeeperConfig keeperConfig = redisKeeperServer.getKeeperConfig(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GtidxHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GtidxHandler.java new file mode 100644 index 0000000000..55bfbf714d --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GtidxHandler.java @@ -0,0 +1,129 @@ +package com.ctrip.xpipe.redis.keeper.handler.keeper; + +import com.ctrip.xpipe.gtid.GtidSet; +import com.ctrip.xpipe.redis.core.protocal.protocal.CommandBulkStringParser; +import com.ctrip.xpipe.redis.core.protocal.protocal.LongParser; +import com.ctrip.xpipe.redis.core.store.MetaStore; +import com.ctrip.xpipe.redis.core.store.ReplicationStore; +import com.ctrip.xpipe.redis.keeper.RedisClient; +import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; +import com.ctrip.xpipe.redis.keeper.handler.AbstractCommandHandler; +import com.ctrip.xpipe.utils.StringUtil; +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; + +import java.util.Map; + +/** + * @author TB + *

+ * 2025/11/10 13:52 + */ +public class GtidxHandler extends AbstractCommandHandler { + private Map sections = Maps.newConcurrentMap(); + + public GtidxHandler(){ + register(new GtidxRemove()); + } + + private void register(GtidxSection section) { + sections.put(section.name().toLowerCase().trim(), section); + } + + + @Override + protected void doHandle(String[] args, RedisClient redisClient) throws Exception { + logger.debug("[doHandle]{},{}", redisClient, StringUtil.join(" ", args)); + RedisKeeperServer redisKeeperServer = (RedisKeeperServer)redisClient.getRedisServer(); + ByteBuf result = doSectionHandler(args[0], args,redisKeeperServer); + redisClient.sendMessage(result); + } + + @Override + public String[] getCommands() { + return new String[]{"gtidx"}; + } + + private ByteBuf doSectionHandler(String section, String args[],RedisKeeperServer redisKeeperServer) throws Exception{ + GtidxSection gtidxSection = sections.get(section.toLowerCase().trim()); + if(gtidxSection == null){ + return new CommandBulkStringParser("ERR "+section+ " subcommand not supported!").format(); + } + return gtidxSection.gtidx(args,redisKeeperServer); + } + + private interface GtidxSection { + ByteBuf gtidx(String args[], RedisKeeperServer redisKeeperServer) throws Exception; + String name(); + } + + private abstract class AbstractGtidxSection implements GtidxSection{ + protected String validateArgs(String[] args, int minLength) { + if (args.length < minLength) { + return "ERR wrong number of arguments"; + } + return null; + } + + protected boolean isExecuted(String type) { + return "executed".equalsIgnoreCase(type); + } + + protected boolean isLost(String type) { + return "lost".equalsIgnoreCase(type); + } + + protected String validateType(String type) { + if (!isExecuted(type) && !isLost(type)) { + return "ERR type must be EXECUTED or LOST"; + } + return null; + } + } + + private class GtidxRemove extends AbstractGtidxSection{ + + @Override + public ByteBuf gtidx(String args[],RedisKeeperServer redisKeeperServer) throws Exception { + String result; + result = validateArgs(args, 5); + if(result != null){ + return new CommandBulkStringParser(result).format(); + } + result = validateType(args[1]); + if(result != null){ + return new CommandBulkStringParser(result).format(); + } + GtidSet gtidSet = new GtidSet(Maps.newLinkedHashMap()); + String uuid = args[2]; + long startGno = parseGno(args[3]); + long endGno = parseGno(args[4]); + + if (startGno > endGno) { + return new CommandBulkStringParser("ERR start_gno cannot be greater than end_gno").format(); + } + + gtidSet.compensate(uuid,startGno,endGno); + if(isLost(args[1])) { + ReplicationStore replicationStore = redisKeeperServer.getReplicationStore(); + MetaStore metaStore = replicationStore.getMetaStore(); + int removeCnt = metaStore.removeLost(gtidSet); + return new LongParser(removeCnt).format(); + } + return new CommandBulkStringParser("ERR only lost supported").format(); + } + + @Override + public String name() { + return "remove"; + } + } + + private long parseGno(String gnoStr) { + try { + return Long.parseLong(gnoStr); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("ERR Invalid gno format: " + gnoStr); + } + } +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyReplicationStore.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyReplicationStore.java index 1db226d891..c5af13af7e 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyReplicationStore.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyReplicationStore.java @@ -216,6 +216,11 @@ public boolean increaseLost(GtidSet lost) throws IOException { throw new UnsupportedOperationException(); } + @Override + public int removeLost(GtidSet gtidSet) throws IOException { + return 0; + } + @Override public UPDATE_RDB_RESULT checkReplIdAndUpdateRdbInfoPsync(String rdbFile, RdbStore.Type type, EofType eofType, long rdbOffset, String rdbReplId, long backlogBeginOffset, long backlogEndOffset) throws IOException { throw new UnsupportedOperationException(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/meta/DefaultMetaStore.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/meta/DefaultMetaStore.java index 386995fea9..ba19ed019e 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/meta/DefaultMetaStore.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/meta/DefaultMetaStore.java @@ -427,6 +427,28 @@ public boolean increaseLost(GtidSet lost) throws IOException { } } + @Override + public int removeLost(GtidSet gtidSet) throws IOException { + synchronized (metaRef) { + ReplicationStoreMeta metaDup = dupReplicationStoreMeta(); + + ReplStage curReplStage = metaDup.getCurReplStage(); + if (curReplStage.getProto() != ReplStage.ReplProto.XSYNC) { + throw new IllegalStateException("xcontinue in psync replstage"); + } + + GtidSet oldLost = curReplStage.getGtidLost(); + GtidSet newLost = oldLost.subtract(gtidSet); + int diffCnt = oldLost.subtract(newLost).itemCnt(); + if (diffCnt == 0) { + return diffCnt; + } + curReplStage.setGtidLost(newLost); + saveMeta(metaDup); + return diffCnt; + } + } + @Override public boolean gtidSetContains(String uuid, long gno) { synchronized (metaRef) { diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GtidxHandlerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GtidxHandlerTest.java new file mode 100644 index 0000000000..57904a6802 --- /dev/null +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/handler/keeper/GtidxHandlerTest.java @@ -0,0 +1,196 @@ +package com.ctrip.xpipe.redis.keeper.handler.keeper; + +import com.ctrip.xpipe.AbstractTest; +import com.ctrip.xpipe.api.command.CommandFuture; +import com.ctrip.xpipe.api.command.CommandFutureListener; +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.exception.XpipeException; +import com.ctrip.xpipe.gtid.GtidSet; +import com.ctrip.xpipe.lifecycle.LifecycleHelper; +import com.ctrip.xpipe.netty.NettyPoolUtil; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.redis.core.protocal.GapAllowedSync; +import com.ctrip.xpipe.redis.core.protocal.GapAllowedSyncObserver; +import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractGapAllowedSync; +import com.ctrip.xpipe.redis.core.protocal.cmd.DefaultGapAllowedSync; +import com.ctrip.xpipe.redis.core.protocal.protocal.EofType; +import com.ctrip.xpipe.redis.core.protocal.protocal.LenEofType; +import com.ctrip.xpipe.redis.core.redis.RunidGenerator; +import com.ctrip.xpipe.redis.core.redis.rdb.RdbConstant; +import com.ctrip.xpipe.redis.core.store.*; +import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest; +import com.ctrip.xpipe.redis.keeper.RedisClient; +import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; +import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisClient; +import com.ctrip.xpipe.redis.keeper.protocal.cmd.GapAllowedSyncTest; +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; + +import static com.ctrip.xpipe.redis.core.protocal.GapAllowedSync.DEFAULT_XSYNC_MAXGAP; + +/** + * @author TB + *

+ * 2025/11/10 14:49 + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class GtidxHandlerTest extends AbstractRedisKeeperTest { + + + private DefaultGapAllowedSync gasync; + private ReplicationStoreManager replicationStoreManager; + private ReplicationStore replicationStore; + private String replIdA = "000000000000000000000000000000000000000A"; + private String replIdB = "000000000000000000000000000000000000000B"; + private String replIdC = "000000000000000000000000000000000000000C"; + private String uuidB = "111111111111111111111111111111111111111B"; + private String uuidC = "111111111111111111111111111111111111111C"; + + @Mock + private RedisKeeperServer redisKeeperServer; + + @Mock + private RedisClient redisClient; + + @Before + public void before() throws Exception { + replicationStoreManager = createReplicationStoreManager(); + LifecycleHelper.initializeIfPossible(replicationStoreManager); + replicationStore = replicationStoreManager.create(); + + Mockito.when(redisClient.getRedisServer()).thenReturn(redisKeeperServer); + Mockito.when(redisKeeperServer.getReplicationStore()).thenReturn(replicationStore); + + SimpleObjectPool clientPool = NettyPoolUtil.createNettyPool(new DefaultEndPoint("127.0.0.1", 1234)); + gasync = new DefaultGapAllowedSync(clientPool, new DefaultEndPoint("127.0.0.1", 1234), replicationStoreManager, scheduled, ()->DEFAULT_XSYNC_MAXGAP); + gasync.future().addListener(new CommandFutureListener() { + @Override + public void operationComplete(CommandFuture commandFuture) throws Exception { + if(!commandFuture.isSuccess()){ + logger.error("[operationComplete]", commandFuture.cause()); + } + } + }); + } + + + @Test + public void testRemoveGtidLost() throws Exception{ + int gnoBaseX = 1, gnoCountX = 100; + setupReplicationStorePX(replIdA, 100000000, 1000, + uuidB, replIdB, 200000000, 1, 100); + + long replOffC = 300000000; + String gtidBaseRepr = uuidB + ":" + gnoBaseX + "-" + (gnoBaseX+2*gnoCountX-1); + String gtidLostRepr = uuidC + ":" + gnoBaseX + "-" + (gnoBaseX+gnoCountX-1); + String gtidContRepr = gtidBaseRepr + "," + gtidLostRepr; + String reply = "+" + GapAllowedSync.XPARTIAL_SYNC + " " + + AbstractGapAllowedSync.SyncReply.XSYNC_REPLY_OPT_REPLID + " " + replIdC + " " + + AbstractGapAllowedSync.SyncReply.XSYNC_REPLY_OPT_REPLOFF + " " + replOffC + " " + + AbstractGapAllowedSync.SyncReply.XSYNC_REPLY_OPT_MASTER_UUID + " " + uuidC + " " + + AbstractGapAllowedSync.SyncReply.XSYNC_REPLY_OPT_GTID_SET + " " + gtidContRepr + " " + + "\r\n"; + + gasync.getRequest(); + + byte[] rawCmds = generateGtidCommands(uuidC, gnoBaseX+gnoCountX, gnoCountX); + runData(new byte[][]{ + reply.getBytes(), + rawCmds + }); + replicationStore = replicationStoreManager.getCurrent(); + Assert.assertEquals(replicationStore.getGtidSet().getValue().toString(), gtidLostRepr); + new GtidxHandler().doHandle(new String[]{"remove","lost",uuidC,"1","100"}, redisClient); + Assert.assertEquals(replicationStore.getGtidSet().getValue().toString(), "\"\""); + } + + @Test + public void testGtidSet(){ + GtidSet gtidSet = new GtidSet(Maps.newLinkedHashMap()); + GtidSet gtidSet1 = new GtidSet(Maps.newLinkedHashMap()); + gtidSet.compensate("a",1,10); + System.out.println(gtidSet.itemCnt()); + gtidSet1.compensate("a",1,100); + System.out.println(gtidSet1.itemCnt()); + System.out.println(gtidSet); + System.out.println(gtidSet1); + System.out.println(gtidSet.subtract(gtidSet1)); + System.out.println(gtidSet1.subtract(gtidSet)); + } + + private void setupReplicationStorePX(String replidP, long replOffP, int cmdLenP, + String uuidX, String replidX, long replOffX, + int gnoBaseX, int gnoCountX) throws IOException { + int gnoCmdX = gnoBaseX + gnoCountX; + String gtidContRepr = uuidX + ":" + gnoBaseX + "-" + (gnoBaseX + gnoCountX - 1); + + Assert.assertTrue(replicationStore.isFresh()); + Assert.assertNull(replicationStore.getMetaStore().getPreReplStage()); + Assert.assertNull(replicationStore.getMetaStore().getCurrentReplStage()); + + RdbStore rdbStore = replicationStore.prepareRdb(replidP, replOffP, new LenEofType(0), + ReplStage.ReplProto.PSYNC, null, null); + rdbStore.updateRdbType(RdbStore.Type.NORMAL); + rdbStore.updateRdbGtidSet(GtidSet.EMPTY_GTIDSET); + replicationStore.confirmRdbGapAllowed(rdbStore); + + replicationStore.appendCommands(Unpooled.wrappedBuffer(generateVanillaCommands(cmdLenP))); + + replicationStore.switchToXSync(replidX,replOffX,uuidX,new GtidSet(gtidContRepr), null); + + replicationStore.appendCommands(Unpooled.wrappedBuffer(generateGtidCommands(uuidX, gnoCmdX, gnoCountX))); + + + } + + private byte[] generateVanillaCommands(int contentLen) { + return randomString(contentLen).getBytes(); + } + + private byte[] generateGtidCommands(String uuid, long startGno, int cmdCount) throws IOException { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + for (int i = 0; i < cmdCount; i++) { + String uuidGno = uuid + ":" + String.valueOf(startGno+i); + os.write("*6\r\n".getBytes()); + os.write("$4\r\nGTID\r\n".getBytes()); + os.write('$'); os.write(String.valueOf(uuidGno.length()).getBytes()); os.write("\r\n".getBytes()); os.write(uuidGno.getBytes()); os.write("\r\n".getBytes()); + os.write("$1\r\n0\r\n".getBytes()); + os.write("$3\r\nSET\r\n".getBytes()); + os.write("$3\r\nFOO\r\n".getBytes()); + os.write("$3\r\nBAR\r\n".getBytes()); + } + return os.toByteArray(); + } + + private void runData(byte [][]data) { + + ByteBuf[]byteBufs = new ByteBuf[data.length]; + + for(int i=0;i