Skip to content

Commit 845a396

Browse files
committed
Revert testReactiveStreamsConsumerPollableChannel
https://build.spring.io/browse/INT-MASTER-1776/ Looks like there is a race condition between subscribers
1 parent 36c9f72 commit 845a396

File tree

1 file changed

+22
-22
lines changed

1 file changed

+22
-22
lines changed

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.willAnswer;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.never;
25+
import static org.mockito.Mockito.times;
2426
import static org.mockito.Mockito.verify;
2527

2628
import java.util.LinkedList;
@@ -31,6 +33,7 @@
3133
import java.util.concurrent.TimeUnit;
3234

3335
import org.junit.Test;
36+
import org.mockito.ArgumentCaptor;
3437
import org.mockito.Mockito;
3538
import org.reactivestreams.Subscriber;
3639
import org.reactivestreams.Subscription;
@@ -160,31 +163,17 @@ public void onComplete() {
160163
public void testReactiveStreamsConsumerPollableChannel() throws InterruptedException {
161164
QueueChannel testChannel = new QueueChannel();
162165

163-
BlockingQueue<Message<?>> messages = new LinkedBlockingQueue<>();
164-
165-
Subscriber<Message<?>> testSubscriber = Mockito.spy(new Subscriber<Message<?>>() {
166-
167-
@Override
168-
public void onSubscribe(Subscription subscription) {
169-
subscription.request(2);
170-
}
171-
172-
@Override
173-
public void onNext(Message<?> message) {
174-
messages.offer(message);
175-
}
176-
177-
@Override
178-
public void onError(Throwable t) {
166+
Subscriber<Message<?>> testSubscriber = (Subscriber<Message<?>>) Mockito.mock(Subscriber.class);
179167

180-
}
168+
BlockingQueue<Message<?>> messages = new LinkedBlockingQueue<>();
181169

182-
@Override
183-
public void onComplete() {
170+
willAnswer(i -> {
171+
messages.put(i.getArgument(0));
172+
return null;
173+
})
174+
.given(testSubscriber)
175+
.onNext(any(Message.class));
184176

185-
}
186-
187-
});
188177
ReactiveStreamsConsumer reactiveConsumer = new ReactiveStreamsConsumer(testChannel, testSubscriber);
189178
reactiveConsumer.setBeanFactory(mock(BeanFactory.class));
190179
reactiveConsumer.afterPropertiesSet();
@@ -193,6 +182,12 @@ public void onComplete() {
193182
Message<?> testMessage = new GenericMessage<>("test");
194183
testChannel.send(testMessage);
195184

185+
ArgumentCaptor<Subscription> subscriptionArgumentCaptor = ArgumentCaptor.forClass(Subscription.class);
186+
verify(testSubscriber).onSubscribe(subscriptionArgumentCaptor.capture());
187+
Subscription subscription = subscriptionArgumentCaptor.getValue();
188+
189+
subscription.request(1);
190+
196191
Message<?> message = messages.poll(10, TimeUnit.SECONDS);
197192
assertThat(message).isSameAs(testMessage);
198193

@@ -203,6 +198,11 @@ public void onComplete() {
203198

204199
reactiveConsumer.start();
205200

201+
verify(testSubscriber, times(2)).onSubscribe(subscriptionArgumentCaptor.capture());
202+
subscription = subscriptionArgumentCaptor.getValue();
203+
204+
subscription.request(2);
205+
206206
Message<?> testMessage2 = new GenericMessage<>("test2");
207207

208208
testChannel.send(testMessage2);

0 commit comments

Comments
 (0)