Skip to content

Commit d7a8d7d

Browse files
committed
GH-626: Enable Streaming.Async for ChannelDirectTcpip
ChannelDirectTcpip gives user code the ability to request a direct-tcpip forward explicitly, without listener on a local port. (Full local port forwarding with a listener on the local port is not implemented in ChannelDirectTcpip but in TcpipClientChannel.) Complete the implementation to also support the asynchronous asyncIn and asyncOut streams. Factor out the stream setup from ChannelSession into a new intermediary class, and derive ChannelDirectTcpip from that new class. (Basing it on ChannelSession directly would be wrong; the channel would end up with the wrong type and also inherit the setEnv() method.)
1 parent 0114a5f commit d7a8d7d

5 files changed

Lines changed: 297 additions & 216 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
## Bug Fixes
4242

4343
* [GH-618](https://github.com/apache/mina-sshd/issues/618) Fix reading an `OpenSshCertificate` from a `Buffer`
44+
* [GH-626](https://github.com/apache/mina-sshd/issues/626) Enable `Streaming.Async` for `ChannelDirectTcpip`
4445
* [GH-628](https://github.com/apache/mina-sshd/issues/628) SFTP: fix reading directories with trailing blanks in the name
4546

4647
## New Features
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sshd.client.channel;
20+
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.OutputStream;
24+
import java.util.concurrent.Future;
25+
import java.util.function.Supplier;
26+
27+
import org.apache.sshd.common.Closeable;
28+
import org.apache.sshd.common.SshConstants;
29+
import org.apache.sshd.common.channel.ChannelAsyncInputStream;
30+
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
31+
import org.apache.sshd.common.channel.ChannelOutputStream;
32+
import org.apache.sshd.common.channel.ChannelPipedInputStream;
33+
import org.apache.sshd.common.channel.ChannelPipedOutputStream;
34+
import org.apache.sshd.common.channel.LocalWindow;
35+
import org.apache.sshd.common.channel.RemoteWindow;
36+
import org.apache.sshd.common.future.CloseFuture;
37+
import org.apache.sshd.common.future.DefaultCloseFuture;
38+
import org.apache.sshd.common.io.IoInputStream;
39+
import org.apache.sshd.common.io.IoWriteFuture;
40+
import org.apache.sshd.common.session.Session;
41+
import org.apache.sshd.common.util.ValidateUtils;
42+
import org.apache.sshd.common.util.threads.CloseableExecutorService;
43+
import org.apache.sshd.common.util.threads.ThreadUtils;
44+
import org.apache.sshd.core.CoreModuleProperties;
45+
46+
/**
47+
* Channel that sets up asynchronous streams, and that pumps an input stream if set via {@link #setIn(InputStream)}.
48+
*
49+
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
50+
*/
51+
public abstract class AsyncCapableClientChannel extends AbstractClientChannel {
52+
53+
protected final boolean withErrorStream;
54+
55+
protected CloseableExecutorService pumperService;
56+
protected Future<?> pumper;
57+
58+
protected AsyncCapableClientChannel(String type, boolean withErrorStream) {
59+
super(type);
60+
this.withErrorStream = withErrorStream;
61+
}
62+
63+
private <T> T unsupported(Supplier<? extends T> err) {
64+
if (!withErrorStream) {
65+
throw new UnsupportedOperationException(getChannelType() + " has no error stream");
66+
}
67+
return err.get();
68+
}
69+
70+
@Override
71+
public OutputStream getErr() {
72+
return unsupported(super::getErr);
73+
}
74+
75+
@Override
76+
public IoInputStream getAsyncErr() {
77+
return unsupported(super::getAsyncErr);
78+
}
79+
80+
@Override
81+
public InputStream getInvertedErr() {
82+
return unsupported(super::getInvertedErr);
83+
}
84+
85+
@Override
86+
public void setErr(OutputStream err) {
87+
if (!withErrorStream) {
88+
throw new UnsupportedOperationException(getChannelType() + " does not support an error stream");
89+
}
90+
super.setErr(err);
91+
}
92+
93+
@Override
94+
protected void doWriteExtendedData(byte[] data, int off, long len) throws IOException {
95+
if (!withErrorStream) {
96+
throw new UnsupportedOperationException(getChannelType() + " does not support extended data");
97+
}
98+
super.doWriteExtendedData(data, off, len);
99+
}
100+
101+
@Override
102+
protected void doOpen() throws IOException {
103+
if (Streaming.Async.equals(streaming)) {
104+
asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
105+
@SuppressWarnings("synthetic-access")
106+
@Override
107+
protected CloseFuture doCloseGracefully() {
108+
DefaultCloseFuture result = new DefaultCloseFuture(getChannelId(), futureLock);
109+
CloseFuture packetsWritten = super.doCloseGracefully();
110+
packetsWritten.addListener(p -> {
111+
try {
112+
// The channel writes EOF directly through the SSH session
113+
IoWriteFuture eofSent = sendEof();
114+
if (eofSent != null) {
115+
eofSent.addListener(f -> result.setClosed());
116+
return;
117+
}
118+
} catch (Exception e) {
119+
getSession().exceptionCaught(e);
120+
}
121+
result.setClosed();
122+
});
123+
return result;
124+
}
125+
};
126+
asyncOut = new ChannelAsyncInputStream(this);
127+
if (withErrorStream) {
128+
if (redirectErrorStream) {
129+
asyncErr = asyncOut;
130+
} else {
131+
asyncErr = new ChannelAsyncInputStream(this);
132+
}
133+
}
134+
} else {
135+
invertedIn = new ChannelOutputStream(this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true);
136+
137+
LocalWindow wLocal = getLocalWindow();
138+
if (out == null) {
139+
ChannelPipedInputStream pis = new ChannelPipedInputStream(this, wLocal);
140+
ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
141+
out = pos;
142+
invertedOut = pis;
143+
}
144+
145+
if (err == null && withErrorStream) {
146+
if (redirectErrorStream) {
147+
err = out;
148+
invertedErr = invertedOut;
149+
} else {
150+
ChannelPipedInputStream pis = new ChannelPipedInputStream(this, wLocal);
151+
ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
152+
err = pos;
153+
invertedErr = pis;
154+
}
155+
}
156+
157+
if (in != null) {
158+
// allocate a temporary executor service if none provided
159+
CloseableExecutorService service = getExecutorService();
160+
if (service == null) {
161+
pumperService = ThreadUtils
162+
.newSingleThreadExecutor("ClientInputStreamPump[" + Math.abs(System.nanoTime() & 0xFFFF) + "]");
163+
} else {
164+
pumperService = ThreadUtils.noClose(service);
165+
}
166+
167+
// Interrupt does not really work and the thread will only exit when
168+
// the call to read() will return. So ensure this thread is a daemon
169+
// to avoid blocking the whole app
170+
pumper = pumperService.submit(this::pumpInputStream);
171+
}
172+
}
173+
}
174+
175+
@Override
176+
protected Closeable getInnerCloseable() {
177+
return builder()
178+
.close(super.getInnerCloseable())
179+
.run(toString(), this::closeImmediately0)
180+
.build();
181+
}
182+
183+
protected void closeImmediately0() {
184+
if ((pumper != null) && (pumperService != null) && (!pumperService.isShutdown())) {
185+
try {
186+
if (!pumper.isDone()) {
187+
pumper.cancel(true);
188+
}
189+
190+
pumperService.shutdownNow();
191+
} catch (Exception e) {
192+
// we log it as WARN since it is relatively harmless
193+
warn("doCloseImmediately({}) failed {} to shutdown stream pumper: {}", this, e.getClass().getSimpleName(),
194+
e.getMessage(), e);
195+
} finally {
196+
pumper = null;
197+
pumperService = null;
198+
}
199+
}
200+
}
201+
202+
protected void pumpInputStream() {
203+
boolean debugEnabled = log.isDebugEnabled();
204+
try {
205+
Session session = getSession();
206+
RemoteWindow wRemote = getRemoteWindow();
207+
long packetSize = wRemote.getPacketSize();
208+
ValidateUtils.checkTrue((packetSize > 0) && (packetSize < Integer.MAX_VALUE),
209+
"Invalid remote packet size int boundary: %d", packetSize);
210+
byte[] buffer = new byte[(int) packetSize];
211+
int maxChunkSize = CoreModuleProperties.INPUT_STREAM_PUMP_CHUNK_SIZE.getRequired(session);
212+
maxChunkSize = Math.max(maxChunkSize, CoreModuleProperties.INPUT_STREAM_PUMP_CHUNK_SIZE.getRequiredDefault());
213+
214+
while (!closeFuture.isClosed()) {
215+
int len = securedRead(in, maxChunkSize, buffer, 0, buffer.length);
216+
if (len < 0) {
217+
if (debugEnabled) {
218+
log.debug("pumpInputStream({}) EOF signalled", this);
219+
}
220+
sendEof();
221+
return;
222+
}
223+
224+
session.resetIdleTimeout();
225+
if (len > 0) {
226+
invertedIn.write(buffer, 0, len);
227+
invertedIn.flush();
228+
}
229+
}
230+
231+
if (debugEnabled) {
232+
log.debug("pumpInputStream({}) close future closed", this);
233+
}
234+
} catch (Exception e) {
235+
if (!isClosing()) {
236+
error("pumpInputStream({}) Caught {} : {}", this, e.getClass().getSimpleName(), e.getMessage(), e);
237+
close(false);
238+
}
239+
}
240+
}
241+
242+
protected int securedRead(InputStream in, int maxChunkSize, byte[] buf, int off, int len) throws IOException {
243+
for (int n = 0;;) {
244+
int nread = in.read(buf, off + n, Math.min(maxChunkSize, len - n));
245+
if (nread <= 0) {
246+
return (n == 0) ? nread : n;
247+
}
248+
249+
n += nread;
250+
if (n >= len) {
251+
return n;
252+
}
253+
254+
// if not closed but no bytes available, return
255+
int availLen = in.available();
256+
if (availLen <= 0) {
257+
return n;
258+
}
259+
}
260+
}
261+
262+
}

sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,8 @@
2626
import org.apache.sshd.client.future.OpenFuture;
2727
import org.apache.sshd.common.SshConstants;
2828
import org.apache.sshd.common.SshException;
29-
import org.apache.sshd.common.channel.ChannelAsyncInputStream;
30-
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
31-
import org.apache.sshd.common.channel.ChannelOutputStream;
32-
import org.apache.sshd.common.channel.ChannelPipedInputStream;
33-
import org.apache.sshd.common.channel.ChannelPipedOutputStream;
3429
import org.apache.sshd.common.channel.LocalWindow;
3530
import org.apache.sshd.common.session.Session;
36-
import org.apache.sshd.common.util.ValidateUtils;
3731
import org.apache.sshd.common.util.buffer.Buffer;
3832
import org.apache.sshd.common.util.net.SshdSocketAddress;
3933

@@ -42,14 +36,13 @@
4236
*
4337
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
4438
*/
45-
public class ChannelDirectTcpip extends AbstractClientChannel {
39+
public class ChannelDirectTcpip extends AsyncCapableClientChannel {
4640

4741
private final SshdSocketAddress local;
4842
private final SshdSocketAddress remote;
49-
private ChannelPipedOutputStream pipe;
5043

5144
public ChannelDirectTcpip(SshdSocketAddress local, SshdSocketAddress remote) {
52-
super("direct-tcpip");
45+
super("direct-tcpip", false);
5346
if (local == null) {
5447
try {
5548
InetAddress localHost = InetAddress.getLocalHost();
@@ -95,34 +88,6 @@ public synchronized OpenFuture open() throws IOException {
9588
return openFuture;
9689
}
9790

98-
@Override
99-
protected void doOpen() throws IOException {
100-
if (streaming == Streaming.Async) {
101-
asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA);
102-
asyncOut = new ChannelAsyncInputStream(this);
103-
} else {
104-
out = new ChannelOutputStream(
105-
this, getRemoteWindow(), log, SshConstants.SSH_MSG_CHANNEL_DATA, true);
106-
invertedIn = out;
107-
108-
ChannelPipedInputStream pis = new ChannelPipedInputStream(this, getLocalWindow());
109-
pipe = new ChannelPipedOutputStream(pis);
110-
in = pis;
111-
invertedOut = in;
112-
}
113-
}
114-
115-
@Override
116-
protected void doWriteData(byte[] data, int off, long len) throws IOException {
117-
ValidateUtils.checkTrue(len <= Integer.MAX_VALUE,
118-
"Data length exceeds int boundaries: %d", len);
119-
pipe.write(data, off, (int) len);
120-
pipe.flush();
121-
122-
LocalWindow wLocal = getLocalWindow();
123-
wLocal.release(len);
124-
}
125-
12691
public SshdSocketAddress getLocalSocketAddress() {
12792
return this.local;
12893
}

0 commit comments

Comments
 (0)