|
25 | 25 | import java.util.concurrent.TimeUnit;
|
26 | 26 | import java.util.concurrent.atomic.AtomicBoolean;
|
27 | 27 | import java.util.concurrent.atomic.AtomicReference;
|
| 28 | +import java.util.function.Consumer; |
28 | 29 | import java.util.function.Function;
|
29 | 30 | import java.util.function.Supplier;
|
30 | 31 |
|
|
45 | 46 | import org.springframework.context.annotation.ComponentScan;
|
46 | 47 | import org.springframework.context.annotation.Configuration;
|
47 | 48 | import org.springframework.context.annotation.Scope;
|
| 49 | +import org.springframework.core.task.TaskExecutor; |
48 | 50 | import org.springframework.integration.MessageDispatchingException;
|
49 | 51 | import org.springframework.integration.annotation.MessageEndpoint;
|
50 | 52 | import org.springframework.integration.annotation.MessagingGateway;
|
@@ -493,6 +495,26 @@ public void testPrototypeIsNotOverridden() {
|
493 | 495 | .isNotSameAs(this.flow1WithPrototypeHandlerConsumer.getHandler());
|
494 | 496 | }
|
495 | 497 |
|
| 498 | + @Autowired |
| 499 | + @Qualifier("globalErrorChannelResolutionFunction") |
| 500 | + private Consumer<String> globalErrorChannelResolutionGateway; |
| 501 | + |
| 502 | + @Autowired |
| 503 | + SubscribableChannel errorChannel; |
| 504 | + |
| 505 | + @Test |
| 506 | + public void testGlobalErrorChannelResolutionFlow() throws InterruptedException { |
| 507 | + CountDownLatch errorMessageLatch = new CountDownLatch(1); |
| 508 | + MessageHandler errorMessageHandler = m -> errorMessageLatch.countDown(); |
| 509 | + this.errorChannel.subscribe(errorMessageHandler); |
| 510 | + |
| 511 | + this.globalErrorChannelResolutionGateway.accept("foo"); |
| 512 | + |
| 513 | + assertThat(errorMessageLatch.await(10, TimeUnit.SECONDS)).isTrue(); |
| 514 | + |
| 515 | + this.errorChannel.unsubscribe(errorMessageHandler); |
| 516 | + } |
| 517 | + |
496 | 518 | @MessagingGateway
|
497 | 519 | public interface ControlBusGateway {
|
498 | 520 |
|
@@ -882,6 +904,16 @@ public IntegrationFlow flow2WithPrototypeHandler(
|
882 | 904 | return f -> f.handle(handler, e -> e.id("flow2WithPrototypeHandlerConsumer"));
|
883 | 905 | }
|
884 | 906 |
|
| 907 | + @Bean |
| 908 | + public IntegrationFlow globalErrorChannelResolutionFlow(@Qualifier("taskScheduler") TaskExecutor taskExecutor) { |
| 909 | + return IntegrationFlows.from(Consumer.class, "globalErrorChannelResolutionFunction") |
| 910 | + .channel(c -> c.executor(taskExecutor)) |
| 911 | + .handle((GenericHandler<?>) (p, h) -> { |
| 912 | + throw new RuntimeException("intentional"); |
| 913 | + }) |
| 914 | + .get(); |
| 915 | + } |
| 916 | + |
885 | 917 | }
|
886 | 918 |
|
887 | 919 | @Service
|
|
0 commit comments