Skip to content

Commit a0cdfd9

Browse files
committed
Change adaptPollableChannelToPublisher to Mono
We can use a `Mono.fromCallable()` in combination with `repeat()` instead of possible race condition deal in the `Flux.create()`
1 parent 1ff69d4 commit a0cdfd9

File tree

1 file changed

+8
-28
lines changed

1 file changed

+8
-28
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/MessageChannelReactiveUtils.java

+8-28
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616

1717
package org.springframework.integration.channel;
1818

19+
import java.time.Duration;
20+
1921
import org.reactivestreams.Publisher;
20-
import org.reactivestreams.Subscriber;
2122

2223
import org.springframework.messaging.Message;
2324
import org.springframework.messaging.MessageChannel;
@@ -27,6 +28,7 @@
2728

2829
import reactor.core.publisher.EmitterProcessor;
2930
import reactor.core.publisher.Flux;
31+
import reactor.core.publisher.Mono;
3032
import reactor.core.scheduler.Schedulers;
3133

3234
/**
@@ -69,34 +71,12 @@ private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(Sub
6971
});
7072
}
7173

74+
@SuppressWarnings("unchecked")
7275
private static <T> Publisher<Message<T>> adaptPollableChannelToPublisher(PollableChannel inputChannel) {
73-
return new PollableChannelPublisherAdapter<>(inputChannel);
74-
}
75-
76-
private static final class PollableChannelPublisherAdapter<T> implements Publisher<Message<T>> {
77-
78-
private final PollableChannel channel;
79-
80-
PollableChannelPublisherAdapter(final PollableChannel channel) {
81-
this.channel = channel;
82-
}
83-
84-
@Override
85-
@SuppressWarnings("unchecked")
86-
public void subscribe(Subscriber<? super Message<T>> subscriber) {
87-
Flux
88-
.<Message<T>>create(sink ->
89-
sink.onRequest(n -> {
90-
Message<?> m;
91-
while (!sink.isCancelled() && n-- > 0
92-
&& (m = this.channel.receive()) != null) { // NOSONAR
93-
sink.next((Message<T>) m);
94-
}
95-
}))
96-
.subscribeOn(Schedulers.elastic())
97-
.subscribe(subscriber);
98-
}
99-
76+
return Mono.fromCallable(() -> (Message<T>) inputChannel.receive())
77+
.subscribeOn(Schedulers.boundedElastic())
78+
.repeatWhenEmpty(it -> it.delayElements(Duration.ofMillis(10)))
79+
.repeat();
10080
}
10181

10282
}

0 commit comments

Comments
 (0)