Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.ctrip.xpipe.api.codec;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;

public class RawByteArraySerializer extends StdSerializer<byte[]> {

public RawByteArraySerializer() {
super(byte[].class);
}

@Override
public void serialize(byte[] value, JsonGenerator gen, SerializerProvider provider) throws IOException {
if (value == null) {
gen.writeNull();
return;
}
gen.writeStartArray();
for (byte b : value) {
gen.writeNumber(b & 0xFF);
}
gen.writeEndArray();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.ctrip.xpipe.payload;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.function.Consumer;

/**
* A WritableByteChannel implementation that limits memory usage by using a fixed-size buffer.
* When the buffer is full, it triggers processing of the buffered data.
*
* @author x-pipe
*/
public class BoundedWritableByteChannel implements WritableByteChannel {

private static final Logger logger = LoggerFactory.getLogger(BoundedWritableByteChannel.class);

private final int bufferSize;
private final ByteBuffer buffer;
private final Consumer<ByteBuf> processor;
private boolean closed = false;

/**
* Creates a new BoundedWritableByteChannel with the specified buffer size.
*
* @param bufferSize the maximum size of the buffer in bytes
* @param processor the callback to process buffered data when buffer is full or channel is closed
*/
public BoundedWritableByteChannel(int bufferSize, Consumer<ByteBuf> processor) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("Buffer size must be positive: " + bufferSize);
}
if (processor == null) {
throw new IllegalArgumentException("Processor cannot be null");
}
this.bufferSize = bufferSize;
this.buffer = ByteBuffer.allocate(bufferSize);
this.processor = processor;
}

@Override
public boolean isOpen() {
return !closed;
}

@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;

// Process any remaining data in the buffer
flush();
}

@Override
public int write(ByteBuffer src) throws IOException {
if (closed) {
throw new IOException("Channel is closed");
}

int totalWritten = 0;
int remaining = src.remaining();

while (remaining > 0) {
// If buffer is full, flush it first
if (buffer.remaining() == 0) {
flush();
}

// Copy data from src to buffer
int toCopy = Math.min(buffer.remaining(), remaining);
int oldLimit = src.limit();
src.limit(src.position() + toCopy);
buffer.put(src);
src.limit(oldLimit);

totalWritten += toCopy;
remaining -= toCopy;

// If buffer is full after this write, flush it
if (buffer.remaining() == 0) {
flush();
}
}

return totalWritten;
}

/**
* Flushes the current buffer content to the processor.
* This method is called automatically when the buffer is full or when the channel is closed.
*/
public void flush() throws IOException {
if (buffer.position() == 0) {
return; // Nothing to flush
}

buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
buffer.clear();

ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
try {
processor.accept(byteBuf);
} catch (Exception e) {
logger.error("[flush] Error processing buffered data", e);
throw new IOException("Error processing buffered data", e);
} finally {
byteBuf.release();
}
}

/**
* Gets the current buffer size limit.
*/
public int getBufferSize() {
return bufferSize;
}

/**
* Gets the number of bytes currently in the buffer.
*/
public int getCurrentBufferSize() {
return buffer.position();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ctrip.xpipe.redis.core.redis.operation.stream;

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.List;

public interface StreamTransactionListener {

boolean preAppend(String gtid, long offset) throws IOException;

int postAppend(ByteBuf commandBuf, Object[] payload) throws IOException;

int batchPostAppend(List<ByteBuf> commandBufs, List<Object[]> payloads) throws IOException;

boolean checkOffset(long offset);

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.List;

public interface CommandStore extends Initializable, Closeable, Destroyable {

Expand All @@ -32,6 +33,8 @@ public interface CommandStore extends Initializable, Closeable, Destroyable {

boolean retainCommands(CommandsGuarantee commandsGuarantee);

List<BacklogOffsetReplicationProgress> locateCmdSegment(String uuid, int begGno, int endGno) throws IOException;

long getCommandsLastUpdatedAt();

void gc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public interface CommandsGuarantee {

boolean isTimeout();

void cancel();

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.List;

public interface IndexStore {

void write(ByteBuf byteBuf) throws IOException;
void rotateFileIfNecessary() throws IOException;
void openWriter(CommandWriter cmdWriter) throws IOException;
List<Pair<Long, Long>> locateGtidRange(String uuid, int begGno, int endGno) throws IOException;
Pair<Long, GtidSet> locateContinueGtidSet(GtidSet request) throws IOException;
Pair<Long, GtidSet> locateGtidSetWithFallbackToEnd(GtidSet request) throws IOException;
boolean increaseLost(GtidSet lost, IOSupplier<Boolean> supplier) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

/**
* @author wenchao.meng
Expand Down Expand Up @@ -61,6 +62,10 @@ public interface ReplicationStore extends Closeable, Destroyable {

FULLSYNC_FAIL_CAUSE fullSyncIfPossible(FullSyncListener fullSyncListener, boolean masterSupportRordb) throws IOException;

boolean retainCommands(CommandsGuarantee commandsGuarantee);

List<BacklogOffsetReplicationProgress> locateCmdSegment(String uuid, int begGno, int endGno) throws IOException;

void addCommandsListener(ReplicationProgress<?> progress, CommandsListener commandsListener) throws IOException;
// meta related
MetaStore getMetaStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ protected Object jedisExecCommand(String host, int port, String method, String..
}
} else if (method.equalsIgnoreCase("GET")) {
result = jedis.get(args[0]);
} else if (method.equalsIgnoreCase("MSET")) {
result = jedis.mset(args);
} else if (method.equalsIgnoreCase("SELECT")) {
result = jedis.select(Integer.parseInt(args[0]));
} else {
throw new IllegalArgumentException("method not supported:" + method);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
KeeperXsyncGapTest.class,
KeeperXsyncTest.class,
MasterSwitchMultDcTest.class,
KeeperXSyncCrossRegionTest.class
KeeperXSyncCrossRegionTest.class,
GtidCmdSearcherKeeperTest.class,
})
public class AllXsyncTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.ctrip.xpipe.redis.integratedtest.keeper;

import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.meta.KeeperState;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.redis.keeper.store.searcher.CmdKeyItem;
import org.junit.Assert;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import java.util.List;

public class GtidCmdSearcherKeeperTest extends AbstractKeeperIntegratedSingleDc{

@Test
public void testCmdSearcher() throws Exception {
setRedisToGtidEnabled(redisMaster.getIp(), redisMaster.getPort());
setKeeperState(activeKeeper, KeeperState.ACTIVE, redisMaster.getIp(), redisMaster.getPort());

waitForKeeperSync(activeKeeper);

jedisExecCommand(redisMaster.getIp(), redisMaster.getPort(), "SET", "K1", "V1");
jedisExecCommand(redisMaster.getIp(), redisMaster.getPort(), "MSET", "K2", "V2", "K3", "V3");
try (Jedis jedis = new Jedis(redisMaster.getIp(), redisMaster.getPort())) {
Transaction transaction = jedis.multi();
transaction.set("K4", "V4");
transaction.mset("K5", "V5", "K6", "V6");
transaction.exec();
}

String raw_gtid = infoRedis(redisMaster.getIp(), redisMaster.getPort(), InfoCommand.INFO_TYPE.GTID, "gtid_executed");
String[] raw = raw_gtid.split(":");
String[] raw_gno = raw[1].split("-");
String UUID = raw[0];
int begGno = Integer.parseInt(raw_gno[0]);
int endGno = Integer.parseInt(raw_gno[1]);
Assert.assertEquals(1, begGno);
Assert.assertEquals(3, endGno);

waitForKeeperSync(activeKeeper);

List<CmdKeyItem> items = getRedisKeeperServer(activeKeeper).createCmdKeySearcher(UUID, begGno, endGno).execute().get();
Assert.assertEquals(6, items.size());
for (CmdKeyItem item : items) {
Assert.assertEquals(item.uuid, UUID);
Assert.assertTrue(item.seq >= begGno && item.seq <= endGno);
Assert.assertEquals(0, item.dbId);
Assert.assertNotNull(item.key);
}
}

private void waitForKeeperSync(KeeperMeta keeper) throws Exception {
waitConditionUntilTimeOut(() -> {
long masterReplOff;
long keeperReplOff;
try {
masterReplOff = Long.parseLong(infoRedis(redisMaster.getIp(), redisMaster.getPort(), InfoCommand.INFO_TYPE.REPLICATION, "master_repl_offset"));
} catch (Exception e) {
throw new RuntimeException(e);
}
keeperReplOff = getRedisKeeperServer(keeper).getReplicationStore().getCurReplStageReplOff();
return masterReplOff == keeperReplOff;
});
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.keeper;


import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.api.lifecycle.Destroyable;
import com.ctrip.xpipe.gtid.GtidSet;
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
Expand All @@ -14,9 +15,12 @@
import com.ctrip.xpipe.redis.keeper.exception.RedisSlavePromotionException;
import com.ctrip.xpipe.redis.keeper.impl.SetRdbDumperException;
import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor;
import com.ctrip.xpipe.redis.keeper.store.searcher.CmdKeyItem;
import com.ctrip.xpipe.redis.keeper.store.searcher.GtidCommandSearcher;
import io.netty.channel.Channel;

import java.io.IOException;
import java.util.List;
import java.util.Set;

/**
Expand All @@ -30,6 +34,8 @@ public interface RedisKeeperServer extends RedisServer, GapAllowedSyncObserver,

KeeperRepl getKeeperRepl();

GtidCommandSearcher createCmdKeySearcher(String uuid, int begGno, int endGno);

XSyncContinue locateContinueGtidSet(GtidSet gtidSet) throws Exception;

XSyncContinue locateContinueGtidSetWithFallbackToEnd(GtidSet gtidSet) throws Exception;
Expand Down
Loading