diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/ApplierNettyPoolClientFactory.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/ApplierNettyPoolClientFactory.java new file mode 100644 index 0000000000..6eb51be840 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/ApplierNettyPoolClientFactory.java @@ -0,0 +1,35 @@ +package com.ctrip.xpipe.redis.keeper.applier; + +import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory; +import com.ctrip.xpipe.pool.ChannelHandlerFactory; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelOption; + +/** + * @author TB + *
+ * 2025/11/13 13:58 + */ +public class ApplierNettyPoolClientFactory extends NettyKeyedPoolClientFactory { + + private static final int MIN_SIZE = 512; + + private static final int INITIAL_SIZE = 2048; + + private static final int MAX_SIZE = 128 * 1024; + + public ApplierNettyPoolClientFactory(ChannelHandlerFactory channelHandlerFactory) { + super(channelHandlerFactory); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + } + + @Override + protected void initBootstrap() { + super.initBootstrap(); + b.option(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(MIN_SIZE,INITIAL_SIZE,MAX_SIZE)); + } +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java index 6d425fe74b..6f9ec1876f 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java @@ -209,7 +209,7 @@ public DefaultApplierServer(String clusterName, ClusterId clusterId, ShardId sha ClusterShardAwareThreadFactory.create(clusterId, shardId, "sch-" + makeApplierThreadName())); pool = new InstanceComponentWrapper<>(new XpipeNettyClientKeyedObjectPool(DEFAULT_KEYED_CLIENT_POOL_SIZE, - new NettyKeyedPoolClientFactory(new ApplierChannelHandlerFactory(keeperConfig.getApplierReadIdleSeconds())))); + new ApplierNettyPoolClientFactory(new ApplierChannelHandlerFactory(keeperConfig.getApplierReadIdleSeconds())))); applierConfigRef = new AtomicReference<>(new ApplierConfig()); applierStatisticRef = new AtomicReference<>(new ApplierStatistic()); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java index 1f7577d924..e9fd770ec7 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java @@ -26,6 +26,7 @@ public class DefaultKeeperConfig extends AbstractCoreConfig implements KeeperCon public static final String KEY_REPLICATION_STORE_MINITIME_GC_AFTERCREATE = "replicationstore.mintime.gc.aftercreate"; public static final String KEY_REPLICATION_STORE_MAX_COMMANDS_TO_TRANSFER_BEFORE_CREATE_RDB = "replicationstore.max.commands.to.transfer"; public static final String KEY_REPLICATION_STORE_MAX_LWM_DISTANCE_TO_TRANSFER_BEFORE_CREATE_RDB = "replicationstore.max.lwm.distance.to.transfer"; + public static final String KEY_REPLICATION_STORE_COMMANDFILE_RETAIN_TIMEOUT_MILLI = "replicationstore.commandfile.retain.timeout.milli"; public static final String KEY_COMMAND_READER_FLYING_THRESHOLD = "command.reader.flying.threshold"; private static final String KEY_COMMAND_INDEX_BYTES_INTERVAL = "command.index.bytes.interval"; public static final String KEY_RDB_DUMP_MIN_INTERVAL = "rdbdump.min.interval"; @@ -113,6 +114,11 @@ public int getReplicationStoreCommandFileNumToKeep() { return getIntProperty(KEY_REPLICATION_STORE_COMMANDFILE_NUM_KEEP, 2); } + @Override + public int getReplicationStoreCommandFileRetainTimeoutMilli() { + return getIntProperty(KEY_REPLICATION_STORE_COMMANDFILE_RETAIN_TIMEOUT_MILLI,1800 * 1000); + } + @Override public long getReplicationStoreMaxCommandsToTransferBeforeCreateRdb() { return getLongProperty(KEY_REPLICATION_STORE_MAX_COMMANDS_TO_TRANSFER_BEFORE_CREATE_RDB, 100L); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java index 62cebcb0e0..fa00c984e2 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java @@ -32,6 +32,9 @@ public interface KeeperConfig extends CoreConfig{ int getReplicationStoreCommandFileNumToKeep(); + int getReplicationStoreCommandFileRetainTimeoutMilli(); + + /** * max commands transfered before create new rdb * @return diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java index 59003a993a..b4fb15d58a 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java @@ -89,7 +89,12 @@ public void setReplicationStoreGcIntervalSeconds(int replicationStoreGcIntervalS public int getReplicationStoreCommandFileNumToKeep() { return replicationStoreCommandFileNumToKeep; } - + + @Override + public int getReplicationStoreCommandFileRetainTimeoutMilli() { + return 2 * 3600 * 1000; + } + @Override public long getReplicationStoreMaxCommandsToTransferBeforeCreateRdb() { return replicationStoreMaxCommandsToTransferBeforeCreateRdb; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java index 39f1abc886..1888afe358 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java @@ -66,9 +66,7 @@ public boolean accept(File path) { private KeeperMonitor keeperMonitor; - public static final String KEY_CMD_RETAIN_TIMEOUT_MILLI = "commandsRetainTimeoutMilli"; - - protected int commandsRetainTimeoutMilli = Integer.parseInt(System.getProperty(KEY_CMD_RETAIN_TIMEOUT_MILLI, "1800000")); + protected int commandsRetainTimeoutMilli; private CommandReaderWriterFactory cmdReaderWriterFactory; @@ -87,6 +85,7 @@ protected DefaultReplicationStore(File baseDir, KeeperConfig config, String keep ) throws IOException { this.baseDir = baseDir; this.cmdFileSize = config.getReplicationStoreCommandFileSize(); + this.commandsRetainTimeoutMilli = config.getReplicationStoreCommandFileRetainTimeoutMilli(); this.config = config; this.keeperMonitor = keeperMonitor; this.cmdReaderWriterFactory = cmdReaderWriterFactory; diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/applier/ApplierNettyPoolClientFactoryTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/applier/ApplierNettyPoolClientFactoryTest.java new file mode 100644 index 0000000000..a60f74daf9 --- /dev/null +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/applier/ApplierNettyPoolClientFactoryTest.java @@ -0,0 +1,55 @@ +package com.ctrip.xpipe.redis.keeper.applier; + +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.pool.ChannelHandlerFactory; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.commons.pool2.PooledObject; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.net.InetSocketAddress; + +/** + * @author TB + *
+ * 2025/11/13 14:05
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class ApplierNettyPoolClientFactoryTest {
+ @Mock
+ private ChannelHandlerFactory channelHandlerFactory;
+ @Test
+ public void testBootstrapRecvBufAllocator() throws Exception {
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(1))
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new SimpleChannelInboundHandler