Skip to content

Commit 69401c2

Browse files
bsideupartembilan
authored andcommitted
Back-pressure tests for SubscribableChPubAdapter
* Add back-pressure tests for SubscribableChannelPublisherAdapter
1 parent e2ba60f commit 69401c2

File tree

1 file changed

+101
-0
lines changed

1 file changed

+101
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.channel;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.time.Duration;
22+
23+
import org.junit.Test;
24+
25+
import org.springframework.messaging.SubscribableChannel;
26+
import org.springframework.messaging.support.GenericMessage;
27+
28+
import reactor.core.Disposable;
29+
import reactor.core.Disposables;
30+
import reactor.core.scheduler.Schedulers;
31+
import reactor.test.StepVerifier;
32+
import reactor.util.concurrent.Queues;
33+
34+
public class MessageChannelReactiveUtilsTest {
35+
36+
@Test
37+
public void testBackpressureWithSubscribableChannel() {
38+
Disposable.Composite compositeDisposable = Disposables.composite();
39+
try {
40+
DirectChannel channel = new DirectChannel();
41+
assertThat(channel).isInstanceOf(SubscribableChannel.class);
42+
int initialRequest = 10;
43+
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
44+
.expectSubscription()
45+
.then(() -> {
46+
compositeDisposable.add(
47+
Schedulers.boundedElastic().schedule(() -> {
48+
while (true) {
49+
if (channel.getSubscriberCount() > 0) {
50+
channel.send(new GenericMessage<>("foo"));
51+
}
52+
}
53+
})
54+
);
55+
})
56+
.expectNextCount(initialRequest)
57+
.expectNoEvent(Duration.ofMillis(100))
58+
.thenCancel()
59+
.verify(Duration.ofSeconds(1));
60+
}
61+
finally {
62+
compositeDisposable.dispose();
63+
}
64+
}
65+
66+
@Test
67+
public void testOverproducingWithSubscribableChannel() {
68+
DirectChannel channel = new DirectChannel();
69+
channel.setCountsEnabled(true);
70+
assertThat(channel).isInstanceOf(SubscribableChannel.class);
71+
72+
Disposable.Composite compositeDisposable = Disposables.composite();
73+
try {
74+
int initialRequest = 10;
75+
StepVerifier.create(MessageChannelReactiveUtils.toPublisher(channel), initialRequest)
76+
.expectSubscription()
77+
.then(() -> {
78+
compositeDisposable.add(
79+
Schedulers.boundedElastic().schedule(() -> {
80+
while (true) {
81+
if (channel.getSubscriberCount() > 0) {
82+
channel.send(new GenericMessage<>("foo"));
83+
}
84+
}
85+
})
86+
);
87+
})
88+
.expectNextCount(initialRequest)
89+
.thenAwait(Duration.ofMillis(100))
90+
.thenCancel()
91+
.verify(Duration.ofSeconds(1));
92+
}
93+
finally {
94+
compositeDisposable.dispose();
95+
}
96+
97+
assertThat(channel.getMetrics().getSendCountLong())
98+
.as("produced")
99+
.isLessThanOrEqualTo(Queues.SMALL_BUFFER_SIZE);
100+
}
101+
}

0 commit comments

Comments
 (0)