Skip to content

Commit cfac0b4

Browse files
committed
spring-projectsGH-3107: Add errorOnTimeout for TcpInboundGateway
Fixes spring-projects#3107 The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw a `MessageTimeoutException` when downstream reply doesn't come back in time for configured reply timeout * Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor property * Add new factory methods into a `Tcp` factory for Java DSL * Ensure a property works as expected in the `IpIntegrationTests` * Document a new option
1 parent 0ec9859 commit cfac0b4

File tree

5 files changed

+54
-8
lines changed

5 files changed

+54
-8
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/Tcp.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,44 @@ public static TcpClientConnectionFactorySpec netClient(String host, int port) {
9595
* @return the spec.
9696
*/
9797
public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactory connectionFactory) {
98-
return new TcpInboundGatewaySpec(connectionFactory);
98+
return inboundGateway(connectionFactory, false);
9999
}
100100

101+
/**
102+
* Create an inbound gateway using the supplied connection factory.
103+
* @param connectionFactory the connection factory - must be an existing bean - it
104+
* will not be initialized.
105+
* @param errorOnTimeout true to create the error message on reply timeout.
106+
* @return the spec.
107+
* @since 5.2.2
108+
*/
109+
public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactory connectionFactory,
110+
boolean errorOnTimeout) {
111+
112+
return new TcpInboundGatewaySpec(connectionFactory, errorOnTimeout);
113+
}
114+
115+
101116
/**
102117
* Create an inbound gateway using the supplied connection factory.
103118
* @param connectionFactorySpec the connection factory spec.
104119
* @return the spec.
105120
*/
106121
public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactorySpec<?, ?> connectionFactorySpec) {
107-
return new TcpInboundGatewaySpec(connectionFactorySpec);
122+
return inboundGateway(connectionFactorySpec, false);
123+
}
124+
125+
/**
126+
* Create an inbound gateway using the supplied connection factory.
127+
* @param connectionFactorySpec the connection factory spec.
128+
* @param errorOnTimeout true to create the error message on reply timeout.
129+
* @return the spec.
130+
* @since 5.2.2
131+
*/
132+
public static TcpInboundGatewaySpec inboundGateway(AbstractConnectionFactorySpec<?, ?> connectionFactorySpec,
133+
boolean errorOnTimeout) {
134+
135+
return new TcpInboundGatewaySpec(connectionFactorySpec, errorOnTimeout);
108136
}
109137

110138
/**

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpInboundGatewaySpec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,21 @@ public class TcpInboundGatewaySpec extends MessagingGatewaySpec<TcpInboundGatewa
4242
/**
4343
* Construct an instance using an existing spring-managed connection factory.
4444
* @param connectionFactoryBean the spring-managed bean.
45+
* @param errorOnTimeout true to create the error message on reply timeout.
4546
*/
46-
TcpInboundGatewaySpec(AbstractConnectionFactory connectionFactoryBean) {
47-
super(new TcpInboundGateway());
47+
TcpInboundGatewaySpec(AbstractConnectionFactory connectionFactoryBean, boolean errorOnTimeout) {
48+
super(new TcpInboundGateway(errorOnTimeout));
4849
this.connectionFactory = null;
4950
this.target.setConnectionFactory(connectionFactoryBean);
5051
}
5152

5253
/**
5354
* Construct an instance using a connection factory spec.
5455
* @param connectionFactorySpec the spec.
56+
* @param errorOnTimeout true to create the error message on reply timeout.
5557
*/
56-
TcpInboundGatewaySpec(AbstractConnectionFactorySpec<?, ?> connectionFactorySpec) {
57-
super(new TcpInboundGateway());
58+
TcpInboundGatewaySpec(AbstractConnectionFactorySpec<?, ?> connectionFactorySpec, boolean errorOnTimeout) {
59+
super(new TcpInboundGateway(errorOnTimeout));
5860
this.connectionFactory = connectionFactorySpec.get();
5961
this.target.setConnectionFactory(this.connectionFactory);
6062
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpInboundGateway.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,21 @@ public class TcpInboundGateway extends MessagingGatewaySupport implements
8383

8484
private volatile boolean shuttingDown;
8585

86+
public TcpInboundGateway() {
87+
this(false);
88+
}
89+
90+
/**
91+
* Instantiate based on the provided flag to indicate if an {@link ErrorMessage}
92+
* with a {@link org.springframework.integration.MessageTimeoutException} as a payload
93+
* will be sent into an error channel if a reply is expected but none is received.
94+
* @param errorOnTimeout true to create the error message on reply timeout.
95+
* @since 5.2.2
96+
*/
97+
public TcpInboundGateway(boolean errorOnTimeout) {
98+
super(errorOnTimeout);
99+
}
100+
86101
@Override
87102
public boolean onMessage(Message<?> message) {
88103
boolean isErrorMessage = message instanceof ErrorMessage;

spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,8 @@ public AbstractServerConnectionFactory server1() {
229229
@Bean
230230
public IntegrationFlow inTcpGateway() {
231231
return IntegrationFlows.from(
232-
Tcp.inboundGateway(server1())
232+
Tcp.inboundGateway(server1(), true)
233233
.replyTimeout(1)
234-
.errorOnTimeout(true)
235234
.errorChannel("inTcpGatewayErrorFlow.input"))
236235
.transform(Transformers.objectToString())
237236
.<String>filter((payload) -> !"junk".equals(payload))

src/reference/asciidoc/ip.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1895,6 +1895,8 @@ Usually, replies arrive on a temporary reply channel added to the inbound messag
18951895
|
18961896
| The time in milliseconds for which the gateway waits for a reply.
18971897
Default: 1000 (1 second).
1898+
Starting with version `5.2.2`, the `TcpInboundGateway` can be created with an `errorOnTimeout` boolean flag to raise a `MessageTimeoutException` when downstream reply doesn't come back during reply timeout.
1899+
Such an exception can be handled on the `errorChannel`, e.g. producing a compensation reply for client.
18981900

18991901
| `error-channel`
19001902
|

0 commit comments

Comments
 (0)