Skip to content

Commit c4aceb6

Browse files
artembilangaryrussell
authored andcommitted
Hide toReactivePublisher() from lambdas (#3065)
* Hide `toReactivePublisher()` from lambdas When an `IntegrationFlow` is declared via lambda, it is impossible to materialize it as a `Publisher` since we lose a Lambda context and finish with a `NoSuchBeanException` when we would like to inject that `Publisher` * Make `IntegrationFlowDefinition.toReactivePublisher()` as `protected` and expose it on the `IntegrationFlowBuilder` level * Use `Channels` as a singleton: no reason in overhead for its instances * Some code style polishing for `IntegrationFlowDefinition` * Restore `Channels.super()` call
1 parent 582574c commit c4aceb6

File tree

3 files changed

+53
-25
lines changed

3 files changed

+53
-25
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/Channels.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
*
3030
* @since 5.0
3131
*/
32-
public class Channels {
32+
public final class Channels {
33+
34+
static final Channels INSTANCE = new Channels();
3335

3436
public DirectChannelSpec direct() {
3537
return MessageChannels.direct();
@@ -129,7 +131,7 @@ public FluxMessageChannelSpec flux(String id) {
129131
return MessageChannels.flux(id);
130132
}
131133

132-
Channels() {
134+
private Channels() {
133135
super();
134136
}
135137

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java

+9
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import org.reactivestreams.Publisher;
20+
21+
import org.springframework.messaging.Message;
22+
1923
/**
2024
* @author Artem Bilan
2125
*
@@ -32,4 +36,9 @@ public StandardIntegrationFlow get() { // NOSONAR - not useless, increases visib
3236
return super.get();
3337
}
3438

39+
@Override
40+
public <T> Publisher<Message<T>> toReactivePublisher() { // NOSONAR - not useless, increases visibility
41+
return super.toReactivePublisher();
42+
}
43+
3544
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

+40-23
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ public B channel(MessageChannel messageChannel) {
231231
*/
232232
public B channel(Function<Channels, MessageChannelSpec<?, ?>> channels) {
233233
Assert.notNull(channels, "'channels' must not be null");
234-
return channel(channels.apply(new Channels()));
234+
return channel(channels.apply(Channels.INSTANCE));
235235
}
236236

237237
/**
@@ -529,6 +529,7 @@ public B transform(Object service, String methodName) {
529529
*/
530530
public B transform(Object service, String methodName,
531531
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
532+
532533
MethodInvokingTransformer transformer;
533534
if (StringUtils.hasText(methodName)) {
534535
transformer = new MethodInvokingTransformer(service, methodName);
@@ -588,6 +589,7 @@ public B transform(MessageProcessorSpec<?> messageProcessorSpec) {
588589
*/
589590
public B transform(MessageProcessorSpec<?> messageProcessorSpec,
590591
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
592+
591593
Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
592594
MessageProcessor<?> processor = messageProcessorSpec.get();
593595
return addComponent(processor)
@@ -614,8 +616,6 @@ public <P> B convert(Class<P> payloadType) {
614616
* Populate the {@link MessageTransformingHandler} instance for the provided
615617
* {@link GenericTransformer} for the specific {@code payloadType} to convert at
616618
* runtime.
617-
* Use {@link #transform(Class, GenericTransformer)} if you need access to the
618-
* entire message.
619619
* @param payloadType the {@link Class} for expected payload type. It can also be
620620
* {@code Message.class} if you wish to access the entire message in the transformer.
621621
* Conversion to this type will be attempted, if necessary.
@@ -690,6 +690,7 @@ public <P> B convert(Class<P> payloadType,
690690
*/
691691
public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer,
692692
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
693+
693694
Assert.notNull(genericTransformer, "'genericTransformer' must not be null");
694695
Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer :
695696
(ClassUtils.isLambda(genericTransformer.getClass())
@@ -759,13 +760,10 @@ public B filter(Object service, String methodName) {
759760
* @see MethodInvokingSelector
760761
*/
761762
public B filter(Object service, String methodName, Consumer<FilterEndpointSpec> endpointConfigurer) {
762-
MethodInvokingSelector selector;
763-
if (StringUtils.hasText(methodName)) {
764-
selector = new MethodInvokingSelector(service, methodName);
765-
}
766-
else {
767-
selector = new MethodInvokingSelector(service);
768-
}
763+
MethodInvokingSelector selector =
764+
StringUtils.hasText(methodName)
765+
? new MethodInvokingSelector(service, methodName)
766+
: new MethodInvokingSelector(service);
769767
return filter(selector, endpointConfigurer);
770768
}
771769

@@ -891,6 +889,7 @@ public <P> B filter(GenericSelector<P> genericSelector, Consumer<FilterEndpointS
891889
*/
892890
public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector,
893891
Consumer<FilterEndpointSpec> endpointConfigurer) {
892+
894893
Assert.notNull(genericSelector, "'genericSelector' must not be null");
895894
MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector :
896895
(ClassUtils.isLambda(genericSelector.getClass())
@@ -957,7 +956,7 @@ public B handle(String beanName, String methodName) {
957956
*/
958957
public B handle(String beanName, String methodName,
959958
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
960-
return handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<Object>(beanName, methodName)),
959+
return handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<>(beanName, methodName)),
961960
endpointConfigurer);
962961
}
963962

@@ -997,6 +996,7 @@ public B handle(Object service, String methodName) {
997996
*/
998997
public B handle(Object service, String methodName,
999998
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
999+
10001000
ServiceActivatingHandler handler;
10011001
if (StringUtils.hasText(methodName)) {
10021002
handler = new ServiceActivatingHandler(service, methodName);
@@ -1050,6 +1050,7 @@ public <P> B handle(GenericHandler<P> handler) {
10501050
*/
10511051
public <P> B handle(GenericHandler<P> handler,
10521052
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
1053+
10531054
return handle(null, handler, endpointConfigurer);
10541055
}
10551056

@@ -1097,6 +1098,7 @@ public <P> B handle(Class<P> payloadType, GenericHandler<P> handler) {
10971098
*/
10981099
public <P> B handle(Class<P> payloadType, GenericHandler<P> handler,
10991100
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
1101+
11001102
ServiceActivatingHandler serviceActivatingHandler;
11011103
if (ClassUtils.isLambda(handler.getClass())) {
11021104
serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType));
@@ -1139,6 +1141,7 @@ public B handle(MessageProcessorSpec<?> messageProcessorSpec) {
11391141
*/
11401142
public B handle(MessageProcessorSpec<?> messageProcessorSpec,
11411143
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
1144+
11421145
Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
11431146
MessageProcessor<?> processor = messageProcessorSpec.get();
11441147
return addComponent(processor)
@@ -1164,6 +1167,7 @@ public B handle(MessageProcessorSpec<?> messageProcessorSpec,
11641167
*/
11651168
public <H extends MessageHandler> B handle(MessageHandlerSpec<?, H> messageHandlerSpec,
11661169
Consumer<GenericEndpointSpec<H>> endpointConfigurer) {
1170+
11671171
Assert.notNull(messageHandlerSpec, "'messageHandlerSpec' must not be null");
11681172
if (messageHandlerSpec instanceof ComponentsRegistration) {
11691173
addComponents(((ComponentsRegistration) messageHandlerSpec).getComponentsToRegister());
@@ -1188,7 +1192,7 @@ public <H extends MessageHandler> B handle(MessageHandlerSpec<?, H> messageHandl
11881192
*/
11891193
public <H extends MessageHandler> B handle(H messageHandler, Consumer<GenericEndpointSpec<H>> endpointConfigurer) {
11901194
Assert.notNull(messageHandler, "'messageHandler' must not be null");
1191-
return this.register(new GenericEndpointSpec<>(messageHandler), endpointConfigurer);
1195+
return register(new GenericEndpointSpec<>(messageHandler), endpointConfigurer);
11921196
}
11931197

11941198
/**
@@ -1306,6 +1310,7 @@ public B enrichHeaders(MapBuilder<?, String, Object> headers) {
13061310
*/
13071311
public B enrichHeaders(MapBuilder<?, String, Object> headers,
13081312
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
1313+
13091314
return enrichHeaders(headers.get(), endpointConfigurer);
13101315
}
13111316

@@ -1331,8 +1336,9 @@ public B enrichHeaders(Map<String, Object> headers) {
13311336
* @return the current {@link IntegrationFlowDefinition}.
13321337
* @see GenericEndpointSpec
13331338
*/
1334-
public B enrichHeaders(final Map<String, Object> headers,
1339+
public B enrichHeaders(Map<String, Object> headers,
13351340
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
1341+
13361342
HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec();
13371343
headerEnricherSpec.headers(headers);
13381344
Tuple2<ConsumerEndpointFactoryBean, MessageTransformingHandler> tuple2 = headerEnricherSpec.get();
@@ -1450,6 +1456,7 @@ public B split(Object service, String methodName) {
14501456
*/
14511457
public B split(Object service, String methodName,
14521458
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
1459+
14531460
MethodInvokingSplitter splitter;
14541461
if (StringUtils.hasText(methodName)) {
14551462
splitter = new MethodInvokingSplitter(service, methodName);
@@ -1484,7 +1491,8 @@ public B split(String beanName, String methodName) {
14841491
*/
14851492
public B split(String beanName, String methodName,
14861493
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
1487-
return split(new MethodInvokingSplitter(new BeanNameMessageProcessor<Object>(beanName, methodName)),
1494+
1495+
return split(new MethodInvokingSplitter(new BeanNameMessageProcessor<>(beanName, methodName)),
14881496
endpointConfigurer);
14891497
}
14901498

@@ -1524,6 +1532,7 @@ public B split(MessageProcessorSpec<?> messageProcessorSpec) {
15241532
*/
15251533
public B split(MessageProcessorSpec<?> messageProcessorSpec,
15261534
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
1535+
15271536
Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
15281537
MessageProcessor<?> processor = messageProcessorSpec.get();
15291538
return addComponent(processor)
@@ -1581,6 +1590,7 @@ public <P> B split(Class<P> payloadType, Function<P, ?> splitter) {
15811590
*/
15821591
public <P> B split(Function<P, ?> splitter,
15831592
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
1593+
15841594
return split(null, splitter, endpointConfigurer);
15851595
}
15861596

@@ -1669,8 +1679,9 @@ public B split(AbstractMessageSplitter splitter) {
16691679
*/
16701680
public <S extends AbstractMessageSplitter> B split(S splitter,
16711681
Consumer<SplitterEndpointSpec<S>> endpointConfigurer) {
1682+
16721683
Assert.notNull(splitter, "'splitter' must not be null");
1673-
return this.register(new SplitterEndpointSpec<>(splitter), endpointConfigurer);
1684+
return register(new SplitterEndpointSpec<>(splitter), endpointConfigurer);
16741685
}
16751686

16761687
/**
@@ -1680,7 +1691,7 @@ public <S extends AbstractMessageSplitter> B split(S splitter,
16801691
* @return this {@link IntegrationFlowDefinition}.
16811692
*/
16821693
public B headerFilter(String... headersToRemove) {
1683-
return this.headerFilter(new HeaderFilter(headersToRemove), null);
1694+
return headerFilter(new HeaderFilter(headersToRemove), null);
16841695
}
16851696

16861697
/**
@@ -1694,7 +1705,7 @@ public B headerFilter(String... headersToRemove) {
16941705
public B headerFilter(String headersToRemove, boolean patternMatch) {
16951706
HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " "));
16961707
headerFilter.setPatternMatch(patternMatch);
1697-
return this.headerFilter(headerFilter, null);
1708+
return headerFilter(headerFilter, null);
16981709
}
16991710

17001711
/**
@@ -1707,6 +1718,7 @@ public B headerFilter(String headersToRemove, boolean patternMatch) {
17071718
*/
17081719
public B headerFilter(HeaderFilter headerFilter,
17091720
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
1721+
17101722
return transform(headerFilter, endpointConfigurer);
17111723
}
17121724

@@ -1731,6 +1743,7 @@ public B claimCheckIn(MessageStore messageStore) {
17311743
*/
17321744
public B claimCheckIn(MessageStore messageStore,
17331745
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
1746+
17341747
return transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer);
17351748
}
17361749

@@ -1742,7 +1755,7 @@ public B claimCheckIn(MessageStore messageStore,
17421755
* @return the current {@link IntegrationFlowDefinition}.
17431756
*/
17441757
public B claimCheckOut(MessageStore messageStore) {
1745-
return this.claimCheckOut(messageStore, false);
1758+
return claimCheckOut(messageStore, false);
17461759
}
17471760

17481761
/**
@@ -1754,7 +1767,7 @@ public B claimCheckOut(MessageStore messageStore) {
17541767
* @see ClaimCheckOutTransformer#setRemoveMessage(boolean)
17551768
*/
17561769
public B claimCheckOut(MessageStore messageStore, boolean removeMessage) {
1757-
return this.claimCheckOut(messageStore, removeMessage, null);
1770+
return claimCheckOut(messageStore, removeMessage, null);
17581771
}
17591772

17601773
/**
@@ -1770,6 +1783,7 @@ public B claimCheckOut(MessageStore messageStore, boolean removeMessage) {
17701783
*/
17711784
public B claimCheckOut(MessageStore messageStore, boolean removeMessage,
17721785
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
1786+
17731787
ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore);
17741788
claimCheckOutTransformer.setRemoveMessage(removeMessage);
17751789
return transform(claimCheckOutTransformer, endpointConfigurer);
@@ -1855,6 +1869,7 @@ public B route(String beanName, String method) {
18551869
*/
18561870
public B route(String beanName, String method, Consumer<RouterSpec<Object,
18571871
MethodInvokingRouter>> routerConfigurer) {
1872+
18581873
MethodInvokingRouter methodInvokingRouter =
18591874
new MethodInvokingRouter(new BeanNameMessageProcessor<>(beanName, method));
18601875
return route(new RouterSpec<>(methodInvokingRouter), routerConfigurer);
@@ -1894,6 +1909,7 @@ public B route(Object service, String methodName) {
18941909
*/
18951910
public B route(Object service, String methodName,
18961911
Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) {
1912+
18971913
MethodInvokingRouter router;
18981914
if (StringUtils.hasText(methodName)) {
18991915
router = new MethodInvokingRouter(service, methodName);
@@ -2062,6 +2078,7 @@ public B route(MessageProcessorSpec<?> messageProcessorSpec) {
20622078
*/
20632079
public B route(MessageProcessorSpec<?> messageProcessorSpec,
20642080
Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) {
2081+
20652082
Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
20662083
MessageProcessor<?> processor = messageProcessorSpec.get();
20672084
addComponent(processor);
@@ -2272,7 +2289,6 @@ public B gateway(IntegrationFlow flow) {
22722289
*/
22732290
public B gateway(IntegrationFlow flow, Consumer<GatewayEndpointSpec> endpointConfigurer) {
22742291
MessageChannel requestChannel = obtainInputChannelFromFlow(flow);
2275-
22762292
return gateway(requestChannel, endpointConfigurer);
22772293
}
22782294

@@ -2826,11 +2842,11 @@ public B scatterGather(MessageChannel scatterChannel, Consumer<AggregatorSpec> g
28262842
*/
28272843
public B scatterGather(MessageChannel scatterChannel, Consumer<AggregatorSpec> gatherer,
28282844
Consumer<ScatterGatherSpec> scatterGather) {
2845+
28292846
AggregatorSpec aggregatorSpec = new AggregatorSpec();
28302847
if (gatherer != null) {
28312848
gatherer.accept(aggregatorSpec);
28322849
}
2833-
28342850
AggregatingMessageHandler aggregatingMessageHandler = aggregatorSpec.get().getT2();
28352851
addComponent(aggregatingMessageHandler);
28362852
ScatterGatherHandler messageHandler = new ScatterGatherHandler(scatterChannel, aggregatingMessageHandler);
@@ -2882,7 +2898,6 @@ public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Co
28822898
if (gatherer != null) {
28832899
gatherer.accept(aggregatorSpec);
28842900
}
2885-
28862901
RecipientListRouter recipientListRouter = recipientListRouterSpec.get().getT2();
28872902
addComponent(recipientListRouter)
28882903
.addComponents(recipientListRouterSpec.getComponentsToRegister());
@@ -2933,6 +2948,7 @@ public B trigger(String triggerActionId) {
29332948
*/
29342949
public B trigger(String triggerActionId,
29352950
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
2951+
29362952
MessageProcessor<Void> trigger = new BeanNameMessageProcessor<>(triggerActionId, "trigger");
29372953
return handle(new ServiceActivatingHandler(trigger), endpointConfigurer);
29382954
}
@@ -2955,6 +2971,7 @@ public B trigger(MessageTriggerAction triggerAction) {
29552971
*/
29562972
public B trigger(MessageTriggerAction triggerAction,
29572973
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
2974+
29582975
return handle(new ServiceActivatingHandler(triggerAction, "trigger"), endpointConfigurer);
29592976
}
29602977

@@ -2991,7 +3008,7 @@ public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publi
29913008
* @return the Reactive Streams {@link Publisher}
29923009
*/
29933010
@SuppressWarnings(UNCHECKED)
2994-
public <T> Publisher<Message<T>> toReactivePublisher() {
3011+
protected <T> Publisher<Message<T>> toReactivePublisher() {
29953012
MessageChannel channelForPublisher = this.currentMessageChannel;
29963013
Publisher<Message<T>> publisher;
29973014
if (channelForPublisher instanceof Publisher) {

0 commit comments

Comments
 (0)