Skip to content

Commit 1511dd8

Browse files
authored
GH-3204: Add DSL intercept() operator
Fixes #3204 * Add an `intercept(ChannelInterceptor...)` method into `BaseIntegrationFlowDefinition` to register one or more channel interceptors at the current flow position. * refactor to reuse `InterceptableChannel` creation from `wireTap` * document the new operator
1 parent c84d264 commit 1511dd8

File tree

4 files changed

+133
-8
lines changed

4 files changed

+133
-8
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

+42-7
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.springframework.messaging.Message;
9191
import org.springframework.messaging.MessageChannel;
9292
import org.springframework.messaging.MessageHandler;
93+
import org.springframework.messaging.support.ChannelInterceptor;
9394
import org.springframework.messaging.support.InterceptableChannel;
9495
import org.springframework.util.Assert;
9596
import org.springframework.util.CollectionUtils;
@@ -109,6 +110,7 @@
109110
* @author Artem Bilan
110111
* @author Gary Russell
111112
* @author Gabriele Del Prete
113+
* @author Tim Feuerbach
112114
*
113115
* @since 5.2.1
114116
*
@@ -179,6 +181,24 @@ protected MessageChannel getCurrentMessageChannel() {
179181
return this.currentMessageChannel;
180182
}
181183

184+
/**
185+
* Return the current channel if it is an {@link InterceptableChannel}, otherwise register a new implicit
186+
* {@link DirectChannel} in the flow and return that one.
187+
* @return the current channel after the operation
188+
*/
189+
protected InterceptableChannel currentInterceptableChannel() {
190+
MessageChannel currentChannel = getCurrentMessageChannel();
191+
if (currentChannel instanceof InterceptableChannel) {
192+
return (InterceptableChannel) currentChannel;
193+
}
194+
else {
195+
DirectChannel newCurrentChannel = new DirectChannel();
196+
channel(newCurrentChannel);
197+
setImplicitChannel(true);
198+
return newCurrentChannel;
199+
}
200+
}
201+
182202
protected void setImplicitChannel(boolean implicitChannel) {
183203
this.implicitChannel = implicitChannel;
184204
}
@@ -488,14 +508,9 @@ public B wireTap(MessageChannel wireTapChannel, Consumer<WireTapSpec> wireTapCon
488508
*/
489509
public B wireTap(WireTapSpec wireTapSpec) {
490510
WireTap interceptor = wireTapSpec.get();
491-
MessageChannel currentChannel = getCurrentMessageChannel();
492-
if (!(currentChannel instanceof InterceptableChannel)) {
493-
currentChannel = new DirectChannel();
494-
channel(currentChannel);
495-
setImplicitChannel(true);
496-
}
511+
InterceptableChannel currentChannel = currentInterceptableChannel();
497512
addComponent(wireTapSpec);
498-
((InterceptableChannel) currentChannel).addInterceptor(interceptor);
513+
currentChannel.addInterceptor(interceptor);
499514
return _this();
500515
}
501516

@@ -2829,6 +2844,26 @@ public B trigger(MessageTriggerAction triggerAction,
28292844
return handle(new ServiceActivatingHandler(triggerAction, "trigger"), endpointConfigurer);
28302845
}
28312846

2847+
/**
2848+
* Add one or more {@link ChannelInterceptor} implementations
2849+
* to the current {@link #currentMessageChannel}, in the given order, after any interceptors already registered.
2850+
* @param interceptorArray one or more {@link ChannelInterceptor}s.
2851+
* @return the current {@link BaseIntegrationFlowDefinition}.
2852+
* @throws IllegalArgumentException if one or more null arguments are provided
2853+
* @since 5.3
2854+
*/
2855+
public B intercept(ChannelInterceptor... interceptorArray) {
2856+
Assert.notNull(interceptorArray, "'interceptorArray' must not be null");
2857+
Assert.noNullElements(interceptorArray, "'interceptorArray' must not contain null elements");
2858+
2859+
InterceptableChannel currentChannel = currentInterceptableChannel();
2860+
for (ChannelInterceptor interceptor : interceptorArray) {
2861+
currentChannel.addInterceptor(interceptor);
2862+
}
2863+
2864+
return _this();
2865+
}
2866+
28322867
/**
28332868
* Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
28342869
* wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}

spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java

+71
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121

2222
import java.io.Serializable;
23+
import java.util.ArrayList;
24+
import java.util.List;
2325
import java.util.concurrent.CountDownLatch;
2426
import java.util.concurrent.Executor;
2527
import java.util.concurrent.TimeUnit;
@@ -32,6 +34,7 @@
3234
import org.aopalliance.aop.Advice;
3335
import org.aopalliance.intercept.MethodInterceptor;
3436
import org.aopalliance.intercept.MethodInvocation;
37+
import org.junit.After;
3538
import org.junit.jupiter.api.Test;
3639

3740
import org.springframework.beans.factory.BeanCreationException;
@@ -83,6 +86,7 @@
8386
import org.springframework.messaging.MessagingException;
8487
import org.springframework.messaging.PollableChannel;
8588
import org.springframework.messaging.SubscribableChannel;
89+
import org.springframework.messaging.support.ChannelInterceptor;
8690
import org.springframework.messaging.support.GenericMessage;
8791
import org.springframework.scheduling.TaskScheduler;
8892
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -100,6 +104,7 @@
100104
* @author Tim Ysewyn
101105
* @author Gary Russell
102106
* @author Oleg Zhurakousky
107+
* @author Tim Feuerbach
103108
*
104109
* @since 5.0
105110
*/
@@ -498,6 +503,32 @@ public void testGlobalErrorChannelResolutionFlow() throws InterruptedException {
498503
this.errorChannel.unsubscribe(errorMessageHandler);
499504
}
500505

506+
@Autowired
507+
@Qualifier("interceptorChannelIn")
508+
private MessageChannel interceptorChannelIn;
509+
510+
@Autowired
511+
private List<String> outputStringList;
512+
513+
@Test
514+
public void testInterceptorFlow() {
515+
this.interceptorChannelIn.send(MessageBuilder.withPayload("foo").build());
516+
517+
assertThat(outputStringList).containsExactly(
518+
"Pre send transform: foo",
519+
"Pre send handle: FOO",
520+
"Handle: FOO",
521+
"Post send handle: FOO",
522+
"Post send transform: foo"
523+
);
524+
}
525+
526+
@After
527+
public void cleanUpList() {
528+
outputStringList.clear();
529+
}
530+
531+
501532
@MessagingGateway
502533
public interface ControlBusGateway {
503534

@@ -909,6 +940,46 @@ public IntegrationFlow globalErrorChannelResolutionFlow(@Qualifier("taskSchedule
909940

910941
}
911942

943+
@Configuration
944+
public static class InterceptorContextConfiguration {
945+
946+
@Bean
947+
public List<String> outputStringList() {
948+
return new ArrayList<>();
949+
}
950+
951+
@Bean
952+
public IntegrationFlow interceptorFlow(List<String> outputStringList) {
953+
return IntegrationFlows.from("interceptorChannelIn")
954+
.intercept(new ChannelInterceptor() {
955+
@Override
956+
public Message<?> preSend(Message<?> message, MessageChannel channel) {
957+
outputStringList.add("Pre send transform: " + message.getPayload());
958+
return message;
959+
}
960+
961+
@Override
962+
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
963+
outputStringList.add("Post send transform: " + message.getPayload());
964+
}
965+
})
966+
.transform((String s) -> s.toUpperCase())
967+
.intercept(new ChannelInterceptor() {
968+
@Override
969+
public Message<?> preSend(Message<?> message, MessageChannel channel) {
970+
outputStringList.add("Pre send handle: " + message.getPayload());
971+
return message;
972+
}
973+
974+
@Override
975+
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
976+
outputStringList.add("Post send handle: " + message.getPayload());
977+
}
978+
})
979+
.handle(m -> outputStringList.add("Handle: " + m.getPayload())).get();
980+
}
981+
}
982+
912983
@Service
913984
public static class GreetingService extends AbstractReplyProducingMessageHandler {
914985

src/reference/asciidoc/dsl.adoc

+17-1
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,22 @@ When this operator is used at the end of a flow, it is a one-way handler and the
594594
To make it as a reply-producing flow, you can either use a simple `bridge()` after the `log()` or, starting with version 5.1, you can use a `logAndReply()` operator instead.
595595
`logAndReply` can only be used at the end of a flow.
596596

597+
[[java-dsl-intercept]]
598+
=== Operator intercept()
599+
600+
Starting with version 5.3, the `intercept()` operator allows to register one or more `ChannelInterceptor` instances at the current `MessageChannel` in the flow.
601+
This is an alternative to creating an explicit `MessageChannel` via the `MessageChannels` API.
602+
The following example uses a `MessageSelectingInterceptor` to reject certain messages with an exception:
603+
604+
====
605+
[source,java]
606+
----
607+
.transform(...)
608+
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
609+
.handle(...)
610+
----
611+
====
612+
597613
[[java-dsl-wiretap]]
598614
=== `MessageChannelSpec.wireTap()`
599615

@@ -618,7 +634,7 @@ public IntegrationFlow loggingFlow() {
618634

619635
[IMPORTANT]
620636
====
621-
If the `MessageChannel` is an instance of `InterceptableChannel`, the `log()` or `wireTap()` operators are applied to the current `MessageChannel`.
637+
If the `MessageChannel` is an instance of `InterceptableChannel`, the `log()`, `wireTap()` or `intercept()` operators are applied to the current `MessageChannel`.
622638
Otherwise, an intermediate `DirectChannel` is injected into the flow for the currently configured endpoint.
623639
In the following example, the `WireTap` interceptor is added to `myChannel` directly, because `DirectChannel` implements `InterceptableChannel`:
624640

src/reference/asciidoc/whats-new.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ Transactional support in Spring Integration now also includes options to configu
6666
See `TransactionInterceptorBuilder` for more information.
6767
See also <<./transactions.adoc#reactive-transactions,Reactive Transactions>>.
6868

69+
A new `intercept()` operator to register `ChannelInterceptor` instances without creating explicit channels was added into Java DSL.
70+
See <<./dsl.adoc#java-dsl-intercept,Operator intercept()>> for more information.
71+
6972
[[x5.3-amqp]]
7073
=== AMQP Changes
7174

0 commit comments

Comments
 (0)