Skip to content

Commit d85d6ee

Browse files
artembilangaryrussell
authored andcommitted
INT-3073: Add FluxAggregatorMessageHandler
JIRA: https://jira.spring.io/browse/INT-3073 Add `FluxAggregatorMessageHandlerTests` * Add `FluxAggregatorMessageHandlerTests` * Fix `FluxAggregatorMessageHandler` for the default `messageForWindowFlux` to rely on the `publish().autoConnect()` to build a target `Flux` for window and also copy headers from the first `Message` in the window. Looks like `switchOnFirst()` doesn't work somehow with windows or I just don't use it properly, although it isn't clear how to continue chain but get the whole `Flux` as a single entry for message payload * Add more tests for `FluxAggregatorMessageHandlerTests` * Fix `AbstractMessageSplitter` to cast properly a primitive array * Implement `Lifecycle` in the `FluxAggregatorMessageHandler` * Add JavaDocs into the `FluxAggregatorMessageHandler` Javadoc polishing
1 parent 45fe5be commit d85d6ee

File tree

4 files changed

+605
-1
lines changed

4 files changed

+605
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aggregator;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.function.Function;
22+
import java.util.function.Predicate;
23+
24+
import org.springframework.context.Lifecycle;
25+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
26+
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
27+
import org.springframework.integration.handler.AbstractMessageProducingHandler;
28+
import org.springframework.messaging.Message;
29+
import org.springframework.messaging.MessageChannel;
30+
import org.springframework.util.Assert;
31+
32+
import reactor.core.Disposable;
33+
import reactor.core.publisher.Flux;
34+
import reactor.core.publisher.FluxSink;
35+
import reactor.core.publisher.Mono;
36+
37+
/**
38+
* The {@link AbstractMessageProducingHandler} implementation for aggregation logic based
39+
* on Reactor's {@link Flux#groupBy} and {@link Flux#window} operators.
40+
* <p>
41+
* The incoming messages are emitted into a {@link FluxSink} provided by the
42+
* {@link Flux#create} initialized in the constructor.
43+
* <p>
44+
* The resulting windows for groups are wrapped into {@link Message}s for downstream
45+
* consumption.
46+
* <p>
47+
* If the {@link #getOutputChannel()} is not a {@link ReactiveStreamsSubscribableChannel}
48+
* instance, a subscription for the whole aggregating {@link Flux} is performed in the
49+
* {@link #start()} method.
50+
*
51+
* @author Artem Bilan
52+
*
53+
* @since 5.2
54+
*/
55+
public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements Lifecycle {
56+
57+
private final AtomicBoolean subscribed = new AtomicBoolean();
58+
59+
private final Flux<Message<?>> aggregatorFlux;
60+
61+
private CorrelationStrategy correlationStrategy =
62+
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);
63+
64+
private Predicate<Message<?>> boundaryTrigger;
65+
66+
private Function<Message<?>, Integer> windowSizeFunction = FluxAggregatorMessageHandler::sequenceSizeHeader;
67+
68+
private Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> windowConfigurer;
69+
70+
private Duration windowTimespan;
71+
72+
private Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction = this::messageForWindowFlux;
73+
74+
private FluxSink<Message<?>> sink;
75+
76+
private volatile Disposable subscription;
77+
78+
/**
79+
* Create an instance with a {@link Flux#create} and apply {@link Flux#groupBy} and {@link Flux#window}
80+
* transformation into it.
81+
*/
82+
public FluxAggregatorMessageHandler() {
83+
this.aggregatorFlux =
84+
Flux.<Message<?>>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.BUFFER)
85+
.groupBy(this::groupBy)
86+
.flatMap((group) -> group.transform(this::releaseBy))
87+
.publish()
88+
.autoConnect();
89+
}
90+
91+
private Object groupBy(Message<?> message) {
92+
return this.correlationStrategy.getCorrelationKey(message);
93+
}
94+
95+
private Flux<Message<?>> releaseBy(Flux<Message<?>> groupFlux) {
96+
return groupFlux
97+
.transform(this.windowConfigurer != null ? this.windowConfigurer : this::applyWindowOptions)
98+
.flatMap((windowFlux) -> windowFlux.transform(this.combineFunction));
99+
}
100+
101+
private Flux<Flux<Message<?>>> applyWindowOptions(Flux<Message<?>> groupFlux) {
102+
if (this.boundaryTrigger != null) {
103+
return groupFlux.windowUntil(this.boundaryTrigger);
104+
}
105+
return groupFlux
106+
.switchOnFirst((signal, group) -> {
107+
if (signal.hasValue()) {
108+
Integer maxSize = this.windowSizeFunction.apply(signal.get());
109+
if (maxSize != null) {
110+
if (this.windowTimespan != null) {
111+
return group.windowTimeout(maxSize, this.windowTimespan);
112+
}
113+
else {
114+
return group.window(maxSize);
115+
}
116+
}
117+
else {
118+
if (this.windowTimespan != null) {
119+
return group.window(this.windowTimespan);
120+
}
121+
else {
122+
return Flux.error(
123+
new IllegalStateException(
124+
"One of the 'boundaryTrigger', 'windowSizeFunction' or "
125+
+ "'windowTimespan' options must be configured or " +
126+
"'sequenceSize' header must be supplied in the messages " +
127+
"to aggregate."));
128+
}
129+
}
130+
}
131+
else {
132+
return Flux.just(group);
133+
}
134+
});
135+
}
136+
137+
/**
138+
* Configure a {@link CorrelationStrategy} to determine a group key from the incoming messages.
139+
* By default a {@link HeaderAttributeCorrelationStrategy} is used against a
140+
* {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} header value.
141+
* @param correlationStrategy the {@link CorrelationStrategy} to use.
142+
*/
143+
public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
144+
Assert.notNull(correlationStrategy, "'correlationStrategy' must not be null");
145+
this.correlationStrategy = correlationStrategy;
146+
}
147+
148+
/**
149+
* Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
150+
* Requires a {@link Mono} result with a {@link Message} as value as a combination result
151+
* of the incoming {@link Flux} for window.
152+
* By default a {@link Flux} for window is fully wrapped into a message with headers copied
153+
* from the first message in window. Such a {@link Flux} in the payload has to be subscribed
154+
* and consumed downstream.
155+
* @param combineFunction the {@link Function} to use for result windows transformation.
156+
*/
157+
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {
158+
Assert.notNull(combineFunction, "'combineFunction' must not be null");
159+
this.combineFunction = combineFunction;
160+
}
161+
162+
/**
163+
* Configure a {@link Predicate} for messages to determine a window boundary in the
164+
* {@link Flux#windowUntil} operator.
165+
* Has a precedence over any other window configuration options.
166+
* @param boundaryTrigger the {@link Predicate} to use for window boundary.
167+
* @see Flux#windowUntil(Predicate)
168+
*/
169+
public void setBoundaryTrigger(Predicate<Message<?>> boundaryTrigger) {
170+
this.boundaryTrigger = boundaryTrigger;
171+
}
172+
173+
/**
174+
* Specify a size for windows to close.
175+
* Can be combined with the {@link #setWindowTimespan(Duration)}.
176+
* @param windowSize the size for window to use.
177+
* @see Flux#window(int)
178+
* @see Flux#windowTimeout(int, Duration)
179+
*/
180+
public void setWindowSize(int windowSize) {
181+
setWindowSizeFunction((message) -> windowSize);
182+
}
183+
184+
/**
185+
* Specify a {@link Function} to determine a size for windows to close against the first message in group.
186+
* Tne result of the function can be combined with the {@link #setWindowTimespan(Duration)}.
187+
* By default an {@link IntegrationMessageHeaderAccessor#SEQUENCE_SIZE} header is consulted.
188+
* @param windowSizeFunction the {@link Function} to use to determine a window size
189+
* against a first message in the group.
190+
* @see Flux#window(int)
191+
* @see Flux#windowTimeout(int, Duration)
192+
*/
193+
public void setWindowSizeFunction(Function<Message<?>, Integer> windowSizeFunction) {
194+
Assert.notNull(windowSizeFunction, "'windowSizeFunction' must not be null");
195+
this.windowSizeFunction = windowSizeFunction;
196+
}
197+
198+
/**
199+
* Configure a {@link Duration} for closing windows periodically.
200+
* Can be combined with the {@link #setWindowSize(int)} or {@link #setWindowSizeFunction(Function)}.
201+
* @param windowTimespan the {@link Duration} to use for windows to close periodically.
202+
* @see Flux#window(Duration)
203+
* @see Flux#windowTimeout(int, Duration)
204+
*/
205+
public void setWindowTimespan(Duration windowTimespan) {
206+
this.windowTimespan = windowTimespan;
207+
}
208+
209+
/**
210+
* Configure a {@link Function} to apply a transformation into the grouping {@link Flux}
211+
* for any arbitrary {@link Flux#window} options not covered by the simple options.
212+
* Has a precedence over any other window configuration options.
213+
* @param windowConfigurer the {@link Function} to apply any custom window transformation.
214+
*/
215+
public void setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>> windowConfigurer) {
216+
this.windowConfigurer = windowConfigurer;
217+
}
218+
219+
@Override
220+
public void start() {
221+
if (this.subscribed.compareAndSet(false, true)) {
222+
MessageChannel outputChannel = getOutputChannel();
223+
if (outputChannel instanceof ReactiveStreamsSubscribableChannel) {
224+
((ReactiveStreamsSubscribableChannel) outputChannel).subscribeTo(this.aggregatorFlux);
225+
}
226+
else {
227+
this.subscription =
228+
this.aggregatorFlux.subscribe((messageToSend) -> produceOutput(messageToSend, messageToSend));
229+
}
230+
}
231+
}
232+
233+
@Override
234+
public void stop() {
235+
if (this.subscribed.compareAndSet(true, false) && this.subscription != null) {
236+
this.subscription.dispose();
237+
}
238+
}
239+
240+
@Override
241+
public boolean isRunning() {
242+
return this.subscribed.get();
243+
}
244+
245+
@Override
246+
protected void handleMessageInternal(Message<?> message) {
247+
Assert.state(isRunning(),
248+
"The 'FluxAggregatorMessageHandler' has not been started to accept incoming messages");
249+
250+
this.sink.next(message);
251+
}
252+
253+
@Override
254+
protected boolean shouldCopyRequestHeaders() {
255+
return false;
256+
}
257+
258+
private Mono<Message<?>> messageForWindowFlux(Flux<Message<?>> messageFlux) {
259+
Flux<Message<?>> window = messageFlux.publish().autoConnect();
260+
return window
261+
.next()
262+
.map((first) ->
263+
getMessageBuilderFactory()
264+
.withPayload(Flux.concat(Mono.just(first), window))
265+
.copyHeaders(first.getHeaders())
266+
.build());
267+
}
268+
269+
private static Integer sequenceSizeHeader(Message<?> message) {
270+
return message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class);
271+
}
272+
273+
}

spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.messaging.Message;
3838
import org.springframework.messaging.MessageChannel;
3939
import org.springframework.util.Assert;
40+
import org.springframework.util.ObjectUtils;
4041

4142
import com.fasterxml.jackson.core.TreeNode;
4243
import reactor.core.publisher.Flux;
@@ -140,7 +141,7 @@ protected final Object handleRequestMessage(Message<?> message) {
140141
}
141142
}
142143
else if (result.getClass().isArray()) {
143-
Object[] items = (Object[]) result;
144+
Object[] items = ObjectUtils.toObjectArray(result);
144145
sequenceSize = items.length;
145146
if (reactive) {
146147
flux = Flux.fromArray(items);

0 commit comments

Comments
 (0)