Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ctrip.xpipe.redis.keeper.applier;

import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory;
import com.ctrip.xpipe.pool.ChannelHandlerFactory;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;

/**
* @author TB
* <p>
* 2025/11/13 13:58
*/
public class ApplierNettyPoolClientFactory extends NettyKeyedPoolClientFactory {

private static final int MIN_SIZE = 512;

private static final int INITIAL_SIZE = 2048;

private static final int MAX_SIZE = 128 * 1024;

public ApplierNettyPoolClientFactory(ChannelHandlerFactory channelHandlerFactory) {
super(channelHandlerFactory);
}

@Override
protected void doStart() throws Exception {
super.doStart();
}

@Override
protected void initBootstrap() {
super.initBootstrap();
b.option(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(MIN_SIZE,INITIAL_SIZE,MAX_SIZE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public DefaultApplierServer(String clusterName, ClusterId clusterId, ShardId sha
ClusterShardAwareThreadFactory.create(clusterId, shardId, "sch-" + makeApplierThreadName()));

pool = new InstanceComponentWrapper<>(new XpipeNettyClientKeyedObjectPool(DEFAULT_KEYED_CLIENT_POOL_SIZE,
new NettyKeyedPoolClientFactory(new ApplierChannelHandlerFactory(keeperConfig.getApplierReadIdleSeconds()))));
new ApplierNettyPoolClientFactory(new ApplierChannelHandlerFactory(keeperConfig.getApplierReadIdleSeconds()))));

applierConfigRef = new AtomicReference<>(new ApplierConfig());
applierStatisticRef = new AtomicReference<>(new ApplierStatistic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class DefaultKeeperConfig extends AbstractCoreConfig implements KeeperCon
public static final String KEY_REPLICATION_STORE_MINITIME_GC_AFTERCREATE = "replicationstore.mintime.gc.aftercreate";
public static final String KEY_REPLICATION_STORE_MAX_COMMANDS_TO_TRANSFER_BEFORE_CREATE_RDB = "replicationstore.max.commands.to.transfer";
public static final String KEY_REPLICATION_STORE_MAX_LWM_DISTANCE_TO_TRANSFER_BEFORE_CREATE_RDB = "replicationstore.max.lwm.distance.to.transfer";
public static final String KEY_REPLICATION_STORE_COMMANDFILE_RETAIN_TIMEOUT_MILLI = "replicationstore.commandfile.retain.timeout.milli";
public static final String KEY_COMMAND_READER_FLYING_THRESHOLD = "command.reader.flying.threshold";
private static final String KEY_COMMAND_INDEX_BYTES_INTERVAL = "command.index.bytes.interval";
public static final String KEY_RDB_DUMP_MIN_INTERVAL = "rdbdump.min.interval";
Expand Down Expand Up @@ -113,6 +114,11 @@ public int getReplicationStoreCommandFileNumToKeep() {
return getIntProperty(KEY_REPLICATION_STORE_COMMANDFILE_NUM_KEEP, 2);
}

@Override
public int getReplicationStoreCommandFileRetainTimeoutMilli() {
return getIntProperty(KEY_REPLICATION_STORE_COMMANDFILE_RETAIN_TIMEOUT_MILLI,1800 * 1000);
}

@Override
public long getReplicationStoreMaxCommandsToTransferBeforeCreateRdb() {
return getLongProperty(KEY_REPLICATION_STORE_MAX_COMMANDS_TO_TRANSFER_BEFORE_CREATE_RDB, 100L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public interface KeeperConfig extends CoreConfig{

int getReplicationStoreCommandFileNumToKeep();

int getReplicationStoreCommandFileRetainTimeoutMilli();


/**
* max commands transfered before create new rdb
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ public void setReplicationStoreGcIntervalSeconds(int replicationStoreGcIntervalS
public int getReplicationStoreCommandFileNumToKeep() {
return replicationStoreCommandFileNumToKeep;
}


@Override
public int getReplicationStoreCommandFileRetainTimeoutMilli() {
return 2 * 3600 * 1000;
}

@Override
public long getReplicationStoreMaxCommandsToTransferBeforeCreateRdb() {
return replicationStoreMaxCommandsToTransferBeforeCreateRdb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ public boolean accept(File path) {

private KeeperMonitor keeperMonitor;

public static final String KEY_CMD_RETAIN_TIMEOUT_MILLI = "commandsRetainTimeoutMilli";

protected int commandsRetainTimeoutMilli = Integer.parseInt(System.getProperty(KEY_CMD_RETAIN_TIMEOUT_MILLI, "1800000"));
protected int commandsRetainTimeoutMilli;

private CommandReaderWriterFactory cmdReaderWriterFactory;

Expand All @@ -87,6 +85,7 @@ protected DefaultReplicationStore(File baseDir, KeeperConfig config, String keep
) throws IOException {
this.baseDir = baseDir;
this.cmdFileSize = config.getReplicationStoreCommandFileSize();
this.commandsRetainTimeoutMilli = config.getReplicationStoreCommandFileRetainTimeoutMilli();
this.config = config;
this.keeperMonitor = keeperMonitor;
this.cmdReaderWriterFactory = cmdReaderWriterFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.ctrip.xpipe.redis.keeper.applier;

import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.pool.ChannelHandlerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.commons.pool2.PooledObject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.net.InetSocketAddress;

/**
* @author TB
* <p>
* 2025/11/13 14:05
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class ApplierNettyPoolClientFactoryTest {
@Mock
private ChannelHandlerFactory channelHandlerFactory;
@Test
public void testBootstrapRecvBufAllocator() throws Exception {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(1))
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// 回显接收到的数据
ctx.writeAndFlush(msg.retain());
}
});

ChannelFuture serverFuture = serverBootstrap.bind(0);
Assert.assertTrue(serverFuture.sync().isSuccess());

Channel serverChannel = serverFuture.channel();
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();


ApplierNettyPoolClientFactory applierNettyPoolClientFactory = new ApplierNettyPoolClientFactory(channelHandlerFactory);
applierNettyPoolClientFactory.doStart();
PooledObject<NettyClient> pooledObject = applierNettyPoolClientFactory.makeObject(new DefaultEndPoint("localhost",port));
RecvByteBufAllocator recvByteBufAllocator = pooledObject.getObject().channel().config().getRecvByteBufAllocator();
Assert.assertTrue(recvByteBufAllocator instanceof AdaptiveRecvByteBufAllocator);
}
}
Loading