Skip to content

RFC: Rewrite PipeStream and SshCommand stream handling #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/Renci.SshNet/Common/LinkedListQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
namespace Renci.SshNet.Common
{
using System;
using System.Threading;

/// <summary>
/// Fast concurrent generic linked list queue.
/// </summary>
internal class LinkedListQueue<T> : IDisposable
{
sealed class Entry<E>
{
public E Item;
public Entry<E> Next;
}

private readonly object _lock = new object();

private Entry<T> _first;
private Entry<T> _last;

private bool _isAddingCompleted;

/// <summary>
/// Gets whether this <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/> has been marked as complete for adding and is empty.
/// </summary>
/// <value>Whether this queue has been marked as complete for adding and is empty.</value>
public bool IsCompleted
{
get { return _isAddingCompleted && _first == null && _last == null; }
}

/// <summary>
/// Gets whether this <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/> has been marked as complete for adding.
/// </summary>
/// <value>Whether this queue has been marked as complete for adding.</value>
public bool IsAddingCompleted
{
get { return _isAddingCompleted; }
set
{
lock (_lock)
{
_isAddingCompleted = value;
}
}
}

/// <summary>
/// Adds the item to <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/>.
/// </summary>
/// <param name="item">The item to be added to the queue. The value can be a null reference.</param>
public void Add(T item)
{
lock (_lock)
{
if (_isAddingCompleted)
return;

var entry = new Entry<T>();
entry.Item = item;

if (_last != null)
{
_last.Next = entry;
}

_last = entry;

if (_first == null)
{
_first = entry;
}

Monitor.PulseAll(_lock);
}
}

/// <summary>
/// Marks the <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/> instances as not accepting any more additions.
/// </summary>
public void CompleteAdding()
{
lock (_lock)
{
IsAddingCompleted = true;
Monitor.PulseAll(_lock);
}
}

/// <summary>
/// Tries to remove an item from the <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/>.
/// </summary>
/// <returns><c>true</c>, if an item could be removed; otherwise <c>false</c>.</returns>
/// <param name="item">The item to be removed from the queue.</param>
/// <param name="wait">Wait for data or fail immediately if empty.</param>
public bool TryTake(out T item, bool wait)
{
lock (_lock)
{
if (_first == null && !wait)
{
item = default(T);
return false;
}

while (_first == null && !_isAddingCompleted)
Monitor.Wait(_lock);

if (_first == null && _isAddingCompleted)
{
item = default(T);
return false;
}

item = _first.Item;
_first = _first.Next;
return true;
}
}

/// <summary>
/// Releases all resource used by the <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/> object.
/// </summary>
/// <remarks>Call <see cref="Dispose"/> when you are finished using the
/// <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/>. The <see cref="Dispose"/> method leaves the
/// <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/> in an unusable state. After calling
/// <see cref="Dispose"/>, you must release all references to the
/// <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/> so the garbage collector can reclaim the memory that
/// the <see cref="T:Renci.SshNet.Common.LinkedListQueue`1"/> was occupying.</remarks>
public void Dispose()
{
lock (_lock)
{
_first = null;
_last = null;
_isAddingCompleted = true;
}
}
}
}
47 changes: 47 additions & 0 deletions src/Renci.SshNet/Common/Pipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace Renci.SshNet.Common
{
using System;
using System.IO;

/// <summary>
/// A generic pipe to pass through data.
/// </summary>
internal class Pipe : IDisposable
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I still prefer to eliminate this, and use the PipeOutputStream to initialize the PipeInputStream.
The PipeOutputStream can hold the data (the queue), and pass this on to the PipeInputStream upon creation.
Hold off on modifying this until I've had some more time to think about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I kept it for clarity sake for now. Java's Pipe also works somewhat like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java, the PipeInputStream takes a PipeOutputStream as argument.
There's no Pipe class.
For example:
http://tutorials.jenkov.com/java-io/pipes.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@drieseng drieseng Jan 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we don't leak this publicly, I don't mind. However, I still think the actual belongs in the PipeOutputStream, and this PipeOutputStream should be the only think that is passed to the PipeInputStream ctor.

If - big IF - we do not want to dispose the Pipe by itself (meaning we may want to be able to dispose the writer without disposing the reader), then Pipe becomes an empty box.

Copy link
Contributor Author

@hifi hifi Jan 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a result object is returned at the end of execution instead of the properties? In general it doesn't sound like a good idea to have them exposed until execution has ended with any other mean than through the streams.

On way could be that Execute would be a shortcut that reads both pipes to EOS and returns a result object with both where the BeginExecute and EndExecute way could only expose the streams?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer to remove both properties. However, I want to limit the number of breaking changes. We could start by marking them as obsolete in the next release (after 2016.1.0).

For now, I guess both properties must be able to return the incomplete output (up until that moment in time).

This means we cannot use StreamReader.ReadToEnd() as that would block until the command has finished (the writer has completed).

Guess we'll have to do something like this:

var buffer = new byte[4096];
while (_outputReadStream.DataAvailable)
{
    var bytesRead = _outputReadStream.Read(buffer, 0, buffer.Length);
    _result.Append(_encoding.GetString(buffer, 0, bytesRead));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should never block reading if there's no data available to fill the whole buffer but return with the amount we do have (except block if none) like I've agreed to refactor the reading to include a while loop again.

Though, what's the verdict on the final design of the stream? One thing I did realize is that if you pass the output stream to input stream constructor, they would need to have some shared private API to pass the data which shouldn't be exposed even internally to any other part of SSH.NET. Current implementation avoids that by having a shared data structure - the queue.

This would be all much easier to discuss in a closed room with a whiteboard and beverages.

Copy link
Member

@drieseng drieseng Jan 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I indeed agree that we should only block until at least some data is available or the writer has signaled EOS.

Was offline for a few hours to install outdoor speakers. Why do this in the summer if you can do it while it's raining and freezing? :p

The PipeOutputStream can share the queue with the PipeInputStream.
The details can be discussed as we proceed.
Since we only expose Stream publicly, we can easily modify our internal design.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have plently of other stuff I'd like to discuss with you.
The good news is that we've got loads of great beers in Belgium :p

private readonly LinkedListQueue<byte[]> _queue;

/// <summary>
/// Gets the input stream.
/// </summary>
/// <value>The input stream.</value>
public Stream InputStream { get; private set; }

/// <summary>
/// Gets the output stream.
/// </summary>
/// <value>The output stream.</value>
public Stream OutputStream { get; private set; }

public Pipe()
{
_queue = new LinkedListQueue<byte[]>();
InputStream = new PipeInputStream(_queue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should at least lazily initialize the inputstream.
In that case, we can leave it up to the consumer to dispose it.
In that case, what should PipeInputStream.Read(...) do when the output stream is disposed?
Should it just return 0 signaling EOS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean inintialize in this case? Disposing has been left to "anyone who cares" and multiple disposes aren't an error in this case.

Currently PipeInputStream.Read() will return 0 if the output stream is disposed as it will also finalize writing. Do you think it should be an error condition that the output stream is disposed before input stream has finished reading?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The questions I had were:

  • should we immediately create the PipeInputStream, or postpone this until a client is actually requesting one (eg. postpone creating the instance until a client invokes SshCommand.OutputStream)?
  • when we postpone creating the PipeInputStream until a client requests it (client could be SSH.NET itself), then shouldn't that client be responsible for disposing this stream (eg. the stream returned by SshCommand.OutputStream)?
  • should we have a way to signal write errors to the reader? eg. do we just let PipeInputStream.Read() return 0 when the SSH session has been closed unexpectedly? one of such errors could also be that the writer was disposed before it signaled EOS.

To me, multiple disposes should never be an error. However In this case, that's not what I was trying to say.
See questions above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the pipe makes sense as the Result property and Error property both will read from them when used. Second If you don't create the streams where the incoming data from the server would be buffered if there's output or errors from the executed ssh command?

I think in a normal POSIX setup the streams will just be cut off and the process error code is used to determine if they were complete or not. Correct me if not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create the PipeOutputStream, and of course start writing to it.
I don't see why we should immediately create the PipeInputStream (when we don't know yet if anyone will be reading from it), unless a client requests it (not that this client may be us).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you like the non-NIO Java model better, I can refactor this into that and make input stream creation on-demand.

OutputStream = new PipeOutputStream(_queue);
}

/// <summary>
/// Releases all resource used by the <see cref="T:Renci.SshNet.Common.Pipe"/> object.
/// </summary>
/// <remarks>Call <see cref="Dispose"/> when you are finished using the <see cref="T:Renci.SshNet.Common.Pipe"/>. The
/// <see cref="Dispose"/> method leaves the <see cref="T:Renci.SshNet.Common.Pipe"/> in an unusable state. After
/// calling <see cref="Dispose"/>, you must release all references to the
/// <see cref="T:Renci.SshNet.Common.Pipe"/> so the garbage collector can reclaim the memory that the
/// <see cref="T:Renci.SshNet.Common.Pipe"/> was occupying.</remarks>
public void Dispose()
{
OutputStream.Dispose();
InputStream.Dispose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not want to implement IDisposable on Pipe. See above.

_queue.Dispose();
}
}
}
122 changes: 122 additions & 0 deletions src/Renci.SshNet/Common/PipeInputStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
namespace Renci.SshNet.Common
{
using System;
using System.IO;

internal class PipeInputStream : Stream
{
private LinkedListQueue<byte[]> _queue;
private byte[] _current;
private int _currentPosition;
private bool _isDisposed;

public PipeInputStream(LinkedListQueue<byte[]> queue)
{
_queue = queue;
}

public override void Flush()
{
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would need to be re-implemented.

}

public override void SetLength(long value)
{
throw new NotSupportedException();
}

public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset + count > buffer.Length)
throw new ArgumentException("The sum of offset and count is greater than the buffer length.");
if (offset < 0 || count < 0)
throw new ArgumentOutOfRangeException("offset", "offset or count is negative.");
if (_isDisposed)
throw CreateObjectDisposedException();

var bytesRead = 0;

while (bytesRead < count)
{
if (_current == null || _currentPosition == _current.Length)
{
if (!_queue.TryTake(out _current, (bytesRead == 0)))
{
_current = null;
return bytesRead;
}

_currentPosition = 0;
}

var toRead = _current.Length - _currentPosition;
if (toRead > count - bytesRead)
toRead = count - bytesRead;

Buffer.BlockCopy(_current, _currentPosition, buffer, offset + bytesRead, toRead);

_currentPosition += toRead;
bytesRead += toRead;
}

return bytesRead;
}

public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}

public override bool CanRead
{
get { return !_isDisposed; }
}

public override bool CanSeek
{
get { return false; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to return true if not disposed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I didn't notice it's in the Stream spec to return false if closed/disposed.

}

public override bool CanWrite
{
get { return false; }
}

public override long Length
{
get
{
throw new NotSupportedException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would need to be re-implemented.

}
}

public override long Position
{
get
{
throw new NotSupportedException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would need to be re-implemented.

}
set
{
throw new NotSupportedException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would need to be re-implemented.

}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_isDisposed = true;
}

private ObjectDisposedException CreateObjectDisposedException()
{
return new ObjectDisposedException(GetType().FullName);
}
}
}
Loading