Skip to content

GH-3155: Add support for Java DSL extensions #3167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,7 @@ public class AbstractRouterSpec<S extends AbstractRouterSpec<S, R>, R extends Ab

private boolean defaultToParentFlow;

AbstractRouterSpec(R router) {
protected AbstractRouterSpec(R router) {
super(router);
}

Expand Down Expand Up @@ -100,7 +100,7 @@ public S defaultOutputToParentFlow() {
return _this();
}

boolean isDefaultToParentFlow() {
protected boolean isDefaultToParentFlow() {
return this.defaultToParentFlow;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,7 @@ public class AggregatorSpec extends CorrelationHandlerSpec<AggregatorSpec, Aggre

private Function<MessageGroup, Map<String, Object>> headersFunction;

AggregatorSpec() {
protected AggregatorSpec() {
super(new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,7 @@ public class BarrierSpec extends ConsumerEndpointSpec<BarrierSpec, BarrierMessag

private boolean async;

BarrierSpec(long timeout) {
protected BarrierSpec(long timeout) {
super(null);
this.timeout = timeout;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -121,10 +121,10 @@ public abstract class BaseIntegrationFlowDefinition<B extends BaseIntegrationFlo

private static final String MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL = "'messageProcessorSpec' must not be null";

private static final SpelExpressionParser PARSER = new SpelExpressionParser();

private static final Set<MessageProducer> REFERENCED_REPLY_PRODUCERS = new HashSet<>();

protected static final SpelExpressionParser PARSER = new SpelExpressionParser(); //NOSONAR - final

protected final Map<Object, String> integrationComponents = new LinkedHashMap<>(); //NOSONAR - final

private MessageChannel currentMessageChannel;
Expand Down Expand Up @@ -380,7 +380,7 @@ public B wireTap(IntegrationFlow flow, Consumer<WireTapSpec> wireTapConfigurer)
return wireTap(wireTapChannel, wireTapConfigurer);
}

private MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) {
protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) {
Assert.notNull(flow, "'flow' must not be null");
MessageChannel messageChannel = flow.getInputChannel();
if (messageChannel == null) {
Expand Down Expand Up @@ -1222,6 +1222,18 @@ public B enrichHeaders(MapBuilder<?, String, Object> headers,
return enrichHeaders(headers.get(), endpointConfigurer);
}

/**
* Accept a {@link Map} of values to be used for the
* {@link Message} header enrichment.
* {@code values} can apply an {@link Expression}
* to be evaluated against a request {@link Message}.
* @param headers the Map of headers to enrich.
* @return the current {@link IntegrationFlowDefinition}.
*/
public B enrichHeaders(Map<String, Object> headers) {
return enrichHeaders(headers, null);
}

/**
* Accept a {@link Map} of values to be used for the
* {@link Message} header enrichment.
Expand Down Expand Up @@ -1908,7 +1920,7 @@ public B route(MessageProcessorSpec<?> messageProcessorSpec,
return route(new RouterSpec<>(new MethodInvokingRouter(processor)), routerConfigurer);
}

private <R extends AbstractMessageRouter, S extends AbstractRouterSpec<S, R>> B route(S routerSpec,
protected <R extends AbstractMessageRouter, S extends AbstractRouterSpec<? super S, R>> B route(S routerSpec,
Consumer<S> routerConfigurer) {

if (routerConfigurer != null) {
Expand Down Expand Up @@ -2825,6 +2837,17 @@ public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publi
.addComponent(downstream);
}

/**
* Add a {@value IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME} bean into this flow
* definition as a terminal operator.
* @return The {@link IntegrationFlow} instance based on this definition.
* @since 5.1
*/
public IntegrationFlow nullChannel() {
return channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)
.get();
}

/**
* Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
* @param <T> the expected {@code payload} type
Expand Down Expand Up @@ -2858,19 +2881,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {
return new PublisherIntegrationFlow<>(components, publisher);
}

/**
* Add a {@value IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME} bean into this flow
* definition as a terminal operator.
* @return The {@link IntegrationFlow} instance based on this definition.
* @since 5.1
*/
public IntegrationFlow nullChannel() {
return channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)
.get();
}

@SuppressWarnings(UNCHECKED)
private <S extends ConsumerEndpointSpec<S, ? extends MessageHandler>> B register(S endpointSpec,
protected <S extends ConsumerEndpointSpec<? super S, ? extends MessageHandler>> B register(S endpointSpec,
Consumer<S> endpointConfigurer) {

if (endpointConfigurer != null) {
Expand Down Expand Up @@ -2906,7 +2917,7 @@ public IntegrationFlow nullChannel() {
return addComponent(endpointSpec).currentComponent(factoryBeanTuple2.getT2());
}

private B registerOutputChannelIfCan(MessageChannel outputChannel) {
protected B registerOutputChannelIfCan(MessageChannel outputChannel) {
if (!(outputChannel instanceof FixedSubscriberChannelPrototype)) {
addComponent(outputChannel, null);
Object currComponent = getCurrentComponent();
Expand Down Expand Up @@ -2947,7 +2958,7 @@ else if (currComponent instanceof SourcePollingChannelAdapterSpec) {
return _this();
}

private boolean isOutputChannelRequired() {
protected boolean isOutputChannelRequired() {
Object currentElement = getCurrentComponent();
if (currentElement != null) {
if (AopUtils.isAopProxy(currentElement)) {
Expand Down Expand Up @@ -3006,27 +3017,15 @@ else if (currentChannel != null) {
return this.integrationFlow;
}

private void checkReuse(MessageProducer replyHandler) {
protected void checkReuse(MessageProducer replyHandler) {
Assert.isTrue(!REFERENCED_REPLY_PRODUCERS.contains(replyHandler),
"A reply MessageProducer may only be referenced once ("
+ replyHandler
+ ") - use @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) on @Bean definition.");
REFERENCED_REPLY_PRODUCERS.add(replyHandler);
}

/**
* Accept a {@link Map} of values to be used for the
* {@link Message} header enrichment.
* {@code values} can apply an {@link Expression}
* to be evaluated against a request {@link Message}.
* @param headers the Map of headers to enrich.
* @return the current {@link IntegrationFlowDefinition}.
*/
public B enrichHeaders(Map<String, Object> headers) {
return enrichHeaders(headers, null);
}

private static Object extractProxyTarget(Object target) {
protected static Object extractProxyTarget(Object target) {
if (!(target instanceof Advised)) {
return target;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,11 +42,11 @@
*
* @since 5.0
*/
public final class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpec, DelayHandler> {
public class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpec, DelayHandler> {

private final List<Advice> delayedAdvice = new LinkedList<>();

DelayerEndpointSpec(DelayHandler delayHandler) {
protected DelayerEndpointSpec(DelayHandler delayHandler) {
super(delayHandler);
Assert.notNull(delayHandler, "'delayHandler' must not be null.");
this.handler.setDelayedAdviceChain(this.delayedAdvice);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,4 @@ protected DirectChannel doGet() {
return super.doGet();
}

DirectChannelSpec() {
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,11 +47,11 @@
*/
public class EnricherSpec extends ConsumerEndpointSpec<EnricherSpec, ContentEnricher> {

private final Map<String, Expression> propertyExpressions = new HashMap<>();
protected final Map<String, Expression> propertyExpressions = new HashMap<>(); // NOSONAR - final

private final Map<String, HeaderValueMessageProcessor<?>> headerExpressions = new HashMap<>();
protected final Map<String, HeaderValueMessageProcessor<?>> headerExpressions = new HashMap<>(); // NOSONAR - final

EnricherSpec() {
protected EnricherSpec() {
super(new ContentEnricher());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@ public class ExecutorChannelSpec extends LoadBalancingChannelSpec<ExecutorChanne

private final Executor executor;

ExecutorChannelSpec(Executor executor) {
protected ExecutorChannelSpec(Executor executor) {
this.executor = executor;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,9 +26,9 @@
*
* @since 5.0
*/
public final class FilterEndpointSpec extends ConsumerEndpointSpec<FilterEndpointSpec, MessageFilter> {
public class FilterEndpointSpec extends ConsumerEndpointSpec<FilterEndpointSpec, MessageFilter> {

FilterEndpointSpec(MessageFilter messageFilter) {
protected FilterEndpointSpec(MessageFilter messageFilter) {
super(messageFilter);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,7 @@
*/
public class FluxMessageChannelSpec extends MessageChannelSpec<FluxMessageChannelSpec, FluxMessageChannel> {

FluxMessageChannelSpec() {
protected FluxMessageChannelSpec() {
this.channel = new FluxMessageChannel();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,43 +26,73 @@
*
* @since 5.0
*/
public final class GatewayEndpointSpec extends ConsumerEndpointSpec<GatewayEndpointSpec, GatewayMessageHandler> {
public class GatewayEndpointSpec extends ConsumerEndpointSpec<GatewayEndpointSpec, GatewayMessageHandler> {

GatewayEndpointSpec(MessageChannel requestChannel) {
protected GatewayEndpointSpec(MessageChannel requestChannel) {
super(new GatewayMessageHandler());
this.handler.setRequestChannel(requestChannel);
}

GatewayEndpointSpec(String requestChannel) {
protected GatewayEndpointSpec(String requestChannel) {
super(new GatewayMessageHandler());
this.handler.setRequestChannelName(requestChannel);
}

/**
* Set a reply channel.
* @param replyChannel the reply channel
* @return the spec
*/
public GatewayEndpointSpec replyChannel(MessageChannel replyChannel) {
this.handler.setReplyChannel(replyChannel);
return this;
}

/**
* Set a reply channel.
* @param replyChannel the reply channel
* @return the spec
*/
public GatewayEndpointSpec replyChannel(String replyChannel) {
this.handler.setReplyChannelName(replyChannel);
return this;
}

/**
* Set an error channel.
* @param errorChannel the error channel
* @return the spec
*/
public GatewayEndpointSpec errorChannel(MessageChannel errorChannel) {
this.handler.setErrorChannel(errorChannel);
return this;
}

/**
* Set an error channel.
* @param errorChannel the error channel
* @return the spec
*/
public GatewayEndpointSpec errorChannel(String errorChannel) {
this.handler.setErrorChannelName(errorChannel);
return this;
}

/**
* Set a request timeout.
* @param requestTimeout the request timeout
* @return the spec
*/
public GatewayEndpointSpec requestTimeout(Long requestTimeout) {
this.handler.setRequestTimeout(requestTimeout);
return this;
}

/**
* Set a reply timeout.
* @param replyTimeout the reply timeout
* @return the spec
*/
public GatewayEndpointSpec replyTimeout(Long replyTimeout) {
this.handler.setReplyTimeout(replyTimeout);
return this;
Expand Down
Loading