From 94300e0c16c711a91f153c92491c5cfccc014359 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Fri, 6 Jan 2017 19:44:53 +0200 Subject: [PATCH 1/3] New pipe implementation for SshCommand --- src/Renci.SshNet/Common/LinkedListQueue.cs | 134 ++++++++++++++++++++ src/Renci.SshNet/Common/Pipe.cs | 47 +++++++ src/Renci.SshNet/Common/PipeInputStream.cs | 112 ++++++++++++++++ src/Renci.SshNet/Common/PipeOutputStream.cs | 111 ++++++++++++++++ src/Renci.SshNet/Renci.SshNet.csproj | 4 + src/Renci.SshNet/SshCommand.cs | 72 +++++------ 6 files changed, 443 insertions(+), 37 deletions(-) create mode 100644 src/Renci.SshNet/Common/LinkedListQueue.cs create mode 100644 src/Renci.SshNet/Common/Pipe.cs create mode 100644 src/Renci.SshNet/Common/PipeInputStream.cs create mode 100644 src/Renci.SshNet/Common/PipeOutputStream.cs diff --git a/src/Renci.SshNet/Common/LinkedListQueue.cs b/src/Renci.SshNet/Common/LinkedListQueue.cs new file mode 100644 index 000000000..d8f164a71 --- /dev/null +++ b/src/Renci.SshNet/Common/LinkedListQueue.cs @@ -0,0 +1,134 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.Threading; + + /// + /// Fast concurrent generic linked list queue. + /// + public class LinkedListQueue : IDisposable + { + sealed class Entry + { + public E Item; + public Entry Next; + } + + private readonly object _lock = new object(); + + private Entry _first; + private Entry _last; + + private bool _isAddingCompleted; + + /// + /// Gets whether this has been marked as complete for adding and is empty. + /// + /// Whether this queue has been marked as complete for adding and is empty. + public bool IsCompleted + { + get { return _isAddingCompleted && _first == null && _last == null; } + } + + /// + /// Gets whether this has been marked as complete for adding. + /// + /// Whether this queue has been marked as complete for adding. + public bool IsAddingCompleted + { + get { return _isAddingCompleted; } + set + { + lock (_lock) + { + _isAddingCompleted = value; + } + } + } + + /// + /// Adds the item to . + /// + /// The item to be added to the queue. The value can be a null reference. + public void Add(T item) + { + lock (_lock) + { + if (_isAddingCompleted) + return; + + var entry = new Entry(); + entry.Item = item; + + if (_last != null) + { + _last.Next = entry; + } + + _last = entry; + + if (_first == null) + { + _first = entry; + } + + Monitor.PulseAll(_lock); + } + } + + /// + /// Marks the instances as not accepting any more additions. + /// + public void CompleteAdding() + { + lock (_lock) + { + IsAddingCompleted = true; + Monitor.PulseAll(_lock); + } + } + + /// + /// Tries to remove an item from the . + /// + /// true, if an item could be removed; otherwise false. + /// The item to be removed from the queue. + public bool TryTake(out T item) + { + lock (_lock) + { + while (_first == null && !_isAddingCompleted) + Monitor.Wait(_lock); + + if (_first == null && _isAddingCompleted) + { + item = default(T); + return false; + } + + item = _first.Item; + _first = _first.Next; + return true; + } + } + + /// + /// Releases all resource used by the object. + /// + /// Call when you are finished using the + /// . The method leaves the + /// in an unusable state. After calling + /// , you must release all references to the + /// so the garbage collector can reclaim the memory that + /// the was occupying. + public void Dispose() + { + lock (_lock) + { + _first = null; + _last = null; + _isAddingCompleted = true; + } + } + } +} diff --git a/src/Renci.SshNet/Common/Pipe.cs b/src/Renci.SshNet/Common/Pipe.cs new file mode 100644 index 000000000..2ef08d49b --- /dev/null +++ b/src/Renci.SshNet/Common/Pipe.cs @@ -0,0 +1,47 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.IO; + + /// + /// A generic pipe to pass through data. + /// + internal class Pipe : IDisposable + { + private readonly LinkedListQueue _queue; + + /// + /// Gets the input stream. + /// + /// The input stream. + public Stream InputStream { get; private set; } + + /// + /// Gets the output stream. + /// + /// The output stream. + public Stream OutputStream { get; private set; } + + public Pipe() + { + _queue = new LinkedListQueue(); + InputStream = new PipeInputStream(_queue); + OutputStream = new PipeOutputStream(_queue); + } + + /// + /// Releases all resource used by the object. + /// + /// Call when you are finished using the . The + /// method leaves the in an unusable state. After + /// calling , you must release all references to the + /// so the garbage collector can reclaim the memory that the + /// was occupying. + public void Dispose() + { + OutputStream.Dispose(); + InputStream.Dispose(); + _queue.Dispose(); + } + } +} diff --git a/src/Renci.SshNet/Common/PipeInputStream.cs b/src/Renci.SshNet/Common/PipeInputStream.cs new file mode 100644 index 000000000..d18192655 --- /dev/null +++ b/src/Renci.SshNet/Common/PipeInputStream.cs @@ -0,0 +1,112 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.IO; + + internal class PipeInputStream : Stream + { + private LinkedListQueue _queue; + private byte[] _current; + private int _currentPosition; + private bool _isDisposed; + + public PipeInputStream(LinkedListQueue queue) + { + _queue = queue; + } + + public override void Flush() + { + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset + count > buffer.Length) + throw new ArgumentException("The sum of offset and count is greater than the buffer length."); + if (offset < 0 || count < 0) + throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); + if (_isDisposed) + throw CreateObjectDisposedException(); + + if (_current == null || _currentPosition == _current.Length) + { + if (_queue.IsCompleted || !_queue.TryTake(out _current)) + return 0; + + _currentPosition = 0; + } + + var avail = _current.Length - _currentPosition; + if (count > avail) + count = avail; + + Buffer.BlockCopy(_current, _currentPosition, buffer, offset, count); + + _currentPosition += count; + return count; + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override bool CanRead + { + get { return true; } + } + + public override bool CanSeek + { + get { return false; } + } + + public override bool CanWrite + { + get { return false; } + } + + public override long Length + { + get + { + throw new NotSupportedException(); + } + } + + public override long Position + { + get + { + throw new NotSupportedException(); + } + set + { + throw new NotSupportedException(); + } + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + _isDisposed = true; + } + + private ObjectDisposedException CreateObjectDisposedException() + { + return new ObjectDisposedException(GetType().FullName); + } + } +} diff --git a/src/Renci.SshNet/Common/PipeOutputStream.cs b/src/Renci.SshNet/Common/PipeOutputStream.cs new file mode 100644 index 000000000..8cfd1d98c --- /dev/null +++ b/src/Renci.SshNet/Common/PipeOutputStream.cs @@ -0,0 +1,111 @@ +namespace Renci.SshNet.Common +{ + using System; + using System.IO; + + internal class PipeOutputStream : Stream + { + private LinkedListQueue _queue; + private bool _isDisposed; + + public PipeOutputStream(LinkedListQueue queue) + { + _queue = queue; + } + + public override void Flush() + { + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset + count > buffer.Length) + throw new ArgumentException("The sum of offset and count is greater than the buffer length."); + if (offset < 0 || count < 0) + throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); + if (_isDisposed) + throw CreateObjectDisposedException(); + if (_queue.IsAddingCompleted) + return; + + byte[] tmp = new byte[count]; + Buffer.BlockCopy(buffer, offset, tmp, 0, count); + _queue.Add(tmp); + } + + public override bool CanRead + { + get { return false; } + } + + public override bool CanSeek + { + get { return false; } + } + + public override bool CanWrite + { + get { return true; } + } + + public override long Length + { + get + { + throw new NotSupportedException(); + } + } + + public override long Position + { + get + { + throw new NotSupportedException(); + } + set + { + throw new NotSupportedException(); + } + } + + public override void Close() + { + if (!_queue.IsAddingCompleted) + _queue.CompleteAdding(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (!_isDisposed) + { + if (!_queue.IsAddingCompleted) + _queue.CompleteAdding(); + _isDisposed = true; + } + } + + private ObjectDisposedException CreateObjectDisposedException() + { + return new ObjectDisposedException(GetType().FullName); + } + } +} diff --git a/src/Renci.SshNet/Renci.SshNet.csproj b/src/Renci.SshNet/Renci.SshNet.csproj index fd278e17b..e5fe85017 100644 --- a/src/Renci.SshNet/Renci.SshNet.csproj +++ b/src/Renci.SshNet/Renci.SshNet.csproj @@ -441,6 +441,10 @@ + + + + diff --git a/src/Renci.SshNet/SshCommand.cs b/src/Renci.SshNet/SshCommand.cs index 37e91da08..4a2a9bb27 100644 --- a/src/Renci.SshNet/SshCommand.cs +++ b/src/Renci.SshNet/SshCommand.cs @@ -25,6 +25,8 @@ public class SshCommand : IDisposable private Exception _exception; private bool _hasError; private readonly object _endExecuteLock = new object(); + private Pipe _stdoutPipe; + private Pipe _stderrPipe; /// /// Gets the command text. @@ -56,7 +58,10 @@ public class SshCommand : IDisposable /// /// /// - public Stream OutputStream { get; private set; } + public Stream OutputStream + { + get { return _stdoutPipe.InputStream; } + } /// /// Gets the extended output stream. @@ -64,7 +69,10 @@ public class SshCommand : IDisposable /// /// /// - public Stream ExtendedOutputStream { get; private set; } + public Stream ExtendedOutputStream + { + get { return _stderrPipe.InputStream; } + } private StringBuilder _result; /// @@ -82,10 +90,10 @@ public string Result _result = new StringBuilder(); } - if (OutputStream != null && OutputStream.Length > 0) + if (_stdoutPipe != null) { // do not dispose the StreamReader, as it would also dispose the stream - var sr = new StreamReader(OutputStream, _encoding); + var sr = new StreamReader(_stdoutPipe.InputStream, _encoding); _result.Append(sr.ReadToEnd()); } @@ -111,10 +119,10 @@ public string Error _error = new StringBuilder(); } - if (ExtendedOutputStream != null && ExtendedOutputStream.Length > 0) + if (_stderrPipe != null) { // do not dispose the StreamReader, as it would also dispose the stream - var sr = new StreamReader(ExtendedOutputStream, _encoding); + var sr = new StreamReader(_stderrPipe.InputStream, _encoding); _error.Append(sr.ReadToEnd()); } @@ -230,23 +238,19 @@ public IAsyncResult BeginExecute(AsyncCallback callback, object state) if (string.IsNullOrEmpty(CommandText)) throw new ArgumentException("CommandText property is empty."); - var outputStream = OutputStream; - if (outputStream != null) + if (_stdoutPipe != null) { - outputStream.Dispose(); - OutputStream = null; + _stdoutPipe.Dispose(); } - var extendedOutputStream = ExtendedOutputStream; - if (extendedOutputStream != null) + if (_stderrPipe != null) { - extendedOutputStream.Dispose(); - ExtendedOutputStream = null; + _stderrPipe.Dispose(); } - // Initialize output streams - OutputStream = new PipeStream(); - ExtendedOutputStream = new PipeStream(); + // Initialize pipes + _stdoutPipe = new Pipe(); + _stderrPipe = new Pipe(); _result = null; _error = null; @@ -395,16 +399,14 @@ private void Session_ErrorOccured(object sender, ExceptionEventArgs e) private void Channel_Closed(object sender, ChannelEventArgs e) { - var outputStream = OutputStream; - if (outputStream != null) + if (_stdoutPipe != null) { - outputStream.Flush(); + _stdoutPipe.OutputStream.Close(); } - var extendedOutputStream = ExtendedOutputStream; - if (extendedOutputStream != null) + if (_stderrPipe != null) { - extendedOutputStream.Flush(); + _stderrPipe.OutputStream.Close(); } _asyncResult.IsCompleted = true; @@ -442,10 +444,9 @@ private void Channel_RequestReceived(object sender, ChannelRequestEventArgs e) private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEventArgs e) { - if (ExtendedOutputStream != null) + if (_stderrPipe != null) { - ExtendedOutputStream.Write(e.Data, 0, e.Data.Length); - ExtendedOutputStream.Flush(); + _stderrPipe.OutputStream.Write(e.Data, 0, e.Data.Length); } if (e.DataTypeCode == 1) @@ -456,10 +457,9 @@ private void Channel_ExtendedDataReceived(object sender, ChannelExtendedDataEven private void Channel_DataReceived(object sender, ChannelDataEventArgs e) { - if (OutputStream != null) + if (_stdoutPipe != null) { - OutputStream.Write(e.Data, 0, e.Data.Length); - OutputStream.Flush(); + _stdoutPipe.OutputStream.Write(e.Data, 0, e.Data.Length); } if (_asyncResult != null) @@ -557,18 +557,16 @@ protected virtual void Dispose(bool disposing) _channel = null; } - var outputStream = OutputStream; - if (outputStream != null) + if (_stdoutPipe != null) { - outputStream.Dispose(); - OutputStream = null; + _stdoutPipe.Dispose(); + _stdoutPipe = null; } - var extendedOutputStream = ExtendedOutputStream; - if (extendedOutputStream != null) + if (_stderrPipe != null) { - extendedOutputStream.Dispose(); - ExtendedOutputStream = null; + _stderrPipe.Dispose(); + _stderrPipe = null; } var sessionErrorOccuredWaitHandle = _sessionErrorOccuredWaitHandle; From 54fb6fb7cc7b67fb6aa16f63743f7a2eea2f42b6 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Sat, 7 Jan 2017 06:53:03 +0200 Subject: [PATCH 2/3] Small fixes --- src/Renci.SshNet/Common/LinkedListQueue.cs | 2 +- src/Renci.SshNet/Common/PipeInputStream.cs | 2 +- src/Renci.SshNet/Common/PipeOutputStream.cs | 6 ++---- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Renci.SshNet/Common/LinkedListQueue.cs b/src/Renci.SshNet/Common/LinkedListQueue.cs index d8f164a71..b07f64846 100644 --- a/src/Renci.SshNet/Common/LinkedListQueue.cs +++ b/src/Renci.SshNet/Common/LinkedListQueue.cs @@ -6,7 +6,7 @@ /// /// Fast concurrent generic linked list queue. /// - public class LinkedListQueue : IDisposable + internal class LinkedListQueue : IDisposable { sealed class Entry { diff --git a/src/Renci.SshNet/Common/PipeInputStream.cs b/src/Renci.SshNet/Common/PipeInputStream.cs index d18192655..bc37d52fb 100644 --- a/src/Renci.SshNet/Common/PipeInputStream.cs +++ b/src/Renci.SshNet/Common/PipeInputStream.cs @@ -65,7 +65,7 @@ public override void Write(byte[] buffer, int offset, int count) public override bool CanRead { - get { return true; } + get { return !_isDisposed; } } public override bool CanSeek diff --git a/src/Renci.SshNet/Common/PipeOutputStream.cs b/src/Renci.SshNet/Common/PipeOutputStream.cs index 8cfd1d98c..42905608f 100644 --- a/src/Renci.SshNet/Common/PipeOutputStream.cs +++ b/src/Renci.SshNet/Common/PipeOutputStream.cs @@ -40,10 +40,8 @@ public override void Write(byte[] buffer, int offset, int count) throw new ArgumentException("The sum of offset and count is greater than the buffer length."); if (offset < 0 || count < 0) throw new ArgumentOutOfRangeException("offset", "offset or count is negative."); - if (_isDisposed) + if (_isDisposed || _queue.IsAddingCompleted) throw CreateObjectDisposedException(); - if (_queue.IsAddingCompleted) - return; byte[] tmp = new byte[count]; Buffer.BlockCopy(buffer, offset, tmp, 0, count); @@ -62,7 +60,7 @@ public override bool CanSeek public override bool CanWrite { - get { return true; } + get { return !_isDisposed; } } public override long Length From ba73c34f2e11eb7a658065e44085492071cd42ec Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Sat, 7 Jan 2017 16:09:16 +0200 Subject: [PATCH 3/3] PipeInputStream will read as much as it can without blocking --- src/Renci.SshNet/Common/LinkedListQueue.cs | 9 +++++- src/Renci.SshNet/Common/PipeInputStream.cs | 32 ++++++++++++++-------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/Renci.SshNet/Common/LinkedListQueue.cs b/src/Renci.SshNet/Common/LinkedListQueue.cs index b07f64846..57dd99d4b 100644 --- a/src/Renci.SshNet/Common/LinkedListQueue.cs +++ b/src/Renci.SshNet/Common/LinkedListQueue.cs @@ -93,10 +93,17 @@ public void CompleteAdding() /// /// true, if an item could be removed; otherwise false. /// The item to be removed from the queue. - public bool TryTake(out T item) + /// Wait for data or fail immediately if empty. + public bool TryTake(out T item, bool wait) { lock (_lock) { + if (_first == null && !wait) + { + item = default(T); + return false; + } + while (_first == null && !_isAddingCompleted) Monitor.Wait(_lock); diff --git a/src/Renci.SshNet/Common/PipeInputStream.cs b/src/Renci.SshNet/Common/PipeInputStream.cs index bc37d52fb..d12fe9aa6 100644 --- a/src/Renci.SshNet/Common/PipeInputStream.cs +++ b/src/Renci.SshNet/Common/PipeInputStream.cs @@ -40,22 +40,32 @@ public override int Read(byte[] buffer, int offset, int count) if (_isDisposed) throw CreateObjectDisposedException(); - if (_current == null || _currentPosition == _current.Length) + var bytesRead = 0; + + while (bytesRead < count) { - if (_queue.IsCompleted || !_queue.TryTake(out _current)) - return 0; + if (_current == null || _currentPosition == _current.Length) + { + if (!_queue.TryTake(out _current, (bytesRead == 0))) + { + _current = null; + return bytesRead; + } - _currentPosition = 0; - } + _currentPosition = 0; + } - var avail = _current.Length - _currentPosition; - if (count > avail) - count = avail; + var toRead = _current.Length - _currentPosition; + if (toRead > count - bytesRead) + toRead = count - bytesRead; - Buffer.BlockCopy(_current, _currentPosition, buffer, offset, count); + Buffer.BlockCopy(_current, _currentPosition, buffer, offset + bytesRead, toRead); + + _currentPosition += toRead; + bytesRead += toRead; + } - _currentPosition += count; - return count; + return bytesRead; } public override void Write(byte[] buffer, int offset, int count)