diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java new file mode 100644 index 00000000000..4ce04b10a5b --- /dev/null +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java @@ -0,0 +1,134 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.outbound; + +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.StreamRecords; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.core.ReactiveStreamOperations; +import org.springframework.data.redis.hash.HashMapper; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.handler.AbstractReactiveMessageHandler; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +import reactor.core.publisher.Mono; + +/** + * Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes + * Message payload or Message itself(see {@link #extractPayload}) into a Redis stream using Reactive Stream operations. + * + * @author Attoumane Ahamadi + * + * @since 5.4 + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHandler { + + private final Expression streamKeyExpression; + + private final ReactiveRedisConnectionFactory connectionFactory; + + private EvaluationContext evaluationContext; + + private boolean extractPayload = true; + + private ReactiveStreamOperations reactiveStreamOperations; + + private RedisSerializationContext serializationContext = RedisSerializationContext.string(); + + @Nullable + private HashMapper hashMapper; + + public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) { + this(connectionFactory, new LiteralExpression(streamKey)); + } + + public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, + Expression streamKeyExpression) { + Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null"); + Assert.notNull(connectionFactory, "'connectionFactory' must not be null"); + this.streamKeyExpression = streamKeyExpression; + this.connectionFactory = connectionFactory; + } + + public void setSerializationContext(RedisSerializationContext serializationContext) { + Assert.notNull(serializationContext, "'serializationContext' must not be null"); + this.serializationContext = serializationContext; + } + + /** + * (Optional) Set the {@link HashMapper} used to create {@link #reactiveStreamOperations}. + * The default {@link HashMapper} is defined from the provided {@link RedisSerializationContext} + * @param hashMapper the wanted hashMapper + * */ + public void setHashMapper(@Nullable HashMapper hashMapper) { + this.hashMapper = hashMapper; + } + + /** + * Set to {@code true} to extract the payload; otherwise + * the entire message is sent. Default {@code true}. + * @param extractPayload false to not extract. + */ + public void setExtractPayload(boolean extractPayload) { + this.extractPayload = extractPayload; + } + + @Override + public String getComponentType() { + return "redis:stream-outbound-channel-adapter"; + } + + @Override + protected void onInit() { + super.onInit(); + + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory, + this.serializationContext); + this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() : + template.opsForStream(this.hashMapper); + } + + @Override + protected Mono handleMessageInternal(Message message) { + + String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class); + + Assert.notNull(streamKey, "'streamKey' must not be null"); + + Object value = message; + if (this.extractPayload) { + value = message.getPayload(); + } + + ObjectRecord record = StreamRecords + .objectBacked(value) + .withStreamKey(streamKey); + + return this.reactiveStreamOperations.add(record); + } + +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java new file mode 100644 index 00000000000..298a9c55ce3 --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandlerTests.java @@ -0,0 +1,171 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.outbound; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; +import org.springframework.integration.redis.rules.RedisAvailable; +import org.springframework.integration.redis.rules.RedisAvailableRule; +import org.springframework.integration.redis.rules.RedisAvailableTests; +import org.springframework.integration.redis.util.Address; +import org.springframework.integration.redis.util.Person; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * @author Attoumane Ahamadi + * + * @since 5.4 + */ +@RunWith(SpringRunner.class) +@DirtiesContext +@SuppressWarnings({"unchecked", "rawtypes"}) +public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests { + + @Autowired + @Qualifier("streamChannel") + private MessageChannel messageChannel; + + @Autowired + private ReactiveRedisConnectionFactory redisConnectionFactory; + + @Autowired + private ReactiveMessageHandlerAdapter handlerAdapter; + + @Autowired + private ReactiveRedisStreamMessageHandler streamMessageHandler; + + @Before + public void deleteStreamKey() { + ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.redisConnectionFactory, + RedisSerializationContext.string()); + template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block(); + } + + + @Test + @RedisAvailable + public void integrationStreamOutboundTest() { + String messagePayload = "Hello stream message"; + + messageChannel.send(new GenericMessage<>(messagePayload)); + + RedisSerializationContext serializationContext = redisSerializationContext(); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); + + ObjectRecord record = template.opsForStream().read(String.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst(); + + assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); + + assertThat(record.getValue()).isEqualTo(messagePayload); + } + + @Test + @RedisAvailable + public void explicitSerializationContextWithModelTest() { + Address address = new Address("Rennes, France"); + Person person = new Person(address, "Attoumane"); + + Message message = new GenericMessage(person); + + RedisSerializationContext serializationContext = redisSerializationContext(); + + streamMessageHandler.setSerializationContext(serializationContext); + streamMessageHandler.afterPropertiesSet(); + + handlerAdapter.handleMessage(message); + + ReactiveRedisTemplate template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext); + + ObjectRecord record = template.opsForStream().read(Person.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst(); + + assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY); + assertThat(record.getValue().getName()).isEqualTo("Attoumane"); + assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France"); + } + + + private RedisSerializationContext redisSerializationContext() { + + RedisSerializer stringSerializer = StringRedisSerializer.UTF_8; + + RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext + .SerializationPair + .fromSerializer(stringSerializer); + + return RedisSerializationContext + .newSerializationContext() + .key(stringSerializerPair) + .value(stringSerializer) + .hashKey(stringSerializer) + .hashValue(stringSerializer) + .build(); + } + + + @Configuration + public static class ReactiveRedisStreamMessageHandlerTestsContext { + + public static final String STREAM_KEY = "myStream"; + + @Bean + public MessageChannel streamChannel(ReactiveMessageHandlerAdapter messageHandlerAdapter) { + DirectChannel directChannel = new DirectChannel(); + directChannel.subscribe(messageHandlerAdapter); + directChannel.setMaxSubscribers(1); + return directChannel; + } + + + @Bean + public ReactiveRedisStreamMessageHandler streamMessageHandler(ReactiveRedisConnectionFactory connectionFactory) { + return new ReactiveRedisStreamMessageHandler(connectionFactory, STREAM_KEY); + } + + @Bean + public ReactiveMessageHandlerAdapter reactiveMessageHandlerAdapter(ReactiveRedisStreamMessageHandler streamMessageHandler) { + return new ReactiveMessageHandlerAdapter(streamMessageHandler); + } + + @Bean + public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { + return RedisAvailableRule.connectionFactory; + } + } + +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java index c30298f1175..1ab4255a638 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2007-2019 the original author or authors. + * Copyright 2007-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,7 +77,7 @@ public void testAddStringMessage() { RedisConnectionFactory jcf = this.getConnectionFactoryForTest(); RedisMessageStore store = new RedisMessageStore(jcf); Message stringMessage = new GenericMessage("Hello Redis"); - Message storedMessage = store.addMessage(stringMessage); + Message storedMessage = store.addMessage(stringMessage); assertThat(storedMessage).isNotSameAs(stringMessage); assertThat(storedMessage.getPayload()).isEqualTo("Hello Redis"); } @@ -92,7 +92,7 @@ public void testAddSerializableObjectMessage() { Person person = new Person(address, "Barak Obama"); Message objectMessage = new GenericMessage(person); - Message storedMessage = store.addMessage(objectMessage); + Message storedMessage = store.addMessage(objectMessage); assertThat(storedMessage).isNotSameAs(objectMessage); assertThat(storedMessage.getPayload().getName()).isEqualTo("Barak Obama"); } @@ -206,6 +206,9 @@ public Person(Address address, String name) { this.name = name; } + public Person() { + } + public Address getAddress() { return address; } @@ -237,6 +240,13 @@ public void setAddress(String address) { this.address = address; } + public Address() { + } + + public Address withAddress(String address) { + this.setAddress(address); + return this; + } } public static class Foo { diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java new file mode 100644 index 00000000000..65514771396 --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Address.java @@ -0,0 +1,40 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.util; + +import java.io.Serializable; + +@SuppressWarnings("serial") +public class Address implements Serializable { + + private String address; + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public Address() { + } + + public Address(String address) { + this.address = address; + } +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Person.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Person.java new file mode 100644 index 00000000000..982aac5c4e7 --- /dev/null +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/util/Person.java @@ -0,0 +1,52 @@ +/* + * Copyright 2013-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.util; + +import java.io.Serializable; + + +@SuppressWarnings("serial") +public class Person implements Serializable { + + private Address address; + + private String name; + + public Person(Address address, String name) { + this.address = address; + this.name = name; + } + + public Person() { + } + + public Address getAddress() { + return address; + } + + public void setAddress(Address address) { + this.address = address; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +}