Skip to content

Commit 6a86f52

Browse files
committed
INT-4366: Fix MulticastSendingMH race condition
JIRA: https://jira.spring.io/browse/INT-4366 The `MulticastSendingMessageHandler.getSocket()` doesn't guard around `this.multicastSocket` property causing `NPE` and other inconsistency in the multi-threaded environment * Make the whole `MulticastSendingMessageHandler.getSocket()` as `synchronized` like it is with the super method * Reuse `closeSocketIfNeeded()` in the `UnicastSendingMessageHandler.handleMessageInternal()` * Fix type in the `UnicastSendingMessageHandler` logging message * Fix `UdpChannelAdapterTests` for missed `BeanFactory` for the SpEL and also `MulticastSendingMessageHandler.stop()` in one missed places **Cherry-pick to 4.3.x** # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java
1 parent c8705f4 commit 6a86f52

File tree

4 files changed

+51
-50
lines changed

4 files changed

+51
-50
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/MulticastSendingMessageHandler.java

+30-36
Original file line numberDiff line numberDiff line change
@@ -100,56 +100,51 @@ public MulticastSendingMessageHandler(String address, int port,
100100
}
101101

102102
@Override
103-
protected DatagramSocket getSocket() throws IOException {
104-
if (this.getTheSocket() == null) {
105-
synchronized (this) {
106-
createSocket();
107-
}
103+
protected synchronized DatagramSocket getSocket() throws IOException {
104+
if (getTheSocket() == null) {
105+
createSocket();
108106
}
109-
return this.getTheSocket();
107+
return getTheSocket();
110108
}
111109

112110
private void createSocket() throws IOException {
113-
if (this.getTheSocket() == null) {
114-
MulticastSocket socket;
115-
if (this.isAcknowledge()) {
116-
int ackPort = this.getAckPort();
117-
if (this.localAddress == null) {
118-
socket = ackPort == 0 ? new MulticastSocket() : new MulticastSocket(ackPort);
119-
}
120-
else {
121-
InetAddress whichNic = InetAddress.getByName(this.localAddress);
122-
socket = new MulticastSocket(new InetSocketAddress(whichNic, ackPort));
123-
}
124-
if (getSoReceiveBufferSize() > 0) {
125-
socket.setReceiveBufferSize(this.getSoReceiveBufferSize());
126-
}
127-
if (logger.isDebugEnabled()) {
128-
logger.debug("Listening for acks on port: " + socket.getLocalPort());
129-
}
130-
setSocket(socket);
131-
updateAckAddress();
111+
MulticastSocket socket;
112+
if (this.isAcknowledge()) {
113+
int ackPort = this.getAckPort();
114+
if (this.localAddress == null) {
115+
socket = ackPort == 0 ? new MulticastSocket() : new MulticastSocket(ackPort);
132116
}
133117
else {
134-
socket = new MulticastSocket();
135-
setSocket(socket);
118+
InetAddress whichNic = InetAddress.getByName(this.localAddress);
119+
socket = new MulticastSocket(new InetSocketAddress(whichNic, ackPort));
136120
}
137-
if (this.timeToLive >= 0) {
138-
socket.setTimeToLive(this.timeToLive);
121+
if (getSoReceiveBufferSize() > 0) {
122+
socket.setReceiveBufferSize(this.getSoReceiveBufferSize());
139123
}
140-
setSocketAttributes(socket);
141-
if (this.localAddress != null) {
142-
InetAddress whichNic = InetAddress.getByName(this.localAddress);
143-
socket.setInterface(whichNic);
124+
if (logger.isDebugEnabled()) {
125+
logger.debug("Listening for acks on port: " + socket.getLocalPort());
144126
}
145-
this.multicastSocket = socket;
127+
setSocket(socket);
128+
updateAckAddress();
129+
}
130+
else {
131+
socket = new MulticastSocket();
132+
setSocket(socket);
133+
}
134+
if (this.timeToLive >= 0) {
135+
socket.setTimeToLive(this.timeToLive);
136+
}
137+
setSocketAttributes(socket);
138+
this.multicastSocket = socket;
139+
if (this.localAddress != null) {
140+
InetAddress whichNic = InetAddress.getByName(this.localAddress);
141+
socket.setInterface(whichNic);
146142
}
147143
}
148144

149145

150146
/**
151147
* If acknowledge = true; how many acks needed for success.
152-
*
153148
* @param minAcksForSuccess The minimum number of acks that will represent success.
154149
*/
155150
public void setMinAcksForSuccess(int minAcksForSuccess) {
@@ -158,7 +153,6 @@ public void setMinAcksForSuccess(int minAcksForSuccess) {
158153

159154
/**
160155
* Set the underlying {@link MulticastSocket} time to live property.
161-
*
162156
* @param timeToLive {@link MulticastSocket#setTimeToLive(int)}
163157
*/
164158
public void setTimeToLive(int timeToLive) {

spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2017 the original author or authors.
2+
* Copyright 2001-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,8 +83,8 @@ public class UnicastSendingMessageHandler extends
8383

8484
private volatile int ackCounter = 1;
8585

86-
private volatile Map<String, CountDownLatch> ackControl = Collections
87-
.synchronizedMap(new HashMap<String, CountDownLatch>());
86+
private volatile Map<String, CountDownLatch> ackControl =
87+
Collections.synchronizedMap(new HashMap<String, CountDownLatch>());
8888

8989
private volatile int soReceiveBufferSize = -1;
9090

@@ -282,11 +282,7 @@ public void handleMessageInternal(Message<?> message) throws MessageHandlingExce
282282
throw e;
283283
}
284284
catch (Exception e) {
285-
try {
286-
this.socket.close();
287-
}
288-
catch (Exception e1) { }
289-
this.socket = null;
285+
closeSocketIfNeeded();
290286
throw new MessageHandlingException(message, "failed to send UDP packet", e);
291287
}
292288
finally {
@@ -511,7 +507,7 @@ public void run() {
511507
}
512508
catch (IOException e) {
513509
if (this.socket != null && !this.socket.isClosed()) {
514-
logger.error("Error on UDP Acknowledge thread:" + e.getMessage());
510+
logger.error("Error on UDP Acknowledge thread: " + e.getMessage());
515511
}
516512
}
517513
finally {
@@ -529,7 +525,11 @@ public void restartAckThread() {
529525

530526
private void closeSocketIfNeeded() {
531527
if (this.socket != null) {
532-
this.socket.close();
528+
try {
529+
this.socket.close();
530+
}
531+
catch (Exception e) {
532+
}
533533
this.socket = null;
534534
}
535535
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/DatagramPacketMulticastSendingHandlerTests.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,7 +26,7 @@
2626
import java.net.InetSocketAddress;
2727
import java.net.MulticastSocket;
2828
import java.util.concurrent.CountDownLatch;
29-
import java.util.concurrent.Executor;
29+
import java.util.concurrent.ExecutorService;
3030
import java.util.concurrent.Executors;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,6 +45,8 @@
4545
/**
4646
* @author Mark Fisher
4747
* @author Gary Russell
48+
* @author Artem Bilan
49+
*
4850
* @since 2.0
4951
*/
5052
public class DatagramPacketMulticastSendingHandlerTests {
@@ -100,7 +102,7 @@ public void run() {
100102
}
101103
}
102104
};
103-
Executor executor = Executors.newFixedThreadPool(2);
105+
ExecutorService executor = Executors.newFixedThreadPool(2);
104106
executor.execute(catcher);
105107
executor.execute(catcher);
106108
assertTrue(listening.await(10000, TimeUnit.MILLISECONDS));
@@ -112,6 +114,7 @@ public void run() {
112114
assertTrue(received.await(10000, TimeUnit.MILLISECONDS));
113115
handler.stop();
114116
socket.close();
117+
executor.shutdownNow();
115118
}
116119

117120
@Test
@@ -177,7 +180,7 @@ public void run() {
177180
}
178181
}
179182
};
180-
Executor executor = Executors.newFixedThreadPool(2);
183+
ExecutorService executor = Executors.newFixedThreadPool(2);
181184
executor.execute(catcher);
182185
executor.execute(catcher);
183186
assertTrue(listening.await(10000, TimeUnit.MILLISECONDS));
@@ -195,6 +198,7 @@ public void run() {
195198
assertTrue(ackSent.await(10000, TimeUnit.MILLISECONDS));
196199
handler.stop();
197200
socket.close();
201+
executor.shutdownNow();
198202
}
199203

200204
public void waitAckListening(UnicastSendingMessageHandler handler) throws InterruptedException {

spring-integration-ip/src/test/java/org/springframework/integration/ip/udp/UdpChannelAdapterTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ public void testMulticastSender() throws Exception {
299299
assertNotNull(receivedMessage);
300300
assertEquals(new String(message.getPayload()), new String(receivedMessage.getPayload()));
301301
adapter.stop();
302+
handler.stop();
302303
}
303304

304305
@Test
@@ -309,6 +310,8 @@ public void testUnicastReceiverException() throws Exception {
309310
// SocketUtils.setLocalNicIfPossible(adapter);
310311
adapter.setOutputChannel(channel);
311312
ServiceActivatingHandler handler = new ServiceActivatingHandler(new FailingService());
313+
handler.setBeanFactory(mock(BeanFactory.class));
314+
handler.afterPropertiesSet();
312315
channel.subscribe(handler);
313316
QueueChannel errorChannel = new QueueChannel();
314317
adapter.setErrorChannel(errorChannel);

0 commit comments

Comments
 (0)