From 0bd13dbf94c1d79ba63584c420150393fb9cb2dd Mon Sep 17 00:00:00 2001 From: Attoumane AHAMADI Date: Thu, 23 Apr 2020 19:27:38 +0200 Subject: [PATCH 1/6] Redis stream message handler support. This is the outbound part publishing message to the actual stream using ReactiveStreamOperations --- .../ReactiveRedisStreamMessageHandler.java | 128 ++++++++++++++++++ ...RedisStreamMessageHandlerTests-context.xml | 21 +++ ...eactiveRedisStreamMessageHandlerTests.java | 35 +++++ 3 files changed, 184 insertions(+) create mode 100644 spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java create mode 100644 spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml create mode 100644 spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java new file mode 100644 index 00000000000..78cb587ca78 --- /dev/null +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java @@ -0,0 +1,128 @@ +/* + * Copyright 2020-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 ( the "License" ); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.outbound; + +import reactor.core.publisher.Mono; + +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.core.ReactiveStreamOperations; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.handler.AbstractReactiveMessageHandler; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes + * Message payload into a Redis stream , using reactive stream operation. + * + * @author Attoumane AHAMADI + * + * @since 5.3 + */ +public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHandler { + + private final Expression streamKeyExpression; + + private volatile EvaluationContext evaluationContext; + + private boolean extractPayload = true; + + private ReactiveStreamOperations reactiveStreamOperations; + + private String streamKey; + + private RedisSerializationContext serializationContext = RedisSerializationContext.string(); + + private ReactiveRedisConnectionFactory connectionFactory; + + public ReactiveRedisStreamMessageHandler(Expression streamKeyExpression, + ReactiveRedisConnectionFactory connectionFactory) { + Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null"); + Assert.notNull(connectionFactory, "'connectionFactory' must not be null"); + this.streamKeyExpression = streamKeyExpression; + this.connectionFactory = connectionFactory; + } + + public ReactiveRedisStreamMessageHandler(String streamKey, ReactiveRedisConnectionFactory connectionFactory) { + this.setStreamKey(streamKey); + Assert.notNull(connectionFactory, "'connectionFactory' must not be null"); + this.connectionFactory = connectionFactory; + this.streamKeyExpression = new LiteralExpression(streamKey); + } + + public void setStreamKey(String streamKey) { + Assert.hasText(streamKey, "'streamKey' must not be an empty string."); + this.streamKey = streamKey; + } + + public void setSerializationContext(RedisSerializationContext serializationContext) { + Assert.notNull(serializationContext, "'serializationContext' must not be null"); + this.serializationContext = serializationContext; + } + + public void setExtractPayload(boolean extractPayload) { + this.extractPayload = extractPayload; + } + + void setEvaluationContext(EvaluationContext evaluationContext) { + Assert.notNull(evaluationContext, "'evaluationContext' must not be null"); + this.evaluationContext = evaluationContext; + } + + @Override + public String getComponentType() { + return "redis:stream-outbound-channel-adapter"; + } + + @Override + protected Mono handleMessageInternal(Message message) { + initStreamOperations(); + + if (!StringUtils.hasText(this.streamKey)) { + this.streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class); + } + + Object value = message; + if (this.extractPayload) { + value = message.getPayload(); + } + + ObjectRecord record = StreamRecords + .objectBacked(value) + .withStreamKey(this.streamKey); + + return this.reactiveStreamOperations.add(record); + } + + private void initStreamOperations() { + if (this.evaluationContext == null) { + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory, + this.serializationContext); + this.reactiveStreamOperations = template.opsForStream(); + } + +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml new file mode 100644 index 00000000000..8a210da4ec2 --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java new file mode 100644 index 00000000000..e9c35e39c9c --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java @@ -0,0 +1,35 @@ +package org.springframework.integration.redis.outbound; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.integration.redis.rules.RedisAvailable; +import org.springframework.integration.redis.rules.RedisAvailableTests; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; + +/** + * @author Attoumane AHAMADI + */ +@RunWith(SpringRunner.class) +@DirtiesContext +public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests { + + @Test + @RedisAvailable + public void emptyStreamKeyTest() { + assertThatIllegalArgumentException() + .isThrownBy(() -> new ReactiveRedisStreamMessageHandler("", null)); + } + + @Test + @RedisAvailable + public void nullConnectionFactoryTest() { + assertThatIllegalArgumentException() + .isThrownBy(() -> new ReactiveRedisStreamMessageHandler("stream", null)); + } + +} From 242375bf9cf0ef36fef95ae799c40d473f5eec15 Mon Sep 17 00:00:00 2001 From: Attoumane AHAMADI Date: Sat, 9 May 2020 23:09:32 +0200 Subject: [PATCH 2/6] Addition of more test cases with one using MessageChannel. Some tests with explicit RedisSerializationContext fail. --- .../ReactiveRedisStreamMessageHandler.java | 36 +++- ...RedisStreamMessageHandlerTests-context.xml | 18 +- ...eactiveRedisStreamMessageHandlerTests.java | 196 +++++++++++++++++- .../redis/store/RedisMessageStoreTests.java | 16 +- 4 files changed, 241 insertions(+), 25 deletions(-) diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java index 78cb587ca78..d7324214029 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java @@ -1,7 +1,7 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020 the original author or authors. * - * Licensed under the Apache License, Version 2.0 ( the "License" ); + * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * @@ -16,13 +16,12 @@ package org.springframework.integration.redis.outbound; -import reactor.core.publisher.Mono; - import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.ReactiveStreamOperations; +import org.springframework.data.redis.hash.HashMapper; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; @@ -33,14 +32,17 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; + /** * Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes * Message payload into a Redis stream , using reactive stream operation. * - * @author Attoumane AHAMADI + * @author Attoumane Ahamadi * - * @since 5.3 + * @since 5.4 */ +@SuppressWarnings({"rawtypes", "unchecked"}) public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHandler { private final Expression streamKeyExpression; @@ -57,6 +59,8 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa private ReactiveRedisConnectionFactory connectionFactory; + private HashMapper hashMapper; + public ReactiveRedisStreamMessageHandler(Expression streamKeyExpression, ReactiveRedisConnectionFactory connectionFactory) { Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null"); @@ -82,6 +86,11 @@ public void setSerializationContext(RedisSerializationContext serializationConte this.serializationContext = serializationContext; } + public void setHashMapper(HashMapper hashMapper) { + Assert.notNull(hashMapper, "'hashMapper' must not be null"); + this.hashMapper = hashMapper; + } + public void setExtractPayload(boolean extractPayload) { this.extractPayload = extractPayload; } @@ -97,8 +106,13 @@ public String getComponentType() { } @Override - protected Mono handleMessageInternal(Message message) { + protected void onInit() { + super.onInit(); initStreamOperations(); + } + + @Override + protected Mono handleMessageInternal(Message message) { if (!StringUtils.hasText(this.streamKey)) { this.streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class); @@ -109,20 +123,20 @@ protected Mono handleMessageInternal(Message message) { value = message.getPayload(); } - ObjectRecord record = StreamRecords - .objectBacked(value) + ObjectRecord record = StreamRecords + .objectBacked(value) .withStreamKey(this.streamKey); return this.reactiveStreamOperations.add(record); } private void initStreamOperations() { - if (this.evaluationContext == null) { + if (this.evaluationContext == null && getBeanFactory() != null) { this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); } ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory, this.serializationContext); - this.reactiveStreamOperations = template.opsForStream(); + this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : template.opsForStream(this.hashMapper); } } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml index 8a210da4ec2..8cf5f8de5af 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml @@ -6,15 +6,19 @@ xsi:schemaLocation="http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd"> - + + + + + - - - - - - + + + + + diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java index e9c35e39c9c..b4dcc66df90 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java @@ -1,23 +1,73 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.integration.redis.outbound; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; + import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; import org.springframework.integration.redis.rules.RedisAvailable; import org.springframework.integration.redis.rules.RedisAvailableTests; +import org.springframework.integration.redis.store.RedisMessageStoreTests; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; - /** - * @author Attoumane AHAMADI + * @author Attoumane Ahamadi + * + * @since 5.4 */ +@ContextConfiguration @RunWith(SpringRunner.class) @DirtiesContext +@SuppressWarnings({"unchecked", "rawtypes"}) public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests { + @Autowired + @Qualifier("forRedisStreamChannel") + private MessageChannel messageChannel; + + @Autowired + private ReactiveRedisConnectionFactory redisConnectionFactory; + + @Autowired + private ReactiveMessageHandlerAdapter handlerAdapter; + @Test @RedisAvailable public void emptyStreamKeyTest() { @@ -32,4 +82,142 @@ public void nullConnectionFactoryTest() { .isThrownBy(() -> new ReactiveRedisStreamMessageHandler("stream", null)); } + + @Test + @RedisAvailable + public void simpleStringInsertionTest() { + String streamKey = "myStream"; + String messagePayload = "Bonjour à tous les confinés"; + + handlerAdapter.handleMessage(new GenericMessage<>(messagePayload)); + + RedisSerializationContext serializationContext = redisStringOrJsonSerializationContext(true, null); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); + + ObjectRecord record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst(); + assertThat(record.getStream()).isEqualTo(streamKey); + assertThat(record.getValue()).isEqualTo(messagePayload); + template.delete(streamKey).block(); + } + + @Test + @RedisAvailable + public void integrationStreamOutboundTest() { + String streamKey = "myStream"; + String messagePayload = "Bonjour à tous les confinés"; + + messageChannel.send(new GenericMessage<>(messagePayload)); + + RedisSerializationContext serializationContext = redisStringOrJsonSerializationContext(true, null); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); + + ObjectRecord record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst(); + assertThat(record.getStream()).isEqualTo(streamKey); + assertThat(record.getValue()).isEqualTo(messagePayload); + template.delete(streamKey).block(); + } + + //TODO Find why the deserialization fail does not work + /*@Test + @RedisAvailable*/ + public void explicitJsonSerializationContextTest() { + String streamKey = "myStream"; + List messagePayload = Arrays.asList("Bonjour", "à", "tous", "les", "confinés"); + + RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(false, List.class); + + ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory); + streamMessageHandler.setSerializationContext(jsonSerializationContext); + //initializes reactiveRedisStreamOperations + invokeOnInitMethod(streamMessageHandler); + + ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler); + handlerAdapter.handleMessage(new GenericMessage<>(messagePayload)); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, + jsonSerializationContext); + ObjectRecord record = template.opsForStream().read(List.class, StreamOffset.fromStart(streamKey)) + .blockFirst(); + + assertThat(record.getStream()).isEqualTo(streamKey); + assertThat(record.getValue()).isEqualTo("[\"Bonjour\", \"à\", \"tous\", \"les\", \"confinés\"]"); + template.delete(streamKey).block(); + } + + //TODO Find why the deserialization does not work + /*@Test + @RedisAvailable*/ + public void explicitJsonSerializationContextWithModelTest() { + String streamKey = "myStream"; + + RedisMessageStoreTests.Address address = new RedisMessageStoreTests.Address().withAddress("Rennes, France"); + RedisMessageStoreTests.Person person = new RedisMessageStoreTests.Person(address, "Attoumane"); + + Message message = new GenericMessage(person); + + RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(false, RedisMessageStoreTests.Person.class); + + ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory); + streamMessageHandler.setSerializationContext(jsonSerializationContext); + //initializes reactiveRedisStreamOperations + invokeOnInitMethod(streamMessageHandler); + + ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler); + handlerAdapter.handleMessage(message); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext); + ObjectRecord record = template.opsForStream().read(RedisMessageStoreTests.Person.class, StreamOffset.fromStart(streamKey)).blockFirst(); + assertThat(record.getStream()).isEqualTo(streamKey); + assertThat(record.getValue().getName()).isEqualTo("Attoumane"); + assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France"); + template.delete(streamKey).block(); + } + + + private RedisSerializationContext redisStringOrJsonSerializationContext(boolean string, Class jsonTargetType) { + + RedisSerializationContext redisSerializationContext; + RedisSerializer jsonSerializer = null; + + if (jsonTargetType != null) { + jsonSerializer = new Jackson2JsonRedisSerializer(jsonTargetType); + } + RedisSerializer stringSerializer = StringRedisSerializer.UTF_8; + RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext.SerializationPair + .fromSerializer(stringSerializer); + + if (string) { + redisSerializationContext = RedisSerializationContext + .newSerializationContext() + .key(stringSerializerPair) + .value(stringSerializer) + .hashKey(stringSerializer) + .hashValue(stringSerializer) + .build(); + } + else { + redisSerializationContext = RedisSerializationContext + .newSerializationContext() + .key(stringSerializerPair) + .value(jsonSerializer) + .hashKey(jsonSerializer) + .hashValue(jsonSerializer) + .build(); + } + + return redisSerializationContext; + } + + private void invokeOnInitMethod(ReactiveRedisStreamMessageHandler streamMessageHandler) { + try { + Method onInit = ReactiveRedisStreamMessageHandler.class.getDeclaredMethod("onInit"); + onInit.setAccessible(true); + onInit.invoke(streamMessageHandler); + } + catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java index c30298f1175..1ab4255a638 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2007-2019 the original author or authors. + * Copyright 2007-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,7 +77,7 @@ public void testAddStringMessage() { RedisConnectionFactory jcf = this.getConnectionFactoryForTest(); RedisMessageStore store = new RedisMessageStore(jcf); Message stringMessage = new GenericMessage("Hello Redis"); - Message storedMessage = store.addMessage(stringMessage); + Message storedMessage = store.addMessage(stringMessage); assertThat(storedMessage).isNotSameAs(stringMessage); assertThat(storedMessage.getPayload()).isEqualTo("Hello Redis"); } @@ -92,7 +92,7 @@ public void testAddSerializableObjectMessage() { Person person = new Person(address, "Barak Obama"); Message objectMessage = new GenericMessage(person); - Message storedMessage = store.addMessage(objectMessage); + Message storedMessage = store.addMessage(objectMessage); assertThat(storedMessage).isNotSameAs(objectMessage); assertThat(storedMessage.getPayload().getName()).isEqualTo("Barak Obama"); } @@ -206,6 +206,9 @@ public Person(Address address, String name) { this.name = name; } + public Person() { + } + public Address getAddress() { return address; } @@ -237,6 +240,13 @@ public void setAddress(String address) { this.address = address; } + public Address() { + } + + public Address withAddress(String address) { + this.setAddress(address); + return this; + } } public static class Foo { From f10f7d40ebe81af6233cd69ee949eaf8c9f21298 Mon Sep 17 00:00:00 2001 From: Attoumane AHAMADI Date: Fri, 5 Jun 2020 19:11:35 +0200 Subject: [PATCH 3/6] Improvements after PR review. --- .../ReactiveRedisStreamMessageHandler.java | 53 +++----- ...RedisStreamMessageHandlerTests-context.xml | 25 ---- ...eactiveRedisStreamMessageHandlerTests.java | 116 ++++++------------ ...RedisStreamMessageHandlerTestsContext.java | 58 +++++++++ .../integration/redis/util/Address.java | 41 +++++++ .../integration/redis/util/Person.java | 52 ++++++++ 6 files changed, 202 insertions(+), 143 deletions(-) delete mode 100644 spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml create mode 100644 spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java create mode 100644 spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java create mode 100644 spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Person.java diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java index d7324214029..d30348f5073 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java @@ -28,9 +28,9 @@ import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.handler.AbstractReactiveMessageHandler; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; @@ -53,32 +53,22 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa private ReactiveStreamOperations reactiveStreamOperations; - private String streamKey; - private RedisSerializationContext serializationContext = RedisSerializationContext.string(); - private ReactiveRedisConnectionFactory connectionFactory; + private final ReactiveRedisConnectionFactory connectionFactory; + @Nullable private HashMapper hashMapper; - public ReactiveRedisStreamMessageHandler(Expression streamKeyExpression, - ReactiveRedisConnectionFactory connectionFactory) { + public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, Expression streamKeyExpression) { Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null"); Assert.notNull(connectionFactory, "'connectionFactory' must not be null"); this.streamKeyExpression = streamKeyExpression; this.connectionFactory = connectionFactory; } - public ReactiveRedisStreamMessageHandler(String streamKey, ReactiveRedisConnectionFactory connectionFactory) { - this.setStreamKey(streamKey); - Assert.notNull(connectionFactory, "'connectionFactory' must not be null"); - this.connectionFactory = connectionFactory; - this.streamKeyExpression = new LiteralExpression(streamKey); - } - - public void setStreamKey(String streamKey) { - Assert.hasText(streamKey, "'streamKey' must not be an empty string."); - this.streamKey = streamKey; + public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) { + this(connectionFactory, new LiteralExpression(streamKey)); } public void setSerializationContext(RedisSerializationContext serializationContext) { @@ -86,8 +76,7 @@ public void setSerializationContext(RedisSerializationContext serializationConte this.serializationContext = serializationContext; } - public void setHashMapper(HashMapper hashMapper) { - Assert.notNull(hashMapper, "'hashMapper' must not be null"); + public void setHashMapper(@Nullable HashMapper hashMapper) { this.hashMapper = hashMapper; } @@ -95,11 +84,6 @@ public void setExtractPayload(boolean extractPayload) { this.extractPayload = extractPayload; } - void setEvaluationContext(EvaluationContext evaluationContext) { - Assert.notNull(evaluationContext, "'evaluationContext' must not be null"); - this.evaluationContext = evaluationContext; - } - @Override public String getComponentType() { return "redis:stream-outbound-channel-adapter"; @@ -108,15 +92,19 @@ public String getComponentType() { @Override protected void onInit() { super.onInit(); - initStreamOperations(); + if (this.evaluationContext == null && getBeanFactory() != null) { + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory, + this.serializationContext); + this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : + template.opsForStream(this.hashMapper); } @Override protected Mono handleMessageInternal(Message message) { - if (!StringUtils.hasText(this.streamKey)) { - this.streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class); - } + String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class); Object value = message; if (this.extractPayload) { @@ -125,18 +113,9 @@ protected Mono handleMessageInternal(Message message) { ObjectRecord record = StreamRecords .objectBacked(value) - .withStreamKey(this.streamKey); + .withStreamKey(streamKey); return this.reactiveStreamOperations.add(record); } - private void initStreamOperations() { - if (this.evaluationContext == null && getBeanFactory() != null) { - this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); - } - ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory, - this.serializationContext); - this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : template.opsForStream(this.hashMapper); - } - } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml deleted file mode 100644 index 8cf5f8de5af..00000000000 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests-context.xml +++ /dev/null @@ -1,25 +0,0 @@ - - - - - - - - - - - - - - - - - - diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java index b4dcc66df90..07fb9a5e23a 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java @@ -17,9 +17,7 @@ package org.springframework.integration.redis.outbound; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; @@ -39,7 +37,8 @@ import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; import org.springframework.integration.redis.rules.RedisAvailable; import org.springframework.integration.redis.rules.RedisAvailableTests; -import org.springframework.integration.redis.store.RedisMessageStoreTests; +import org.springframework.integration.redis.util.Address; +import org.springframework.integration.redis.util.Person; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; @@ -52,14 +51,14 @@ * * @since 5.4 */ -@ContextConfiguration +@ContextConfiguration(classes = ReactiveRedisStreamMessageHandlerTestsContext.class) @RunWith(SpringRunner.class) @DirtiesContext @SuppressWarnings({"unchecked", "rawtypes"}) public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests { @Autowired - @Qualifier("forRedisStreamChannel") + @Qualifier("streamChannel") private MessageChannel messageChannel; @Autowired @@ -68,115 +67,80 @@ public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests @Autowired private ReactiveMessageHandlerAdapter handlerAdapter; - @Test - @RedisAvailable - public void emptyStreamKeyTest() { - assertThatIllegalArgumentException() - .isThrownBy(() -> new ReactiveRedisStreamMessageHandler("", null)); - } - - @Test - @RedisAvailable - public void nullConnectionFactoryTest() { - assertThatIllegalArgumentException() - .isThrownBy(() -> new ReactiveRedisStreamMessageHandler("stream", null)); - } - - - @Test - @RedisAvailable - public void simpleStringInsertionTest() { - String streamKey = "myStream"; - String messagePayload = "Bonjour à tous les confinés"; - - handlerAdapter.handleMessage(new GenericMessage<>(messagePayload)); - - RedisSerializationContext serializationContext = redisStringOrJsonSerializationContext(true, null); - - ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); - - ObjectRecord record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst(); - assertThat(record.getStream()).isEqualTo(streamKey); - assertThat(record.getValue()).isEqualTo(messagePayload); - template.delete(streamKey).block(); - } + @Autowired + private ReactiveRedisStreamMessageHandler streamMessageHandler; @Test @RedisAvailable public void integrationStreamOutboundTest() { - String streamKey = "myStream"; - String messagePayload = "Bonjour à tous les confinés"; + String messagePayload = "Hello stream message"; messageChannel.send(new GenericMessage<>(messagePayload)); - RedisSerializationContext serializationContext = redisStringOrJsonSerializationContext(true, null); + RedisSerializationContext serializationContext = redisStringOrJsonSerializationContext(null); ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); - ObjectRecord record = template.opsForStream().read(String.class, StreamOffset.fromStart(streamKey)).blockFirst(); - assertThat(record.getStream()).isEqualTo(streamKey); + ObjectRecord record = template.opsForStream().read(String.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst(); + + assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); assertThat(record.getValue()).isEqualTo(messagePayload); - template.delete(streamKey).block(); + + template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); } //TODO Find why the deserialization fail does not work /*@Test @RedisAvailable*/ public void explicitJsonSerializationContextTest() { - String streamKey = "myStream"; - List messagePayload = Arrays.asList("Bonjour", "à", "tous", "les", "confinés"); + List messagePayload = Arrays.asList("Hello", "stream", "message"); - RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(false, List.class); + RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(List.class); - ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory); streamMessageHandler.setSerializationContext(jsonSerializationContext); - //initializes reactiveRedisStreamOperations - invokeOnInitMethod(streamMessageHandler); - ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler); handlerAdapter.handleMessage(new GenericMessage<>(messagePayload)); - ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext); - ObjectRecord record = template.opsForStream().read(List.class, StreamOffset.fromStart(streamKey)) + + ObjectRecord record = template.opsForStream().read(List.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)) .blockFirst(); - assertThat(record.getStream()).isEqualTo(streamKey); - assertThat(record.getValue()).isEqualTo("[\"Bonjour\", \"à\", \"tous\", \"les\", \"confinés\"]"); - template.delete(streamKey).block(); + assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); + assertThat(record.getValue()).isEqualTo("[\"Hello\", \"stream\", \"message\"]"); + + template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); } //TODO Find why the deserialization does not work /*@Test @RedisAvailable*/ public void explicitJsonSerializationContextWithModelTest() { - String streamKey = "myStream"; - - RedisMessageStoreTests.Address address = new RedisMessageStoreTests.Address().withAddress("Rennes, France"); - RedisMessageStoreTests.Person person = new RedisMessageStoreTests.Person(address, "Attoumane"); + Address address = new Address().withAddress("Rennes, France"); + Person person = new Person(address, "Attoumane"); Message message = new GenericMessage(person); - RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(false, RedisMessageStoreTests.Person.class); + RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(Person.class); - ReactiveRedisStreamMessageHandler streamMessageHandler = new ReactiveRedisStreamMessageHandler(streamKey, redisConnectionFactory); streamMessageHandler.setSerializationContext(jsonSerializationContext); - //initializes reactiveRedisStreamOperations - invokeOnInitMethod(streamMessageHandler); - ReactiveMessageHandlerAdapter handlerAdapter = new ReactiveMessageHandlerAdapter(streamMessageHandler); handlerAdapter.handleMessage(message); - ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext); - ObjectRecord record = template.opsForStream().read(RedisMessageStoreTests.Person.class, StreamOffset.fromStart(streamKey)).blockFirst(); - assertThat(record.getStream()).isEqualTo(streamKey); + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext); + + ObjectRecord record = template.opsForStream().read(Person.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst(); + + assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); assertThat(record.getValue().getName()).isEqualTo("Attoumane"); assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France"); - template.delete(streamKey).block(); + + template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); } - private RedisSerializationContext redisStringOrJsonSerializationContext(boolean string, Class jsonTargetType) { + private RedisSerializationContext redisStringOrJsonSerializationContext(Class jsonTargetType) { RedisSerializationContext redisSerializationContext; RedisSerializer jsonSerializer = null; @@ -188,7 +152,7 @@ private RedisSerializationContext redisStringOrJsonSerializationContext(boolean RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext.SerializationPair .fromSerializer(stringSerializer); - if (string) { + if (jsonTargetType == null) { redisSerializationContext = RedisSerializationContext .newSerializationContext() .key(stringSerializerPair) @@ -200,7 +164,7 @@ private RedisSerializationContext redisStringOrJsonSerializationContext(boolean else { redisSerializationContext = RedisSerializationContext .newSerializationContext() - .key(stringSerializerPair) + .key(jsonSerializer) .value(jsonSerializer) .hashKey(jsonSerializer) .hashValue(jsonSerializer) @@ -210,14 +174,4 @@ private RedisSerializationContext redisStringOrJsonSerializationContext(boolean return redisSerializationContext; } - private void invokeOnInitMethod(ReactiveRedisStreamMessageHandler streamMessageHandler) { - try { - Method onInit = ReactiveRedisStreamMessageHandler.class.getDeclaredMethod("onInit"); - onInit.setAccessible(true); - onInit.invoke(streamMessageHandler); - } - catch (Exception e) { - e.printStackTrace(); - } - } } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java new file mode 100644 index 00000000000..e9c4b287b7c --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java @@ -0,0 +1,58 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.outbound; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; +import org.springframework.integration.redis.rules.RedisAvailableRule; +import org.springframework.messaging.MessageChannel; + +/** + * @author Attoumane Ahamadi + * + * @since 5.4 + */ +@Configuration +public class ReactiveRedisStreamMessageHandlerTestsContext { + public static final String STREAM_KEY = "myStream"; + + @Bean + public MessageChannel streamChannel(ReactiveMessageHandlerAdapter messageHandlerAdapter) { + DirectChannel directChannel = new DirectChannel(); + directChannel.subscribe(messageHandlerAdapter); + return directChannel; + } + + + @Bean + public ReactiveRedisStreamMessageHandler streamMessageHandler(ReactiveRedisConnectionFactory connectionFactory) { + return new ReactiveRedisStreamMessageHandler(connectionFactory, STREAM_KEY); + } + + @Bean + public ReactiveMessageHandlerAdapter reactiveMessageHandlerAdapter(ReactiveRedisStreamMessageHandler streamMessageHandler) { + return new ReactiveMessageHandlerAdapter(streamMessageHandler); + } + + @Bean + public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { + return RedisAvailableRule.connectionFactory; + } +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java new file mode 100644 index 00000000000..9bdc8f50516 --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java @@ -0,0 +1,41 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.util; + +import java.io.Serializable; + +@SuppressWarnings("serial") +public class Address implements Serializable { + + private String address; + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public Address() { + } + + public Address withAddress(String address) { + this.setAddress(address); + return this; + } +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Person.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Person.java new file mode 100644 index 00000000000..982aac5c4e7 --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Person.java @@ -0,0 +1,52 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.util; + +import java.io.Serializable; + + +@SuppressWarnings("serial") +public class Person implements Serializable { + + private Address address; + + private String name; + + public Person(Address address, String name) { + this.address = address; + this.name = name; + } + + public Person() { + } + + public Address getAddress() { + return address; + } + + public void setAddress(Address address) { + this.address = address; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} From e44930a8d4f054be44136a5e9b764a8dea33e484 Mon Sep 17 00:00:00 2001 From: Attoumane AHAMADI Date: Fri, 12 Jun 2020 16:51:08 +0200 Subject: [PATCH 4/6] Improvements after PR review. Also only one test does not pass, see ReactiveRedisStreamMessageHandlerTests#explicitSerializationContextTest --- .../ReactiveRedisStreamMessageHandler.java | 37 ++++-- ...eactiveRedisStreamMessageHandlerTests.java | 117 +++++++++++------- ...RedisStreamMessageHandlerTestsContext.java | 58 --------- 3 files changed, 95 insertions(+), 117 deletions(-) delete mode 100644 spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java index d30348f5073..4ce04b10a5b 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java @@ -36,7 +36,7 @@ /** * Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes - * Message payload into a Redis stream , using reactive stream operation. + * Message payload or Message itself(see {@link #extractPayload}) into a Redis stream using Reactive Stream operations. * * @author Attoumane Ahamadi * @@ -47,7 +47,9 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa private final Expression streamKeyExpression; - private volatile EvaluationContext evaluationContext; + private final ReactiveRedisConnectionFactory connectionFactory; + + private EvaluationContext evaluationContext; private boolean extractPayload = true; @@ -55,31 +57,40 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa private RedisSerializationContext serializationContext = RedisSerializationContext.string(); - private final ReactiveRedisConnectionFactory connectionFactory; - @Nullable private HashMapper hashMapper; - public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, Expression streamKeyExpression) { + public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) { + this(connectionFactory, new LiteralExpression(streamKey)); + } + + public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, + Expression streamKeyExpression) { Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null"); Assert.notNull(connectionFactory, "'connectionFactory' must not be null"); this.streamKeyExpression = streamKeyExpression; this.connectionFactory = connectionFactory; } - public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) { - this(connectionFactory, new LiteralExpression(streamKey)); - } - public void setSerializationContext(RedisSerializationContext serializationContext) { Assert.notNull(serializationContext, "'serializationContext' must not be null"); this.serializationContext = serializationContext; } + /** + * (Optional) Set the {@link HashMapper} used to create {@link #reactiveStreamOperations}. + * The default {@link HashMapper} is defined from the provided {@link RedisSerializationContext} + * @param hashMapper the wanted hashMapper + * */ public void setHashMapper(@Nullable HashMapper hashMapper) { this.hashMapper = hashMapper; } + /** + * Set to {@code true} to extract the payload; otherwise + * the entire message is sent. Default {@code true}. + * @param extractPayload false to not extract. + */ public void setExtractPayload(boolean extractPayload) { this.extractPayload = extractPayload; } @@ -92,9 +103,9 @@ public String getComponentType() { @Override protected void onInit() { super.onInit(); - if (this.evaluationContext == null && getBeanFactory() != null) { - this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); - } + + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory, this.serializationContext); this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : @@ -106,6 +117,8 @@ protected Mono handleMessageInternal(Message message) { String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class); + Assert.notNull(streamKey, "'streamKey' must not be null"); + Object value = message; if (this.extractPayload) { value = message.getPayload(); diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java index 07fb9a5e23a..1fc401df609 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java @@ -21,21 +21,25 @@ import java.util.Arrays; import java.util.List; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.ReactiveRedisTemplate; -import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; import org.springframework.integration.redis.rules.RedisAvailable; +import org.springframework.integration.redis.rules.RedisAvailableRule; import org.springframework.integration.redis.rules.RedisAvailableTests; import org.springframework.integration.redis.util.Address; import org.springframework.integration.redis.util.Person; @@ -43,7 +47,6 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; /** @@ -51,7 +54,6 @@ * * @since 5.4 */ -@ContextConfiguration(classes = ReactiveRedisStreamMessageHandlerTestsContext.class) @RunWith(SpringRunner.class) @DirtiesContext @SuppressWarnings({"unchecked", "rawtypes"}) @@ -70,6 +72,14 @@ public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests @Autowired private ReactiveRedisStreamMessageHandler streamMessageHandler; + @Before + public void deleteStreamKey() { + ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.redisConnectionFactory, + RedisSerializationContext.string()); + template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); + } + + @Test @RedisAvailable public void integrationStreamOutboundTest() { @@ -77,101 +87,114 @@ public void integrationStreamOutboundTest() { messageChannel.send(new GenericMessage<>(messagePayload)); - RedisSerializationContext serializationContext = redisStringOrJsonSerializationContext(null); + RedisSerializationContext serializationContext = redisSerializationContext(); ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); ObjectRecord record = template.opsForStream().read(String.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst(); assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); - assertThat(record.getValue()).isEqualTo(messagePayload); - template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); + assertThat(record.getValue()).isEqualTo(messagePayload); } - //TODO Find why the deserialization fail does not work + /*@Test @RedisAvailable*/ - public void explicitJsonSerializationContextTest() { + public void explicitSerializationContextTest() { List messagePayload = Arrays.asList("Hello", "stream", "message"); - RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(List.class); + RedisSerializationContext serializationContext = redisSerializationContext(); - streamMessageHandler.setSerializationContext(jsonSerializationContext); + streamMessageHandler.setSerializationContext(serializationContext); + streamMessageHandler.afterPropertiesSet(); handlerAdapter.handleMessage(new GenericMessage<>(messagePayload)); ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, - jsonSerializationContext); + serializationContext); - ObjectRecord record = template.opsForStream().read(List.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)) + ObjectRecord record = template.opsForStream().read(List.class, StreamOffset + .fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)) .blockFirst(); assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); - assertThat(record.getValue()).isEqualTo("[\"Hello\", \"stream\", \"message\"]"); - template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); + assertThat(record.getValue()).isEqualTo("[\"Hello\", \"stream\", \"message\"]"); } - //TODO Find why the deserialization does not work - /*@Test - @RedisAvailable*/ - public void explicitJsonSerializationContextWithModelTest() { + + @Test + @RedisAvailable + public void explicitSerializationContextWithModelTest() { Address address = new Address().withAddress("Rennes, France"); Person person = new Person(address, "Attoumane"); Message message = new GenericMessage(person); - RedisSerializationContext jsonSerializationContext = redisStringOrJsonSerializationContext(Person.class); + RedisSerializationContext serializationContext = redisSerializationContext(); - streamMessageHandler.setSerializationContext(jsonSerializationContext); + streamMessageHandler.setSerializationContext(serializationContext); + streamMessageHandler.afterPropertiesSet(); handlerAdapter.handleMessage(message); - ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, jsonSerializationContext); + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); ObjectRecord record = template.opsForStream().read(Person.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst(); assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); assertThat(record.getValue().getName()).isEqualTo("Attoumane"); assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France"); + } - template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); + + private RedisSerializationContext redisSerializationContext() { + + RedisSerializer stringSerializer = StringRedisSerializer.UTF_8; + + RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext + .SerializationPair + .fromSerializer(stringSerializer); + + return RedisSerializationContext + .newSerializationContext() + .key(stringSerializerPair) + .value(stringSerializer) + .hashKey(stringSerializer) + .hashValue(stringSerializer) + .build(); } - private RedisSerializationContext redisStringOrJsonSerializationContext(Class jsonTargetType) { + @Configuration + public static class ReactiveRedisStreamMessageHandlerTestsContext { - RedisSerializationContext redisSerializationContext; - RedisSerializer jsonSerializer = null; + public static final String STREAM_KEY = "myStream"; - if (jsonTargetType != null) { - jsonSerializer = new Jackson2JsonRedisSerializer(jsonTargetType); + @Bean + public MessageChannel streamChannel(ReactiveMessageHandlerAdapter messageHandlerAdapter) { + DirectChannel directChannel = new DirectChannel(); + directChannel.subscribe(messageHandlerAdapter); + directChannel.setMaxSubscribers(1); + return directChannel; } - RedisSerializer stringSerializer = StringRedisSerializer.UTF_8; - RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext.SerializationPair - .fromSerializer(stringSerializer); - if (jsonTargetType == null) { - redisSerializationContext = RedisSerializationContext - .newSerializationContext() - .key(stringSerializerPair) - .value(stringSerializer) - .hashKey(stringSerializer) - .hashValue(stringSerializer) - .build(); + + @Bean + public ReactiveRedisStreamMessageHandler streamMessageHandler(ReactiveRedisConnectionFactory connectionFactory) { + return new ReactiveRedisStreamMessageHandler(connectionFactory, STREAM_KEY); } - else { - redisSerializationContext = RedisSerializationContext - .newSerializationContext() - .key(jsonSerializer) - .value(jsonSerializer) - .hashKey(jsonSerializer) - .hashValue(jsonSerializer) - .build(); + + @Bean + public ReactiveMessageHandlerAdapter reactiveMessageHandlerAdapter(ReactiveRedisStreamMessageHandler streamMessageHandler) { + return new ReactiveMessageHandlerAdapter(streamMessageHandler); } - return redisSerializationContext; + @Bean + public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { + return RedisAvailableRule.connectionFactory; + } } } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java deleted file mode 100644 index e9c4b287b7c..00000000000 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTestsContext.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2013-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.redis.outbound; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; -import org.springframework.integration.redis.rules.RedisAvailableRule; -import org.springframework.messaging.MessageChannel; - -/** - * @author Attoumane Ahamadi - * - * @since 5.4 - */ -@Configuration -public class ReactiveRedisStreamMessageHandlerTestsContext { - public static final String STREAM_KEY = "myStream"; - - @Bean - public MessageChannel streamChannel(ReactiveMessageHandlerAdapter messageHandlerAdapter) { - DirectChannel directChannel = new DirectChannel(); - directChannel.subscribe(messageHandlerAdapter); - return directChannel; - } - - - @Bean - public ReactiveRedisStreamMessageHandler streamMessageHandler(ReactiveRedisConnectionFactory connectionFactory) { - return new ReactiveRedisStreamMessageHandler(connectionFactory, STREAM_KEY); - } - - @Bean - public ReactiveMessageHandlerAdapter reactiveMessageHandlerAdapter(ReactiveRedisStreamMessageHandler streamMessageHandler) { - return new ReactiveMessageHandlerAdapter(streamMessageHandler); - } - - @Bean - public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { - return RedisAvailableRule.connectionFactory; - } -} From a0f9770468ceffba77487a8bd049914ee7a9ee7d Mon Sep 17 00:00:00 2001 From: Attoumane AHAMADI Date: Wed, 24 Jun 2020 22:57:46 +0200 Subject: [PATCH 5/6] Still have to review one failing test --- .../outbound/ReactiveRedisStreamMessageHandlerTests.java | 4 ++-- .../org/springframework/integration/redis/util/Address.java | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java index 1fc401df609..1530733df34 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java @@ -120,14 +120,14 @@ public void explicitSerializationContextTest() { assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); - assertThat(record.getValue()).isEqualTo("[\"Hello\", \"stream\", \"message\"]"); + assertThat(record.getValue()).containsExactlyInAnyOrder("stream", "message", "Hello"); } @Test @RedisAvailable public void explicitSerializationContextWithModelTest() { - Address address = new Address().withAddress("Rennes, France"); + Address address = new Address("Rennes, France"); Person person = new Person(address, "Attoumane"); Message message = new GenericMessage(person); diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java index 9bdc8f50516..65514771396 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java @@ -34,8 +34,7 @@ public void setAddress(String address) { public Address() { } - public Address withAddress(String address) { - this.setAddress(address); - return this; + public Address(String address) { + this.address = address; } } From 739edc0a315be93c339a71edebe8d4532921d538 Mon Sep 17 00:00:00 2001 From: Attoumane AHAMADI Date: Thu, 25 Jun 2020 14:08:58 +0200 Subject: [PATCH 6/6] Removed failed test reading List from a Stream --- ...eactiveRedisStreamMessageHandlerTests.java | 29 ------------------- 1 file changed, 29 deletions(-) diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java index 1530733df34..298a9c55ce3 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java @@ -18,9 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.util.Arrays; -import java.util.List; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -98,32 +95,6 @@ public void integrationStreamOutboundTest() { assertThat(record.getValue()).isEqualTo(messagePayload); } - - /*@Test - @RedisAvailable*/ - public void explicitSerializationContextTest() { - List messagePayload = Arrays.asList("Hello", "stream", "message"); - - RedisSerializationContext serializationContext = redisSerializationContext(); - - streamMessageHandler.setSerializationContext(serializationContext); - streamMessageHandler.afterPropertiesSet(); - - handlerAdapter.handleMessage(new GenericMessage<>(messagePayload)); - - ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, - serializationContext); - - ObjectRecord record = template.opsForStream().read(List.class, StreamOffset - .fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)) - .blockFirst(); - - assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); - - assertThat(record.getValue()).containsExactlyInAnyOrder("stream", "message", "Hello"); - } - - @Test @RedisAvailable public void explicitSerializationContextWithModelTest() {