Skip to content

Commit 6876730

Browse files
committed
Introduce Reactive Message producer
1 parent 24e6be6 commit 6876730

File tree

2 files changed

+142
-10
lines changed

2 files changed

+142
-10
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19+
import org.reactivestreams.Publisher;
20+
1921
import org.springframework.beans.factory.BeanFactory;
2022
import org.springframework.beans.factory.SmartInitializingSingleton;
2123
import org.springframework.core.AttributeAccessor;
2224
import org.springframework.integration.IntegrationPattern;
2325
import org.springframework.integration.IntegrationPatternType;
26+
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
2427
import org.springframework.integration.core.MessageProducer;
2528
import org.springframework.integration.core.MessagingTemplate;
2629
import org.springframework.integration.history.MessageHistory;
@@ -36,6 +39,10 @@
3639
import org.springframework.util.Assert;
3740
import org.springframework.util.StringUtils;
3841

42+
import reactor.core.Disposable;
43+
import reactor.core.Disposables;
44+
import reactor.core.publisher.Flux;
45+
3946
/**
4047
* A support class for producer endpoints that provides a setter for the
4148
* output channel and a convenience method for sending Messages.
@@ -47,6 +54,8 @@
4754
public abstract class MessageProducerSupport extends AbstractEndpoint implements MessageProducer, TrackableComponent,
4855
SmartInitializingSingleton, IntegrationPattern {
4956

57+
private final Disposable.Composite reactiveSubscriptions = Disposables.composite();
58+
5059
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
5160

5261
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
@@ -196,13 +205,10 @@ protected void sendMessage(Message<?> messageArg) {
196205
if (message == null) {
197206
throw new MessagingException("cannot send a null message");
198207
}
199-
if (this.shouldTrack) {
200-
message = MessageHistory.write(message, this, getMessageBuilderFactory());
201-
}
208+
message = trackMessageIfAny(message);
202209
try {
203-
MessageChannel messageChannel = getOutputChannel();
204-
Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
205-
this.messagingTemplate.send(messageChannel, message);
210+
MessageChannel outputChannel = getRequiredOutputChannel();
211+
this.messagingTemplate.send(outputChannel, message);
206212
}
207213
catch (RuntimeException ex) {
208214
if (!sendErrorMessageIfNecessary(message, ex)) {
@@ -211,14 +217,37 @@ protected void sendMessage(Message<?> messageArg) {
211217
}
212218
}
213219

220+
protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
221+
MessageChannel outputChannel = getRequiredOutputChannel();
222+
Flux<Message<?>> messageFlux =
223+
Flux.from(publisher)
224+
.map(this::trackMessageIfAny);
225+
if (outputChannel instanceof ReactiveStreamsSubscribableChannel) {
226+
((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(messageFlux);
227+
}
228+
else {
229+
this.reactiveSubscriptions.add(
230+
messageFlux
231+
.doOnNext((message) -> {
232+
try {
233+
sendMessage(message);
234+
}
235+
catch (Exception ex) {
236+
logger.error("Error sending a message: " + message, ex);
237+
}
238+
})
239+
.subscribe());
240+
}
241+
}
242+
214243
/**
215244
* Send an error message based on the exception and message.
216245
* @param message the message.
217246
* @param exception the exception.
218247
* @return true if the error channel is available and message sent.
219248
* @since 4.3.10
220249
*/
221-
protected final boolean sendErrorMessageIfNecessary(Message<?> message, RuntimeException exception) {
250+
protected final boolean sendErrorMessageIfNecessary(Message<?> message, Exception exception) {
222251
MessageChannel channel = getErrorChannel();
223252
if (channel != null) {
224253
this.messagingTemplate.send(channel, buildErrorMessage(message, exception));
@@ -235,9 +264,8 @@ protected final boolean sendErrorMessageIfNecessary(Message<?> message, RuntimeE
235264
* @return the error message.
236265
* @since 4.3.10
237266
*/
238-
protected final ErrorMessage buildErrorMessage(Message<?> message, RuntimeException exception) {
239-
return this.errorMessageStrategy.buildErrorMessage(exception,
240-
getErrorMessageAttributes(message));
267+
protected final ErrorMessage buildErrorMessage(Message<?> message, Exception exception) {
268+
return this.errorMessageStrategy.buildErrorMessage(exception, getErrorMessageAttributes(message));
241269
}
242270

243271
/**
@@ -252,4 +280,25 @@ protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
252280
return ErrorMessageUtils.getAttributeAccessor(message, null);
253281
}
254282

283+
private MessageChannel getRequiredOutputChannel() {
284+
MessageChannel messageChannel = getOutputChannel();
285+
Assert.state(messageChannel != null, "The 'outputChannel' or `outputChannelName` must be configured");
286+
return messageChannel;
287+
}
288+
289+
private Message<?> trackMessageIfAny(Message<?> message) {
290+
if (this.shouldTrack) {
291+
return MessageHistory.write(message, this, getMessageBuilderFactory());
292+
}
293+
else {
294+
return message;
295+
}
296+
}
297+
298+
@Override
299+
public void destroy() {
300+
this.reactiveSubscriptions.dispose();
301+
super.destroy();
302+
}
303+
255304
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.endpoint;
18+
19+
import org.junit.jupiter.api.Test;
20+
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.context.annotation.Bean;
23+
import org.springframework.context.annotation.Configuration;
24+
import org.springframework.integration.channel.FluxMessageChannel;
25+
import org.springframework.integration.config.EnableIntegration;
26+
import org.springframework.messaging.Message;
27+
import org.springframework.messaging.support.GenericMessage;
28+
import org.springframework.test.annotation.DirtiesContext;
29+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
30+
31+
import reactor.core.publisher.Flux;
32+
import reactor.test.StepVerifier;
33+
34+
/**
35+
* @author Artem Bilan
36+
*
37+
* @since 5.3
38+
*/
39+
@SpringJUnitConfig
40+
@DirtiesContext
41+
public class ReactiveMessageProducerTests {
42+
43+
@Autowired
44+
public FluxMessageChannel fluxMessageChannel;
45+
46+
@Test
47+
public void test() {
48+
StepVerifier.create(
49+
Flux.from(this.fluxMessageChannel)
50+
.map(Message::getPayload)
51+
.cast(String.class))
52+
.expectNext("test1", "test2")
53+
.thenCancel()
54+
.verify();
55+
}
56+
57+
@Configuration
58+
@EnableIntegration
59+
public static class Config {
60+
61+
@Bean
62+
public FluxMessageChannel fluxMessageChannel() {
63+
return new FluxMessageChannel();
64+
}
65+
66+
@Bean
67+
public MessageProducerSupport producer() {
68+
MessageProducerSupport producer =
69+
new MessageProducerSupport() {
70+
71+
@Override
72+
protected void doStart() {
73+
subscribeToPublisher(Flux.just("test1", "test2").map(GenericMessage::new));
74+
}
75+
76+
};
77+
producer.setOutputChannel(fluxMessageChannel());
78+
return producer;
79+
}
80+
81+
}
82+
83+
}

0 commit comments

Comments
 (0)