33
33
import org .springframework .util .Assert ;
34
34
35
35
/**
36
- * Connection provider for Cluster connections.
36
+ * {@link LettuceConnectionProvider} and {@link RedisClientProvider} for Redis Cluster connections.
37
37
*
38
38
* @author Mark Paluch
39
39
* @author Christoph Strobl
40
40
* @author Bruce Cloud
41
+ * @author John Blum
41
42
* @since 2.0
42
43
*/
43
44
class ClusterConnectionProvider implements LettuceConnectionProvider , RedisClientProvider {
44
45
45
- private final RedisClusterClient client ;
46
- private final RedisCodec <?, ?> codec ;
47
- private final Optional <ReadFrom > readFrom ;
46
+ private volatile boolean initialized ;
48
47
49
48
private final Lock lock = new ReentrantLock ();
50
49
51
- private volatile boolean initialized ;
50
+ @ Nullable
51
+ private final ReadFrom readFrom ;
52
+
53
+ private final RedisClusterClient client ;
54
+
55
+ private final RedisCodec <?, ?> codec ;
52
56
53
57
/**
54
58
* Create new {@link ClusterConnectionProvider}.
@@ -75,18 +79,22 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
75
79
76
80
this .client = client ;
77
81
this .codec = codec ;
78
- this .readFrom = Optional .ofNullable (readFrom );
82
+ this .readFrom = readFrom ;
83
+ }
84
+
85
+ private Optional <ReadFrom > getReadFrom () {
86
+ return Optional .ofNullable (this .readFrom );
79
87
}
80
88
81
89
@ Override
82
90
public <T extends StatefulConnection <?, ?>> CompletableFuture <T > getConnectionAsync (Class <T > connectionType ) {
83
91
84
92
if (!initialized ) {
85
93
86
- // partitions have to be initialized before asynchronous usage.
87
- // Needs to happen only once. Initialize eagerly if
88
- // blocking is not an options.
94
+ // Partitions have to be initialized before asynchronous usage.
95
+ // Needs to happen only once. Initialize eagerly if blocking is not an options.
89
96
lock .lock ();
97
+
90
98
try {
91
99
if (!initialized ) {
92
100
client .getPartitions ();
@@ -100,27 +108,25 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
100
108
if (connectionType .equals (StatefulRedisPubSubConnection .class )
101
109
|| connectionType .equals (StatefulRedisClusterPubSubConnection .class )) {
102
110
103
- return client .connectPubSubAsync (codec ) //
104
- .thenApply (connectionType ::cast );
111
+ return client .connectPubSubAsync (codec ).thenApply (connectionType ::cast );
105
112
}
106
113
107
114
if (StatefulRedisClusterConnection .class .isAssignableFrom (connectionType )
108
115
|| connectionType .equals (StatefulConnection .class )) {
109
116
110
- return client .connectAsync (codec ) //
111
- .thenApply (connection -> {
112
-
113
- readFrom .ifPresent (connection ::setReadFrom );
117
+ return client .connectAsync (codec ).thenApply (connection -> {
118
+ getReadFrom ().ifPresent (connection ::setReadFrom );
114
119
return connectionType .cast (connection );
115
120
});
116
121
}
117
122
118
- return LettuceFutureUtils
119
- .failed (new InvalidDataAccessApiUsageException ("Connection type " + connectionType + " not supported" ));
123
+ String message = String .format ("Connection type %s not supported" , connectionType );
124
+
125
+ return LettuceFutureUtils .failed (new InvalidDataAccessApiUsageException (message ));
120
126
}
121
127
122
128
@ Override
123
129
public RedisClusterClient getRedisClient () {
124
- return client ;
130
+ return this . client ;
125
131
}
126
132
}
0 commit comments