From cca9ac11dadfda8e39f82a9cc9192dc6c02ff9b2 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 3 May 2018 14:49:37 -0400 Subject: [PATCH 1/2] Reactive message-driven adapter adapter PoC - adapt SI message-driven adapters to send one initial message with a flux payload during startup and then send messages via the flux as they arrive. The test case shows 2 services, one that updates the flux and a second one that subscribes to the modified flux. --- .../endpoint/MessageProducerSupport.java | 51 +++++++- .../ReactiveMessageProducerTests.java | 111 ++++++++++++++++++ 2 files changed, 157 insertions(+), 5 deletions(-) create mode 100644 spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index d920a31c14b..4b7f1e234b1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -16,6 +16,9 @@ package org.springframework.integration.endpoint; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.core.AttributeAccessor; @@ -30,9 +33,12 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; + /** * A support class for producer endpoints that provides a setter for the * output channel and a convenience method for sending Messages. @@ -46,6 +52,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements private final MessagingTemplate messagingTemplate = new MessagingTemplate(); + private final BlockingQueue> sinkQueue = new LinkedBlockingQueue<>(); + private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); private volatile MessageChannel outputChannel; @@ -58,6 +66,10 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements private volatile boolean shouldTrack = false; + private boolean reactive; + + private Flux> flux; + protected MessageProducerSupport() { this.setPhase(Integer.MAX_VALUE / 2); } @@ -151,6 +163,14 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat this.errorMessageStrategy = errorMessageStrategy; } + public boolean isReactive() { + return this.reactive; + } + + public void setReactive(boolean reactive) { + this.reactive = reactive; + } + protected MessagingTemplate getMessagingTemplate() { return this.messagingTemplate; } @@ -182,6 +202,17 @@ protected void onInit() { */ @Override protected void doStart() { + if (this.reactive && this.flux == null) { + this.flux = Flux.generate(sink -> { + try { + sink.next(this.sinkQueue.take()); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + this.messagingTemplate.send(getOutputChannel(), new GenericMessage<>(this.flux)); + } } /** @@ -199,12 +230,22 @@ protected void sendMessage(Message message) { if (this.shouldTrack) { message = MessageHistory.write(message, this, this.getMessageBuilderFactory()); } - try { - this.messagingTemplate.send(getOutputChannel(), message); + if (this.flux != null) { + try { + this.sinkQueue.put(message); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - catch (RuntimeException e) { - if (!sendErrorMessageIfNecessary(message, e)) { - throw e; + else { + try { + this.messagingTemplate.send(getOutputChannel(), message); + } + catch (RuntimeException e) { + if (!sendErrorMessageIfNecessary(message, e)) { + throw e; + } } } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java new file mode 100644 index 00000000000..fa957bbe698 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java @@ -0,0 +1,111 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Flux; + +/** + * @author Gary Russell + * @since 5.1 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class ReactiveMessageProducerTests { + + @Autowired + public Config config; + + @Test + public void test() throws Exception { + for (int i = 0; i < 5; i++) { + this.config.producer().produce(); + } + assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.received.get(0).getPayload()).isEqualTo("FOO"); + } + + @Configuration + @EnableIntegration + public static class Config { + + private final List> received = new ArrayList<>(); + + private final CountDownLatch latch = new CountDownLatch(5); + + @Bean + public MyProducer producer() { + MyProducer producer = new MyProducer(); + producer.setReactive(true); + producer.setOutputChannelName("in"); + return producer; + } + + @ServiceActivator(inputChannel = "in", outputChannel = "out") + public Flux> handle1(Flux> flux) { + return flux.map(m -> MessageBuilder.withPayload(((String) m.getPayload()).toUpperCase()).build()); + } + + @ServiceActivator(inputChannel = "out") + public void handle2(final Flux> flux) { + Executors.newSingleThreadExecutor().execute(() -> { + flux.map(m -> { + this.received.add(m); + System.out.println(m); + latch.countDown(); + return m; + }) + .subscribe(); + }); + } + + } + + private static class MyProducer extends MessageProducerSupport { + + MyProducer() { + super(); + } + + void produce() { + sendMessage(new GenericMessage<>("foo")); + } + + } + +} From b8eb602c30c0c93d5c0822634dfb720332682e98 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 3 May 2018 16:34:21 -0400 Subject: [PATCH 2/2] Use Flux.create() like in the FluxMessageChannel and push instead of pull. --- .../endpoint/MessageProducerSupport.java | 27 ++++++------------- .../ReactiveMessageProducerTests.java | 17 +++++------- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index 4b7f1e234b1..b5da2bd2b30 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -16,9 +16,6 @@ package org.springframework.integration.endpoint; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.core.AttributeAccessor; @@ -38,6 +35,7 @@ import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; /** * A support class for producer endpoints that provides a setter for the @@ -52,8 +50,6 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements private final MessagingTemplate messagingTemplate = new MessagingTemplate(); - private final BlockingQueue> sinkQueue = new LinkedBlockingQueue<>(); - private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); private volatile MessageChannel outputChannel; @@ -70,6 +66,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements private Flux> flux; + private volatile FluxSink> sink; + protected MessageProducerSupport() { this.setPhase(Integer.MAX_VALUE / 2); } @@ -203,14 +201,10 @@ protected void onInit() { @Override protected void doStart() { if (this.reactive && this.flux == null) { - this.flux = Flux.generate(sink -> { - try { - sink.next(this.sinkQueue.take()); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); + this.flux = + Flux.>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.IGNORE) + .publish() + .autoConnect(); this.messagingTemplate.send(getOutputChannel(), new GenericMessage<>(this.flux)); } } @@ -231,12 +225,7 @@ protected void sendMessage(Message message) { message = MessageHistory.write(message, this, this.getMessageBuilderFactory()); } if (this.flux != null) { - try { - this.sinkQueue.put(message); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + this.sink.next(message); } else { try { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java index fa957bbe698..cb527d4c953 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -83,15 +82,13 @@ public Flux> handle1(Flux> flux) { @ServiceActivator(inputChannel = "out") public void handle2(final Flux> flux) { - Executors.newSingleThreadExecutor().execute(() -> { - flux.map(m -> { - this.received.add(m); - System.out.println(m); - latch.countDown(); - return m; - }) - .subscribe(); - }); + flux.map(m -> { + this.received.add(m); + System.out.println(m); + latch.countDown(); + return m; + }) + .subscribe(); } }