1
1
/*
2
- * Copyright 2013-2017 the original author or authors.
2
+ * Copyright 2013-2019 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
47
47
import org .springframework .beans .factory .InitializingBean ;
48
48
import org .springframework .beans .factory .annotation .Autowired ;
49
49
import org .springframework .context .ApplicationEvent ;
50
+ import org .springframework .context .Lifecycle ;
50
51
import org .springframework .data .redis .RedisConnectionFailureException ;
51
52
import org .springframework .data .redis .RedisSystemException ;
52
53
import org .springframework .data .redis .connection .RedisConnectionFactory ;
@@ -89,9 +90,15 @@ public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests {
89
90
@ Autowired
90
91
private PollableChannel fromChannel ;
91
92
93
+ @ Autowired
94
+ private Lifecycle fromChannelEndpoint ;
95
+
92
96
@ Autowired
93
97
private MessageChannel symmetricalInputChannel ;
94
98
99
+ @ Autowired
100
+ private Lifecycle symmetricalRedisChannelEndpoint ;
101
+
95
102
@ Autowired
96
103
private PollableChannel symmetricalOutputChannel ;
97
104
@@ -107,6 +114,7 @@ public void testInt3014Default() throws Exception {
107
114
redisTemplate .setKeySerializer (new StringRedisSerializer ());
108
115
redisTemplate .setValueSerializer (new JdkSerializationRedisSerializer ());
109
116
redisTemplate .afterPropertiesSet ();
117
+ redisTemplate .delete (queueName );
110
118
111
119
String payload = "testing" ;
112
120
@@ -136,6 +144,7 @@ public void testInt3014Default() throws Exception {
136
144
assertEquals (payload2 , receive .getPayload ());
137
145
138
146
endpoint .stop ();
147
+ redisTemplate .delete (queueName );
139
148
}
140
149
141
150
@ Test
@@ -150,6 +159,7 @@ public void testInt3014ExpectMessageTrue() throws Exception {
150
159
redisTemplate .setKeySerializer (new StringRedisSerializer ());
151
160
redisTemplate .setValueSerializer (new JdkSerializationRedisSerializer ());
152
161
redisTemplate .afterPropertiesSet ();
162
+ redisTemplate .delete (queueName );
153
163
154
164
Message <?> message = MessageBuilder .withPayload ("testing" ).build ();
155
165
@@ -188,38 +198,53 @@ public void testInt3014ExpectMessageTrue() throws Exception {
188
198
Matchers .containsString ("java.lang.String cannot be cast to org.springframework.messaging.Message" ));
189
199
190
200
endpoint .stop ();
201
+ redisTemplate .delete (queueName );
191
202
}
192
203
193
204
@ Test
194
205
@ RedisAvailable
195
206
public void testInt3017IntegrationInbound () throws Exception {
196
- String payload = new Date ().toString ();
197
-
207
+ String queueName = "si.test.redisQueueInboundChannelAdapterTests2" ;
198
208
RedisTemplate <String , String > redisTemplate = new StringRedisTemplate ();
199
209
redisTemplate .setConnectionFactory (this .connectionFactory );
200
210
redisTemplate .afterPropertiesSet ();
211
+ redisTemplate .delete (queueName );
212
+
213
+ this .fromChannelEndpoint .start ();
214
+ String payload = new Date ().toString ();
215
+
201
216
202
217
redisTemplate .boundListOps ("si.test.Int3017IntegrationInbound" )
203
218
.leftPush ("{\" payload\" :\" " + payload + "\" ,\" headers\" :{}}" );
204
219
205
220
Message <?> receive = this .fromChannel .receive (10000 );
206
221
assertNotNull (receive );
207
222
assertEquals (payload , receive .getPayload ());
223
+ this .fromChannelEndpoint .stop ();
224
+ redisTemplate .delete (queueName );
208
225
}
209
226
210
227
@ Test
211
228
@ RedisAvailable
212
229
public void testInt3017IntegrationSymmetrical () throws Exception {
230
+ String queueName = "si.test.Int3017IntegrationSymmetrical" ;
231
+ RedisTemplate <String , String > redisTemplate = new StringRedisTemplate ();
232
+ redisTemplate .setConnectionFactory (this .connectionFactory );
233
+ redisTemplate .afterPropertiesSet ();
234
+ redisTemplate .delete (queueName );
235
+ this .symmetricalRedisChannelEndpoint .start ();
213
236
UUID payload = UUID .randomUUID ();
214
237
Message <UUID > message = MessageBuilder .withPayload (payload )
215
- .setHeader ("redis_queue" , "si.test.Int3017IntegrationSymmetrical" )
238
+ .setHeader ("redis_queue" , queueName )
216
239
.build ();
217
240
218
241
this .symmetricalInputChannel .send (message );
219
242
220
243
Message <?> receive = this .symmetricalOutputChannel .receive (10000 );
221
244
assertNotNull (receive );
222
245
assertEquals (payload , receive .getPayload ());
246
+ this .symmetricalRedisChannelEndpoint .stop ();
247
+ redisTemplate .delete (queueName );
223
248
}
224
249
225
250
@ Test
@@ -234,6 +259,7 @@ public void testInt3442ProperlyStop() throws Exception {
234
259
redisTemplate .setKeySerializer (new StringRedisSerializer ());
235
260
redisTemplate .setValueSerializer (new JdkSerializationRedisSerializer ());
236
261
redisTemplate .afterPropertiesSet ();
262
+ redisTemplate .delete (queueName );
237
263
238
264
while (redisTemplate .boundListOps (queueName ).rightPop () != null ) {
239
265
// drain
@@ -271,6 +297,7 @@ public void testInt3442ProperlyStop() throws Exception {
271
297
assertTrue (stopLatch .await (21 , TimeUnit .SECONDS ));
272
298
273
299
verify (boundListOperations , atLeastOnce ()).rightPush (any (byte [].class ));
300
+ redisTemplate .delete (queueName );
274
301
}
275
302
276
303
@@ -285,7 +312,8 @@ public void testInt3196Recovery() throws Exception {
285
312
286
313
final CountDownLatch exceptionsLatch = new CountDownLatch (2 );
287
314
288
- RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint (queueName , this .connectionFactory );
315
+ RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint (queueName ,
316
+ this .connectionFactory );
289
317
endpoint .setBeanFactory (Mockito .mock (BeanFactory .class ));
290
318
endpoint .setApplicationEventPublisher (event -> {
291
319
exceptionEvents .add ((ApplicationEvent ) event );
@@ -328,6 +356,7 @@ public void testInt3196Recovery() throws Exception {
328
356
assertEquals (payload , receive .getPayload ());
329
357
330
358
endpoint .stop ();
359
+ redisTemplate .delete (queueName );
331
360
}
332
361
333
362
@ Test
@@ -342,6 +371,7 @@ public void testInt3932ReadFromLeft() throws Exception {
342
371
redisTemplate .setKeySerializer (new StringRedisSerializer ());
343
372
redisTemplate .setValueSerializer (new JdkSerializationRedisSerializer ());
344
373
redisTemplate .afterPropertiesSet ();
374
+ redisTemplate .delete (queueName );
345
375
346
376
String payload = "testing" ;
347
377
@@ -372,6 +402,7 @@ public void testInt3932ReadFromLeft() throws Exception {
372
402
assertEquals (payload2 , receive .getPayload ());
373
403
374
404
endpoint .stop ();
405
+ redisTemplate .delete (queueName );
375
406
}
376
407
377
408
private void waitListening (RedisQueueMessageDrivenEndpoint endpoint ) throws InterruptedException {
0 commit comments