Skip to content

Commit 4a26f09

Browse files
committed
Addition of more test cases with one using MessageChannel.
Some tests with explicit RedisSerializationContext fail.
1 parent 586fae4 commit 4a26f09

File tree

4 files changed

+241
-25
lines changed

4 files changed

+241
-25
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020 the original author or authors.
33
*
4-
* Licensed under the Apache License, Version 2.0 ( the "License" );
4+
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
77
*
@@ -16,13 +16,12 @@
1616

1717
package org.springframework.integration.redis.outbound;
1818

19-
import reactor.core.publisher.Mono;
20-
2119
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
2220
import org.springframework.data.redis.connection.stream.ObjectRecord;
2321
import org.springframework.data.redis.connection.stream.StreamRecords;
2422
import org.springframework.data.redis.core.ReactiveRedisTemplate;
2523
import org.springframework.data.redis.core.ReactiveStreamOperations;
24+
import org.springframework.data.redis.hash.HashMapper;
2625
import org.springframework.data.redis.serializer.RedisSerializationContext;
2726
import org.springframework.expression.EvaluationContext;
2827
import org.springframework.expression.Expression;
@@ -33,14 +32,17 @@
3332
import org.springframework.util.Assert;
3433
import org.springframework.util.StringUtils;
3534

35+
import reactor.core.publisher.Mono;
36+
3637
/**
3738
* Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes
3839
* Message payload into a Redis stream , using reactive stream operation.
3940
*
40-
* @author Attoumane AHAMADI
41+
* @author Attoumane Ahamadi
4142
*
42-
* @since 5.3
43+
* @since 5.4
4344
*/
45+
@SuppressWarnings({"rawtypes", "unchecked"})
4446
public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHandler {
4547

4648
private final Expression streamKeyExpression;
@@ -57,6 +59,8 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa
5759

5860
private ReactiveRedisConnectionFactory connectionFactory;
5961

62+
private HashMapper hashMapper;
63+
6064
public ReactiveRedisStreamMessageHandler(Expression streamKeyExpression,
6165
ReactiveRedisConnectionFactory connectionFactory) {
6266
Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null");
@@ -82,6 +86,11 @@ public void setSerializationContext(RedisSerializationContext serializationConte
8286
this.serializationContext = serializationContext;
8387
}
8488

89+
public void setHashMapper(HashMapper hashMapper) {
90+
Assert.notNull(hashMapper, "'hashMapper' must not be null");
91+
this.hashMapper = hashMapper;
92+
}
93+
8594
public void setExtractPayload(boolean extractPayload) {
8695
this.extractPayload = extractPayload;
8796
}
@@ -97,8 +106,13 @@ public String getComponentType() {
97106
}
98107

99108
@Override
100-
protected Mono<Void> handleMessageInternal(Message<?> message) {
109+
protected void onInit() {
110+
super.onInit();
101111
initStreamOperations();
112+
}
113+
114+
@Override
115+
protected Mono<Void> handleMessageInternal(Message<?> message) {
102116

103117
if (!StringUtils.hasText(this.streamKey)) {
104118
this.streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
@@ -109,20 +123,20 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
109123
value = message.getPayload();
110124
}
111125

112-
ObjectRecord<String, Object> record = StreamRecords
113-
.<String, Object>objectBacked(value)
126+
ObjectRecord record = StreamRecords
127+
.objectBacked(value)
114128
.withStreamKey(this.streamKey);
115129

116130
return this.reactiveStreamOperations.add(record);
117131
}
118132

119133
private void initStreamOperations() {
120-
if (this.evaluationContext == null) {
134+
if (this.evaluationContext == null && getBeanFactory() != null) {
121135
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
122136
}
123137
ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory,
124138
this.serializationContext);
125-
this.reactiveStreamOperations = template.opsForStream();
139+
this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : template.opsForStream(this.hashMapper);
126140
}
127141

128142
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@
66
xsi:schemaLocation="http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
77
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd">
88

9-
<bean id="input" class="org.springframework.integration.channel.FluxMessageChannel"/>
9+
<bean id="forRedisStreamChannel" class="org.springframework.integration.channel.FluxMessageChannel"/>
1010

11+
<bean id="streamMessageHandler"
12+
class="org.springframework.integration.redis.outbound.ReactiveRedisStreamMessageHandler">
13+
<constructor-arg index="0" value="myStream"/>
14+
<constructor-arg index="1" ref="redisConnectionFactory"/>
15+
</bean>
1116

12-
<int:service-activator input-channel="input">
13-
<bean class="org.springframework.integration.redis.outbound.ReactiveRedisStreamMessageHandler">
14-
<constructor-arg index="0" value="stream"/>
15-
<constructor-arg index="1" ref="redisConnectionFactory"/>
16-
</bean>
17-
</int:service-activator>
17+
<bean id="handlerAdapter" class="org.springframework.integration.handler.ReactiveMessageHandlerAdapter">
18+
<constructor-arg ref="streamMessageHandler"/>
19+
</bean>
20+
21+
<int:service-activator input-channel="forRedisStreamChannel" ref="streamMessageHandler"/>
1822

1923
<util:constant id="redisConnectionFactory"
2024
static-field="org.springframework.integration.redis.rules.RedisAvailableRule.connectionFactory"/>

spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java

Lines changed: 192 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,73 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package org.springframework.integration.redis.outbound;
218

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
21+
22+
import java.lang.reflect.Method;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
326
import org.junit.Test;
427
import org.junit.runner.RunWith;
528

6-
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.beans.factory.annotation.Qualifier;
31+
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
32+
import org.springframework.data.redis.connection.stream.ObjectRecord;
33+
import org.springframework.data.redis.connection.stream.StreamOffset;
34+
import org.springframework.data.redis.core.ReactiveRedisTemplate;
35+
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
36+
import org.springframework.data.redis.serializer.RedisSerializationContext;
37+
import org.springframework.data.redis.serializer.RedisSerializer;
38+
import org.springframework.data.redis.serializer.StringRedisSerializer;
39+
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
740
import org.springframework.integration.redis.rules.RedisAvailable;
841
import org.springframework.integration.redis.rules.RedisAvailableTests;
42+
import org.springframework.integration.redis.store.RedisMessageStoreTests;
43+
import org.springframework.messaging.Message;
44+
import org.springframework.messaging.MessageChannel;
45+
import org.springframework.messaging.support.GenericMessage;
946
import org.springframework.test.annotation.DirtiesContext;
47+
import org.springframework.test.context.ContextConfiguration;
1048
import org.springframework.test.context.junit4.SpringRunner;
1149

12-
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
13-
1450
/**
15-
* @author Attoumane AHAMADI
51+
* @author Attoumane Ahamadi
52+
*
53+
* @since 5.4
1654
*/
55+
@ContextConfiguration
1756
@RunWith(SpringRunner.class)
1857
@DirtiesContext
58+
@SuppressWarnings({"unchecked", "rawtypes"})
1959
public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests {
2060

61+
@Autowired
62+
@Qualifier("forRedisStreamChannel")
63+
private MessageChannel messageChannel;
64+
65+
@Autowired
66+
private ReactiveRedisConnectionFactory redisConnectionFactory;
67+
68+
@Autowired
69+
private ReactiveMessageHandlerAdapter handlerAdapter;
70+
2171
@Test
2272
@RedisAvailable
2373
public void emptyStreamKeyTest() {
@@ -32,4 +82,142 @@ public void nullConnectionFactoryTest() {
3282
.isThrownBy(() -> new ReactiveRedisStreamMessageHandler("stream", null));
3383
}
3484

85+
86+
@Test
87+
@RedisAvailable
88+
public void simpleStringInsertionTest() {
89+
String streamKey = "myStream";
90+
String messagePayload = "Bonjour à tous les confinés";
91+
92+
handlerAdapter.handleMessage(new GenericMessage<>(messagePayload));
93+
94+
RedisSerializationContext<String, String> serializationContext = redisStringOrJsonSerializationContext(true, null);
95+
96+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);
97+
98+
ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst();
99+
assertThat(record.getStream()).isEqualTo(streamKey);
100+
assertThat(record.getValue()).isEqualTo(messagePayload);
101+
template.delete(streamKey).block();
102+
}
103+
104+
@Test
105+
@RedisAvailable
106+
public void integrationStreamOutboundTest() {
107+
String streamKey = "myStream";
108+
String messagePayload = "Bonjour à tous les confinés";
109+
110+
messageChannel.send(new GenericMessage<>(messagePayload));
111+
112+
RedisSerializationContext<String, String> serializationContext = redisStringOrJsonSerializationContext(true, null);
113+
114+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);
115+
116+
ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst();
117+
assertThat(record.getStream()).isEqualTo(streamKey);
118+
assertThat(record.getValue()).isEqualTo(messagePayload);
119+
template.delete(streamKey).block();
120+
}
121+
122+
//TODO Find why the deserialization fail does not work
123+
/*@Test
124+
@RedisAvailable*/
125+
public void explicitJsonSerializationContextTest() {
126+
String streamKey = "myStream";
127+
List<String> messagePayload = Arrays.asList("Bonjour", "à", "tous", "les", "confinés");
128+
129+
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(false, List.class);
130+
131+
ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory);
132+
streamMessageHandler.setSerializationContext(jsonSerializationContext);
133+
//initializes reactiveRedisStreamOperations
134+
invokeOnInitMethod(streamMessageHandler);
135+
136+
ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler);
137+
handlerAdapter.handleMessage(new GenericMessage<>(messagePayload));
138+
139+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory,
140+
jsonSerializationContext);
141+
ObjectRecord<String, List> record = template.opsForStream().read(List.class, StreamOffset.fromStart(streamKey))
142+
.blockFirst();
143+
144+
assertThat(record.getStream()).isEqualTo(streamKey);
145+
assertThat(record.getValue()).isEqualTo("[\"Bonjour\", \"à\", \"tous\", \"les\", \"confinés\"]");
146+
template.delete(streamKey).block();
147+
}
148+
149+
//TODO Find why the deserialization does not work
150+
/*@Test
151+
@RedisAvailable*/
152+
public void explicitJsonSerializationContextWithModelTest() {
153+
String streamKey = "myStream";
154+
155+
RedisMessageStoreTests.Address address = new RedisMessageStoreTests.Address().withAddress("Rennes, France");
156+
RedisMessageStoreTests.Person person = new RedisMessageStoreTests.Person(address, "Attoumane");
157+
158+
Message message = new GenericMessage(person);
159+
160+
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(false, RedisMessageStoreTests.Person.class);
161+
162+
ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory);
163+
streamMessageHandler.setSerializationContext(jsonSerializationContext);
164+
//initializes reactiveRedisStreamOperations
165+
invokeOnInitMethod(streamMessageHandler);
166+
167+
ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler);
168+
handlerAdapter.handleMessage(message);
169+
170+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext);
171+
ObjectRecord<String, RedisMessageStoreTests.Person> record = template.opsForStream().read(RedisMessageStoreTests.Person.class, StreamOffset.fromStart(streamKey)).blockFirst();
172+
assertThat(record.getStream()).isEqualTo(streamKey);
173+
assertThat(record.getValue().getName()).isEqualTo("Attoumane");
174+
assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France");
175+
template.delete(streamKey).block();
176+
}
177+
178+
179+
private RedisSerializationContext redisStringOrJsonSerializationContext(boolean string, Class jsonTargetType) {
180+
181+
RedisSerializationContext redisSerializationContext;
182+
RedisSerializer jsonSerializer = null;
183+
184+
if (jsonTargetType != null) {
185+
jsonSerializer = new Jackson2JsonRedisSerializer(jsonTargetType);
186+
}
187+
RedisSerializer stringSerializer = StringRedisSerializer.UTF_8;
188+
RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext.SerializationPair
189+
.fromSerializer(stringSerializer);
190+
191+
if (string) {
192+
redisSerializationContext = RedisSerializationContext
193+
.newSerializationContext()
194+
.key(stringSerializerPair)
195+
.value(stringSerializer)
196+
.hashKey(stringSerializer)
197+
.hashValue(stringSerializer)
198+
.build();
199+
}
200+
else {
201+
redisSerializationContext = RedisSerializationContext
202+
.newSerializationContext()
203+
.key(stringSerializerPair)
204+
.value(jsonSerializer)
205+
.hashKey(jsonSerializer)
206+
.hashValue(jsonSerializer)
207+
.build();
208+
}
209+
210+
return redisSerializationContext;
211+
}
212+
213+
private void invokeOnInitMethod(ReactiveRedisStreamMessageHandler streamMessageHandler) {
214+
try {
215+
Method onInit = ReactiveRedisStreamMessageHandler.class.getDeclaredMethod("onInit");
216+
onInit.setAccessible(true);
217+
onInit.invoke(streamMessageHandler);
218+
}
219+
catch (Exception e) {
220+
e.printStackTrace();
221+
}
222+
}
35223
}

0 commit comments

Comments
 (0)