Skip to content

Outbound channel adapter for Redis Stream #3259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.outbound;

import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReactiveMessageHandler;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

import reactor.core.publisher.Mono;

/**
* Implementation of {@link org.springframework.messaging.ReactiveMessageHandler} which writes
* Message payload or Message itself(see {@link #extractPayload}) into a Redis stream using Reactive Stream operations.
*
* @author Attoumane Ahamadi
*
* @since 5.4
*/
@SuppressWarnings({"rawtypes", "unchecked"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we kinda get rid of rawtypes?
Probably simple wildcard for genercis (<?>) should be enough for us...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing rawtypes causes compilation issue:

error: no suitable method found for opsForStream(HashMapper<CAP#1,CAP#2,CAP#3>)
				template.opsForStream(this.hashMapper);
				        ^
    method ReactiveRedisOperations.<HK#1,HV#1>opsForStream() is not applicable
      (cannot infer type-variable(s) HK#1,HV#1
        (actual and formal argument lists differ in length))

on

this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() :
				template.opsForStream(this.hashMapper);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me play with that locally!
Then I'll merge it since we need something for today's release.

You feel free to go ahead with Inbound Channel Adapter implementation and some docs for both of this.

Thanks

public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHandler {

private final Expression streamKeyExpression;

private final ReactiveRedisConnectionFactory connectionFactory;

private EvaluationContext evaluationContext;

private boolean extractPayload = true;

private ReactiveStreamOperations reactiveStreamOperations;

private RedisSerializationContext serializationContext = RedisSerializationContext.string();

@Nullable
private HashMapper hashMapper;

public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory, String streamKey) {
this(connectionFactory, new LiteralExpression(streamKey));
}

public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory,
Expression streamKeyExpression) {
Assert.notNull(streamKeyExpression, "'streamKeyExpression' must not be null");
Assert.notNull(connectionFactory, "'connectionFactory' must not be null");
this.streamKeyExpression = streamKeyExpression;
this.connectionFactory = connectionFactory;
}

public void setSerializationContext(RedisSerializationContext serializationContext) {
Assert.notNull(serializationContext, "'serializationContext' must not be null");
this.serializationContext = serializationContext;
}

/**
* (Optional) Set the {@link HashMapper} used to create {@link #reactiveStreamOperations}.
* The default {@link HashMapper} is defined from the provided {@link RedisSerializationContext}
* @param hashMapper the wanted hashMapper
* */
public void setHashMapper(@Nullable HashMapper hashMapper) {
this.hashMapper = hashMapper;
}

/**
* Set to {@code true} to extract the payload; otherwise
* the entire message is sent. Default {@code true}.
* @param extractPayload false to not extract.
*/
public void setExtractPayload(boolean extractPayload) {
this.extractPayload = extractPayload;
}

@Override
public String getComponentType() {
return "redis:stream-outbound-channel-adapter";
}

@Override
protected void onInit() {
super.onInit();

this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());

ReactiveRedisTemplate template = new ReactiveRedisTemplate<>(this.connectionFactory,
this.serializationContext);
this.reactiveStreamOperations = this.hashMapper == null ? template.opsForStream() :
template.opsForStream(this.hashMapper);
}

@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {

String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);

Assert.notNull(streamKey, "'streamKey' must not be null");

Object value = message;
if (this.extractPayload) {
value = message.getPayload();
}

ObjectRecord record = StreamRecords
.objectBacked(value)
.withStreamKey(streamKey);

return this.reactiveStreamOperations.add(record);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.outbound;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
import org.springframework.integration.redis.rules.RedisAvailable;
import org.springframework.integration.redis.rules.RedisAvailableRule;
import org.springframework.integration.redis.rules.RedisAvailableTests;
import org.springframework.integration.redis.util.Address;
import org.springframework.integration.redis.util.Person;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author Attoumane Ahamadi
*
* @since 5.4
*/
@RunWith(SpringRunner.class)
@DirtiesContext
@SuppressWarnings({"unchecked", "rawtypes"})
public class ReactiveRedisStreamMessageHandlerTests extends RedisAvailableTests {

@Autowired
@Qualifier("streamChannel")
private MessageChannel messageChannel;

@Autowired
private ReactiveRedisConnectionFactory redisConnectionFactory;

@Autowired
private ReactiveMessageHandlerAdapter handlerAdapter;

@Autowired
private ReactiveRedisStreamMessageHandler streamMessageHandler;

@Before
public void deleteStreamKey() {
ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate<>(this.redisConnectionFactory,
RedisSerializationContext.string());
template.delete(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY).block();
}


@Test
@RedisAvailable
public void integrationStreamOutboundTest() {
String messagePayload = "Hello stream message";

messageChannel.send(new GenericMessage<>(messagePayload));

RedisSerializationContext<String, String> serializationContext = redisSerializationContext();

ReactiveRedisTemplate<String, String> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);

ObjectRecord<String, String> record = template.opsForStream().read(String.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, consider to not exceed 120 line length: much easier to review.


assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);

assertThat(record.getValue()).isEqualTo(messagePayload);
}

@Test
@RedisAvailable
public void explicitSerializationContextWithModelTest() {
Address address = new Address("Rennes, France");
Person person = new Person(address, "Attoumane");

Message message = new GenericMessage(person);

RedisSerializationContext<String, Object> serializationContext = redisSerializationContext();

streamMessageHandler.setSerializationContext(serializationContext);
streamMessageHandler.afterPropertiesSet();

handlerAdapter.handleMessage(message);

ReactiveRedisTemplate<String, Person> template = new ReactiveRedisTemplate(redisConnectionFactory, serializationContext);

ObjectRecord<String, Person> record = template.opsForStream().read(Person.class, StreamOffset.fromStart(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY)).blockFirst();

assertThat(record.getStream()).isEqualTo(ReactiveRedisStreamMessageHandlerTestsContext.STREAM_KEY);
assertThat(record.getValue().getName()).isEqualTo("Attoumane");
assertThat(record.getValue().getAddress().getAddress()).isEqualTo("Rennes, France");
}


private RedisSerializationContext redisSerializationContext() {

RedisSerializer stringSerializer = StringRedisSerializer.UTF_8;

RedisSerializationContext.SerializationPair stringSerializerPair = RedisSerializationContext
.SerializationPair
.fromSerializer(stringSerializer);

return RedisSerializationContext
.newSerializationContext()
.key(stringSerializerPair)
.value(stringSerializer)
.hashKey(stringSerializer)
.hashValue(stringSerializer)
.build();
}


@Configuration
public static class ReactiveRedisStreamMessageHandlerTestsContext {

public static final String STREAM_KEY = "myStream";

@Bean
public MessageChannel streamChannel(ReactiveMessageHandlerAdapter messageHandlerAdapter) {
DirectChannel directChannel = new DirectChannel();
directChannel.subscribe(messageHandlerAdapter);
directChannel.setMaxSubscribers(1);
return directChannel;
}


@Bean
public ReactiveRedisStreamMessageHandler streamMessageHandler(ReactiveRedisConnectionFactory connectionFactory) {
return new ReactiveRedisStreamMessageHandler(connectionFactory, STREAM_KEY);
}

@Bean
public ReactiveMessageHandlerAdapter reactiveMessageHandlerAdapter(ReactiveRedisStreamMessageHandler streamMessageHandler) {
return new ReactiveMessageHandlerAdapter(streamMessageHandler);
}

@Bean
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
return RedisAvailableRule.connectionFactory;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2007-2019 the original author or authors.
* Copyright 2007-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testAddStringMessage() {
RedisConnectionFactory jcf = this.getConnectionFactoryForTest();
RedisMessageStore store = new RedisMessageStore(jcf);
Message<String> stringMessage = new GenericMessage<String>("Hello Redis");
Message<String> storedMessage = store.addMessage(stringMessage);
Message<String> storedMessage = store.addMessage(stringMessage);
assertThat(storedMessage).isNotSameAs(stringMessage);
assertThat(storedMessage.getPayload()).isEqualTo("Hello Redis");
}
Expand All @@ -92,7 +92,7 @@ public void testAddSerializableObjectMessage() {
Person person = new Person(address, "Barak Obama");

Message<Person> objectMessage = new GenericMessage<Person>(person);
Message<Person> storedMessage = store.addMessage(objectMessage);
Message<Person> storedMessage = store.addMessage(objectMessage);
assertThat(storedMessage).isNotSameAs(objectMessage);
assertThat(storedMessage.getPayload().getName()).isEqualTo("Barak Obama");
}
Expand Down Expand Up @@ -206,6 +206,9 @@ public Person(Address address, String name) {
this.name = name;
}

public Person() {
}

public Address getAddress() {
return address;
}
Expand Down Expand Up @@ -237,6 +240,13 @@ public void setAddress(String address) {
this.address = address;
}

public Address() {
}

public Address withAddress(String address) {
this.setAddress(address);
return this;
}
}

public static class Foo {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.util;

import java.io.Serializable;

@SuppressWarnings("serial")
public class Address implements Serializable {

private String address;

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

public Address() {
}

public Address(String address) {
this.address = address;
}
}
Loading