Skip to content

Commit 93cd669

Browse files
garyrussellartembilan
authored andcommitted
GH-2974: Fix race in TcpNetConnection.getPayload()
Fixes #2974 There is a race in that we could get a `SocketException` in `inputStream`. Since this is between payloads, it needs to be thrown as a `SoftEndOfStreamException`. Also fix unnecessary `this.` in `MessageHistoryConfigurer.java`. **cherry-pick to 5.0.x, 4.3.x** * * Add javadocs to SoftEndOfStreamException # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java
1 parent 8a4e733 commit 93cd669

File tree

3 files changed

+62
-1
lines changed

3 files changed

+62
-1
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,15 @@ public synchronized void send(Message<?> message) throws Exception {
120120

121121
@Override
122122
public Object getPayload() throws Exception {
123-
return this.getDeserializer().deserialize(inputStream());
123+
InputStream inputStream;
124+
try {
125+
inputStream = inputStream();
126+
}
127+
catch (IOException e1) {
128+
throw new SoftEndOfStreamException("Socket closed when getting input stream", e1);
129+
}
130+
return getDeserializer()
131+
.deserialize(inputStream);
124132
}
125133

126134
@Override

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/SoftEndOfStreamException.java

+17
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,29 @@ public class SoftEndOfStreamException extends IOException {
3030

3131
private static final long serialVersionUID = 7309907445617226978L;
3232

33+
/**
34+
* Default constructor.
35+
*/
3336
public SoftEndOfStreamException() {
3437
super();
3538
}
3639

40+
/**
41+
* Construct an instance with the message.
42+
* @param message the message.
43+
*/
3744
public SoftEndOfStreamException(String message) {
3845
super(message);
3946
}
4047

48+
/**
49+
* Construct an instance with the message and cause.
50+
* @param message the message.
51+
* @param cause the cause.
52+
* @since 4.3.21.
53+
*/
54+
public SoftEndOfStreamException(String message, Throwable cause) {
55+
super(message, cause);
56+
}
57+
4158
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java

+36
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,39 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1921
import static org.junit.Assert.assertEquals;
2022
import static org.junit.Assert.assertNotNull;
2123
import static org.mockito.Mockito.doAnswer;
2224
import static org.mockito.Mockito.mock;
2325
import static org.mockito.Mockito.when;
2426

2527
import java.io.ByteArrayOutputStream;
28+
import java.io.IOException;
2629
import java.io.InputStream;
2730
import java.io.PipedInputStream;
2831
import java.io.PipedOutputStream;
2932
import java.net.Socket;
3033
import java.nio.ByteBuffer;
3134
import java.nio.channels.SocketChannel;
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicInteger;
3238
import java.util.concurrent.atomic.AtomicReference;
3339

40+
import javax.net.SocketFactory;
41+
3442
import org.apache.commons.logging.Log;
3543
import org.junit.Test;
3644
import org.mockito.Mockito;
3745

3846
import org.springframework.beans.DirectFieldAccessor;
47+
import org.springframework.context.ApplicationEventPublisher;
3948
import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
4049
import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer;
4150
import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
51+
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
4252
import org.springframework.integration.support.MessageBuilder;
4353
import org.springframework.integration.support.converter.MapMessageConverter;
4454
import org.springframework.integration.test.util.TestUtils;
@@ -137,4 +147,30 @@ public void transferHeaders() throws Exception {
137147
assertEquals("baz", inboundMessage.get().getHeaders().get("bar"));
138148
}
139149

150+
@Test
151+
public void socketClosedNextRead() throws InterruptedException, IOException {
152+
TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0);
153+
AtomicInteger port = new AtomicInteger();
154+
CountDownLatch latch = new CountDownLatch(1);
155+
ApplicationEventPublisher publisher = ev -> {
156+
if (ev instanceof TcpConnectionServerListeningEvent) {
157+
port.set(((TcpConnectionServerListeningEvent) ev).getPort());
158+
latch.countDown();
159+
}
160+
};
161+
server.setApplicationEventPublisher(publisher);
162+
server.registerListener(message -> {
163+
return false;
164+
});
165+
server.afterPropertiesSet();
166+
server.start();
167+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
168+
Socket socket = SocketFactory.getDefault().createSocket("localhost", port.get());
169+
TcpNetConnection connection = new TcpNetConnection(socket, false, false, publisher, "socketClosedNextRead");
170+
socket.close();
171+
assertThatThrownBy(() -> connection.getPayload())
172+
.isInstanceOf(SoftEndOfStreamException.class);
173+
server.stop();
174+
}
175+
140176
}

0 commit comments

Comments
 (0)