From 0637c9988b4d8596a22df329decd39791acf5856 Mon Sep 17 00:00:00 2001 From: Igor Milavec Date: Tue, 22 Feb 2022 23:03:06 +0100 Subject: [PATCH 1/3] Fix ShellStream.Read returning 0 prematurely --- src/Renci.SshNet/ShellStream.cs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/Renci.SshNet/ShellStream.cs b/src/Renci.SshNet/ShellStream.cs index 3274fe19c..b7eefb0e9 100644 --- a/src/Renci.SshNet/ShellStream.cs +++ b/src/Renci.SshNet/ShellStream.cs @@ -215,17 +215,31 @@ public override long Position /// Methods were called after the stream was closed. public override int Read(byte[] buffer, int offset, int count) { - var i = 0; + if (_isDisposed) + { + throw new ObjectDisposedException("ShellStream"); + } - lock (_incoming) + var i = 0; + while (true) { - for (; i < count && _incoming.Count > 0; i++) + lock (_incoming) { - buffer[offset + i] = _incoming.Dequeue(); + for (; i < count && _incoming.Count > 0; i++) + { + buffer[offset + i] = _incoming.Dequeue(); + } } - } - return i; + if (i != 0) + return i; + + _dataReceived.WaitOne(); + if (_incoming.Count == 0) + { + return 0; // The session was closed + } + } } /// @@ -766,6 +780,10 @@ private void Session_ErrorOccured(object sender, ExceptionEventArgs e) private void Session_Disconnected(object sender, EventArgs e) { + // Release thread blocked in a ShellStream.Read() call + if (_dataReceived != null) + _dataReceived.Set(); + if (_channel != null) _channel.Dispose(); } From b4ffc604fbbda899e26ef7b2151a34d7983e63cc Mon Sep 17 00:00:00 2001 From: Igor Milavec Date: Wed, 23 Feb 2022 21:30:09 +0100 Subject: [PATCH 2/3] Fix ShellStream session close and Dispose() handling --- src/Renci.SshNet/ShellStream.cs | 321 +++++++++++++++----------------- 1 file changed, 152 insertions(+), 169 deletions(-) diff --git a/src/Renci.SshNet/ShellStream.cs b/src/Renci.SshNet/ShellStream.cs index b7eefb0e9..12e293ea7 100644 --- a/src/Renci.SshNet/ShellStream.cs +++ b/src/Renci.SshNet/ShellStream.cs @@ -16,6 +16,7 @@ namespace Renci.SshNet public class ShellStream : Stream { private const string CrLf = "\r\n"; + private static readonly TimeSpan InfiniteTimeSpan = TimeSpan.FromMilliseconds(-1); // ToDo: Replace with Timeout.InfiniteTimeSpan when we drop old targets private readonly ISession _session; private readonly Encoding _encoding; @@ -25,6 +26,7 @@ public class ShellStream : Stream private IChannelSession _channel; private AutoResetEvent _dataReceived = new AutoResetEvent(false); private bool _isDisposed; + private bool _isClosed; /// /// Occurs when data was received. @@ -220,8 +222,8 @@ public override int Read(byte[] buffer, int offset, int count) throw new ObjectDisposedException("ShellStream"); } - var i = 0; - while (true) + int i = 0; + do { lock (_incoming) { @@ -229,17 +231,14 @@ public override int Read(byte[] buffer, int offset, int count) { buffer[offset + i] = _incoming.Dequeue(); } + if (i > 0) + { + return i; + } } + } while (WaitForDataReceived(InfiniteTimeSpan)); - if (i != 0) - return i; - - _dataReceived.WaitOne(); - if (_incoming.Count == 0) - { - return 0; // The session was closed - } - } + return i; } /// @@ -284,6 +283,11 @@ public override void SetLength(long value) /// Methods were called after the stream was closed. public override void Write(byte[] buffer, int offset, int count) { + if (_isDisposed) + { + throw new ObjectDisposedException("ShellStream"); + } + foreach (var b in buffer.Take(offset, count)) { if (_outgoing.Count == _bufferSize) @@ -303,7 +307,7 @@ public override void Write(byte[] buffer, int offset, int count) /// The expected expressions and actions to perform. public void Expect(params ExpectAction[] expectActions) { - Expect(TimeSpan.Zero, expectActions); + Expect(InfiniteTimeSpan, expectActions); } /// @@ -313,8 +317,10 @@ public void Expect(params ExpectAction[] expectActions) /// The expected expressions and actions to perform, if the specified time elapsed and expected condition have not met, that method will exit without executing any action. public void Expect(TimeSpan timeout, params ExpectAction[] expectActions) { - var expectedFound = false; - var text = string.Empty; + if (_isDisposed) + { + throw new ObjectDisposedException("ShellStream"); + } do { @@ -322,48 +328,30 @@ public void Expect(TimeSpan timeout, params ExpectAction[] expectActions) { if (_incoming.Count > 0) { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); - } - - if (text.Length > 0) - { - foreach (var expectAction in expectActions) + var text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); + if (text.Length > 0) { - var match = expectAction.Expect.Match(text); - - if (match.Success) + foreach (var expectAction in expectActions) { - var result = text.Substring(0, match.Index + match.Length); - - for (var i = 0; i < match.Index + match.Length && _incoming.Count > 0; i++) + var match = expectAction.Expect.Match(text); + if (match.Success) { - // Remove processed items from the queue - _incoming.Dequeue(); - } + var result = text.Substring(0, match.Index + match.Length); - expectAction.Action(result); - expectedFound = true; - } - } - } - } + for (var i = 0; i < match.Index + match.Length && _incoming.Count > 0; i++) + { + // Remove processed items from the queue + _incoming.Dequeue(); + } - if (!expectedFound) - { - if (timeout.Ticks > 0) - { - if (!_dataReceived.WaitOne(timeout)) - { - return; + expectAction.Action(result); + return; + } + } } } - else - { - _dataReceived.WaitOne(); - } } - } - while (!expectedFound); + } while (WaitForDataReceived(timeout)); } /// @@ -375,7 +363,7 @@ public void Expect(TimeSpan timeout, params ExpectAction[] expectActions) /// public IAsyncResult BeginExpect(params ExpectAction[] expectActions) { - return BeginExpect(TimeSpan.Zero, null, null, expectActions); + return BeginExpect(InfiniteTimeSpan, null, null, expectActions); } /// @@ -388,7 +376,7 @@ public IAsyncResult BeginExpect(params ExpectAction[] expectActions) /// public IAsyncResult BeginExpect(AsyncCallback callback, params ExpectAction[] expectActions) { - return BeginExpect(TimeSpan.Zero, callback, null, expectActions); + return BeginExpect(InfiniteTimeSpan, callback, null, expectActions); } /// @@ -402,7 +390,7 @@ public IAsyncResult BeginExpect(AsyncCallback callback, params ExpectAction[] ex /// public IAsyncResult BeginExpect(AsyncCallback callback, object state, params ExpectAction[] expectActions) { - return BeginExpect(TimeSpan.Zero, callback, state, expectActions); + return BeginExpect(InfiniteTimeSpan, callback, state, expectActions); } /// @@ -417,7 +405,10 @@ public IAsyncResult BeginExpect(AsyncCallback callback, object state, params Exp /// public IAsyncResult BeginExpect(TimeSpan timeout, AsyncCallback callback, object state, params ExpectAction[] expectActions) { - var text = string.Empty; + if (_isDisposed) + { + throw new ObjectDisposedException("ShellStream"); + } // Create new AsyncResult object var asyncResult = new ExpectAsyncResult(callback, state); @@ -425,73 +416,47 @@ public IAsyncResult BeginExpect(TimeSpan timeout, AsyncCallback callback, object // Execute callback on different thread ThreadAbstraction.ExecuteThread(() => { - string expectActionResult = null; try { + string expectActionResult = null; do { lock (_incoming) { - if (_incoming.Count > 0) { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); - } - - if (text.Length > 0) - { - foreach (var expectAction in expectActions) + var text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); + if (text.Length > 0) { - var match = expectAction.Expect.Match(text); - - if (match.Success) + foreach (var expectAction in expectActions) { - var result = text.Substring(0, match.Index + match.Length); - - for (var i = 0; i < match.Index + match.Length && _incoming.Count > 0; i++) + var match = expectAction.Expect.Match(text); + if (match.Success) { - // Remove processed items from the queue - _incoming.Dequeue(); - } + var result = text.Substring(0, match.Index + match.Length); - expectAction.Action(result); + for (var i = 0; i < match.Index + match.Length && _incoming.Count > 0; i++) + { + // Remove processed items from the queue + _incoming.Dequeue(); + } - if (callback != null) - { - callback(asyncResult); + expectAction.Action(result); + expectActionResult = result; + break; } - expectActionResult = result; } } } } + } while (WaitForDataReceived(timeout)); - if (expectActionResult != null) - break; - - if (timeout.Ticks > 0) - { - if (!_dataReceived.WaitOne(timeout)) - { - if (callback != null) - { - callback(asyncResult); - } - break; - } - } - else - { - _dataReceived.WaitOne(); - } - } while (true); - - asyncResult.SetAsCompleted(expectActionResult, true); + asyncResult.SetAsCompleted(expectActionResult, false); } catch (Exception exp) { - asyncResult.SetAsCompleted(exp, true); + asyncResult.SetAsCompleted(exp, false); } }); @@ -548,7 +513,7 @@ public string Expect(string text, TimeSpan timeout) /// public string Expect(Regex regex) { - return Expect(regex, TimeSpan.Zero); + return Expect(regex, InfiniteTimeSpan); } /// @@ -562,7 +527,10 @@ public string Expect(Regex regex) /// public string Expect(Regex regex, TimeSpan timeout) { - var text = string.Empty; + if (_isDisposed) + { + throw new ObjectDisposedException("ShellStream"); + } while (true) { @@ -570,37 +538,28 @@ public string Expect(Regex regex, TimeSpan timeout) { if (_incoming.Count > 0) { - text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); - } - - var match = regex.Match(text); + var text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); - if (match.Success) - { - // Remove processed items from the queue - for (var i = 0; i < match.Index + match.Length && _incoming.Count > 0; i++) + var match = regex.Match(text); + if (match.Success) { - _incoming.Dequeue(); + // ToDo: We should use _encoding to count the number of bytes to remove + // Remove processed items from the queue + for (var i = 0; i < match.Index + match.Length && _incoming.Count > 0; i++) + { + _incoming.Dequeue(); + } + + return text; // ToDo: Fix for issue #833, we should return match.Value, breaking change again } - break; } } - if (timeout.Ticks > 0) + if (!WaitForDataReceived(timeout)) { - if (!_dataReceived.WaitOne(timeout)) - { - return null; - } - } - else - { - _dataReceived.WaitOne(); + return null; // The session was closed } - } - - return text; } /// @@ -611,7 +570,7 @@ public string Expect(Regex regex, TimeSpan timeout) /// public string ReadLine() { - return ReadLine(TimeSpan.Zero); + return ReadLine(InfiniteTimeSpan); } /// @@ -623,47 +582,37 @@ public string ReadLine() /// public string ReadLine(TimeSpan timeout) { - var text = string.Empty; + if (_isDisposed) + { + throw new ObjectDisposedException("ShellStream"); + } - while (true) + string text = null; + do { lock (_incoming) { if (_incoming.Count > 0) { text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); - } - - var index = text.IndexOf(CrLf, StringComparison.Ordinal); - if (index >= 0) - { - text = text.Substring(0, index); + var index = text.IndexOf(CrLf, StringComparison.Ordinal); + if (index >= 0) + { + text = text.Substring(0, index); - // determine how many bytes to remove from buffer - var bytesProcessed = _encoding.GetByteCount(text + CrLf); + // determine how many bytes to remove from buffer + var bytesProcessed = _encoding.GetByteCount(text + CrLf); - // remove processed bytes from the queue - for (var i = 0; i < bytesProcessed; i++) - _incoming.Dequeue(); + // remove processed bytes from the queue + for (var i = 0; i < bytesProcessed; i++) + _incoming.Dequeue(); - break; + return text; + } } } - - if (timeout.Ticks > 0) - { - if (!_dataReceived.WaitOne(timeout)) - { - return null; - } - } - else - { - _dataReceived.WaitOne(); - } - - } + } while (WaitForDataReceived(timeout)); return text; } @@ -676,6 +625,11 @@ public string ReadLine(TimeSpan timeout) /// public string Read() { + if (_isDisposed) + { + throw new ObjectDisposedException("ShellStream"); + } + string text; lock (_incoming) @@ -696,14 +650,14 @@ public string Read() /// public void Write(string text) { - if (text == null) - return; - - if (_channel == null) + if (_isDisposed) { throw new ObjectDisposedException("ShellStream"); } + if (text == null) + return; + var data = _encoding.GetBytes(text); _channel.SendData(data); } @@ -726,13 +680,21 @@ public void WriteLine(string line) /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected override void Dispose(bool disposing) { - base.Dispose(disposing); - if (_isDisposed) return; + base.Dispose(disposing); + if (disposing) { + _isDisposed = true; + + if (_dataReceived != null) + { + _dataReceived.Dispose(); + _dataReceived = null; + } + UnsubscribeFromSessionEvents(_session); if (_channel != null) @@ -742,19 +704,29 @@ protected override void Dispose(bool disposing) _channel.Dispose(); _channel = null; } + } + } - if (_dataReceived != null) + private bool WaitForDataReceived(TimeSpan timeout) + { + while (!_isClosed && _incoming.Count == 0) + { + if (timeout.Ticks > 0) { - _dataReceived.Dispose(); - _dataReceived = null; + if (!_dataReceived.WaitOne(timeout)) + { + // ToDo: We should throw TimeoutException here, but it is a breaking change + // ToDo: Not to mention that timeouts are "additive" for each byte received in ShellStream, which is again a (soft) breaking change + return false; + } + } + else + { + _dataReceived.WaitOne(); } - - _isDisposed = true; - } - else - { - UnsubscribeFromSessionEvents(_session); } + + return _incoming.Count > 0; } /// @@ -775,14 +747,14 @@ private void UnsubscribeFromSessionEvents(ISession session) private void Session_ErrorOccured(object sender, ExceptionEventArgs e) { + Stream_Closed(); + OnRaiseError(e); } private void Session_Disconnected(object sender, EventArgs e) { - // Release thread blocked in a ShellStream.Read() call - if (_dataReceived != null) - _dataReceived.Set(); + Stream_Closed(); if (_channel != null) _channel.Dispose(); @@ -790,12 +762,23 @@ private void Session_Disconnected(object sender, EventArgs e) private void Channel_Closed(object sender, ChannelEventArgs e) { - // TODO: Do we need to call dispose here ?? - Dispose(); + Stream_Closed(); + } + + private void Stream_Closed() + { + _isClosed = true; + + // Release thread blocked in a WaitForDataReceived() call + if (_dataReceived != null) + _dataReceived.Set(); } private void Channel_DataReceived(object sender, ChannelDataEventArgs e) { + if (_isDisposed) + return; + lock (_incoming) { foreach (var b in e.Data) From 4644de75c97af08f56ce67bea62d9f5f8a8c541b Mon Sep 17 00:00:00 2001 From: Igor Milavec Date: Tue, 22 Mar 2022 18:03:21 +0100 Subject: [PATCH 3/3] Fix regression in Expect and "fix" race condition --- src/Renci.SshNet/ShellStream.cs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Renci.SshNet/ShellStream.cs b/src/Renci.SshNet/ShellStream.cs index 12e293ea7..e75d270a3 100644 --- a/src/Renci.SshNet/ShellStream.cs +++ b/src/Renci.SshNet/ShellStream.cs @@ -709,7 +709,7 @@ protected override void Dispose(bool disposing) private bool WaitForDataReceived(TimeSpan timeout) { - while (!_isClosed && _incoming.Count == 0) + while (!_isClosed && !_isDisposed) { if (timeout.Ticks > 0) { @@ -724,6 +724,11 @@ private bool WaitForDataReceived(TimeSpan timeout) { _dataReceived.WaitOne(); } + + if (_incoming.Count > 0) + { + return true; + } } return _incoming.Count > 0; @@ -783,10 +788,10 @@ private void Channel_DataReceived(object sender, ChannelDataEventArgs e) { foreach (var b in e.Data) _incoming.Enqueue(b); - } - if (_dataReceived != null) - _dataReceived.Set(); + if (_dataReceived != null) + _dataReceived.Set(); + } OnDataReceived(e.Data); }