Skip to content

Commit 3bfcc40

Browse files
committed
* Add FluxAggregatorMessageHandlerTests
* Fix `FluxAggregatorMessageHandler` for the default `messageForWindowFlux` to rely on the `publish().autoConnect()` to build a target `Flux` for window and also copy headers from the first `Message` in the window. Looks like `switchOnFirst()` doesn't work somehow with windows or I just don't use it properly, although it isn't clear how to continue chain but get the whole `Flux` as a single entry for message payload
1 parent 3ec0b3a commit 3bfcc40

File tree

2 files changed

+180
-25
lines changed

2 files changed

+180
-25
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java

+8-14
Original file line numberDiff line numberDiff line change
@@ -165,20 +165,14 @@ protected void handleMessageInternal(Message<?> message) {
165165
}
166166

167167
private Mono<Message<?>> messageForWindowFlux(Flux<Message<?>> messageFlux) {
168-
return messageFlux
169-
.switchOnFirst((signal, window) -> {
170-
if (signal.hasValue()) {
171-
return Flux.<Message<?>>just(
172-
getMessageBuilderFactory()
173-
.withPayload(window)
174-
.copyHeaders(signal.get().getHeaders())
175-
.build());
176-
}
177-
else {
178-
return window.thenMany(Flux.empty());
179-
}
180-
})
181-
.next();
168+
Flux<Message<?>> window = messageFlux.publish().autoConnect();
169+
return window
170+
.next()
171+
.map((first) ->
172+
getMessageBuilderFactory()
173+
.withPayload(Flux.concat(Mono.just(first), window))
174+
.copyHeaders(first.getHeaders())
175+
.build());
182176
}
183177

184178
private static Integer sequenceSizeHeader(Message<?> message) {

spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java

+172-11
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,31 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.time.Duration;
22+
import java.util.List;
23+
import java.util.Objects;
24+
import java.util.concurrent.Executors;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.IntStream;
27+
2128
import org.junit.jupiter.api.Test;
2229

2330
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2431
import org.springframework.integration.channel.QueueChannel;
2532
import org.springframework.integration.support.MessageBuilder;
2633
import org.springframework.messaging.Message;
2734
import org.springframework.messaging.MessageHeaders;
35+
import org.springframework.messaging.support.GenericMessage;
2836

2937
import reactor.core.publisher.Flux;
3038
import reactor.test.StepVerifier;
31-
import reactor.util.Loggers;
3239

3340
/**
3441
* @author Artem Bilan
3542
*
3643
* @since 5.2
3744
*/
45+
@SuppressWarnings("unchecked")
3846
class FluxAggregatorMessageHandlerTests {
3947

4048
@Test
@@ -62,23 +70,176 @@ void testDefaultAggregation() {
6270
Object payload = result.getPayload();
6371
assertThat(payload).isInstanceOf(Flux.class);
6472

65-
Loggers.useVerboseConsoleLoggers();
66-
67-
@SuppressWarnings("unchecked")
6873
Flux<Message<?>> window = (Flux<Message<?>>) payload;
6974

7075
StepVerifier.create(
7176
window.map(Message::getPayload)
72-
.cast(String.class)
73-
.log())
74-
.expectNext("0", "2", "4", "6", "8", "10", "12", "14", "16", "18", "20")
75-
/*.expectNextSequence(
77+
.cast(String.class))
78+
.expectNextSequence(
7679
IntStream.iterate(0, i -> i + 2)
7780
.limit(10)
7881
.mapToObj(Objects::toString)
79-
.collect(Collectors.toList()))*/
80-
.thenCancel()
81-
.verify();
82+
.collect(Collectors.toList()))
83+
.verifyComplete();
84+
85+
result = resultChannel.receive(10_000);
86+
assertThat(result).isNotNull()
87+
.extracting(Message::getHeaders)
88+
.satisfies((headers) ->
89+
assertThat((MessageHeaders) headers)
90+
.containsEntry(IntegrationMessageHeaderAccessor.CORRELATION_ID, 1));
91+
92+
payload = result.getPayload();
93+
window = (Flux<Message<?>>) payload;
94+
95+
StepVerifier.create(
96+
window.map(Message::getPayload)
97+
.cast(String.class))
98+
.expectNextSequence(
99+
IntStream.iterate(1, i -> i + 2)
100+
.limit(10)
101+
.mapToObj(Objects::toString)
102+
.collect(Collectors.toList()))
103+
.verifyComplete();
104+
}
105+
106+
@Test
107+
void testCustomCombineFunction() {
108+
QueueChannel resultChannel = new QueueChannel();
109+
FluxAggregatorMessageHandler fluxAggregatorMessageHandler = new FluxAggregatorMessageHandler();
110+
fluxAggregatorMessageHandler.setOutputChannel(resultChannel);
111+
fluxAggregatorMessageHandler.setWindowSize(10);
112+
fluxAggregatorMessageHandler.setCombineFunction(
113+
(messageFlux) ->
114+
messageFlux
115+
.map(Message::getPayload)
116+
.collectList()
117+
.map(GenericMessage::new));
118+
119+
for (int i = 0; i < 20; i++) {
120+
Message<?> messageToAggregate =
121+
MessageBuilder.withPayload(i)
122+
.setCorrelationId(i % 2)
123+
.build();
124+
fluxAggregatorMessageHandler.handleMessage(messageToAggregate);
125+
}
126+
127+
Message<?> result = resultChannel.receive(10_000);
128+
assertThat(result).isNotNull();
129+
130+
Object payload = result.getPayload();
131+
assertThat(payload)
132+
.isInstanceOf(List.class)
133+
.asList()
134+
.containsExactly(
135+
IntStream.iterate(0, i -> i + 2)
136+
.limit(10)
137+
.boxed()
138+
.toArray());
139+
140+
result = resultChannel.receive(10_000);
141+
assertThat(result).isNotNull();
142+
143+
payload = result.getPayload();
144+
assertThat(payload)
145+
.isInstanceOf(List.class)
146+
.asList()
147+
.containsExactly(
148+
IntStream.iterate(1, i -> i + 2)
149+
.limit(10)
150+
.boxed()
151+
.toArray());
152+
}
153+
154+
@Test
155+
void testWindowTimespan() {
156+
QueueChannel resultChannel = new QueueChannel();
157+
FluxAggregatorMessageHandler fluxAggregatorMessageHandler = new FluxAggregatorMessageHandler();
158+
fluxAggregatorMessageHandler.setOutputChannel(resultChannel);
159+
fluxAggregatorMessageHandler.setWindowTimespan(Duration.ofMillis(100));
160+
161+
Executors.newSingleThreadExecutor()
162+
.submit(() -> {
163+
for (int i = 0; i < 10; i++) {
164+
Message<?> messageToAggregate =
165+
MessageBuilder.withPayload(i)
166+
.setCorrelationId("1")
167+
.build();
168+
fluxAggregatorMessageHandler.handleMessage(messageToAggregate);
169+
Thread.sleep(20);
170+
}
171+
return null;
172+
});
173+
174+
Message<?> result = resultChannel.receive(10_000);
175+
assertThat(result).isNotNull();
176+
177+
Flux<Message<?>> window = (Flux<Message<?>>) result.getPayload();
178+
179+
List<Integer> messageList =
180+
window.map(Message::getPayload)
181+
.cast(Integer.class)
182+
.collectList()
183+
.block(Duration.ofSeconds(10));
184+
185+
assertThat(messageList)
186+
.isNotEmpty()
187+
.hasSizeLessThan(10)
188+
.contains(0, 1);
189+
190+
result = resultChannel.receive(10_000);
191+
assertThat(result).isNotNull();
192+
193+
window = (Flux<Message<?>>) result.getPayload();
194+
195+
messageList =
196+
window.map(Message::getPayload)
197+
.cast(Integer.class)
198+
.collectList()
199+
.block(Duration.ofSeconds(10));
200+
201+
assertThat(messageList)
202+
.isNotEmpty()
203+
.hasSizeLessThan(10)
204+
.doesNotContain(0, 1);
205+
}
206+
207+
@Test
208+
void testBoundaryTrigger() {
209+
QueueChannel resultChannel = new QueueChannel();
210+
FluxAggregatorMessageHandler fluxAggregatorMessageHandler = new FluxAggregatorMessageHandler();
211+
fluxAggregatorMessageHandler.setOutputChannel(resultChannel);
212+
fluxAggregatorMessageHandler.setBoundaryTrigger((message -> "terminate".equals(message.getPayload())));
213+
214+
for (int i = 0; i < 3; i++) {
215+
Message<?> messageToAggregate =
216+
MessageBuilder.withPayload("" + i)
217+
.setCorrelationId("1")
218+
.build();
219+
fluxAggregatorMessageHandler.handleMessage(messageToAggregate);
220+
}
221+
222+
fluxAggregatorMessageHandler.handleMessage(
223+
MessageBuilder.withPayload("terminate")
224+
.setCorrelationId("1")
225+
.build());
226+
227+
fluxAggregatorMessageHandler.handleMessage(
228+
MessageBuilder.withPayload("next")
229+
.setCorrelationId("1")
230+
.build());
231+
232+
Message<?> result = resultChannel.receive(10_000);
233+
assertThat(result).isNotNull();
234+
235+
Flux<Message<?>> window = (Flux<Message<?>>) result.getPayload();
236+
237+
StepVerifier.create(
238+
window.map(Message::getPayload)
239+
.cast(String.class))
240+
.expectNext("0", "1", "2")
241+
.expectNext("terminate")
242+
.verifyComplete();
82243
}
83244

84245
}

0 commit comments

Comments
 (0)