Skip to content

Commit 7372f18

Browse files
committed
Inbound:
- Changed the imperative `StreamMessageListenerContainer` listener in favor of the reactive one `StreamReceiver` - Added RedisHeaders values for Redis Stream Outbound: - Added a new constructor taking `Expression` as parameter - Changed the imperative `StreamOperations` in favor of `ReactiveStreamOperations` - Renaming class from `RedisStreamMessageHandler` to `ReactiveRedisStreamMessageHandler` Took into account other improvement remarks: copyright, setters position, methods names
1 parent 88c7dbe commit 7372f18

File tree

4 files changed

+270
-219
lines changed

4 files changed

+270
-219
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,34 @@
1+
/*
2+
* Copyright 2020-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 ( the "License" );
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package org.springframework.integration.redis.inbound;
218

3-
import java.time.Duration;
419
import java.util.HashMap;
520
import java.util.Map;
621

7-
import org.springframework.core.task.SimpleAsyncTaskExecutor;
8-
import org.springframework.data.redis.connection.RedisConnectionFactory;
22+
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
923
import org.springframework.data.redis.connection.stream.Consumer;
1024
import org.springframework.data.redis.connection.stream.ReadOffset;
1125
import org.springframework.data.redis.connection.stream.Record;
1226
import org.springframework.data.redis.connection.stream.StreamOffset;
1327
import org.springframework.data.redis.core.RedisCallback;
1428
import org.springframework.data.redis.core.RedisTemplate;
15-
import org.springframework.data.redis.hash.ObjectHashMapper;
16-
import org.springframework.data.redis.serializer.StringRedisSerializer;
17-
import org.springframework.data.redis.stream.StreamListener;
18-
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
29+
import org.springframework.data.redis.stream.StreamReceiver;
1930
import org.springframework.integration.endpoint.MessageProducerSupport;
31+
import org.springframework.integration.redis.support.RedisHeaders;
2032
import org.springframework.lang.Nullable;
2133
import org.springframework.messaging.Message;
2234
import org.springframework.messaging.MessageHeaders;
@@ -30,126 +42,130 @@
3042
*/
3143
public class RedisStreamInboundChannelAdapter extends MessageProducerSupport {
3244

33-
private volatile StreamMessageListenerContainer container;
34-
35-
private volatile StreamMessageListenerContainer.StreamMessageListenerContainerOptions containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
36-
.builder()
37-
.serializer( StringRedisSerializer.UTF_8 )
38-
.pollTimeout( Duration.ZERO )
39-
.executor( new SimpleAsyncTaskExecutor() )
40-
.batchSize( 1 )
41-
.objectMapper( new ObjectHashMapper() )
42-
.build();
43-
44-
private final String streamKey;
45-
46-
private volatile @Nullable String consumerGroupName;
47-
48-
private volatile @Nullable String consumerName;
49-
50-
private volatile boolean createGroupeIfNotExist = false;
51-
52-
private volatile ReadOffset readOffset = ReadOffset.lastConsumed();
53-
54-
private final RedisTemplate<String, ?> template;
55-
56-
private final MessageConverter messageConverter = new SimpleMessageConverter();
57-
58-
private final RedisConnectionFactory connectionFactory;
59-
60-
public RedisStreamInboundChannelAdapter( String streamKey, RedisConnectionFactory connectionFactory ) {
61-
this.streamKey = streamKey;
62-
this.connectionFactory = connectionFactory;
63-
this.template = new RedisTemplate<>();
64-
this.template.setConnectionFactory( this.connectionFactory );
65-
this.template.setKeySerializer( new StringRedisSerializer() );
66-
this.template.afterPropertiesSet();
67-
}
68-
69-
@Override
70-
protected void onInit() {
71-
super.onInit();
72-
listenMessage();
73-
}
74-
75-
@Override
76-
protected void doStart() {
77-
super.doStart();
78-
createGroup();
79-
this.container.start();
80-
}
81-
82-
@Override
83-
protected void doStop() {
84-
super.doStop();
85-
this.container.stop();
86-
}
87-
88-
@Override
89-
public String getComponentType() {
90-
return "redis:stream-inbound-channel-adapter";
91-
}
92-
93-
private Message<?> convertMessage( Record<Object, Object> message ) {
94-
Map<String, Object> headers = new HashMap<>();
95-
headers.put( "streamKey", message.getStream() );
96-
headers.put( "Message-ID", message.getId() );
97-
return this.messageConverter.toMessage( message.getValue(), new MessageHeaders( headers ) );
98-
}
99-
100-
private void listenMessage() {
101-
this.container = StreamMessageListenerContainer.create( connectionFactory, this.containerOptions );
102-
103-
StreamListener<Object, Record<Object, Object>> streamListener = message -> sendMessage(
104-
convertMessage( message ) );
105-
106-
StreamOffset offset = StreamOffset.create( this.streamKey, this.readOffset );
107-
108-
if ( StringUtils.isEmpty( this.consumerGroupName ) ) {
109-
container.receive( offset, streamListener );
110-
} else {
111-
Assert.hasText( consumerName, "'consumerName' must be set" );
112-
Consumer consumer = Consumer.from( this.consumerGroupName, this.consumerName );
113-
container.receiveAutoAck( consumer, offset, streamListener );
114-
}
115-
}
116-
117-
// TODO : follow the resolution of this Spring Data Redis issue:
118-
// https://jira.spring.io/projects/DATAREDIS/issues/DATAREDIS-1119
119-
// And improve this method when it will be solved.
120-
/**
121-
* Create the Consumer Group if and only if it does not exist. During the
122-
* creation we can also create the stream ie {@code MKSTREAM}, that does not
123-
* have effect if the stream already exists.
124-
*/
125-
private void createGroup() {
126-
if ( createGroupeIfNotExist ) {
127-
Assert.hasText( consumerGroupName, "'consumerGroupName' must be set" );
128-
Assert.hasText( consumerName, "'consumerName' must be set" );
129-
try {
130-
template.execute( (RedisCallback<Object>) connection -> connection.execute( "XGROUP",
131-
"CREATE".getBytes(), streamKey.getBytes(), consumerGroupName.getBytes(),
132-
ReadOffset.latest().getOffset().getBytes(), "MKSTREAM".getBytes() ) );
133-
} catch ( Exception e ) {
134-
// An exception is thrown when the group already exists
135-
e.printStackTrace();
136-
}
137-
}
138-
}
139-
140-
public void setConsumerGroupName( @Nullable String consumerGroupName ) {
141-
this.consumerGroupName = consumerGroupName;
142-
}
143-
144-
public void setConsumerName( @Nullable String consumerName ) {
145-
this.consumerName = consumerName;
146-
}
147-
148-
public void setCreateGroupeIfNotExist( boolean createGroupeIfNotExist ) {
149-
this.createGroupeIfNotExist = createGroupeIfNotExist;
150-
}
151-
152-
public void setReadOffset( ReadOffset readOffset ) {
153-
this.readOffset = readOffset;
154-
}
45+
private final ReactiveRedisConnectionFactory reactiveConnectionFactory;
46+
47+
private final MessageConverter messageConverter = new SimpleMessageConverter();
48+
49+
private final String streamKey;
50+
51+
private StreamReceiver receiver;
52+
53+
private StreamReceiver.StreamReceiverOptions receiverOptions;
54+
55+
private volatile String consumerGroupName;
56+
57+
private volatile String consumerName;
58+
59+
private volatile boolean createGroupIfNotExist = false;
60+
61+
private ReadOffset readOffset = ReadOffset.lastConsumed();
62+
63+
private RedisTemplate template;
64+
65+
public RedisStreamInboundChannelAdapter( String streamKey,
66+
ReactiveRedisConnectionFactory reactiveConnectionFactory ) {
67+
this.streamKey = streamKey;
68+
this.reactiveConnectionFactory = reactiveConnectionFactory;
69+
}
70+
71+
public void setConsumerGroupName( @Nullable String consumerGroupName ) {
72+
this.consumerGroupName = consumerGroupName;
73+
}
74+
75+
public void setConsumerName( @Nullable String consumerName ) {
76+
this.consumerName = consumerName;
77+
}
78+
79+
public void setCreateGroupIfNotExist( boolean createGroupIfNotExist ) {
80+
this.createGroupIfNotExist = createGroupIfNotExist;
81+
}
82+
83+
public void setReceiver( StreamReceiver receiver ) {
84+
this.receiver = receiver;
85+
}
86+
87+
public void setReceiverOptions( StreamReceiver.StreamReceiverOptions receiverOptions ) {
88+
this.receiverOptions = receiverOptions;
89+
}
90+
91+
public void setReadOffset( ReadOffset readOffset ) {
92+
this.readOffset = readOffset;
93+
}
94+
95+
void setTemplate( RedisTemplate template ) {
96+
this.template = template;
97+
}
98+
99+
@Override
100+
protected void onInit() {
101+
super.onInit();
102+
registerListener();
103+
}
104+
105+
@Override
106+
protected void doStart() {
107+
super.doStart();
108+
createGroup();
109+
}
110+
111+
@Override
112+
public String getComponentType() {
113+
return "redis:stream-inbound-channel-adapter";
114+
}
115+
116+
private Message<?> convertMessage( Record<Object, Object> record ) {
117+
Map<String, Object> headers = new HashMap<>();
118+
headers.put( RedisHeaders.STREAM_KEY, record.getStream() );
119+
headers.put( RedisHeaders.STREAM_MESSAGE_ID, record.getId() );
120+
return this.messageConverter.toMessage( record.getValue(), new MessageHeaders( headers ) );
121+
}
122+
123+
private void registerListener() {
124+
if ( this.receiverOptions != null ) {
125+
this.receiver = StreamReceiver.create( reactiveConnectionFactory, this.receiverOptions );
126+
}
127+
else {
128+
this.receiver = StreamReceiver.create( reactiveConnectionFactory );
129+
}
130+
131+
StreamOffset offset = StreamOffset.create( this.streamKey, this.readOffset );
132+
133+
if ( StringUtils.isEmpty( this.consumerGroupName ) ) {
134+
this.receiver
135+
.receive( offset )
136+
.subscribe( record -> sendMessage( convertMessage( ( Record<Object, Object> ) record ) ) );
137+
}
138+
else {
139+
Assert.hasText( consumerName, "'consumerName' must be set" );
140+
Consumer consumer = Consumer.from( this.consumerGroupName, this.consumerName );
141+
this.receiver
142+
.receiveAutoAck( consumer, offset )
143+
.subscribe( record -> sendMessage( convertMessage( ( Record<Object, Object> ) record ) ) );
144+
}
145+
}
146+
147+
// TODO : follow the resolution of this Spring Data Redis issue:
148+
// https://jira.spring.io/projects/DATAREDIS/issues/DATAREDIS-1119
149+
// And improve this method when it will be solved.
150+
/**
151+
* Create the Consumer Group if and only if it does not exist. During the
152+
* creation we can also create the stream ie {@code MKSTREAM}, that does not
153+
* have effect if the stream already exists.
154+
*/
155+
private void createGroup() {
156+
if ( createGroupIfNotExist ) {
157+
Assert.notNull( template, "'template' should not be null" );
158+
Assert.hasText( consumerGroupName, "'consumerGroupName' must be set" );
159+
Assert.hasText( consumerName, "'consumerName' must be set" );
160+
try {
161+
this.template.execute( ( RedisCallback<Object> ) connection -> connection.execute( "XGROUP",
162+
"CREATE".getBytes(), streamKey.getBytes(), consumerGroupName.getBytes(),
163+
ReadOffset.latest().getOffset().getBytes(), "MKSTREAM".getBytes() ) );
164+
}
165+
catch ( Exception e ) {
166+
// An exception is thrown when the group already exists
167+
e.printStackTrace();
168+
}
169+
}
170+
}
155171
}

0 commit comments

Comments
 (0)