Skip to content

Commit d404aa8

Browse files
committed
Improvements after PR review.
1 parent 21d18ce commit d404aa8

File tree

6 files changed

+202
-143
lines changed

6 files changed

+202
-143
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import org.springframework.expression.common.LiteralExpression;
2929
import org.springframework.integration.expression.ExpressionUtils;
3030
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
31+
import org.springframework.lang.Nullable;
3132
import org.springframework.messaging.Message;
3233
import org.springframework.util.Assert;
33-
import org.springframework.util.StringUtils;
3434

3535
import reactor.core.publisher.Mono;
3636

@@ -53,53 +53,37 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa
5353

5454
private ReactiveStreamOperations reactiveStreamOperations;
5555

56-
private String streamKey;
57-
5856
private RedisSerializationContext serializationContext = RedisSerializationContext.string();
5957

60-
private ReactiveRedisConnectionFactory connectionFactory;
58+
private final ReactiveRedisConnectionFactory connectionFactory;
6159

60+
@Nullable
6261
private HashMapper hashMapper;
6362

64-
public ReactiveRedisStreamMessageHandler(Expression streamKeyExpression,
65-
ReactiveRedisConnectionFactory connectionFactory) {
63+
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, Expression streamKeyExpression) {
6664
Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null");
6765
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
6866
this.streamKeyExpression = streamKeyExpression;
6967
this.connectionFactory = connectionFactory;
7068
}
7169

72-
public ReactiveRedisStreamMessageHandler(String streamKey, ReactiveRedisConnectionFactory connectionFactory) {
73-
this.setStreamKey(streamKey);
74-
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
75-
this.connectionFactory = connectionFactory;
76-
this.streamKeyExpression = new LiteralExpression(streamKey);
77-
}
78-
79-
public void setStreamKey(String streamKey) {
80-
Assert.hasText(streamKey, "'streamKey' must not be an empty string.");
81-
this.streamKey = streamKey;
70+
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) {
71+
this(connectionFactory, new LiteralExpression(streamKey));
8272
}
8373

8474
public void setSerializationContext(RedisSerializationContext serializationContext) {
8575
Assert.notNull(serializationContext, "'serializationContext' must not be null");
8676
this.serializationContext = serializationContext;
8777
}
8878

89-
public void setHashMapper(HashMapper hashMapper) {
90-
Assert.notNull(hashMapper, "'hashMapper' must not be null");
79+
public void setHashMapper(@Nullable HashMapper hashMapper) {
9180
this.hashMapper = hashMapper;
9281
}
9382

9483
public void setExtractPayload(boolean extractPayload) {
9584
this.extractPayload = extractPayload;
9685
}
9786

98-
void setEvaluationContext(EvaluationContext evaluationContext) {
99-
Assert.notNull(evaluationContext, "'evaluationContext' must not be null");
100-
this.evaluationContext = evaluationContext;
101-
}
102-
10387
@Override
10488
public String getComponentType() {
10589
return "redis:stream-outbound-channel-adapter";
@@ -108,15 +92,19 @@ public String getComponentType() {
10892
@Override
10993
protected void onInit() {
11094
super.onInit();
111-
initStreamOperations();
95+
if (this.evaluationContext == null && getBeanFactory() != null) {
96+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
97+
}
98+
ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory,
99+
this.serializationContext);
100+
this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() :
101+
template.opsForStream(this.hashMapper);
112102
}
113103

114104
@Override
115105
protected Mono<Void> handleMessageInternal(Message<?> message) {
116106

117-
if (!StringUtils.hasText(this.streamKey)) {
118-
this.streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
119-
}
107+
String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
120108

121109
Object value = message;
122110
if (this.extractPayload) {
@@ -125,18 +113,9 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
125113

126114
ObjectRecord record = StreamRecords
127115
.objectBacked(value)
128-
.withStreamKey(this.streamKey);
116+
.withStreamKey(streamKey);
129117

130118
return this.reactiveStreamOperations.add(record);
131119
}
132120

133-
private void initStreamOperations() {
134-
if (this.evaluationContext == null && getBeanFactory() != null) {
135-
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
136-
}
137-
ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory,
138-
this.serializationContext);
139-
this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : template.opsForStream(this.hashMapper);
140-
}
141-
142121
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml

Lines changed: 0 additions & 25 deletions
This file was deleted.

spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java

Lines changed: 35 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package org.springframework.integration.redis.outbound;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2120

22-
import java.lang.reflect.Method;
2321
import java.util.Arrays;
2422
import java.util.List;
2523

@@ -39,7 +37,8 @@
3937
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
4038
import org.springframework.integration.redis.rules.RedisAvailable;
4139
import org.springframework.integration.redis.rules.RedisAvailableTests;
42-
import org.springframework.integration.redis.store.RedisMessageStoreTests;
40+
import org.springframework.integration.redis.util.Address;
41+
import org.springframework.integration.redis.util.Person;
4342
import org.springframework.messaging.Message;
4443
import org.springframework.messaging.MessageChannel;
4544
import org.springframework.messaging.support.GenericMessage;
@@ -52,14 +51,14 @@
5251
*
5352
* @since 5.4
5453
*/
55-
@ContextConfiguration
54+
@ContextConfiguration(classes = ReactiveRedisStreamMessageHandlerTestsContext.class)
5655
@RunWith(SpringRunner.class)
5756
@DirtiesContext
5857
@SuppressWarnings({"unchecked", "rawtypes"})
5958
public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests {
6059

6160
@Autowired
62-
@Qualifier("forRedisStreamChannel")
61+
@Qualifier("streamChannel")
6362
private MessageChannel messageChannel;
6463

6564
@Autowired
@@ -68,115 +67,80 @@ public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests
6867
@Autowired
6968
private ReactiveMessageHandlerAdapter handlerAdapter;
7069

71-
@Test
72-
@RedisAvailable
73-
public void emptyStreamKeyTest() {
74-
assertThatIllegalArgumentException()
75-
.isThrownBy(() -> new ReactiveRedisStreamMessageHandler("", null));
76-
}
77-
78-
@Test
79-
@RedisAvailable
80-
public void nullConnectionFactoryTest() {
81-
assertThatIllegalArgumentException()
82-
.isThrownBy(() -> new ReactiveRedisStreamMessageHandler("stream", null));
83-
}
84-
85-
86-
@Test
87-
@RedisAvailable
88-
public void simpleStringInsertionTest() {
89-
String streamKey = "myStream";
90-
String messagePayload = "Bonjour à tous les confinés";
91-
92-
handlerAdapter.handleMessage(new GenericMessage<>(messagePayload));
93-
94-
RedisSerializationContext<String, String> serializationContext = redisStringOrJsonSerializationContext(true, null);
95-
96-
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);
97-
98-
ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst();
99-
assertThat(record.getStream()).isEqualTo(streamKey);
100-
assertThat(record.getValue()).isEqualTo(messagePayload);
101-
template.delete(streamKey).block();
102-
}
70+
@Autowired
71+
private ReactiveRedisStreamMessageHandler streamMessageHandler;
10372

10473
@Test
10574
@RedisAvailable
10675
public void integrationStreamOutboundTest() {
107-
String streamKey = "myStream";
108-
String messagePayload = "Bonjour à tous les confinés";
76+
String messagePayload = "Hello stream message";
10977

11078
messageChannel.send(new GenericMessage<>(messagePayload));
11179

112-
RedisSerializationContext<String, String> serializationContext = redisStringOrJsonSerializationContext(true, null);
80+
RedisSerializationContext<String, String> serializationContext = redisStringOrJsonSerializationContext(null);
11381

11482
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);
11583

116-
ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst();
117-
assertThat(record.getStream()).isEqualTo(streamKey);
84+
ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst();
85+
86+
assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
11887
assertThat(record.getValue()).isEqualTo(messagePayload);
119-
template.delete(streamKey).block();
88+
89+
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
12090
}
12191

12292
//TODO Find why the deserialization fail does not work
12393
/*@Test
12494
@RedisAvailable*/
12595
public void explicitJsonSerializationContextTest() {
126-
String streamKey = "myStream";
127-
List<String> messagePayload = Arrays.asList("Bonjour", "à", "tous", "les", "confinés");
96+
List<String> messagePayload = Arrays.asList("Hello", "stream", "message");
12897

129-
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(false, List.class);
98+
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(List.class);
13099

131-
ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory);
132100
streamMessageHandler.setSerializationContext(jsonSerializationContext);
133-
//initializes reactiveRedisStreamOperations
134-
invokeOnInitMethod(streamMessageHandler);
135101

136-
ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler);
137102
handlerAdapter.handleMessage(new GenericMessage<>(messagePayload));
138103

139-
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory,
104+
ReactiveRedisTemplate<String, List> template = new ReactiveRedisTemplate(redisConnectionFactory,
140105
jsonSerializationContext);
141-
ObjectRecord<String, List> record = template.opsForStream().read(List.class, StreamOffset.fromStart(streamKey))
106+
107+
ObjectRecord<String, List> record = template.opsForStream().read(List.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY))
142108
.blockFirst();
143109

144-
assertThat(record.getStream()).isEqualTo(streamKey);
145-
assertThat(record.getValue()).isEqualTo("[\"Bonjour\", \"à\", \"tous\", \"les\", \"confinés\"]");
146-
template.delete(streamKey).block();
110+
assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
111+
assertThat(record.getValue()).isEqualTo("[\"Hello\", \"stream\", \"message\"]");
112+
113+
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
147114
}
148115

149116
//TODO Find why the deserialization does not work
150117
/*@Test
151118
@RedisAvailable*/
152119
public void explicitJsonSerializationContextWithModelTest() {
153-
String streamKey = "myStream";
154-
155-
RedisMessageStoreTests.Address address = new RedisMessageStoreTests.Address().withAddress("Rennes, France");
156-
RedisMessageStoreTests.Person person = new RedisMessageStoreTests.Person(address, "Attoumane");
120+
Address address = new Address().withAddress("Rennes, France");
121+
Person person = new Person(address, "Attoumane");
157122

158123
Message message = new GenericMessage(person);
159124

160-
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(false, RedisMessageStoreTests.Person.class);
125+
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(Person.class);
161126

162-
ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory);
163127
streamMessageHandler.setSerializationContext(jsonSerializationContext);
164-
//initializes reactiveRedisStreamOperations
165-
invokeOnInitMethod(streamMessageHandler);
166128

167-
ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler);
168129
handlerAdapter.handleMessage(message);
169130

170-
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext);
171-
ObjectRecord<String, RedisMessageStoreTests.Person> record = template.opsForStream().read(RedisMessageStoreTests.Person.class, StreamOffset.fromStart(streamKey)).blockFirst();
172-
assertThat(record.getStream()).isEqualTo(streamKey);
131+
ReactiveRedisTemplate<String, Person> template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext);
132+
133+
ObjectRecord<String, Person> record = template.opsForStream().read(Person.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst();
134+
135+
assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
173136
assertThat(record.getValue().getName()).isEqualTo("Attoumane");
174137
assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France");
175-
template.delete(streamKey).block();
138+
139+
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
176140
}
177141

178142

179-
private RedisSerializationContext redisStringOrJsonSerializationContext(boolean string, Class jsonTargetType) {
143+
private RedisSerializationContext redisStringOrJsonSerializationContext(Class<?> jsonTargetType) {
180144

181145
RedisSerializationContext redisSerializationContext;
182146
RedisSerializer jsonSerializer = null;
@@ -188,7 +152,7 @@ private RedisSerializationContext redisStringOrJsonSerializationContext(boolean
188152
RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext.SerializationPair
189153
.fromSerializer(stringSerializer);
190154

191-
if (string) {
155+
if (jsonTargetType == null) {
192156
redisSerializationContext = RedisSerializationContext
193157
.newSerializationContext()
194158
.key(stringSerializerPair)
@@ -200,7 +164,7 @@ private RedisSerializationContext redisStringOrJsonSerializationContext(boolean
200164
else {
201165
redisSerializationContext = RedisSerializationContext
202166
.newSerializationContext()
203-
.key(stringSerializerPair)
167+
.key(jsonSerializer)
204168
.value(jsonSerializer)
205169
.hashKey(jsonSerializer)
206170
.hashValue(jsonSerializer)
@@ -210,14 +174,4 @@ private RedisSerializationContext redisStringOrJsonSerializationContext(boolean
210174
return redisSerializationContext;
211175
}
212176

213-
private void invokeOnInitMethod(ReactiveRedisStreamMessageHandler streamMessageHandler) {
214-
try {
215-
Method onInit = ReactiveRedisStreamMessageHandler.class.getDeclaredMethod("onInit");
216-
onInit.setAccessible(true);
217-
onInit.invoke(streamMessageHandler);
218-
}
219-
catch (Exception e) {
220-
e.printStackTrace();
221-
}
222-
}
223177
}

0 commit comments

Comments
 (0)