Skip to content

Commit 469e97a

Browse files
committed
INT-3073: Add FluxAggregatorMessageHandler
JIRA: https://jira.spring.io/browse/INT-3073
1 parent cd07572 commit 469e97a

File tree

1 file changed

+190
-0
lines changed

1 file changed

+190
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aggregator;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.function.Function;
22+
import java.util.function.Predicate;
23+
24+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
25+
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
26+
import org.springframework.integration.handler.AbstractMessageProducingHandler;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.messaging.MessageChannel;
29+
import org.springframework.util.Assert;
30+
31+
import reactor.core.publisher.Flux;
32+
import reactor.core.publisher.FluxSink;
33+
import reactor.core.publisher.Mono;
34+
35+
/**
36+
* @author Artem Bilan
37+
*
38+
* @since 5.2
39+
*/
40+
public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler {
41+
42+
private final AtomicBoolean subscribed = new AtomicBoolean();
43+
44+
private final Flux<Message<?>> aggregatorFlux;
45+
46+
private CorrelationStrategy correlationStrategy =
47+
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);
48+
49+
private Predicate<Message<?>> boundaryTrigger;
50+
51+
private Function<Message<?>, Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader;
52+
53+
private Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> windowConfigurer;
54+
55+
private Duration windowTimespan;
56+
57+
private Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction = this::messageForWindowFlux;
58+
59+
private FluxSink<Message<?>> sink;
60+
61+
public FluxAggregatorMessageHandler() {
62+
this.aggregatorFlux =
63+
Flux.<Message<?>>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.BUFFER)
64+
.groupBy(this::groupBy)
65+
.flatMap((group) -> group.transform(this::releaseBy))
66+
.publish()
67+
.autoConnect();
68+
}
69+
70+
private Object groupBy(Message<?> message) {
71+
return this.correlationStrategy.getCorrelationKey(message);
72+
}
73+
74+
private Flux<Message<?>> releaseBy(Flux<Message<?>> groupFlux) {
75+
return groupFlux
76+
.transform(this.windowConfigurer != null ? this.windowConfigurer : this::applyWindowOptions)
77+
.flatMap((windowFlux) -> windowFlux.transform(this.combineFunction));
78+
}
79+
80+
private Flux<Flux<Message<?>>> applyWindowOptions(Flux<Message<?>> groupFlux) {
81+
if (this.boundaryTrigger != null) {
82+
return groupFlux.windowUntil(this.boundaryTrigger);
83+
}
84+
return groupFlux
85+
.switchOnFirst((signal, group) -> {
86+
if (signal.hasValue()) {
87+
Integer maxSize = this.windowSizeFunction.apply(signal.get());
88+
if (maxSize != null) {
89+
if (this.windowTimespan != null) {
90+
return group.windowTimeout(maxSize, this.windowTimespan);
91+
}
92+
else {
93+
return group.window(maxSize);
94+
}
95+
}
96+
else {
97+
if (this.windowTimespan != null) {
98+
return group.window(this.windowTimespan);
99+
}
100+
else {
101+
return Flux.error(
102+
new IllegalStateException(
103+
"One of the 'boundaryTrigger', 'windowSizeFunction' or "
104+
+ "'windowTimespan' options must be configured or " +
105+
"'sequenceSize' header must be supplied in the messages " +
106+
"to aggregate."));
107+
}
108+
}
109+
}
110+
else {
111+
return Flux.just(group);
112+
}
113+
});
114+
}
115+
116+
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
117+
Assert.notNull(correlationStrategy, "'correlationStrategy' must not be null");
118+
this.correlationStrategy = correlationStrategy;
119+
}
120+
121+
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {
122+
Assert.notNull(combineFunction, "'combineFunction' must not be null");
123+
this.combineFunction = combineFunction;
124+
}
125+
126+
public void setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> windowConfigurer) {
127+
this.windowConfigurer = windowConfigurer;
128+
}
129+
130+
public void setBoundaryTrigger(Predicate<Message<?>> boundaryTrigger) {
131+
this.boundaryTrigger = boundaryTrigger;
132+
}
133+
134+
public void setWindowSize(int windowSize) {
135+
setWindowSizeFunction((message) -> windowSize);
136+
}
137+
138+
public void setWindowSizeFunction(Function<Message<?>, Integer> windowSizeFunction) {
139+
Assert.notNull(windowSizeFunction, "'windowSizeFunction' must not be null");
140+
this.windowSizeFunction = windowSizeFunction;
141+
}
142+
143+
public void setWindowTimespan(Duration windowTimespan) {
144+
this.windowTimespan = windowTimespan;
145+
}
146+
147+
@Override
148+
protected boolean shouldCopyRequestHeaders() {
149+
return false;
150+
}
151+
152+
@Override
153+
protected void handleMessageInternal(Message<?> message) {
154+
if (this.subscribed.compareAndSet(false, true)) {
155+
MessageChannel outputChannel = getOutputChannel();
156+
if (outputChannel instanceof ReactiveStreamsSubscribableChannel) {
157+
((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(this.aggregatorFlux);
158+
}
159+
else {
160+
this.aggregatorFlux
161+
.doOnNext((messageToSend) -> produceOutput(messageToSend, messageToSend))
162+
.subscribe();
163+
}
164+
}
165+
166+
this.sink.next(message);
167+
}
168+
169+
private Mono<Message<?>> messageForWindowFlux(Flux<Message<?>> messageFlux) {
170+
return messageFlux
171+
.switchOnFirst((signal, window) -> {
172+
if (signal.hasValue()) {
173+
return Flux.<Message<?>>just(
174+
getMessageBuilderFactory()
175+
.withPayload(window)
176+
.copyHeaders(signal.get().getHeaders())
177+
.build());
178+
}
179+
else {
180+
return window.thenMany(Flux.empty());
181+
}
182+
})
183+
.next();
184+
}
185+
186+
private static Integer sequenceSizeHeader(Message<?> message) {
187+
return message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class);
188+
}
189+
190+
}

0 commit comments

Comments
 (0)