Skip to content

Commit 58f6c79

Browse files
committed
Implement no-flow-control extension (fixes #539)
1 parent e58d2f1 commit 58f6c79

16 files changed

Lines changed: 503 additions & 13 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
## New Features
4444

4545
* New utility methods `SftpClient.put(Path localFile, String remoteFileName)` and `SftpClient.put(InputStream in, String remoteFileName)` facilitate SFTP file uploading.
46+
* [GH-539](https://github.com/apache/mina-sshd/issues/539) Implement no-flow-control extension
4647

4748
## Potential compatibility issues
4849

sshd-cli/src/main/java/org/apache/sshd/cli/client/ScpCommandMain.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Collection;
3333
import java.util.Collections;
3434
import java.util.EnumSet;
35+
import java.util.HashMap;
3536
import java.util.Iterator;
3637
import java.util.List;
3738
import java.util.Set;
@@ -42,12 +43,14 @@
4243
import org.apache.sshd.client.auth.AuthenticationIdentitiesProvider;
4344
import org.apache.sshd.client.config.hosts.HostConfigEntry;
4445
import org.apache.sshd.client.session.ClientSession;
46+
import org.apache.sshd.common.PropertyResolver;
4547
import org.apache.sshd.common.SshConstants;
4648
import org.apache.sshd.common.session.Session;
4749
import org.apache.sshd.common.util.GenericUtils;
4850
import org.apache.sshd.common.util.ReflectionUtils;
4951
import org.apache.sshd.common.util.io.input.NoCloseInputStream;
5052
import org.apache.sshd.common.util.threads.ThreadUtils;
53+
import org.apache.sshd.core.CoreModuleProperties;
5154
import org.apache.sshd.scp.client.ScpClient;
5255
import org.apache.sshd.scp.client.ScpClient.Option;
5356
import org.apache.sshd.scp.client.ScpClientCreator;
@@ -61,6 +64,8 @@
6164
import org.apache.sshd.scp.common.helpers.ScpTimestampCommandDetails;
6265
import org.slf4j.Logger;
6366

67+
import static org.apache.sshd.common.PropertyResolverUtils.toPropertyResolver;
68+
6469
/**
6570
* @see <A HREF="https://man7.org/linux/man-pages/man1/scp.1.html">SCP(1) - manual page</A>
6671
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
@@ -151,7 +156,7 @@ public static String[] normalizeCommandArguments(PrintStream stdout, PrintStream
151156
return null;
152157
}
153158

154-
return effective.toArray(new String[effective.size()]);
159+
return effective.toArray(new String[0]);
155160
}
156161

157162
/* -------------------------------------------------------------------------------- */
@@ -248,11 +253,11 @@ public static void showUsageMessage(PrintStream stderr) {
248253
public static void xferLocalToRemote(
249254
BufferedReader stdin, PrintStream stdout, PrintStream stderr, String[] args,
250255
ScpLocation source, ScpLocation target, Collection<Option> options,
251-
OutputStream logStream, Level level, boolean quiet)
256+
OutputStream logStream, Level level, boolean quiet, PropertyResolver defaultOptions)
252257
throws Exception {
253258
ScpClientCreator creator = resolveScpClientCreator(stderr, args);
254259
ClientSession session = ((logStream == null) || (creator == null) || GenericUtils.isEmpty(args))
255-
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args);
260+
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
256261
if (session == null) {
257262
showUsageMessage(stderr);
258263
System.exit(-1);
@@ -330,10 +335,10 @@ private void logEvent(
330335
public static void xferRemoteToRemote(
331336
BufferedReader stdin, PrintStream stdout, PrintStream stderr, String[] args,
332337
ScpLocation source, ScpLocation target, Collection<Option> options,
333-
OutputStream logStream, Level level, boolean quiet)
338+
OutputStream logStream, Level level, boolean quiet, PropertyResolver defaultOptions)
334339
throws Exception {
335340
ClientSession srcSession = ((logStream == null) || GenericUtils.isEmpty(args))
336-
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args);
341+
? null : setupClientSession(SCP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
337342
if (srcSession == null) {
338343
showUsageMessage(stderr);
339344
System.exit(-1);
@@ -444,6 +449,10 @@ public static void main(String[] args) throws Exception {
444449
new InputStreamReader(new NoCloseInputStream(System.in), Charset.defaultCharset()))) {
445450
args = normalizeCommandArguments(stdout, stderr, args);
446451

452+
PropertyResolver defaultOptions = toPropertyResolver(new HashMap<>());
453+
CoreModuleProperties.NO_FLOW_CONTROL.set(defaultOptions, Boolean.TRUE);
454+
CoreModuleProperties.WINDOW_SIZE.set(defaultOptions, 1024L * 1024L * 1024L);
455+
447456
Level level = Level.SEVERE;
448457
int numArgs = GenericUtils.length(args);
449458
// see the way normalizeCommandArguments works...
@@ -472,9 +481,11 @@ public static void main(String[] args) throws Exception {
472481
}
473482

474483
if (threeWay) {
475-
xferRemoteToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet);
484+
xferRemoteToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet,
485+
defaultOptions);
476486
} else {
477-
xferLocalToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet);
487+
xferLocalToRemote(stdin, stdout, stderr, args, source, target, options, logStream, level, quiet,
488+
defaultOptions);
478489
}
479490
} finally {
480491
if ((logStream != stdout) && (logStream != stderr)) {

sshd-cli/src/main/java/org/apache/sshd/cli/client/SftpCommandMain.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.nio.file.Paths;
3535
import java.util.Arrays;
3636
import java.util.Collections;
37+
import java.util.HashMap;
3738
import java.util.Map;
3839
import java.util.Objects;
3940
import java.util.ServiceLoader;
@@ -47,6 +48,7 @@
4748
import org.apache.sshd.client.ClientFactoryManager;
4849
import org.apache.sshd.client.session.ClientSession;
4950
import org.apache.sshd.common.NamedResource;
51+
import org.apache.sshd.common.PropertyResolver;
5052
import org.apache.sshd.common.ServiceFactory;
5153
import org.apache.sshd.common.channel.ChannelFactory;
5254
import org.apache.sshd.common.cipher.CipherFactory;
@@ -71,6 +73,7 @@
7173
import org.apache.sshd.common.util.io.output.LineLevelAppenderStream;
7274
import org.apache.sshd.common.util.io.output.NullOutputStream;
7375
import org.apache.sshd.common.util.threads.ThreadUtils;
76+
import org.apache.sshd.core.CoreModuleProperties;
7477
import org.apache.sshd.server.config.SshServerConfigFileReader;
7578
import org.apache.sshd.sftp.client.SftpClient;
7679
import org.apache.sshd.sftp.client.SftpClient.Attributes;
@@ -91,6 +94,8 @@
9194
import org.apache.sshd.sftp.common.extensions.openssh.StatVfsExtensionParser;
9295
import org.slf4j.Logger;
9396

97+
import static org.apache.sshd.common.PropertyResolverUtils.toPropertyResolver;
98+
9499
/**
95100
* TODO Add javadoc
96101
*
@@ -366,9 +371,13 @@ public static void main(String[] args) throws Exception {
366371
setupLogging(level, stdout, stderr, logStream);
367372
}
368373

374+
PropertyResolver defaultOptions = toPropertyResolver(new HashMap<>());
375+
CoreModuleProperties.NO_FLOW_CONTROL.set(defaultOptions, Boolean.TRUE);
376+
CoreModuleProperties.WINDOW_SIZE.set(defaultOptions, 1024L * 1024L * 1024L);
377+
369378
ClientSession session = (logStream == null)
370379
? null
371-
: setupClientSession(SFTP_PORT_OPTION, stdin, level, stdout, stderr, args);
380+
: setupClientSession(SFTP_PORT_OPTION, stdin, level, stdout, stderr, args, defaultOptions);
372381
if (session == null) {
373382
System.err.println("usage: sftp [-v[v][v]] [-E logoutput] [-i identity] [-io nio2|mina|netty]"
374383
+ " [-J proxyJump] [-l login] [" + SFTP_PORT_OPTION + " port] [-o option=value]"

sshd-cli/src/main/java/org/apache/sshd/cli/client/SshClientCliSupport.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,15 @@ public static boolean isArgumentedOption(String portOption, String argName) {
119119
// CHECKSTYLE:OFF
120120
public static ClientSession setupClientSession(
121121
String portOption, BufferedReader stdin, Level level,
122-
PrintStream stdout, PrintStream stderr, String... args)
122+
PrintStream stdout, PrintStream stderr, String[] args)
123+
throws Exception {
124+
return setupClientSession(portOption, stdin, level, stdout, stderr, args, null);
125+
}
126+
127+
public static ClientSession setupClientSession(
128+
String portOption, BufferedReader stdin, Level level,
129+
PrintStream stdout, PrintStream stderr, String[] args,
130+
PropertyResolver defaultOptions)
123131
throws Exception {
124132
int port = -1;
125133
String host = null;
@@ -240,7 +248,8 @@ public static ClientSession setupClientSession(
240248
return null;
241249
}
242250

243-
PropertyResolver resolver = PropertyResolverUtils.toPropertyResolver(options);
251+
PropertyResolver resolver = PropertyResolverUtils.toPropertyResolver(options,
252+
defaultOptions);
244253
SshClient client = setupClient(
245254
resolver, ciphers, macs, compressions, identities,
246255
stdin, stdout, stderr, level, args);

sshd-common/src/main/java/org/apache/sshd/common/kex/extension/parser/NoFlowControl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
public class NoFlowControl extends AbstractKexExtensionParser<String> {
3232
public static final String NAME = "no-flow-control";
3333

34+
public static final String SUPPORTED = "s";
35+
public static final String PREFERRED = "p";
36+
3437
public static final NoFlowControl INSTANCE = new NoFlowControl();
3538

3639
public NoFlowControl() {

sshd-core/src/main/java/org/apache/sshd/common/channel/LocalWindow.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public void consume(long len) throws IOException {
7070
BufferUtils.validateUint32Value(len, "Invalid consumption length: %d");
7171
checkInitialized("consume");
7272

73+
if (noFlowControl) {
74+
// flow control is disabled, so just bail out
75+
return;
76+
}
77+
7378
long remainLen;
7479
synchronized (lock) {
7580
remainLen = getSize() - len;

sshd-core/src/main/java/org/apache/sshd/common/channel/RemoteWindow.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public void consume(long len) {
6767
BufferUtils.validateUint32Value(len, "Invalid consumption length: %d");
6868
checkInitialized("consume");
6969

70+
if (noFlowControl) {
71+
// flow control is disabled, so just bail out
72+
return;
73+
}
74+
7075
long remainLen;
7176
synchronized (lock) {
7277
remainLen = getSize() - len;

sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.function.Predicate;
2626

2727
import org.apache.sshd.common.PropertyResolver;
28+
import org.apache.sshd.common.session.Session;
2829
import org.apache.sshd.common.util.ValidateUtils;
2930
import org.apache.sshd.common.util.buffer.BufferUtils;
3031
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
@@ -46,6 +47,8 @@ public abstract class Window extends AbstractLoggingBean implements ChannelHolde
4647

4748
protected final Object lock = new Object();
4849

50+
protected boolean noFlowControl;
51+
4952
private final AtomicBoolean closed = new AtomicBoolean(false);
5053
private final AtomicBoolean initialized = new AtomicBoolean(false);
5154
private final Channel channelInstance;
@@ -94,6 +97,8 @@ protected void init(long size, long packetSize, PropertyResolver resolver) {
9497
}
9598

9699
synchronized (lock) {
100+
Session session = channelInstance.getSession(); // this should only be null during tests
101+
this.noFlowControl = session != null && session.isNoFlowControl();
97102
this.maxSize = size;
98103
this.packetSize = packetSize;
99104
updateSize(size);

sshd-core/src/main/java/org/apache/sshd/common/kex/extension/DefaultClientKexExtensionHandler.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,23 @@
2626
import java.util.LinkedHashMap;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Optional;
2930
import java.util.Set;
3031
import java.util.TreeSet;
3132
import java.util.function.BiConsumer;
3233

3334
import org.apache.sshd.common.AttributeRepository.AttributeKey;
3435
import org.apache.sshd.common.NamedFactory;
3536
import org.apache.sshd.common.kex.extension.parser.HostBoundPubkeyAuthentication;
37+
import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
3638
import org.apache.sshd.common.kex.extension.parser.ServerSignatureAlgorithms;
3739
import org.apache.sshd.common.session.Session;
40+
import org.apache.sshd.common.session.helpers.AbstractSession;
3841
import org.apache.sshd.common.signature.Signature;
42+
import org.apache.sshd.common.util.ValidateUtils;
3943
import org.apache.sshd.common.util.buffer.Buffer;
4044
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
45+
import org.apache.sshd.core.CoreModuleProperties;
4146

4247
/**
4348
* Detects if the server sends a
@@ -92,6 +97,15 @@ public boolean handleKexExtensionRequest(
9297
} else {
9398
session.setAttribute(HOSTBOUND_AUTHENTICATION, version);
9499
}
100+
} else if (NoFlowControl.NAME.equals(name)) {
101+
String o = NoFlowControl.INSTANCE.parseExtension(data);
102+
Optional<Boolean> nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
103+
if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
104+
|| NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
105+
AbstractSession abstractSession
106+
= ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
107+
abstractSession.activateNoFlowControl();
108+
}
95109
}
96110
return true;
97111
}
@@ -157,13 +171,19 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
157171
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
158172
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
159173
* <p>
160-
* This default implementation does not marshal any extension.
174+
* This default implementation marshals a {@link NoFlowControl} extension}.
161175
* </p>
162176
*
163177
* @param session {@link Session} to send the KEX extension information for
164178
* @param phase {@link KexPhase} of the SSH protocol
165179
* @param marshaller {@link BiConsumer} writing the extensions into an SSH message
166180
*/
167181
public void collectExtensions(Session session, KexPhase phase, BiConsumer<String, Object> marshaller) {
182+
// no-flow-control
183+
Boolean nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session).orElse(null);
184+
if (nfc == null || nfc) {
185+
marshaller.accept(NoFlowControl.NAME,
186+
nfc != null ? NoFlowControl.PREFERRED : NoFlowControl.SUPPORTED);
187+
}
168188
}
169189
}

sshd-core/src/main/java/org/apache/sshd/common/kex/extension/DefaultServerKexExtensionHandler.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,29 @@
1919

2020
package org.apache.sshd.common.kex.extension;
2121

22+
import java.io.IOException;
2223
import java.util.Arrays;
2324
import java.util.Collection;
2425
import java.util.LinkedHashMap;
2526
import java.util.Map;
27+
import java.util.Optional;
2628
import java.util.function.BiConsumer;
2729

2830
import org.apache.sshd.common.AttributeRepository.AttributeKey;
2931
import org.apache.sshd.common.kex.KexProposalOption;
32+
import org.apache.sshd.common.kex.extension.parser.NoFlowControl;
3033
import org.apache.sshd.common.kex.extension.parser.ServerSignatureAlgorithms;
3134
import org.apache.sshd.common.session.Session;
35+
import org.apache.sshd.common.session.helpers.AbstractSession;
3236
import org.apache.sshd.common.util.GenericUtils;
37+
import org.apache.sshd.common.util.ValidateUtils;
3338
import org.apache.sshd.common.util.buffer.Buffer;
3439
import org.apache.sshd.common.util.logging.AbstractLoggingBean;
40+
import org.apache.sshd.core.CoreModuleProperties;
3541

3642
/**
3743
* A basic default implementation of a server-side {@link KexExtensionHandler} handling the
38-
* {@link ServerSignatureAlgorithms} KEX extension.
44+
* {@link ServerSignatureAlgorithms} KEX extension along with the {@link NoFlowControl} extension.
3945
*
4046
* @see <a href="https://tools.ietf.org/html/rfc8308">RFC 8308</a>
4147
*/
@@ -130,6 +136,23 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
130136
}
131137
}
132138

139+
@Override
140+
public boolean handleKexExtensionRequest(
141+
Session session, int index, int count, String name, byte[] data)
142+
throws IOException {
143+
if (NoFlowControl.NAME.equals(name)) {
144+
String o = NoFlowControl.INSTANCE.parseExtension(data);
145+
Optional<Boolean> nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session);
146+
if (NoFlowControl.PREFERRED.equals(o) && nfc.orElse(Boolean.TRUE)
147+
|| NoFlowControl.SUPPORTED.equals(o) && nfc.orElse(Boolean.FALSE)) {
148+
AbstractSession abstractSession
149+
= ValidateUtils.checkInstanceOf(session, AbstractSession.class, "Not a supported session: %s", session);
150+
abstractSession.activateNoFlowControl();
151+
}
152+
}
153+
return true;
154+
}
155+
133156
/**
134157
* Collects extension info records, handing them off to the given {@code marshaller} for writing into an
135158
* {@link KexExtensions#SSH_MSG_EXT_INFO} message.
@@ -144,6 +167,7 @@ public void sendKexExtensions(Session session, KexPhase phase) throws Exception
144167
*/
145168
@SuppressWarnings("javadoc")
146169
public void collectExtensions(Session session, KexPhase phase, BiConsumer<String, Object> marshaller) {
170+
// server-sig-algs
147171
if (phase == KexPhase.NEWKEYS) {
148172
Collection<String> algorithms = session.getSignatureFactoriesNames();
149173
if (!GenericUtils.isEmpty(algorithms)) {
@@ -157,5 +181,11 @@ public void collectExtensions(Session session, KexPhase phase, BiConsumer<String
157181
ServerSignatureAlgorithms.NAME);
158182
}
159183
}
184+
// no-flow-control
185+
Boolean nfc = CoreModuleProperties.NO_FLOW_CONTROL.get(session).orElse(null);
186+
if (nfc != Boolean.FALSE) {
187+
marshaller.accept(NoFlowControl.NAME,
188+
nfc != null ? NoFlowControl.PREFERRED : NoFlowControl.SUPPORTED);
189+
}
160190
}
161191
}

0 commit comments

Comments
 (0)