Skip to content

Commit c334bad

Browse files
committed
GH-663: Fix a race in IoSession creation
IoSessions are created asynchronously through an IoConnector. When an IoConnector is closed, all sessions created through it are also to be closed. Because session creation is asynchronous, it was possible that a newly created session registered on an already closed IoConnector, and then would never be closed. Prevent this by forcibly closing any newly created IoSession if the IoConnector is already closed when the new session tries to register with the connector. Add a test that verifies that the connect future the client code sees does not provide a session but is fulfilled either by an exception or by having been cancelled.
1 parent c497904 commit c334bad

9 files changed

Lines changed: 241 additions & 18 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
## Bug Fixes
3030

3131
* [GH-650](https://github.com/apache/mina-sshd/issues/650) Use the correct key from a user certificate in server-side pubkey auth
32+
* [GH-663](https://github.com/apache/mina-sshd/issues/663) Fix racy `IoSession` creation
3233
* [GH-664](https://github.com/apache/mina-sshd/issues/664) Skip MAC negotiation if an AEAD cipher was negotiated
3334

3435
## New Features

sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Connector.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,10 @@ protected void onCompleted(Void result, Object attachment) {
215215

216216
handler.sessionCreated(session);
217217
sessionId = session.getId();
218-
sessions.put(sessionId, session);
219-
future.setSession(session);
218+
IoSession registered = mapSession(session);
219+
if (registered == session) {
220+
future.setSession(session);
221+
}
220222
if (session != future.getSession()) {
221223
session.close(true);
222224
throw new CancellationException();

sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Service.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public abstract class Nio2Service extends AbstractInnerCloseable implements IoSe
7272
private final AsynchronousChannelGroup group;
7373
private final ExecutorService executor;
7474
private IoServiceEventListener eventListener;
75+
private boolean noMoreSessions;
7576

7677
protected Nio2Service(PropertyResolver propertyResolver, IoHandler handler, AsynchronousChannelGroup group,
7778
ExecutorService resumeTasks) {
@@ -127,7 +128,7 @@ public void dispose() {
127128
@Override
128129
protected Closeable getInnerCloseable() {
129130
return builder()
130-
.parallel(toString(), sessions.values())
131+
.parallel(toString(), snapshot())
131132
.build();
132133
}
133134

@@ -140,6 +141,23 @@ public void sessionClosed(Nio2Session session) {
140141
unmapSession(session.getId());
141142
}
142143

144+
private Collection<IoSession> snapshot() {
145+
synchronized (this) {
146+
noMoreSessions = true;
147+
}
148+
return sessions.values();
149+
}
150+
151+
protected IoSession mapSession(IoSession session) {
152+
synchronized (this) {
153+
if (noMoreSessions) {
154+
return null;
155+
}
156+
sessions.put(session.getId(), session);
157+
return session;
158+
}
159+
}
160+
143161
protected void unmapSession(Long sessionId) {
144162
if (sessionId != null) {
145163
IoSession ioSession = sessions.remove(sessionId);
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sshd.common.io;
20+
21+
import java.io.IOException;
22+
import java.net.InetAddress;
23+
import java.net.InetSocketAddress;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
import org.apache.mina.core.buffer.IoBuffer;
30+
import org.apache.mina.core.service.IoHandlerAdapter;
31+
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
32+
import org.apache.sshd.client.SshClient;
33+
import org.apache.sshd.common.future.SshFutureListener;
34+
import org.apache.sshd.common.util.Readable;
35+
import org.apache.sshd.util.test.BaseTestSupport;
36+
import org.junit.jupiter.api.Test;
37+
38+
/**
39+
* Tests for low-level connections.
40+
*/
41+
class IoConnectionTest extends BaseTestSupport {
42+
43+
@Test
44+
void connectorRace() throws Exception {
45+
CountDownLatch connectionMade = new CountDownLatch(1);
46+
CountDownLatch connectorClosing = new CountDownLatch(1);
47+
CountDownLatch futureTriggered = new CountDownLatch(1);
48+
CountDownLatch ioSessionClosed = new CountDownLatch(1);
49+
AtomicReference<IoSession> session = new AtomicReference<>();
50+
AtomicReference<Exception> exceptionOnClose = new AtomicReference<>();
51+
AtomicReference<Exception> exceptionInSession = new AtomicReference<>();
52+
AtomicReference<Exception> exceptionWhileClosing = new AtomicReference<>();
53+
AtomicBoolean connectionWasMade = new AtomicBoolean();
54+
AtomicBoolean connectorIsClosing = new AtomicBoolean();
55+
AtomicBoolean sessionWaited = new AtomicBoolean();
56+
57+
SshClient client = setupTestClient();
58+
IoServiceFactory serviceFactory = DefaultIoServiceFactoryFactory.getDefaultIoServiceFactoryFactoryInstance()
59+
.create(client);
60+
IoConnector connector = serviceFactory.createConnector(new IoHandler() {
61+
62+
@Override
63+
public void sessionCreated(org.apache.sshd.common.io.IoSession session) throws Exception {
64+
connectionMade.countDown();
65+
try {
66+
sessionWaited.set(connectorClosing.await(10, TimeUnit.SECONDS));
67+
} catch (Exception e) {
68+
exceptionInSession.set(e);
69+
throw e;
70+
}
71+
}
72+
73+
@Override
74+
public void sessionClosed(org.apache.sshd.common.io.IoSession session) throws Exception {
75+
ioSessionClosed.countDown();
76+
}
77+
78+
@Override
79+
public void exceptionCaught(org.apache.sshd.common.io.IoSession session, Throwable cause) throws Exception {
80+
// Nothing
81+
}
82+
83+
@Override
84+
public void messageReceived(org.apache.sshd.common.io.IoSession session, Readable message) throws Exception {
85+
// Nothing; we're not actually sending or receiving data.
86+
}
87+
});
88+
NioSocketAcceptor acceptor = startEchoServer();
89+
Thread closer = new Thread(() -> {
90+
try {
91+
connector.close();
92+
} catch (Exception e) {
93+
exceptionWhileClosing.set(e);
94+
}
95+
});
96+
Thread t = new Thread(() -> {
97+
try {
98+
connectionWasMade.set(connectionMade.await(10, TimeUnit.SECONDS));
99+
closer.start();
100+
int time = 0;
101+
while (time < 10000 && connector.isOpen()) {
102+
Thread.sleep(500);
103+
time += 500;
104+
}
105+
if (connector.isOpen()) {
106+
throw new RuntimeException("Connector still open after 10 seconds");
107+
}
108+
connectorClosing.countDown();
109+
} catch (RuntimeException | InterruptedException e) {
110+
exceptionOnClose.set(e);
111+
}
112+
});
113+
try {
114+
InetSocketAddress serverAddress = acceptor.getLocalAddress();
115+
InetSocketAddress connectAddress = new InetSocketAddress(InetAddress.getByName(TEST_LOCALHOST),
116+
serverAddress.getPort());
117+
t.start();
118+
IoConnectFuture future = connector.connect(connectAddress, null, null);
119+
future.addListener(new SshFutureListener<IoConnectFuture>() {
120+
121+
@Override
122+
public void operationComplete(IoConnectFuture future) {
123+
session.set(future.getSession());
124+
connectorIsClosing.set(!connector.isOpen());
125+
futureTriggered.countDown();
126+
}
127+
128+
});
129+
t.join(10000);
130+
Throwable error = future.getException();
131+
if (error != null) {
132+
error.printStackTrace();
133+
} else if (future.isCanceled()) {
134+
System.err.println("Connect future was canceled");
135+
}
136+
assertTrue(connectionWasMade.get());
137+
assertTrue(futureTriggered.await(10, TimeUnit.SECONDS));
138+
assertNull(exceptionWhileClosing.get());
139+
assertNull(exceptionInSession.get());
140+
assertNull(exceptionOnClose.get());
141+
assertTrue(future.isDone());
142+
assertTrue(sessionWaited.get());
143+
assertNull(session.get());
144+
assertTrue(connectorIsClosing.get());
145+
assertTrue(ioSessionClosed.await(10, TimeUnit.SECONDS));
146+
} finally {
147+
if (t.isAlive()) {
148+
t.interrupt();
149+
}
150+
if (closer.isAlive()) {
151+
closer.interrupt();
152+
}
153+
acceptor.dispose(false);
154+
}
155+
}
156+
157+
private NioSocketAcceptor startEchoServer() throws IOException {
158+
NioSocketAcceptor acceptor = new NioSocketAcceptor();
159+
acceptor.setHandler(new IoHandlerAdapter() {
160+
@Override
161+
public void messageReceived(org.apache.mina.core.session.IoSession session, Object message) throws Exception {
162+
IoBuffer recv = (IoBuffer) message;
163+
IoBuffer sent = IoBuffer.allocate(recv.remaining());
164+
sent.put(recv);
165+
sent.flip();
166+
session.write(sent);
167+
}
168+
});
169+
acceptor.setReuseAddress(true);
170+
acceptor.bind(new InetSocketAddress(0));
171+
return acceptor;
172+
}
173+
}

sshd-mina/src/main/java/org/apache/sshd/mina/MinaConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public void setSession(org.apache.sshd.common.io.IoSession session) {
187187
Throwable t = cf.getException();
188188
if (t != null) {
189189
future.setException(t);
190-
} else if (cf.isCanceled()) {
190+
} else if (cf.isCanceled() || !isOpen()) {
191191
IoSession ioSession = createdSession.getAndSet(null);
192192
CancelFuture cancellation = future.cancel();
193193
if (ioSession != null) {

sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoAcceptor.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,10 @@
3737
import io.netty.channel.ChannelInitializer;
3838
import io.netty.channel.ChannelOption;
3939
import io.netty.channel.ChannelPipeline;
40-
import io.netty.channel.group.DefaultChannelGroup;
4140
import io.netty.channel.socket.SocketChannel;
4241
import io.netty.channel.socket.nio.NioServerSocketChannel;
4342
import io.netty.handler.logging.LogLevel;
4443
import io.netty.handler.logging.LoggingHandler;
45-
import io.netty.util.concurrent.GlobalEventExecutor;
4644
import org.apache.sshd.common.future.CloseFuture;
4745
import org.apache.sshd.common.io.IoAcceptor;
4846
import org.apache.sshd.common.io.IoHandler;
@@ -64,10 +62,9 @@ public class NettyIoAcceptor extends NettyIoService implements IoAcceptor {
6462
protected final Map<SocketAddress, Channel> boundAddresses = new ConcurrentHashMap<>();
6563

6664
public NettyIoAcceptor(NettyIoServiceFactory factory, IoHandler handler) {
67-
super(factory, handler);
65+
super(factory, handler, "sshd-acceptor-channels");
6866

6967
Boolean reuseaddr = CoreModuleProperties.SOCKET_REUSEADDR.getRequired(factory.manager);
70-
channelGroup = new DefaultChannelGroup("sshd-acceptor-channels", GlobalEventExecutor.INSTANCE);
7168
bootstrap.group(factory.eventLoopGroup)
7269
.channel(NioServerSocketChannel.class)
7370
.option(ChannelOption.SO_BACKLOG, CoreModuleProperties.SOCKET_BACKLOG.getRequired(factory.manager))

sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoConnector.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@
2626
import io.netty.channel.ChannelInitializer;
2727
import io.netty.channel.ChannelOption;
2828
import io.netty.channel.ChannelPipeline;
29-
import io.netty.channel.group.DefaultChannelGroup;
3029
import io.netty.channel.socket.SocketChannel;
3130
import io.netty.channel.socket.nio.NioSocketChannel;
3231
import io.netty.handler.logging.LogLevel;
3332
import io.netty.handler.logging.LoggingHandler;
34-
import io.netty.util.concurrent.GlobalEventExecutor;
3533
import org.apache.sshd.common.AttributeRepository;
3634
import org.apache.sshd.common.future.CancelFuture;
3735
import org.apache.sshd.common.io.DefaultIoConnectFuture;
@@ -51,8 +49,7 @@ public class NettyIoConnector extends NettyIoService implements IoConnector {
5149
private static final LoggingHandler LOGGING_TRACE = new LoggingHandler(NettyIoConnector.class, LogLevel.TRACE);
5250

5351
public NettyIoConnector(NettyIoServiceFactory factory, IoHandler handler) {
54-
super(factory, handler);
55-
channelGroup = new DefaultChannelGroup("sshd-connector-channels", GlobalEventExecutor.INSTANCE);
52+
super(factory, handler, "sshd-connector-channels");
5653
}
5754

5855
@Override

sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoService.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@
1919

2020
package org.apache.sshd.netty;
2121

22+
import java.util.Collections;
2223
import java.util.Map;
2324
import java.util.Objects;
25+
import java.util.concurrent.CancellationException;
2426
import java.util.concurrent.ConcurrentHashMap;
2527
import java.util.concurrent.atomic.AtomicLong;
2628

29+
import io.netty.channel.Channel;
2730
import io.netty.channel.group.ChannelGroup;
31+
import io.netty.channel.group.DefaultChannelGroup;
2832
import io.netty.util.AttributeKey;
33+
import io.netty.util.concurrent.GlobalEventExecutor;
2934
import org.apache.sshd.common.io.IoConnectFuture;
3035
import org.apache.sshd.common.io.IoHandler;
3136
import org.apache.sshd.common.io.IoService;
@@ -44,16 +49,46 @@ public abstract class NettyIoService extends AbstractCloseable implements IoServ
4449

4550
protected final AtomicLong sessionSeq = new AtomicLong();
4651
protected final Map<Long, IoSession> sessions = new ConcurrentHashMap<>();
47-
protected ChannelGroup channelGroup;
52+
protected final ChannelGroup channelGroup;
4853
protected final NettyIoServiceFactory factory;
4954
protected final IoHandler handler;
55+
private boolean noMoreSessions;
5056

5157
private IoServiceEventListener eventListener;
5258

53-
protected NettyIoService(NettyIoServiceFactory factory, IoHandler handler) {
59+
protected NettyIoService(NettyIoServiceFactory factory, IoHandler handler, String channelGroupName) {
5460
this.factory = Objects.requireNonNull(factory, "No factory instance provided");
5561
this.handler = Objects.requireNonNull(handler, "No I/O handler provied");
5662
this.eventListener = factory.getIoServiceEventListener();
63+
this.channelGroup = new DefaultChannelGroup(Objects.requireNonNull(channelGroupName, "No channel group name"),
64+
GlobalEventExecutor.INSTANCE);
65+
}
66+
67+
@Override
68+
protected void doCloseImmediately() {
69+
synchronized (this) {
70+
noMoreSessions = true;
71+
}
72+
channelGroup.close();
73+
super.doCloseImmediately();
74+
}
75+
76+
protected void registerChannel(Channel channel) throws CancellationException {
77+
synchronized (this) {
78+
if (noMoreSessions) {
79+
throw new CancellationException("NettyIoService closed");
80+
}
81+
channelGroup.add(channel);
82+
}
83+
}
84+
85+
protected void mapSession(IoSession session) throws CancellationException {
86+
synchronized (this) {
87+
if (noMoreSessions) {
88+
throw new CancellationException("NettyIoService closed; cannot register new session");
89+
}
90+
sessions.put(session.getId(), session);
91+
}
5792
}
5893

5994
@Override
@@ -68,6 +103,6 @@ public void setIoServiceEventListener(IoServiceEventListener listener) {
68103

69104
@Override
70105
public Map<Long, IoSession> getManagedSessions() {
71-
return sessions;
106+
return Collections.unmodifiableMap(sessions);
72107
}
73108
}

sshd-netty/src/main/java/org/apache/sshd/netty/NettyIoSession.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,6 @@ protected void doCloseImmediately() {
241241
protected void channelActive(ChannelHandlerContext ctx) throws Exception {
242242
context = ctx;
243243
Channel channel = ctx.channel();
244-
service.channelGroup.add(channel);
245-
service.sessions.put(id, NettyIoSession.this);
246244
prev = context.newPromise().setSuccess();
247245
remoteAddr = channel.remoteAddress();
248246
// If handler.sessionCreated() propagates an exception, we'll have a NettyIoSession without SSH session. We'll
@@ -254,15 +252,17 @@ protected void channelActive(ChannelHandlerContext ctx) throws Exception {
254252
Attribute<IoConnectFuture> connectFuture = channel.attr(NettyIoService.CONNECT_FUTURE_KEY);
255253
IoConnectFuture future = connectFuture.get();
256254
try {
255+
service.registerChannel(channel);
257256
handler.sessionCreated(NettyIoSession.this);
257+
service.mapSession(this);
258258
if (future != null) {
259259
future.setSession(NettyIoSession.this);
260260
if (future.getSession() != NettyIoSession.this) {
261261
close(true);
262262
}
263263
}
264264
} catch (Throwable e) {
265-
log.warn("channelActive(session={}): could not create SSH session ({}); closing", this, e.getClass().getName(), e);
265+
warn("channelActive(session={}): could not create SSH session ({}); closing", this, e.getClass().getName(), e);
266266
try {
267267
if (future != null) {
268268
future.setException(e);

0 commit comments

Comments
 (0)