Skip to content

Commit 4264779

Browse files
committed
spring-projectsGH-3003: Fix pub/sub with dynamic DSL flows
Fixes spring-projects#3003 Statically defined flows with a publish/subscribe channel invoke the subscriptions in natural (declared) order. The components in the flow are started by the application context in phases (consumers, then producers) and bean declaration order within each phase. When a dynamically declared flow is started, the components are started by the `StandardIntegrationFlow` in reverse order (last to first) so that we don't start producing messages before the flow is fully wired. This has the side-effect that pub/sub subscribers are invoked in an unnatural (last to first) order. All subscription sub-flows start with a bridge from the pub/sub channel to the first component's input channel. The `BroadcastingDispatcher` honors the `Ordered` interface. Change the `PublishSubscribeSpec` to set the `order` property so that subscribers are always invoked in the natural order, regardless of whether the flow is statically or dynamically defined.
1 parent 016bb32 commit 4264779

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/PublishSubscribeSpec.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class PublishSubscribeSpec extends PublishSubscribeChannelSpec<PublishSub
3232

3333
private final Map<Object, String> subscriberFlows = new LinkedHashMap<>();
3434

35+
private int order;
36+
3537
PublishSubscribeSpec() {
3638
super();
3739
}
@@ -50,7 +52,7 @@ public PublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
5052

5153
IntegrationFlowBuilder flowBuilder =
5254
IntegrationFlows.from(this.channel)
53-
.bridge();
55+
.bridge(consumer -> consumer.order(this.order++));
5456

5557
MessageChannel subFlowInput = subFlow.getInputChannel();
5658

spring-integration-core/src/test/java/org/springframework/integration/dsl/publishsubscribe/PublishSubscribeTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.springframework.context.annotation.Configuration;
3232
import org.springframework.integration.config.EnableIntegration;
3333
import org.springframework.integration.dsl.IntegrationFlow;
34+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
35+
import org.springframework.integration.dsl.context.IntegrationFlowContext.IntegrationFlowRegistration;
3436
import org.springframework.messaging.MessageChannel;
3537
import org.springframework.messaging.MessageHandler;
3638
import org.springframework.messaging.support.GenericMessage;
@@ -52,12 +54,28 @@ public class PublishSubscribeTests {
5254
@Autowired
5355
private List<Integer> subscribersOrderedCall;
5456

57+
@Autowired
58+
private PubSubBugTestContext config;
59+
60+
@Autowired
61+
private IntegrationFlowContext context;
62+
5563
@Test
5664
public void executeFirstFlow() {
65+
this.subscribersOrderedCall.clear();
5766
this.inputChannel.send(new GenericMessage<>("Test"));
5867
assertThat(this.subscribersOrderedCall).containsExactly(0, 1, 2, 3, 4, 5);
5968
}
6069

70+
@Test
71+
public void dynamicFlow() {
72+
this.subscribersOrderedCall.clear();
73+
IntegrationFlowRegistration reg = this.context.registration(this.config.flow()).register();
74+
reg.getInputChannel().send(new GenericMessage<>("Test"));
75+
assertThat(this.subscribersOrderedCall).containsExactly(0, 1, 2, 3, 4, 5);
76+
this.context.remove(reg.getId());
77+
}
78+
6179
@Configuration
6280
@EnableIntegration
6381
static class PubSubBugTestContext {
@@ -79,6 +97,10 @@ public MessageHandler subscriberMessageHandlerBean() {
7997

8098
@Bean
8199
public IntegrationFlow pubSubFlow() {
100+
return flow();
101+
}
102+
103+
IntegrationFlow flow() {
82104
return f -> f
83105
.publishSubscribeChannel(c -> c
84106
.subscribe(sf -> sf

0 commit comments

Comments
 (0)