You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: spring-integration-core/src/test/java/org/springframework/integration/dsl/correlation/CorrelationHandlerTests.java
+4-3
Original file line number
Diff line number
Diff line change
@@ -177,8 +177,9 @@ public void testSplitterDiscard() {
177
177
178
178
@Test
179
179
publicvoidtestFluxAggregator() {
180
-
IntegrationFlowtestFlow = (flow) ->
181
-
flow.split()
180
+
IntegrationFlowtestFlow =
181
+
(flow) -> flow
182
+
.split()
182
183
.channel(MessageChannels.flux())
183
184
.handle(newFluxAggregatorMessageHandler());
184
185
@@ -189,7 +190,7 @@ public void testFluxAggregator() {
Copy file name to clipboardExpand all lines: src/reference/asciidoc/aggregator.adoc
+63
Original file line number
Diff line number
Diff line change
@@ -865,3 +865,66 @@ For example, the `JdbcMessageStore` has a `region` property, and the `MongoDbMes
865
865
866
866
For more information about the `MessageStore` interface and its implementations, see <<./message-store.adoc#message-store,Message Store>>.
867
867
=====
868
+
869
+
[[flux-aggregator]]
870
+
==== Flux Aggregator
871
+
872
+
In version 5.2, the `FluxAggregatorMessageHandler` component has been introduced.
873
+
It is based on the Project Reactor `Flux.groupBy()` and `Flux.window()` operators.
874
+
The incoming messages are emitted into the `FluxSink` initiated by the `Flux.create()` in the constructor of this component.
875
+
If the `outputChannel` is not provided or it is not an instance of `ReactiveStreamsSubscribableChannel`, the subscription to the main `Flux` is done from the `Lifecycle.start()` implementation.
876
+
Otherwise it is postponed to the subscription done by the `ReactiveStreamsSubscribableChannel` implementation.
877
+
The messages are grouped by the `Flux.groupBy()` using a `CorrelationStrategy` for the group key.
878
+
By default, the `IntegrationMessageHeaderAccessor.CORRELATION_ID` header of the message is consulted.
879
+
880
+
By default every closed window is released as a `Flux` in payload of a message to produce.
881
+
This message contains all the headers from the first message in the window.
882
+
This `Flux` in the output message payload must be subscribed and processed downstream.
883
+
Such a logic can be customized (or superseded) by the `setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)` configuration option of the `FluxAggregatorMessageHandler`.
884
+
For example, if we would like to have a `List` of payloads in the final message, we can configure a `Flux.collectList()` like this:
885
+
886
+
====
887
+
[source,java]
888
+
----
889
+
fluxAggregatorMessageHandler.setCombineFunction(
890
+
(messageFlux) ->
891
+
messageFlux
892
+
.map(Message::getPayload)
893
+
.collectList()
894
+
.map(GenericMessage::new));
895
+
----
896
+
====
897
+
898
+
There are several options in the `FluxAggregatorMessageHandler` to select an appropriate window strategy:
899
+
900
+
* `setBoundaryTrigger(Predicate<Message<?>>)` - is propagated to the `Flux.windowUntil()` operator.
901
+
See its JavaDocs for more information.
902
+
Has a precedence over all other window options.
903
+
* `setWindowSize(int)` and `setWindowSizeFunction(Function<Message<?>, Integer>)` - is propagated to the `Flux.window(int)` or `windowTimeout(int, Duration)`.
904
+
By default a window size is calculated from the first message in group and its `IntegrationMessageHeaderAccessor.SEQUENCE_SIZE` header.
905
+
* `setWindowTimespan(Duration)` - is propagated to the `Flux.window(Duration)` or `windowTimeout(int, Duration)` depending in the window size configuration.
906
+
* `setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)` - a function to apply a transformation into the grouped fluxes for any custom window operation not covered by the exposed options.
907
+
908
+
Since this component is a `MessageHandler` implementation it can simply be used as a `@Bean` definition together with a `@ServiceActivator` messaging annotation.
909
+
With Java DSL it can be used from the `.handle()` EIP-method.
910
+
The sample below demonstrates how we can register an `IntegrationFlow` at runtime and how a `FluxAggregatorMessageHandler` can be correlated with a splitter upstream:
0 commit comments