Skip to content

Commit 05798fb

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-976 - Allow extension of Lettuce Connection and Subscription classes.
LettuceConnection, LettuceClusterConnection, and LettuceSubscription can now be properly subclassed so they can be extended and created by LettuceConnectionFactory. LettuceConnectionFactory provides template methods doCreateLettuceConnection and doCreateLettuceClusterConnection. Original pull request: #450 & #457
1 parent 6d4be98 commit 05798fb

File tree

7 files changed

+305
-49
lines changed

7 files changed

+305
-49
lines changed

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java

+14-21
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.apache.commons.logging.Log;
3636
import org.apache.commons.logging.LogFactory;
37+
3738
import org.springframework.beans.factory.DisposableBean;
3839
import org.springframework.dao.DataAccessException;
3940
import org.springframework.dao.DataAccessResourceFailureException;
@@ -66,7 +67,6 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau
6667
new LettuceExceptionConverter());
6768

6869
private final Log log = LogFactory.getLog(getClass());
69-
private final RedisClusterClient clusterClient;
7070

7171
private ClusterCommandExecutor clusterCommandExecutor;
7272
private ClusterTopologyProvider topologyProvider;
@@ -121,8 +121,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider) {
121121
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
122122
"LettuceConnectionProvider must be a ClusterConnectionProvider.");
123123

124-
this.clusterClient = getClient();
125-
this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
124+
this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
126125
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
127126
new LettuceClusterNodeResourceProvider(getConnectionProvider()), exceptionConverter);
128127
this.disposeClusterCommandExecutorOnClose = true;
@@ -158,8 +157,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
158157
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
159158
"LettuceConnectionProvider must be a ClusterConnectionProvider.");
160159

161-
this.clusterClient = getClient();
162-
this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
160+
this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
163161
this.clusterCommandExecutor = executor;
164162
this.disposeClusterCommandExecutorOnClose = false;
165163
}
@@ -170,22 +168,20 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
170168
*
171169
* @param sharedConnection may be {@literal null} if no shared connection used.
172170
* @param connectionProvider must not be {@literal null}.
173-
* @param clusterClient must not be {@literal null}.
171+
* @param clusterTopologyProvider must not be {@literal null}.
174172
* @param executor must not be {@literal null}.
175173
* @param timeout must not be {@literal null}.
176174
* @since 2.1
177175
*/
178-
LettuceClusterConnection(@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
179-
LettuceConnectionProvider connectionProvider, RedisClusterClient clusterClient, ClusterCommandExecutor executor,
180-
Duration timeout) {
176+
protected LettuceClusterConnection(@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
177+
LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider,
178+
ClusterCommandExecutor executor, Duration timeout) {
181179

182180
super(sharedConnection, connectionProvider, timeout.toMillis(), 0);
183181

184182
Assert.notNull(executor, "ClusterCommandExecutor must not be null.");
185-
Assert.notNull(clusterClient, "RedisClusterClient must not be null.");
186183

187-
this.clusterClient = clusterClient;
188-
this.topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
184+
this.topologyProvider = clusterTopologyProvider;
189185
this.clusterCommandExecutor = executor;
190186
this.disposeClusterCommandExecutorOnClose = false;
191187
}
@@ -205,13 +201,6 @@ private RedisClusterClient getClient() {
205201
connectionProvider.getClass().getName()));
206202
}
207203

208-
/**
209-
* @return access to {@link RedisClusterClient} for non-connection access.
210-
*/
211-
private Partitions getPartitions() {
212-
return clusterClient.getPartitions();
213-
}
214-
215204
/*
216205
* (non-Javadoc)
217206
* @see org.springframework.data.redis.connection.lettuce.LettuceConnection#geoCommands()
@@ -330,7 +319,11 @@ public Integer clusterGetSlotForKey(byte[] key) {
330319
@Override
331320
public RedisClusterNode clusterGetNodeForSlot(int slot) {
332321

333-
return LettuceConverters.toRedisClusterNode(getPartitions().getPartitionBySlot(slot));
322+
Set<RedisClusterNode> nodes = topologyProvider.getTopology().getSlotServingNodes(slot);
323+
if (nodes.isEmpty()) {
324+
return null;
325+
}
326+
return nodes.iterator().next();
334327
}
335328

336329
/*
@@ -574,7 +567,7 @@ public void select(int dbIndex) {
574567
*/
575568
@Override
576569
public List<RedisClusterNode> clusterGetNodes() {
577-
return LettuceConverters.partitionsToClusterNodes(getPartitions());
570+
return new ArrayList<>(topologyProvider.getTopology().getNodes());
578571
}
579572

580573
/*

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,21 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {
879879
}
880880

881881
private LettuceSubscription initSubscription(MessageListener listener) {
882-
return new LettuceSubscription(listener, switchToPubSub(), connectionProvider);
882+
return doCreateSubscription(listener, switchToPubSub(), connectionProvider);
883+
}
884+
885+
/**
886+
* Customization hook to create a {@link LettuceSubscription}.
887+
*
888+
* @param listener the {@link MessageListener} to notify.
889+
* @param connection Pub/Sub connection.
890+
* @param connectionProvider the {@link LettuceConnectionProvider} for connection release.
891+
* @return a {@link LettuceSubscription}.
892+
* @since 2.2
893+
*/
894+
protected LettuceSubscription doCreateSubscription(MessageListener listener,
895+
StatefulRedisPubSubConnection<byte[], byte[]> connection, LettuceConnectionProvider connectionProvider) {
896+
return new LettuceSubscription(listener, connection, connectionProvider);
883897
}
884898

885899
void pipeline(LettuceResult result) {
@@ -1250,7 +1264,7 @@ public CommandOutput getTypeHint(CommandType type, CommandOutput defaultType) {
12501264
}
12511265

12521266
@RequiredArgsConstructor
1253-
private class LettucePoolConnectionProvider implements LettuceConnectionProvider {
1267+
static class LettucePoolConnectionProvider implements LettuceConnectionProvider {
12541268

12551269
private final LettucePool pool;
12561270

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

+54-14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.redis.connection.lettuce;
1717

18+
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;
19+
1820
import io.lettuce.core.AbstractRedisClient;
1921
import io.lettuce.core.ClientOptions;
2022
import io.lettuce.core.ReadFrom;
@@ -40,6 +42,7 @@
4042

4143
import org.apache.commons.logging.Log;
4244
import org.apache.commons.logging.LogFactory;
45+
4346
import org.springframework.beans.factory.DisposableBean;
4447
import org.springframework.beans.factory.InitializingBean;
4548
import org.springframework.dao.DataAccessException;
@@ -273,7 +276,7 @@ public void afterPropertiesSet() {
273276

274277
this.client = createClient();
275278

276-
this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
279+
this.connectionProvider = createConnectionProvider(client, CODEC);
277280
this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);
278281

279282
if (isClusterAware()) {
@@ -341,13 +344,7 @@ public RedisConnection getConnection() {
341344
}
342345

343346
LettuceConnection connection;
344-
345-
if (pool != null) {
346-
connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());
347-
} else {
348-
connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
349-
}
350-
347+
connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
351348
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
352349
return connection;
353350
}
@@ -365,12 +362,51 @@ public RedisClusterConnection getClusterConnection() {
365362

366363
RedisClusterClient clusterClient = (RedisClusterClient) client;
367364

368-
return getShareNativeConnection()
369-
? new LettuceClusterConnection(
370-
(StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection(),
371-
connectionProvider, clusterClient, clusterCommandExecutor, clientConfiguration.getCommandTimeout())
372-
: new LettuceClusterConnection(null, connectionProvider, clusterClient, clusterCommandExecutor,
373-
clientConfiguration.getCommandTimeout());
365+
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection = getShareNativeConnection()
366+
? (StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection()
367+
: null;
368+
369+
LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
370+
return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider,
371+
clusterCommandExecutor, clientConfiguration.getCommandTimeout());
372+
}
373+
374+
/**
375+
* Customization hook for {@link LettuceConnection} creation.
376+
*
377+
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
378+
* {@literal true}; {@literal null} otherwise.
379+
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
380+
* @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
381+
* @param database database index to operate on.
382+
* @return the {@link LettuceConnection}.
383+
* @since 2.2
384+
*/
385+
protected LettuceConnection doCreateLettuceConnection(StatefulRedisConnection<byte[], byte[]> sharedConnection,
386+
LettuceConnectionProvider connectionProvider, long timeout, int database) {
387+
388+
return new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
389+
}
390+
391+
/**
392+
* Customization hook for {@link LettuceClusterConnection} creation.
393+
*
394+
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
395+
* {@literal true}; {@literal null} otherwise.
396+
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
397+
* @param topologyProvider the {@link ClusterTopologyProvider}.
398+
* @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
399+
* @param commandTimeout command timeout {@link Duration}.
400+
* @return the {@link LettuceConnection}.
401+
* @since 2.2
402+
*/
403+
protected LettuceClusterConnection doCreateLettuceClusterConnection(
404+
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider,
405+
ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor,
406+
Duration commandTimeout) {
407+
408+
return new LettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, clusterCommandExecutor,
409+
commandTimeout);
374410
}
375411

376412
/*
@@ -909,6 +945,10 @@ protected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection
909945

910946
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
911947

948+
if (this.pool != null) {
949+
return new LettucePoolConnectionProvider(this.pool);
950+
}
951+
912952
LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);
913953

914954
if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceSubscription.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,23 @@
2828
* @author Mark Paluch
2929
* @author Christoph Strobl
3030
*/
31-
class LettuceSubscription extends AbstractSubscription {
31+
public class LettuceSubscription extends AbstractSubscription {
3232

3333
private final StatefulRedisPubSubConnection<byte[], byte[]> connection;
3434
private final LettuceMessageListener listener;
3535
private final LettuceConnectionProvider connectionProvider;
3636
private final RedisPubSubCommands<byte[], byte[]> pubsub;
3737

38-
LettuceSubscription(MessageListener listener, StatefulRedisPubSubConnection<byte[], byte[]> pubsubConnection,
39-
LettuceConnectionProvider connectionProvider) {
38+
/**
39+
* Creates a new {@link LettuceSubscription} given {@link MessageListener}, {@link StatefulRedisPubSubConnection}, and
40+
* {@link LettuceConnectionProvider}.
41+
*
42+
* @param listener the listener to notify, must not be {@literal null}.
43+
* @param pubsubConnection must not be {@literal null}.
44+
* @param connectionProvider must not be {@literal null}.
45+
*/
46+
protected LettuceSubscription(MessageListener listener,
47+
StatefulRedisPubSubConnection<byte[], byte[]> pubsubConnection, LettuceConnectionProvider connectionProvider) {
4048

4149
super(listener);
4250

@@ -52,33 +60,33 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> getNativeConnection() {
5260
return connection;
5361
}
5462

55-
/*
63+
/*
5664
* (non-Javadoc)
5765
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doClose()
5866
*/
5967
protected void doClose() {
6068

6169
if (!getChannels().isEmpty()) {
62-
pubsub.unsubscribe(new byte[0]);
70+
doUnsubscribe(true, new byte[0]);
6371
}
6472

6573
if (!getPatterns().isEmpty()) {
66-
pubsub.punsubscribe(new byte[0]);
74+
doPUnsubscribe(true, new byte[0]);
6775
}
6876

6977
connection.removeListener(this.listener);
7078
connectionProvider.release(connection);
7179
}
7280

73-
/*
81+
/*
7482
* (non-Javadoc)
7583
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doPsubscribe(byte[][])
7684
*/
7785
protected void doPsubscribe(byte[]... patterns) {
7886
pubsub.psubscribe(patterns);
7987
}
8088

81-
/*
89+
/*
8290
* (non-Javadoc)
8391
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doPUnsubscribe(boolean, byte[][])
8492
*/
@@ -88,15 +96,15 @@ protected void doPUnsubscribe(boolean all, byte[]... patterns) {
8896
pubsub.punsubscribe(patterns);
8997
}
9098

91-
/*
99+
/*
92100
* (non-Javadoc)
93101
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doSubscribe(byte[][])
94102
*/
95103
protected void doSubscribe(byte[]... channels) {
96104
pubsub.subscribe(channels);
97105
}
98106

99-
/*
107+
/*
100108
* (non-Javadoc)
101109
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doUnsubscribe(boolean, byte[][])
102110
*/

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.mockito.junit.MockitoJUnitRunner;
4949
import org.springframework.data.redis.connection.ClusterCommandExecutor;
5050
import org.springframework.data.redis.connection.ClusterNodeResourceProvider;
51+
import org.springframework.data.redis.connection.ClusterTopologyProvider;
5152
import org.springframework.data.redis.connection.RedisClusterCommands.AddSlots;
5253
import org.springframework.data.redis.connection.RedisClusterNode;
5354

@@ -67,6 +68,7 @@ public class LettuceClusterConnectionUnitTests {
6768
static final byte[] KEY_3_BYTES = KEY_3.getBytes();
6869

6970
@Mock RedisClusterClient clusterMock;
71+
@Mock ClusterTopologyProvider topologyProviderMock;
7072

7173
@Mock LettuceConnectionProvider connectionProviderMock;
7274
@Mock ClusterCommandExecutor executorMock;
@@ -363,7 +365,7 @@ public void shouldExecuteOnSharedConnection() {
363365
when(sharedConnectionMock.sync()).thenReturn(sync);
364366

365367
LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock,
366-
clusterMock, executorMock, Duration.ZERO);
368+
topologyProviderMock, executorMock, Duration.ZERO);
367369

368370
connection.keyCommands().del(KEY_1_BYTES);
369371

@@ -381,7 +383,7 @@ public void shouldExecuteOnDedicatedConnection() {
381383
when(dedicatedConnection.sync()).thenReturn(sync);
382384

383385
LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock,
384-
clusterMock, executorMock, Duration.ZERO);
386+
topologyProviderMock, executorMock, Duration.ZERO);
385387

386388
connection.listCommands().bLPop(1, KEY_1_BYTES);
387389

0 commit comments

Comments
 (0)