Skip to content

Commit d87cf4e

Browse files
akuma8ahamadi.attoumane
authored and
ahamadi.attoumane
committed
Improvements after PR review. Also only one test does not pass, see ReactiveRedisStreamMessageHandlerTests#explicitSerializationContextTest
1 parent d404aa8 commit d87cf4e

File tree

4 files changed

+96
-118
lines changed

4 files changed

+96
-118
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
/**
3838
* Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes
39-
* Message payload into a Redis stream , using reactive stream operation.
39+
* Message payload or Message itself(see {@link #extractPayload}) into a Redis stream using Reactive Stream operations.
4040
*
4141
* @author Attoumane Ahamadi
4242
*
@@ -47,39 +47,50 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa
4747

4848
private final Expression streamKeyExpression;
4949

50-
private volatile EvaluationContext evaluationContext;
50+
private final ReactiveRedisConnectionFactory connectionFactory;
51+
52+
private EvaluationContext evaluationContext;
5153

5254
private boolean extractPayload = true;
5355

5456
private ReactiveStreamOperations reactiveStreamOperations;
5557

5658
private RedisSerializationContext serializationContext = RedisSerializationContext.string();
5759

58-
private final ReactiveRedisConnectionFactory connectionFactory;
59-
6060
@Nullable
6161
private HashMapper hashMapper;
6262

63-
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, Expression streamKeyExpression) {
63+
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) {
64+
this(connectionFactory, new LiteralExpression(streamKey));
65+
}
66+
67+
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory,
68+
Expression streamKeyExpression) {
6469
Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null");
6570
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
6671
this.streamKeyExpression = streamKeyExpression;
6772
this.connectionFactory = connectionFactory;
6873
}
6974

70-
public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) {
71-
this(connectionFactory, new LiteralExpression(streamKey));
72-
}
73-
7475
public void setSerializationContext(RedisSerializationContext serializationContext) {
7576
Assert.notNull(serializationContext, "'serializationContext' must not be null");
7677
this.serializationContext = serializationContext;
7778
}
7879

80+
/**
81+
* (Optional) Set the {@link HashMapper} used to create {@link #reactiveStreamOperations}.
82+
* The default {@link HashMapper} is defined from the provided {@link RedisSerializationContext}
83+
* @param hashMapper the wanted hashMapper
84+
* */
7985
public void setHashMapper(@Nullable HashMapper hashMapper) {
8086
this.hashMapper = hashMapper;
8187
}
8288

89+
/**
90+
* Set to {@code true} to extract the payload; otherwise
91+
* the entire message is sent. Default {@code true}.
92+
* @param extractPayload false to not extract.
93+
*/
8394
public void setExtractPayload(boolean extractPayload) {
8495
this.extractPayload = extractPayload;
8596
}
@@ -92,9 +103,9 @@ public String getComponentType() {
92103
@Override
93104
protected void onInit() {
94105
super.onInit();
95-
if (this.evaluationContext == null && getBeanFactory() != null) {
96-
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
97-
}
106+
107+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
108+
98109
ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory,
99110
this.serializationContext);
100111
this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() :
@@ -106,6 +117,8 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
106117

107118
String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
108119

120+
Assert.notNull(streamKey, "'streamKey' must not be null");
121+
109122
Object value = message;
110123
if (this.extractPayload) {
111124
value = message.getPayload();

spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java

Lines changed: 70 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,39 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323

24+
import org.junit.Before;
2425
import org.junit.Test;
2526
import org.junit.runner.RunWith;
2627

2728
import org.springframework.beans.factory.annotation.Autowired;
2829
import org.springframework.beans.factory.annotation.Qualifier;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
2932
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
3033
import org.springframework.data.redis.connection.stream.ObjectRecord;
3134
import org.springframework.data.redis.connection.stream.StreamOffset;
3235
import org.springframework.data.redis.core.ReactiveRedisTemplate;
33-
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
3436
import org.springframework.data.redis.serializer.RedisSerializationContext;
3537
import org.springframework.data.redis.serializer.RedisSerializer;
3638
import org.springframework.data.redis.serializer.StringRedisSerializer;
39+
import org.springframework.integration.channel.DirectChannel;
3740
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
3841
import org.springframework.integration.redis.rules.RedisAvailable;
42+
import org.springframework.integration.redis.rules.RedisAvailableRule;
3943
import org.springframework.integration.redis.rules.RedisAvailableTests;
4044
import org.springframework.integration.redis.util.Address;
4145
import org.springframework.integration.redis.util.Person;
4246
import org.springframework.messaging.Message;
4347
import org.springframework.messaging.MessageChannel;
4448
import org.springframework.messaging.support.GenericMessage;
4549
import org.springframework.test.annotation.DirtiesContext;
46-
import org.springframework.test.context.ContextConfiguration;
4750
import org.springframework.test.context.junit4.SpringRunner;
4851

4952
/**
5053
* @author Attoumane Ahamadi
5154
*
5255
* @since 5.4
5356
*/
54-
@ContextConfiguration(classes = ReactiveRedisStreamMessageHandlerTestsContext.class)
5557
@RunWith(SpringRunner.class)
5658
@DirtiesContext
5759
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -70,108 +72,129 @@ public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests
7072
@Autowired
7173
private ReactiveRedisStreamMessageHandler streamMessageHandler;
7274

75+
@Before
76+
public void deleteStreamKey() {
77+
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate<>(this.redisConnectionFactory,
78+
RedisSerializationContext.string());
79+
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
80+
}
81+
82+
7383
@Test
7484
@RedisAvailable
7585
public void integrationStreamOutboundTest() {
7686
String messagePayload = "Hello stream message";
7787

7888
messageChannel.send(new GenericMessage<>(messagePayload));
7989

80-
RedisSerializationContext<String, String> serializationContext = redisStringOrJsonSerializationContext(null);
90+
RedisSerializationContext<String, String> serializationContext = redisSerializationContext();
8191

8292
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);
8393

8494
ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst();
8595

8696
assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
87-
assertThat(record.getValue()).isEqualTo(messagePayload);
8897

89-
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
98+
assertThat(record.getValue()).isEqualTo(messagePayload);
9099
}
91100

92-
//TODO Find why the deserialization fail does not work
101+
93102
/*@Test
94103
@RedisAvailable*/
95-
public void explicitJsonSerializationContextTest() {
104+
public void explicitSerializationContextTest() {
96105
List<String> messagePayload = Arrays.asList("Hello", "stream", "message");
97106

98-
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(List.class);
107+
RedisSerializationContext<String, Object> serializationContext = redisSerializationContext();
99108

100-
streamMessageHandler.setSerializationContext(jsonSerializationContext);
109+
streamMessageHandler.setSerializationContext(serializationContext);
110+
streamMessageHandler.afterPropertiesSet();
101111

102112
handlerAdapter.handleMessage(new GenericMessage<>(messagePayload));
103113

104114
ReactiveRedisTemplate<String, List> template = new ReactiveRedisTemplate(redisConnectionFactory,
105-
jsonSerializationContext);
115+
serializationContext);
106116

107-
ObjectRecord<String, List> record = template.opsForStream().read(List.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY))
117+
ObjectRecord<String, List> record = template.opsForStream().read(List.class, StreamOffset
118+
.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY))
108119
.blockFirst();
109120

110121
assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
111-
assertThat(record.getValue()).isEqualTo("[\"Hello\", \"stream\", \"message\"]");
112122

113-
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
123+
assertThat(record.getValue()).isEqualTo("[\"Hello\", \"stream\", \"message\"]");
114124
}
115125

116-
//TODO Find why the deserialization does not work
117-
/*@Test
118-
@RedisAvailable*/
119-
public void explicitJsonSerializationContextWithModelTest() {
126+
127+
@Test
128+
@RedisAvailable
129+
public void explicitSerializationContextWithModelTest() {
120130
Address address = new Address().withAddress("Rennes, France");
121131
Person person = new Person(address, "Attoumane");
122132

123133
Message message = new GenericMessage(person);
124134

125-
RedisSerializationContext<String, Object> jsonSerializationContext = redisStringOrJsonSerializationContext(Person.class);
135+
RedisSerializationContext<String, Object> serializationContext = redisSerializationContext();
126136

127-
streamMessageHandler.setSerializationContext(jsonSerializationContext);
137+
streamMessageHandler.setSerializationContext(serializationContext);
138+
streamMessageHandler.afterPropertiesSet();
128139

129140
handlerAdapter.handleMessage(message);
130141

131-
ReactiveRedisTemplate<String, Person> template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext);
142+
ReactiveRedisTemplate<String, Person> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);
132143

133144
ObjectRecord<String, Person> record = template.opsForStream().read(Person.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst();
134145

135146
assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
136147
assertThat(record.getValue().getName()).isEqualTo("Attoumane");
137148
assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France");
149+
}
138150

139-
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
151+
152+
private RedisSerializationContext redisSerializationContext() {
153+
154+
RedisSerializer stringSerializer = StringRedisSerializer.UTF_8;
155+
156+
RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext
157+
.SerializationPair
158+
.fromSerializer(stringSerializer);
159+
160+
return RedisSerializationContext
161+
.newSerializationContext()
162+
.key(stringSerializerPair)
163+
.value(stringSerializer)
164+
.hashKey(stringSerializer)
165+
.hashValue(stringSerializer)
166+
.build();
140167
}
141168

142169

143-
private RedisSerializationContext redisStringOrJsonSerializationContext(Class<?> jsonTargetType) {
170+
@Configuration
171+
public static class ReactiveRedisStreamMessageHandlerTestsContext {
144172

145-
RedisSerializationContext redisSerializationContext;
146-
RedisSerializer jsonSerializer = null;
173+
public static final String STREAM_KEY = "myStream";
147174

148-
if (jsonTargetType != null) {
149-
jsonSerializer = new Jackson2JsonRedisSerializer(jsonTargetType);
175+
@Bean
176+
public MessageChannel streamChannel(ReactiveMessageHandlerAdapter messageHandlerAdapter) {
177+
DirectChannel directChannel = new DirectChannel();
178+
directChannel.subscribe(messageHandlerAdapter);
179+
directChannel.setMaxSubscribers(1);
180+
return directChannel;
150181
}
151-
RedisSerializer stringSerializer = StringRedisSerializer.UTF_8;
152-
RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext.SerializationPair
153-
.fromSerializer(stringSerializer);
154182

155-
if (jsonTargetType == null) {
156-
redisSerializationContext = RedisSerializationContext
157-
.newSerializationContext()
158-
.key(stringSerializerPair)
159-
.value(stringSerializer)
160-
.hashKey(stringSerializer)
161-
.hashValue(stringSerializer)
162-
.build();
183+
184+
@Bean
185+
public ReactiveRedisStreamMessageHandler streamMessageHandler(ReactiveRedisConnectionFactory connectionFactory) {
186+
return new ReactiveRedisStreamMessageHandler(connectionFactory, STREAM_KEY);
163187
}
164-
else {
165-
redisSerializationContext = RedisSerializationContext
166-
.newSerializationContext()
167-
.key(jsonSerializer)
168-
.value(jsonSerializer)
169-
.hashKey(jsonSerializer)
170-
.hashValue(jsonSerializer)
171-
.build();
188+
189+
@Bean
190+
public ReactiveMessageHandlerAdapter reactiveMessageHandlerAdapter(ReactiveRedisStreamMessageHandler streamMessageHandler) {
191+
return new ReactiveMessageHandlerAdapter(streamMessageHandler);
172192
}
173193

174-
return redisSerializationContext;
194+
@Bean
195+
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
196+
return RedisAvailableRule.connectionFactory;
197+
}
175198
}
176199

177200
}

spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ public void evaluate() throws Throwable {
7878
if (connectionFactory != null) {
7979
try {
8080
connectionFactory.getConnection();
81-
base.evaluate();
8281
}
8382
catch (Exception e) {
8483
Assume.assumeTrue("Skipping test due to Redis not being available on port: " + REDIS_PORT + ": " + e, false);
8584
}
85+
base.evaluate();
8686
}
8787
}
8888
}

0 commit comments

Comments
 (0)