30
30
import java .util .concurrent .Executors ;
31
31
import java .util .concurrent .TimeUnit ;
32
32
33
+ import org .junit .After ;
34
+ import org .junit .Before ;
33
35
import org .junit .Ignore ;
34
36
import org .junit .Test ;
35
37
import org .junit .runner .RunWith ;
41
43
import org .springframework .beans .factory .InitializingBean ;
42
44
import org .springframework .beans .factory .annotation .Autowired ;
43
45
import org .springframework .context .ApplicationEvent ;
46
+ import org .springframework .context .Lifecycle ;
44
47
import org .springframework .data .redis .RedisConnectionFailureException ;
45
48
import org .springframework .data .redis .RedisSystemException ;
46
49
import org .springframework .data .redis .connection .RedisConnectionFactory ;
78
81
@ DirtiesContext
79
82
public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests {
80
83
84
+ public static final String TEST_QUEUE = "testQueue" ;
85
+
81
86
@ Autowired
82
87
private RedisConnectionFactory connectionFactory ;
83
88
84
89
@ Autowired
85
90
private PollableChannel fromChannel ;
86
91
92
+ @ Autowired
93
+ private Lifecycle fromChannelEndpoint ;
94
+
87
95
@ Autowired
88
96
private MessageChannel symmetricalInputChannel ;
89
97
98
+ @ Autowired
99
+ private Lifecycle symmetricalRedisChannelEndpoint ;
100
+
90
101
@ Autowired
91
102
private PollableChannel symmetricalOutputChannel ;
92
103
104
+ @ Before
105
+ @ After
106
+ public void setUpTearDown () {
107
+ RedisTemplate <String , ?> redisTemplate = new RedisTemplate <>();
108
+ redisTemplate .setConnectionFactory (this .connectionFactory );
109
+ redisTemplate .delete (TEST_QUEUE );
110
+ }
111
+
93
112
@ Test
94
113
@ RedisAvailable
95
114
@ SuppressWarnings ("unchecked" )
96
115
public void testInt3014Default () {
97
- String queueName = "si.test.redisQueueInboundChannelAdapterTests" ;
98
-
99
116
RedisTemplate <String , Object > redisTemplate = new RedisTemplate <>();
100
117
redisTemplate .setConnectionFactory (this .connectionFactory );
101
118
redisTemplate .setEnableDefaultSerializer (false );
@@ -105,16 +122,16 @@ public void testInt3014Default() {
105
122
106
123
String payload = "testing" ;
107
124
108
- redisTemplate .boundListOps (queueName ).leftPush (payload );
125
+ redisTemplate .boundListOps (TEST_QUEUE ).leftPush (payload );
109
126
110
127
Date payload2 = new Date ();
111
128
112
- redisTemplate .boundListOps (queueName ).leftPush (payload2 );
129
+ redisTemplate .boundListOps (TEST_QUEUE ).leftPush (payload2 );
113
130
114
131
PollableChannel channel = new QueueChannel ();
115
132
116
133
RedisQueueMessageDrivenEndpoint endpoint =
117
- new RedisQueueMessageDrivenEndpoint (queueName , this .connectionFactory );
134
+ new RedisQueueMessageDrivenEndpoint (TEST_QUEUE , this .connectionFactory );
118
135
endpoint .setBeanFactory (Mockito .mock (BeanFactory .class ));
119
136
endpoint .setBeanClassLoader (ClassUtils .getDefaultClassLoader ());
120
137
endpoint .setOutputChannel (channel );
@@ -137,8 +154,6 @@ public void testInt3014Default() {
137
154
@ RedisAvailable
138
155
@ SuppressWarnings ("unchecked" )
139
156
public void testInt3014ExpectMessageTrue () {
140
- String queueName = "si.test.redisQueueInboundChannelAdapterTests2" ;
141
-
142
157
RedisTemplate <String , Object > redisTemplate = new RedisTemplate <>();
143
158
redisTemplate .setConnectionFactory (this .connectionFactory );
144
159
redisTemplate .setEnableDefaultSerializer (false );
@@ -148,16 +163,16 @@ public void testInt3014ExpectMessageTrue() {
148
163
149
164
Message <?> message = MessageBuilder .withPayload ("testing" ).build ();
150
165
151
- redisTemplate .boundListOps (queueName ).leftPush (message );
166
+ redisTemplate .boundListOps (TEST_QUEUE ).leftPush (message );
152
167
153
- redisTemplate .boundListOps (queueName ).leftPush ("test" );
168
+ redisTemplate .boundListOps (TEST_QUEUE ).leftPush ("test" );
154
169
155
170
PollableChannel channel = new QueueChannel ();
156
171
157
172
PollableChannel errorChannel = new QueueChannel ();
158
173
159
174
RedisQueueMessageDrivenEndpoint endpoint =
160
- new RedisQueueMessageDrivenEndpoint (queueName , this .connectionFactory );
175
+ new RedisQueueMessageDrivenEndpoint (TEST_QUEUE , this .connectionFactory );
161
176
endpoint .setBeanFactory (Mockito .mock (BeanFactory .class ));
162
177
endpoint .setExpectMessage (true );
163
178
endpoint .setOutputChannel (channel );
@@ -186,53 +201,55 @@ public void testInt3014ExpectMessageTrue() {
186
201
@ Test
187
202
@ RedisAvailable
188
203
public void testInt3017IntegrationInbound () {
204
+ this .fromChannelEndpoint .start ();
189
205
String payload = new Date ().toString ();
190
206
191
207
RedisTemplate <String , String > redisTemplate = new StringRedisTemplate ();
192
208
redisTemplate .setConnectionFactory (this .connectionFactory );
193
209
redisTemplate .afterPropertiesSet ();
194
210
195
- redisTemplate .boundListOps ("si.test.Int3017IntegrationInbound" )
211
+ redisTemplate .boundListOps (TEST_QUEUE )
196
212
.leftPush ("{\" payload\" :\" " + payload + "\" ,\" headers\" :{}}" );
197
213
198
214
Message <?> receive = this .fromChannel .receive (10000 );
199
215
assertThat (receive ).isNotNull ();
200
216
assertThat (receive .getPayload ()).isEqualTo (payload );
217
+ this .fromChannelEndpoint .stop ();
201
218
}
202
219
203
220
@ Test
204
221
@ RedisAvailable
205
222
public void testInt3017IntegrationSymmetrical () {
223
+ this .symmetricalRedisChannelEndpoint .start ();
206
224
UUID payload = UUID .randomUUID ();
207
225
Message <UUID > message = MessageBuilder .withPayload (payload )
208
- .setHeader ("redis_queue" , "si.test.Int3017IntegrationSymmetrical" )
226
+ .setHeader ("redis_queue" , TEST_QUEUE )
209
227
.build ();
210
228
211
229
this .symmetricalInputChannel .send (message );
212
230
213
231
Message <?> receive = this .symmetricalOutputChannel .receive (10000 );
214
232
assertThat (receive ).isNotNull ();
215
233
assertThat (receive .getPayload ()).isEqualTo (payload );
234
+ this .symmetricalRedisChannelEndpoint .stop ();
216
235
}
217
236
218
237
@ Test
219
238
@ RedisAvailable
220
239
@ SuppressWarnings ("unchecked" )
221
240
public void testInt3442ProperlyStop () throws Exception {
222
- String queueName = "si.test.testInt3442ProperlyStopTest" ;
223
-
224
241
RedisTemplate <String , Object > redisTemplate = new RedisTemplate <>();
225
242
redisTemplate .setConnectionFactory (this .connectionFactory );
226
243
redisTemplate .setEnableDefaultSerializer (false );
227
244
redisTemplate .setKeySerializer (new StringRedisSerializer ());
228
245
redisTemplate .setValueSerializer (new JdkSerializationRedisSerializer ());
229
246
redisTemplate .afterPropertiesSet ();
230
247
231
- while (redisTemplate .boundListOps (queueName ).rightPop () != null ) {
248
+ while (redisTemplate .boundListOps (TEST_QUEUE ).rightPop () != null ) {
232
249
// drain
233
250
}
234
251
235
- RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint (queueName ,
252
+ RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint (TEST_QUEUE ,
236
253
this .connectionFactory );
237
254
BoundListOperations <String , byte []> boundListOperations =
238
255
TestUtils .getPropertyValue (endpoint , "boundListOperations" , BoundListOperations .class );
@@ -252,7 +269,7 @@ public void testInt3442ProperlyStop() throws Exception {
252
269
waitListening (endpoint );
253
270
dfa .setPropertyValue ("listening" , false );
254
271
255
- redisTemplate .boundListOps (queueName ).leftPush ("foo" );
272
+ redisTemplate .boundListOps (TEST_QUEUE ).leftPush ("foo" );
256
273
257
274
CountDownLatch stopLatch = new CountDownLatch (1 );
258
275
@@ -271,14 +288,13 @@ public void testInt3442ProperlyStop() throws Exception {
271
288
@ RedisAvailable
272
289
@ Ignore ("LettuceConnectionFactory doesn't support proper reinitialization after 'destroy()'" )
273
290
public void testInt3196Recovery () throws Exception {
274
- String queueName = "test.si.Int3196Recovery" ;
275
291
QueueChannel channel = new QueueChannel ();
276
292
277
293
final List <ApplicationEvent > exceptionEvents = new ArrayList <>();
278
294
279
295
final CountDownLatch exceptionsLatch = new CountDownLatch (2 );
280
296
281
- RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint (queueName ,
297
+ RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint (TEST_QUEUE ,
282
298
this .connectionFactory );
283
299
endpoint .setBeanFactory (Mockito .mock (BeanFactory .class ));
284
300
endpoint .setApplicationEventPublisher (event -> {
@@ -315,7 +331,7 @@ public void testInt3196Recovery() throws Exception {
315
331
316
332
String payload = "testing" ;
317
333
318
- redisTemplate .boundListOps (queueName ).leftPush (payload );
334
+ redisTemplate .boundListOps (TEST_QUEUE ).leftPush (payload );
319
335
320
336
Message <?> receive = channel .receive (10000 );
321
337
assertThat (receive ).isNotNull ();
@@ -328,8 +344,6 @@ public void testInt3196Recovery() throws Exception {
328
344
@ RedisAvailable
329
345
@ SuppressWarnings ("unchecked" )
330
346
public void testInt3932ReadFromLeft () {
331
- String queueName = "si.test.redisQueueInboundChannelAdapterTests3932" ;
332
-
333
347
RedisTemplate <String , Object > redisTemplate = new RedisTemplate <>();
334
348
redisTemplate .setConnectionFactory (this .connectionFactory );
335
349
redisTemplate .setEnableDefaultSerializer (false );
@@ -339,16 +353,16 @@ public void testInt3932ReadFromLeft() {
339
353
340
354
String payload = "testing" ;
341
355
342
- redisTemplate .boundListOps (queueName ).rightPush (payload );
356
+ redisTemplate .boundListOps (TEST_QUEUE ).rightPush (payload );
343
357
344
358
Date payload2 = new Date ();
345
359
346
- redisTemplate .boundListOps (queueName ).rightPush (payload2 );
360
+ redisTemplate .boundListOps (TEST_QUEUE ).rightPush (payload2 );
347
361
348
362
PollableChannel channel = new QueueChannel ();
349
363
350
364
RedisQueueMessageDrivenEndpoint endpoint =
351
- new RedisQueueMessageDrivenEndpoint (queueName , this .connectionFactory );
365
+ new RedisQueueMessageDrivenEndpoint (TEST_QUEUE , this .connectionFactory );
352
366
endpoint .setBeanFactory (Mockito .mock (BeanFactory .class ));
353
367
endpoint .setBeanClassLoader (ClassUtils .getDefaultClassLoader ());
354
368
endpoint .setOutputChannel (channel );
0 commit comments