Skip to content

Commit 94e0816

Browse files
artembilangaryrussell
authored andcommitted
Use EmitterProcessor in the FluxMessageChannel (#3104)
* Use EmitterProcessor in the FluxMessageChannel The `EmitterProcessor` has a good logic to block upstream producer when its downstream subscriber cannot keep up with overproducing. * Rework `FluxMessageChannel` logic to rely on the `EmitterProcessor` instead of `Flux.create()` * Cancel `FluxMessageChannel` internal subscriptions in the `destroy()` * Fix `ReactiveStreamsTests.testFluxTransform()` for the splitter's delimiter * Ensure in the `FluxMessageChannelTests.testFluxMessageChannel` that we can have several concurrent subscribers to the `FluxMessageChannel` * * Use `flux.onComplete()` instead of iteration over subscribers * Change `subscribers` list into just `AtomicInteger` count marker * fix `DefaultSplitterTests` according a new logic in the `FluxMessageChannel` * GH-3107: Add errorOnTimeout for TcpInboundGateway Fixes #3107 The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw a `MessageTimeoutException` when downstream reply doesn't come back in time for configured reply timeout * Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor property * Add new factory methods into a `Tcp` factory for Java DSL * Ensure a property works as expected in the `IpIntegrationTests` * Document a new option * * Use `delaySubscription()` for subscribing publishers in the `FluxMessageChannel` to wait until this one subscribed. * Use an `EmitterProcessor` to catch subscriptions and pass them as a signal to delayed upstream publishers * Fix `FluxMessageChannelTests.testFluxMessageChannelCleanUp` to verify an actual property instead of removed. * Fix `RSocketOutboundGatewayIntegrationTests` for the proper subscription into a `FluxMessageChannel` before actual interaction with an RSocket gateway. This should help us also to avoid some race conditions in the future * Revert "GH-3107: Add errorOnTimeout for TcpInboundGateway" This reverts commit fa6119d. * * Refactor `FluxMessageChannel` to use `ReplayProcessor` for `subscribedSignal`. This one is used `delaySubscription` for the upstream publishers * Use a `AtomicBoolean` for subscription state since `doOnSubscribe()` is called before `EmitterProcessor` adds subscribers for its `downstreams` * Use `publishOn(Schedulers.boundedElastic())` for upstream publishers to avoid blocking over there when our `EmitterProcessor` doesn't have enough demand * Refactor reactive tests to have a subscription into the `FluxMessageChannel` earlier than emission happens for it * * Use `Flux.subscribe(Consumer)` instead of `doOnNext(Consumer).subscribe()` * * Emit `subscribedSignal` value after `.subscribe(subscriber)` instead of `doOnSubscribe` * Check for `this.processor.hasDownstreams()` before emitting such an event * * Use `this.processor.hasDownstreams()` as a value to emit for `subscribedSignal`. This way we are less vulnerable race conditions when subscribers are changed actively
1 parent 762f839 commit 94e0816

File tree

2 files changed

+57
-50
lines changed

2 files changed

+57
-50
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

+31-34
Original file line numberDiff line numberDiff line change
@@ -16,82 +16,79 @@
1616

1717
package org.springframework.integration.channel;
1818

19-
import java.util.ArrayList;
20-
import java.util.List;
21-
import java.util.Map;
22-
import java.util.concurrent.ConcurrentHashMap;
23-
2419
import org.reactivestreams.Publisher;
2520
import org.reactivestreams.Subscriber;
2621

2722
import org.springframework.messaging.Message;
2823
import org.springframework.util.Assert;
2924

30-
import reactor.core.publisher.ConnectableFlux;
25+
import reactor.core.publisher.EmitterProcessor;
3126
import reactor.core.publisher.Flux;
3227
import reactor.core.publisher.FluxSink;
28+
import reactor.core.publisher.ReplayProcessor;
29+
import reactor.core.scheduler.Schedulers;
3330

3431
/**
3532
* The {@link AbstractMessageChannel} implementation for the
3633
* Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}.
3734
*
3835
* @author Artem Bilan
3936
* @author Gary Russell
37+
* @author Sergei Egorov
4038
*
4139
* @since 5.0
4240
*/
4341
public class FluxMessageChannel extends AbstractMessageChannel
4442
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
4543

46-
private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList<>();
47-
48-
private final Map<Publisher<? extends Message<?>>, ConnectableFlux<?>> publishers = new ConcurrentHashMap<>();
44+
private final EmitterProcessor<Message<?>> processor;
4945

50-
private final Flux<Message<?>> flux;
46+
private final FluxSink<Message<?>> sink;
5147

52-
private FluxSink<Message<?>> sink;
48+
private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1);
5349

5450
public FluxMessageChannel() {
55-
this.flux =
56-
Flux.<Message<?>>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.IGNORE)
57-
.publish()
58-
.autoConnect();
51+
this.processor = EmitterProcessor.create(1, false);
52+
this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
5953
}
6054

6155
@Override
6256
protected boolean doSend(Message<?> message, long timeout) {
63-
Assert.state(this.subscribers.size() > 0,
57+
Assert.state(this.processor.hasDownstreams(),
6458
() -> "The [" + this + "] doesn't have subscribers to accept messages");
6559
this.sink.next(message);
6660
return true;
6761
}
6862

6963
@Override
7064
public void subscribe(Subscriber<? super Message<?>> subscriber) {
71-
this.subscribers.add(subscriber);
72-
73-
this.flux.doOnCancel(() -> this.subscribers.remove(subscriber))
74-
.retry()
65+
this.processor
66+
.doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams()))
7567
.subscribe(subscriber);
76-
77-
this.publishers.values().forEach(ConnectableFlux::connect);
68+
this.subscribedSignal.onNext(this.processor.hasDownstreams());
7869
}
7970

8071
@Override
8172
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
82-
ConnectableFlux<?> connectableFlux =
83-
Flux.from(publisher)
84-
.handle((message, sink) -> sink.next(send(message)))
85-
.onErrorContinue((throwable, event) ->
86-
logger.warn("Error during processing event: " + event, throwable))
87-
.doOnComplete(() -> this.publishers.remove(publisher))
88-
.publish();
89-
90-
this.publishers.put(publisher, connectableFlux);
73+
Flux.from(publisher)
74+
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
75+
.publishOn(Schedulers.boundedElastic())
76+
.doOnNext((message) -> {
77+
try {
78+
send(message);
79+
}
80+
catch (Exception e) {
81+
logger.warn("Error during processing event: " + message, e);
82+
}
83+
})
84+
.subscribe();
85+
}
9186

92-
if (!this.subscribers.isEmpty()) {
93-
connectableFlux.connect();
94-
}
87+
@Override
88+
public void destroy() {
89+
this.subscribedSignal.onNext(false);
90+
this.processor.onComplete();
91+
super.destroy();
9592
}
9693

9794
}

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

+26-16
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23-
import java.util.Map;
2423
import java.util.concurrent.CountDownLatch;
2524
import java.util.concurrent.TimeUnit;
25+
import java.util.stream.Collectors;
2626
import java.util.stream.IntStream;
2727

28-
import org.junit.Test;
29-
import org.junit.runner.RunWith;
28+
import org.junit.jupiter.api.Test;
3029

3130
import org.springframework.beans.factory.annotation.Autowired;
3231
import org.springframework.context.annotation.Bean;
3332
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.integration.annotation.BridgeFrom;
3434
import org.springframework.integration.annotation.ServiceActivator;
3535
import org.springframework.integration.channel.FluxMessageChannel;
3636
import org.springframework.integration.channel.MessageChannelReactiveUtils;
@@ -47,24 +47,26 @@
4747
import org.springframework.messaging.support.GenericMessage;
4848
import org.springframework.messaging.support.MessageBuilder;
4949
import org.springframework.test.annotation.DirtiesContext;
50-
import org.springframework.test.context.junit4.SpringRunner;
50+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5151

52+
import reactor.core.Disposable;
53+
import reactor.core.publisher.EmitterProcessor;
5254
import reactor.core.publisher.Flux;
5355

5456
/**
5557
* @author Artem Bilan
5658
*
5759
* @since 5.0
5860
*/
59-
@RunWith(SpringRunner.class)
61+
@SpringJUnitConfig
6062
@DirtiesContext
6163
public class FluxMessageChannelTests {
6264

6365
@Autowired
6466
private MessageChannel fluxMessageChannel;
6567

6668
@Autowired
67-
private MessageChannel queueChannel;
69+
private QueueChannel queueChannel;
6870

6971
@Autowired
7072
private PollableChannel errorChannel;
@@ -73,7 +75,7 @@ public class FluxMessageChannelTests {
7375
private IntegrationFlowContext integrationFlowContext;
7476

7577
@Test
76-
public void testFluxMessageChannel() {
78+
void testFluxMessageChannel() {
7779
QueueChannel replyChannel = new QueueChannel();
7880

7981
for (int i = 0; i < 10; i++) {
@@ -90,28 +92,35 @@ public void testFluxMessageChannel() {
9092
Message<?> error = this.errorChannel.receive(0);
9193
assertThat(error).isNotNull();
9294
assertThat(((MessagingException) error.getPayload()).getFailedMessage().getPayload()).isEqualTo(5);
95+
96+
List<Message<?>> messages = this.queueChannel.clear();
97+
assertThat(messages).extracting((message) -> (Integer) message.getPayload())
98+
.containsAll(IntStream.range(0, 10).boxed().collect(Collectors.toList()));
9399
}
94100

95101
@Test
96-
public void testMessageChannelReactiveAdaptation() throws InterruptedException {
102+
void testMessageChannelReactiveAdaptation() throws InterruptedException {
97103
CountDownLatch done = new CountDownLatch(2);
98104
List<String> results = new ArrayList<>();
99105

100-
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
101-
.map(Message::getPayload)
102-
.map(String::toUpperCase)
103-
.doOnNext(results::add)
104-
.subscribe(v -> done.countDown());
106+
Disposable disposable =
107+
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
108+
.map(Message::getPayload)
109+
.map(String::toUpperCase)
110+
.doOnNext(results::add)
111+
.subscribe(v -> done.countDown());
105112

106113
this.queueChannel.send(new GenericMessage<>("foo"));
107114
this.queueChannel.send(new GenericMessage<>("bar"));
108115

109116
assertThat(done.await(10, TimeUnit.SECONDS)).isTrue();
110117
assertThat(results).containsExactly("FOO", "BAR");
118+
119+
disposable.dispose();
111120
}
112121

113122
@Test
114-
public void testFluxMessageChannelCleanUp() throws InterruptedException {
123+
void testFluxMessageChannelCleanUp() throws InterruptedException {
115124
FluxMessageChannel flux = MessageChannels.flux().get();
116125

117126
CountDownLatch finishLatch = new CountDownLatch(1);
@@ -130,9 +139,9 @@ public void testFluxMessageChannelCleanUp() throws InterruptedException {
130139

131140
assertThat(finishLatch.await(10, TimeUnit.SECONDS)).isTrue();
132141

133-
assertThat(TestUtils.getPropertyValue(flux, "publishers", Map.class).isEmpty()).isTrue();
134-
135142
flowRegistration.destroy();
143+
144+
assertThat(TestUtils.getPropertyValue(flux, "processor", EmitterProcessor.class).isTerminated()).isTrue();
136145
}
137146

138147
@Configuration
@@ -158,6 +167,7 @@ public String handle(int payload) {
158167
}
159168

160169
@Bean
170+
@BridgeFrom("fluxMessageChannel")
161171
public MessageChannel queueChannel() {
162172
return new QueueChannel();
163173
}

0 commit comments

Comments
 (0)