|
1 | 1 | /*
|
2 |
| - * Copyright 2019 the original author or authors. |
| 2 | + * Copyright 2019-2020 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
@@ -121,10 +121,10 @@ public abstract class BaseIntegrationFlowDefinition<B extends BaseIntegrationFlo
|
121 | 121 |
|
122 | 122 | private static final String MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL = "'messageProcessorSpec' must not be null";
|
123 | 123 |
|
124 |
| - private static final SpelExpressionParser PARSER = new SpelExpressionParser(); |
125 |
| - |
126 | 124 | private static final Set<MessageProducer> REFERENCED_REPLY_PRODUCERS = new HashSet<>();
|
127 | 125 |
|
| 126 | + protected static final SpelExpressionParser PARSER = new SpelExpressionParser(); //NOSONAR - final |
| 127 | + |
128 | 128 | protected final Map<Object, String> integrationComponents = new LinkedHashMap<>(); //NOSONAR - final
|
129 | 129 |
|
130 | 130 | private MessageChannel currentMessageChannel;
|
@@ -380,7 +380,7 @@ public B wireTap(IntegrationFlow flow, Consumer<WireTapSpec> wireTapConfigurer)
|
380 | 380 | return wireTap(wireTapChannel, wireTapConfigurer);
|
381 | 381 | }
|
382 | 382 |
|
383 |
| - private MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) { |
| 383 | + protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) { |
384 | 384 | Assert.notNull(flow, "'flow' must not be null");
|
385 | 385 | MessageChannel messageChannel = flow.getInputChannel();
|
386 | 386 | if (messageChannel == null) {
|
@@ -1222,6 +1222,18 @@ public B enrichHeaders(MapBuilder<?, String, Object> headers,
|
1222 | 1222 | return enrichHeaders(headers.get(), endpointConfigurer);
|
1223 | 1223 | }
|
1224 | 1224 |
|
| 1225 | + /** |
| 1226 | + * Accept a {@link Map} of values to be used for the |
| 1227 | + * {@link Message} header enrichment. |
| 1228 | + * {@code values} can apply an {@link Expression} |
| 1229 | + * to be evaluated against a request {@link Message}. |
| 1230 | + * @param headers the Map of headers to enrich. |
| 1231 | + * @return the current {@link IntegrationFlowDefinition}. |
| 1232 | + */ |
| 1233 | + public B enrichHeaders(Map<String, Object> headers) { |
| 1234 | + return enrichHeaders(headers, null); |
| 1235 | + } |
| 1236 | + |
1225 | 1237 | /**
|
1226 | 1238 | * Accept a {@link Map} of values to be used for the
|
1227 | 1239 | * {@link Message} header enrichment.
|
@@ -1908,7 +1920,7 @@ public B route(MessageProcessorSpec<?> messageProcessorSpec,
|
1908 | 1920 | return route(new RouterSpec<>(new MethodInvokingRouter(processor)), routerConfigurer);
|
1909 | 1921 | }
|
1910 | 1922 |
|
1911 |
| - private <R extends AbstractMessageRouter, S extends AbstractRouterSpec<S, R>> B route(S routerSpec, |
| 1923 | + protected <R extends AbstractMessageRouter, S extends AbstractRouterSpec<? super S, R>> B route(S routerSpec, |
1912 | 1924 | Consumer<S> routerConfigurer) {
|
1913 | 1925 |
|
1914 | 1926 | if (routerConfigurer != null) {
|
@@ -2825,6 +2837,17 @@ public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publi
|
2825 | 2837 | .addComponent(downstream);
|
2826 | 2838 | }
|
2827 | 2839 |
|
| 2840 | + /** |
| 2841 | + * Add a {@value IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME} bean into this flow |
| 2842 | + * definition as a terminal operator. |
| 2843 | + * @return The {@link IntegrationFlow} instance based on this definition. |
| 2844 | + * @since 5.1 |
| 2845 | + */ |
| 2846 | + public IntegrationFlow nullChannel() { |
| 2847 | + return channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME) |
| 2848 | + .get(); |
| 2849 | + } |
| 2850 | + |
2828 | 2851 | /**
|
2829 | 2852 | * Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
|
2830 | 2853 | * @param <T> the expected {@code payload} type
|
@@ -2858,19 +2881,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {
|
2858 | 2881 | return new PublisherIntegrationFlow<>(components, publisher);
|
2859 | 2882 | }
|
2860 | 2883 |
|
2861 |
| - /** |
2862 |
| - * Add a {@value IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME} bean into this flow |
2863 |
| - * definition as a terminal operator. |
2864 |
| - * @return The {@link IntegrationFlow} instance based on this definition. |
2865 |
| - * @since 5.1 |
2866 |
| - */ |
2867 |
| - public IntegrationFlow nullChannel() { |
2868 |
| - return channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME) |
2869 |
| - .get(); |
2870 |
| - } |
2871 |
| - |
2872 |
| - @SuppressWarnings(UNCHECKED) |
2873 |
| - private <S extends ConsumerEndpointSpec<S, ? extends MessageHandler>> B register(S endpointSpec, |
| 2884 | + protected <S extends ConsumerEndpointSpec<? super S, ? extends MessageHandler>> B register(S endpointSpec, |
2874 | 2885 | Consumer<S> endpointConfigurer) {
|
2875 | 2886 |
|
2876 | 2887 | if (endpointConfigurer != null) {
|
@@ -2906,7 +2917,7 @@ public IntegrationFlow nullChannel() {
|
2906 | 2917 | return addComponent(endpointSpec).currentComponent(factoryBeanTuple2.getT2());
|
2907 | 2918 | }
|
2908 | 2919 |
|
2909 |
| - private B registerOutputChannelIfCan(MessageChannel outputChannel) { |
| 2920 | + protected B registerOutputChannelIfCan(MessageChannel outputChannel) { |
2910 | 2921 | if (!(outputChannel instanceof FixedSubscriberChannelPrototype)) {
|
2911 | 2922 | addComponent(outputChannel, null);
|
2912 | 2923 | Object currComponent = getCurrentComponent();
|
@@ -2947,7 +2958,7 @@ else if (currComponent instanceof SourcePollingChannelAdapterSpec) {
|
2947 | 2958 | return _this();
|
2948 | 2959 | }
|
2949 | 2960 |
|
2950 |
| - private boolean isOutputChannelRequired() { |
| 2961 | + protected boolean isOutputChannelRequired() { |
2951 | 2962 | Object currentElement = getCurrentComponent();
|
2952 | 2963 | if (currentElement != null) {
|
2953 | 2964 | if (AopUtils.isAopProxy(currentElement)) {
|
@@ -3006,27 +3017,15 @@ else if (currentChannel != null) {
|
3006 | 3017 | return this.integrationFlow;
|
3007 | 3018 | }
|
3008 | 3019 |
|
3009 |
| - private void checkReuse(MessageProducer replyHandler) { |
| 3020 | + protected void checkReuse(MessageProducer replyHandler) { |
3010 | 3021 | Assert.isTrue(!REFERENCED_REPLY_PRODUCERS.contains(replyHandler),
|
3011 | 3022 | "A reply MessageProducer may only be referenced once ("
|
3012 | 3023 | + replyHandler
|
3013 | 3024 | + ") - use @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) on @Bean definition.");
|
3014 | 3025 | REFERENCED_REPLY_PRODUCERS.add(replyHandler);
|
3015 | 3026 | }
|
3016 | 3027 |
|
3017 |
| - /** |
3018 |
| - * Accept a {@link Map} of values to be used for the |
3019 |
| - * {@link Message} header enrichment. |
3020 |
| - * {@code values} can apply an {@link Expression} |
3021 |
| - * to be evaluated against a request {@link Message}. |
3022 |
| - * @param headers the Map of headers to enrich. |
3023 |
| - * @return the current {@link IntegrationFlowDefinition}. |
3024 |
| - */ |
3025 |
| - public B enrichHeaders(Map<String, Object> headers) { |
3026 |
| - return enrichHeaders(headers, null); |
3027 |
| - } |
3028 |
| - |
3029 |
| - private static Object extractProxyTarget(Object target) { |
| 3028 | + protected static Object extractProxyTarget(Object target) { |
3030 | 3029 | if (!(target instanceof Advised)) {
|
3031 | 3030 | return target;
|
3032 | 3031 | }
|
|
0 commit comments