Skip to content

Hide toReactivePublisher() from lambdas #3065

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
*
* @since 5.0
*/
public class Channels {
public final class Channels {

static final Channels INSTANCE = new Channels();

public DirectChannelSpec direct() {
return MessageChannels.direct();
Expand Down Expand Up @@ -129,7 +131,7 @@ public FluxMessageChannelSpec flux(String id) {
return MessageChannels.flux(id);
}

Channels() {
private Channels() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best to leave super() to avoid empty block complaint from Sonar.

super();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package org.springframework.integration.dsl;

import org.reactivestreams.Publisher;

import org.springframework.messaging.Message;

/**
* @author Artem Bilan
*
Expand All @@ -32,4 +36,9 @@ public StandardIntegrationFlow get() { // NOSONAR - not useless, increases visib
return super.get();
}

@Override
public <T> Publisher<Message<T>> toReactivePublisher() { // NOSONAR - not useless, increases visibility
return super.toReactivePublisher();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public B channel(MessageChannel messageChannel) {
*/
public B channel(Function<Channels, MessageChannelSpec<?, ?>> channels) {
Assert.notNull(channels, "'channels' must not be null");
return channel(channels.apply(new Channels()));
return channel(channels.apply(Channels.INSTANCE));
}

/**
Expand Down Expand Up @@ -529,6 +529,7 @@ public B transform(Object service, String methodName) {
*/
public B transform(Object service, String methodName,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

MethodInvokingTransformer transformer;
if (StringUtils.hasText(methodName)) {
transformer = new MethodInvokingTransformer(service, methodName);
Expand Down Expand Up @@ -588,6 +589,7 @@ public B transform(MessageProcessorSpec<?> messageProcessorSpec) {
*/
public B transform(MessageProcessorSpec<?> messageProcessorSpec,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
MessageProcessor<?> processor = messageProcessorSpec.get();
return addComponent(processor)
Expand All @@ -614,8 +616,6 @@ public <P> B convert(Class<P> payloadType) {
* Populate the {@link MessageTransformingHandler} instance for the provided
* {@link GenericTransformer} for the specific {@code payloadType} to convert at
* runtime.
* Use {@link #transform(Class, GenericTransformer)} if you need access to the
* entire message.
* @param payloadType the {@link Class} for expected payload type. It can also be
* {@code Message.class} if you wish to access the entire message in the transformer.
* Conversion to this type will be attempted, if necessary.
Expand Down Expand Up @@ -690,6 +690,7 @@ public <P> B convert(Class<P> payloadType,
*/
public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

Assert.notNull(genericTransformer, "'genericTransformer' must not be null");
Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer :
(ClassUtils.isLambda(genericTransformer.getClass())
Expand Down Expand Up @@ -759,13 +760,10 @@ public B filter(Object service, String methodName) {
* @see MethodInvokingSelector
*/
public B filter(Object service, String methodName, Consumer<FilterEndpointSpec> endpointConfigurer) {
MethodInvokingSelector selector;
if (StringUtils.hasText(methodName)) {
selector = new MethodInvokingSelector(service, methodName);
}
else {
selector = new MethodInvokingSelector(service);
}
MethodInvokingSelector selector =
StringUtils.hasText(methodName)
? new MethodInvokingSelector(service, methodName)
: new MethodInvokingSelector(service);
return filter(selector, endpointConfigurer);
}

Expand Down Expand Up @@ -891,6 +889,7 @@ public <P> B filter(GenericSelector<P> genericSelector, Consumer<FilterEndpointS
*/
public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector,
Consumer<FilterEndpointSpec> endpointConfigurer) {

Assert.notNull(genericSelector, "'genericSelector' must not be null");
MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector :
(ClassUtils.isLambda(genericSelector.getClass())
Expand Down Expand Up @@ -957,7 +956,7 @@ public B handle(String beanName, String methodName) {
*/
public B handle(String beanName, String methodName,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<Object>(beanName, methodName)),
return handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<>(beanName, methodName)),
endpointConfigurer);
}

Expand Down Expand Up @@ -997,6 +996,7 @@ public B handle(Object service, String methodName) {
*/
public B handle(Object service, String methodName,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {

ServiceActivatingHandler handler;
if (StringUtils.hasText(methodName)) {
handler = new ServiceActivatingHandler(service, methodName);
Expand Down Expand Up @@ -1050,6 +1050,7 @@ public <P> B handle(GenericHandler<P> handler) {
*/
public <P> B handle(GenericHandler<P> handler,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {

return handle(null, handler, endpointConfigurer);
}

Expand Down Expand Up @@ -1097,6 +1098,7 @@ public <P> B handle(Class<P> payloadType, GenericHandler<P> handler) {
*/
public <P> B handle(Class<P> payloadType, GenericHandler<P> handler,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {

ServiceActivatingHandler serviceActivatingHandler;
if (ClassUtils.isLambda(handler.getClass())) {
serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType));
Expand Down Expand Up @@ -1139,6 +1141,7 @@ public B handle(MessageProcessorSpec<?> messageProcessorSpec) {
*/
public B handle(MessageProcessorSpec<?> messageProcessorSpec,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {

Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
MessageProcessor<?> processor = messageProcessorSpec.get();
return addComponent(processor)
Expand All @@ -1164,6 +1167,7 @@ public B handle(MessageProcessorSpec<?> messageProcessorSpec,
*/
public <H extends MessageHandler> B handle(MessageHandlerSpec<?, H> messageHandlerSpec,
Consumer<GenericEndpointSpec<H>> endpointConfigurer) {

Assert.notNull(messageHandlerSpec, "'messageHandlerSpec' must not be null");
if (messageHandlerSpec instanceof ComponentsRegistration) {
addComponents(((ComponentsRegistration) messageHandlerSpec).getComponentsToRegister());
Expand All @@ -1188,7 +1192,7 @@ public <H extends MessageHandler> B handle(MessageHandlerSpec<?, H> messageHandl
*/
public <H extends MessageHandler> B handle(H messageHandler, Consumer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandler, "'messageHandler' must not be null");
return this.register(new GenericEndpointSpec<>(messageHandler), endpointConfigurer);
return register(new GenericEndpointSpec<>(messageHandler), endpointConfigurer);
}

/**
Expand Down Expand Up @@ -1306,6 +1310,7 @@ public B enrichHeaders(MapBuilder<?, String, Object> headers) {
*/
public B enrichHeaders(MapBuilder<?, String, Object> headers,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

return enrichHeaders(headers.get(), endpointConfigurer);
}

Expand All @@ -1331,8 +1336,9 @@ public B enrichHeaders(Map<String, Object> headers) {
* @return the current {@link IntegrationFlowDefinition}.
* @see GenericEndpointSpec
*/
public B enrichHeaders(final Map<String, Object> headers,
public B enrichHeaders(Map<String, Object> headers,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec();
headerEnricherSpec.headers(headers);
Tuple2<ConsumerEndpointFactoryBean, MessageTransformingHandler> tuple2 = headerEnricherSpec.get();
Expand Down Expand Up @@ -1450,6 +1456,7 @@ public B split(Object service, String methodName) {
*/
public B split(Object service, String methodName,
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

MethodInvokingSplitter splitter;
if (StringUtils.hasText(methodName)) {
splitter = new MethodInvokingSplitter(service, methodName);
Expand Down Expand Up @@ -1484,7 +1491,8 @@ public B split(String beanName, String methodName) {
*/
public B split(String beanName, String methodName,
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
return split(new MethodInvokingSplitter(new BeanNameMessageProcessor<Object>(beanName, methodName)),

return split(new MethodInvokingSplitter(new BeanNameMessageProcessor<>(beanName, methodName)),
endpointConfigurer);
}

Expand Down Expand Up @@ -1524,6 +1532,7 @@ public B split(MessageProcessorSpec<?> messageProcessorSpec) {
*/
public B split(MessageProcessorSpec<?> messageProcessorSpec,
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
MessageProcessor<?> processor = messageProcessorSpec.get();
return addComponent(processor)
Expand Down Expand Up @@ -1581,6 +1590,7 @@ public <P> B split(Class<P> payloadType, Function<P, ?> splitter) {
*/
public <P> B split(Function<P, ?> splitter,
Consumer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {

return split(null, splitter, endpointConfigurer);
}

Expand Down Expand Up @@ -1669,8 +1679,9 @@ public B split(AbstractMessageSplitter splitter) {
*/
public <S extends AbstractMessageSplitter> B split(S splitter,
Consumer<SplitterEndpointSpec<S>> endpointConfigurer) {

Assert.notNull(splitter, "'splitter' must not be null");
return this.register(new SplitterEndpointSpec<>(splitter), endpointConfigurer);
return register(new SplitterEndpointSpec<>(splitter), endpointConfigurer);
}

/**
Expand All @@ -1680,7 +1691,7 @@ public <S extends AbstractMessageSplitter> B split(S splitter,
* @return this {@link IntegrationFlowDefinition}.
*/
public B headerFilter(String... headersToRemove) {
return this.headerFilter(new HeaderFilter(headersToRemove), null);
return headerFilter(new HeaderFilter(headersToRemove), null);
}

/**
Expand All @@ -1694,7 +1705,7 @@ public B headerFilter(String... headersToRemove) {
public B headerFilter(String headersToRemove, boolean patternMatch) {
HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " "));
headerFilter.setPatternMatch(patternMatch);
return this.headerFilter(headerFilter, null);
return headerFilter(headerFilter, null);
}

/**
Expand All @@ -1707,6 +1718,7 @@ public B headerFilter(String headersToRemove, boolean patternMatch) {
*/
public B headerFilter(HeaderFilter headerFilter,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

return transform(headerFilter, endpointConfigurer);
}

Expand All @@ -1731,6 +1743,7 @@ public B claimCheckIn(MessageStore messageStore) {
*/
public B claimCheckIn(MessageStore messageStore,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

return transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer);
}

Expand All @@ -1742,7 +1755,7 @@ public B claimCheckIn(MessageStore messageStore,
* @return the current {@link IntegrationFlowDefinition}.
*/
public B claimCheckOut(MessageStore messageStore) {
return this.claimCheckOut(messageStore, false);
return claimCheckOut(messageStore, false);
}

/**
Expand All @@ -1754,7 +1767,7 @@ public B claimCheckOut(MessageStore messageStore) {
* @see ClaimCheckOutTransformer#setRemoveMessage(boolean)
*/
public B claimCheckOut(MessageStore messageStore, boolean removeMessage) {
return this.claimCheckOut(messageStore, removeMessage, null);
return claimCheckOut(messageStore, removeMessage, null);
}

/**
Expand All @@ -1770,6 +1783,7 @@ public B claimCheckOut(MessageStore messageStore, boolean removeMessage) {
*/
public B claimCheckOut(MessageStore messageStore, boolean removeMessage,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore);
claimCheckOutTransformer.setRemoveMessage(removeMessage);
return transform(claimCheckOutTransformer, endpointConfigurer);
Expand Down Expand Up @@ -1855,6 +1869,7 @@ public B route(String beanName, String method) {
*/
public B route(String beanName, String method, Consumer<RouterSpec<Object,
MethodInvokingRouter>> routerConfigurer) {

MethodInvokingRouter methodInvokingRouter =
new MethodInvokingRouter(new BeanNameMessageProcessor<>(beanName, method));
return route(new RouterSpec<>(methodInvokingRouter), routerConfigurer);
Expand Down Expand Up @@ -1894,6 +1909,7 @@ public B route(Object service, String methodName) {
*/
public B route(Object service, String methodName,
Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) {

MethodInvokingRouter router;
if (StringUtils.hasText(methodName)) {
router = new MethodInvokingRouter(service, methodName);
Expand Down Expand Up @@ -2062,6 +2078,7 @@ public B route(MessageProcessorSpec<?> messageProcessorSpec) {
*/
public B route(MessageProcessorSpec<?> messageProcessorSpec,
Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) {

Assert.notNull(messageProcessorSpec, MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL);
MessageProcessor<?> processor = messageProcessorSpec.get();
addComponent(processor);
Expand Down Expand Up @@ -2272,7 +2289,6 @@ public B gateway(IntegrationFlow flow) {
*/
public B gateway(IntegrationFlow flow, Consumer<GatewayEndpointSpec> endpointConfigurer) {
MessageChannel requestChannel = obtainInputChannelFromFlow(flow);

return gateway(requestChannel, endpointConfigurer);
}

Expand Down Expand Up @@ -2826,11 +2842,11 @@ public B scatterGather(MessageChannel scatterChannel, Consumer<AggregatorSpec> g
*/
public B scatterGather(MessageChannel scatterChannel, Consumer<AggregatorSpec> gatherer,
Consumer<ScatterGatherSpec> scatterGather) {

AggregatorSpec aggregatorSpec = new AggregatorSpec();
if (gatherer != null) {
gatherer.accept(aggregatorSpec);
}

AggregatingMessageHandler aggregatingMessageHandler = aggregatorSpec.get().getT2();
addComponent(aggregatingMessageHandler);
ScatterGatherHandler messageHandler = new ScatterGatherHandler(scatterChannel, aggregatingMessageHandler);
Expand Down Expand Up @@ -2882,7 +2898,6 @@ public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Co
if (gatherer != null) {
gatherer.accept(aggregatorSpec);
}

RecipientListRouter recipientListRouter = recipientListRouterSpec.get().getT2();
addComponent(recipientListRouter)
.addComponents(recipientListRouterSpec.getComponentsToRegister());
Expand Down Expand Up @@ -2933,6 +2948,7 @@ public B trigger(String triggerActionId) {
*/
public B trigger(String triggerActionId,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {

MessageProcessor<Void> trigger = new BeanNameMessageProcessor<>(triggerActionId, "trigger");
return handle(new ServiceActivatingHandler(trigger), endpointConfigurer);
}
Expand All @@ -2955,6 +2971,7 @@ public B trigger(MessageTriggerAction triggerAction) {
*/
public B trigger(MessageTriggerAction triggerAction,
Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {

return handle(new ServiceActivatingHandler(triggerAction, "trigger"), endpointConfigurer);
}

Expand Down Expand Up @@ -2991,7 +3008,7 @@ public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publi
* @return the Reactive Streams {@link Publisher}
*/
@SuppressWarnings(UNCHECKED)
public <T> Publisher<Message<T>> toReactivePublisher() {
protected <T> Publisher<Message<T>> toReactivePublisher() {
MessageChannel channelForPublisher = this.currentMessageChannel;
Publisher<Message<T>> publisher;
if (channelForPublisher instanceof Publisher) {
Expand Down