diff --git a/.gitignore b/.gitignore index b42864eedc..9b6ed3ef31 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,6 @@ TestResult.xml /NuGet .vscode/ *.lock.json -api/ test.sh *.VisualState.xml diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index 5da59a801f..ea36e9bab6 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -31,12 +31,8 @@ using System; using System.Collections.Generic; -using System.IO; -using System.Threading; - using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; -using RabbitMQ.Client.Impl; namespace RabbitMQ.Client { @@ -214,126 +210,15 @@ public interface IConnection : INetworkConnection, IDisposable /// The reason for the secret update. void UpdateSecret(string newSecret, string reason); - /// - /// Abort this connection and all its channels. - /// - /// - /// Note that all active channels, sessions, and models will be closed if this method is called. - /// In comparison to normal method, will not throw - /// during closing connection. - ///This method waits infinitely for the in-progress close operation to complete. - /// - void Abort(); - - /// - /// Abort this connection and all its channels. - /// - /// - /// The method behaves in the same way as , with the only - /// difference that the connection is closed with the given connection close code and message. - /// - /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification) - /// - /// - /// A message indicating the reason for closing the connection - /// - /// - void Abort(ushort reasonCode, string reasonText); - - /// - /// Abort this connection and all its channels and wait with a - /// timeout for all the in-progress close operations to complete. - /// - /// - /// This method, behaves in a similar way as method with the - /// only difference that it explictly specifies a timeout given - /// for all the in-progress close operations to complete. - /// If timeout is reached and the close operations haven't finished, then socket is forced to close. - /// - /// To wait infinitely for the close operations to complete use . - /// - /// - void Abort(TimeSpan timeout); - - /// - /// Abort this connection and all its channels and wait with a - /// timeout for all the in-progress close operations to complete. - /// - /// - /// The method behaves in the same way as , with the only - /// difference that the connection is closed with the given connection close code and message. - /// - /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification). - /// - /// - /// A message indicating the reason for closing the connection. - /// - /// - void Abort(ushort reasonCode, string reasonText, TimeSpan timeout); - - /// - /// Close this connection and all its channels. - /// - /// - /// Note that all active channels, sessions, and models will be - /// closed if this method is called. It will wait for the in-progress - /// close operation to complete. This method will not return to the caller - /// until the shutdown is complete. If the connection is already closed - /// (or closing), then this method will do nothing. - /// It can also throw when socket was closed unexpectedly. - /// - void Close(); - - /// - /// Close this connection and all its channels. - /// - /// - /// The method behaves in the same way as , with the only - /// difference that the connection is closed with the given connection close code and message. - /// - /// The close code (See under "Reply Codes" in the AMQP specification). - /// - /// - /// A message indicating the reason for closing the connection. - /// - /// - void Close(ushort reasonCode, string reasonText); - /// /// Close this connection and all its channels /// and wait with a timeout for all the in-progress close operations to complete. /// - /// - /// Note that all active channels, sessions, and models will be - /// closed if this method is called. It will wait for the in-progress - /// close operation to complete with a timeout. If the connection is - /// already closed (or closing), then this method will do nothing. - /// It can also throw when socket was closed unexpectedly. - /// If timeout is reached and the close operations haven't finished, then socket is forced to close. - /// - /// To wait infinitely for the close operations to complete use . - /// - /// - void Close(TimeSpan timeout); - - /// - /// Close this connection and all its channels - /// and wait with a timeout for all the in-progress close operations to complete. - /// - /// - /// The method behaves in the same way as , with the only - /// difference that the connection is closed with the given connection close code and message. - /// - /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification). - /// - /// - /// A message indicating the reason for closing the connection. - /// - /// - /// Operation timeout. - /// - /// - void Close(ushort reasonCode, string reasonText, TimeSpan timeout); + /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification). + /// A message indicating the reason for closing the connection. + /// Operation timeout. + /// Whether or not this close is an abort (ignores certain exceptions). + void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort); /// /// Create and return a fresh channel, session, and model. diff --git a/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs new file mode 100644 index 0000000000..85589c9353 --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs @@ -0,0 +1,154 @@ +using System; +using System.IO; +using System.Threading; + +namespace RabbitMQ.Client +{ + public static class IConnectionExtensions + { + /// + /// Close this connection and all its channels. + /// + /// + /// Note that all active channels, sessions, and models will be + /// closed if this method is called. It will wait for the in-progress + /// close operation to complete. This method will not return to the caller + /// until the shutdown is complete. If the connection is already closed + /// (or closing), then this method will do nothing. + /// It can also throw when socket was closed unexpectedly. + /// + public static void Close(this IConnection connection) + { + connection.Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan, false); + } + + /// + /// Close this connection and all its channels. + /// + /// + /// The method behaves in the same way as , with the only + /// difference that the connection is closed with the given connection close code and message. + /// + /// The close code (See under "Reply Codes" in the AMQP specification). + /// + /// + /// A message indicating the reason for closing the connection. + /// + /// + public static void Close(this IConnection connection, ushort reasonCode, string reasonText) + { + connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, false); + } + + /// + /// Close this connection and all its channels + /// and wait with a timeout for all the in-progress close operations to complete. + /// + /// + /// Note that all active channels, sessions, and models will be + /// closed if this method is called. It will wait for the in-progress + /// close operation to complete with a timeout. If the connection is + /// already closed (or closing), then this method will do nothing. + /// It can also throw when socket was closed unexpectedly. + /// If timeout is reached and the close operations haven't finished, then socket is forced to close. + /// + /// To wait infinitely for the close operations to complete use . + /// + /// + public static void Close(this IConnection connection, TimeSpan timeout) + { + connection.Close(Constants.ReplySuccess, "Goodbye", timeout, false); + } + + /// + /// Close this connection and all its channels + /// and wait with a timeout for all the in-progress close operations to complete. + /// + /// + /// The method behaves in the same way as , with the only + /// difference that the connection is closed with the given connection close code and message. + /// + /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification). + /// + /// + /// A message indicating the reason for closing the connection. + /// + /// + /// Operation timeout. + /// + /// + public static void Close(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) + { + connection.Close(reasonCode, reasonText, timeout, false); + } + + /// + /// Abort this connection and all its channels. + /// + /// + /// Note that all active channels, sessions, and models will be closed if this method is called. + /// In comparison to normal method, will not throw + /// during closing connection. + ///This method waits infinitely for the in-progress close operation to complete. + /// + public static void Abort(this IConnection connection) + { + connection.Close(Constants.ReplySuccess, "Connection close forced", Timeout.InfiniteTimeSpan, true); + } + + /// + /// Abort this connection and all its channels. + /// + /// + /// The method behaves in the same way as , with the only + /// difference that the connection is closed with the given connection close code and message. + /// + /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification) + /// + /// + /// A message indicating the reason for closing the connection + /// + /// + public static void Abort(this IConnection connection, ushort reasonCode, string reasonText) + { + connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, true); + } + + /// + /// Abort this connection and all its channels and wait with a + /// timeout for all the in-progress close operations to complete. + /// + /// + /// This method, behaves in a similar way as method with the + /// only difference that it explictly specifies a timeout given + /// for all the in-progress close operations to complete. + /// If timeout is reached and the close operations haven't finished, then socket is forced to close. + /// + /// To wait infinitely for the close operations to complete use . + /// + /// + public static void Abort(this IConnection connection, TimeSpan timeout) + { + connection.Close(Constants.ReplySuccess, "Connection close forced", timeout, true); + } + + /// + /// Abort this connection and all its channels and wait with a + /// timeout for all the in-progress close operations to complete. + /// + /// + /// The method behaves in the same way as , with the only + /// difference that the connection is closed with the given connection close code and message. + /// + /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification). + /// + /// + /// A message indicating the reason for closing the connection. + /// + /// + public static void Abort(this IConnection connection, ushort reasonCode, string reasonText, TimeSpan timeout) + { + connection.Close(reasonCode, reasonText, timeout, true); + } + } +} diff --git a/projects/RabbitMQ.Client/client/api/IModel.cs b/projects/RabbitMQ.Client/client/api/IModel.cs index 5ba036b649..85cf7f552d 100644 --- a/projects/RabbitMQ.Client/client/api/IModel.cs +++ b/projects/RabbitMQ.Client/client/api/IModel.cs @@ -143,34 +143,6 @@ public interface IModel : IDisposable /// event EventHandler ModelShutdown; - /// - /// Abort this session. - /// - /// - /// If the session is already closed (or closing), then this - /// method does nothing but wait for the in-progress close - /// operation to complete. This method will not return to the - /// caller until the shutdown is complete. - /// In comparison to normal method, will not throw - /// or or any other during closing model. - /// - void Abort(); - - /// - /// Abort this session. - /// - /// - /// The method behaves in the same way as , with the only - /// difference that the model is closed with the given model close code and message. - /// - /// The close code (See under "Reply Codes" in the AMQP specification) - /// - /// - /// A message indicating the reason for closing the model - /// - /// - void Abort(ushort replyCode, string replyText); - /// /// Acknowledge one or more delivered message(s). /// @@ -247,27 +219,10 @@ string BasicConsume( void BasicReject(ulong deliveryTag, bool requeue); /// Close this session. - /// - /// If the session is already closed (or closing), then this - /// method does nothing but wait for the in-progress close - /// operation to complete. This method will not return to the - /// caller until the shutdown is complete. - /// - void Close(); - - /// Close this session. - /// - /// The method behaves in the same way as Close(), with the only - /// difference that the model is closed with the given model - /// close code and message. - /// - /// The close code (See under "Reply Codes" in the AMQP specification) - /// - /// - /// A message indicating the reason for closing the model - /// - /// - void Close(ushort replyCode, string replyText); + /// The reply code to send for closing (See under "Reply Codes" in the AMQP specification). + /// The reply text to send for closing. + /// Whether or not the close is an abort (ignoring certain exceptions). + void Close(ushort replyCode, string replyText, bool abort); /// /// Enable publisher acknowledgements. diff --git a/projects/RabbitMQ.Client/client/api/IModelExtensions.cs b/projects/RabbitMQ.Client/client/api/IModelExtensions.cs index 886c8aeefb..d003ead6a1 100644 --- a/projects/RabbitMQ.Client/client/api/IModelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IModelExtensions.cs @@ -216,5 +216,68 @@ public static void QueueUnbind(this IModel model, string queue, string exchange, { model.QueueUnbind(queue, exchange, routingKey, arguments); } + + /// + /// Abort this session. + /// + /// + /// If the session is already closed (or closing), then this + /// method does nothing but wait for the in-progress close + /// operation to complete. This method will not return to the + /// caller until the shutdown is complete. + /// In comparison to normal method, will not throw + /// or or any other during closing model. + /// + public static void Abort(this IModel model) + { + model.Close(Constants.ReplySuccess, "Goodbye", true); + } + + /// + /// Abort this session. + /// + /// + /// The method behaves in the same way as , with the only + /// difference that the model is closed with the given model close code and message. + /// + /// The close code (See under "Reply Codes" in the AMQP specification) + /// + /// + /// A message indicating the reason for closing the model + /// + /// + public static void Abort(this IModel model, ushort replyCode, string replyText) + { + model.Close(replyCode, replyText, true); + } + + /// Close this session. + /// + /// If the session is already closed (or closing), then this + /// method does nothing but wait for the in-progress close + /// operation to complete. This method will not return to the + /// caller until the shutdown is complete. + /// + public static void Close(this IModel model) + { + model.Close(Constants.ReplySuccess, "Goodbye", false); + } + + /// Close this session. + /// + /// The method behaves in the same way as Close(), with the only + /// difference that the model is closed with the given model + /// close code and message. + /// + /// The close code (See under "Reply Codes" in the AMQP specification) + /// + /// + /// A message indicating the reason for closing the model + /// + /// + public static void Close(this IModel model, ushort replyCode, string replyText) + { + model.Close(replyCode, replyText, false); + } } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 5b36058850..e391a3f81b 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -150,8 +150,6 @@ public AmqpTcpEndpoint[] KnownHosts IProtocol IConnection.Protocol => Endpoint.Protocol; - public void Close(ShutdownEventArgs reason) => Delegate.Close(reason); - public RecoveryAwareModel CreateNonRecoveringModel() { ISession session = Delegate.CreateSession(); @@ -182,91 +180,14 @@ public void UpdateSecret(string newSecret, string reason) _factory.Password = newSecret; } - ///API-side invocation of connection abort. - public void Abort() - { - ThrowIfDisposed(); - StopRecoveryLoop(); - if (_delegate.IsOpen) - { - _delegate.Abort(); - } - } - - ///API-side invocation of connection abort. - public void Abort(ushort reasonCode, string reasonText) - { - ThrowIfDisposed(); - StopRecoveryLoop(); - if (_delegate.IsOpen) - { - _delegate.Abort(reasonCode, reasonText); - } - } - - ///API-side invocation of connection abort with timeout. - public void Abort(TimeSpan timeout) - { - ThrowIfDisposed(); - StopRecoveryLoop(); - if (_delegate.IsOpen) - { - _delegate.Abort(timeout); - } - } - - ///API-side invocation of connection abort with timeout. - public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) - { - ThrowIfDisposed(); - StopRecoveryLoop(); - if (_delegate.IsOpen) - { - _delegate.Abort(reasonCode, reasonText, timeout); - } - } - - ///API-side invocation of connection.close. - public void Close() - { - ThrowIfDisposed(); - StopRecoveryLoop(); - if (_delegate.IsOpen) - { - _delegate.Close(); - } - } - - ///API-side invocation of connection.close. - public void Close(ushort reasonCode, string reasonText) - { - ThrowIfDisposed(); - StopRecoveryLoop(); - if (_delegate.IsOpen) - { - _delegate.Close(reasonCode, reasonText); - } - } - - ///API-side invocation of connection.close with timeout. - public void Close(TimeSpan timeout) - { - ThrowIfDisposed(); - StopRecoveryLoop(); - if (_delegate.IsOpen) - { - _delegate.Close(timeout); - } - } - ///API-side invocation of connection.close with timeout. - public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) + public void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort) { ThrowIfDisposed(); StopRecoveryLoop(); if (_delegate.IsOpen) { - _delegate.Close(reasonCode, reasonText, timeout); + _delegate.Close(reasonCode, reasonText, timeout, abort); } } @@ -295,7 +216,7 @@ private void Dispose(bool disposing) { try { - Abort(); + this.Abort(); } catch (Exception) { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs index 08221d8cdf..5bac12a238 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs @@ -161,19 +161,6 @@ public void Close(ushort replyCode, string replyText, bool abort) } } - public void Close(ShutdownEventArgs reason, bool abort) - { - ThrowIfDisposed(); - try - { - _delegate.Close(reason, abort).GetAwaiter().GetResult();; - } - finally - { - _connection.DeleteRecordedChannel(this); - } - } - public override string ToString() => Delegate.ToString(); void IDisposable.Dispose() => Dispose(true); @@ -187,7 +174,7 @@ private void Dispose(bool disposing) if (disposing) { - Abort(); + this.Abort(); _connection = null; _delegate = null; @@ -559,32 +546,6 @@ public void _Private_QueueDeclare(string queue, public uint _Private_QueuePurge(string queue, bool nowait) => RecoveryAwareDelegate._Private_QueuePurge(queue, nowait); - public void Abort() - { - ThrowIfDisposed(); - try - { - _delegate.Abort(); - } - finally - { - _connection.DeleteRecordedChannel(this); - } - } - - public void Abort(ushort replyCode, string replyText) - { - ThrowIfDisposed(); - try - { - _delegate.Abort(replyCode, replyText); - } - finally - { - _connection.DeleteRecordedChannel(this); - } - } - public void BasicAck(ulong deliveryTag, bool multiple) => Delegate.BasicAck(deliveryTag, multiple); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 9e00748c4e..675063e994 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -247,16 +247,6 @@ public static IDictionary DefaultClientProperties() return table; } - public void Abort(ushort reasonCode, string reasonText, ShutdownInitiator initiator, TimeSpan timeout) - { - Close(new ShutdownEventArgs(initiator, reasonCode, reasonText), true, timeout); - } - - public void Close(ShutdownEventArgs reason) - { - Close(reason, false, Timeout.InfiniteTimeSpan); - } - ///Try to close connection in a graceful way /// /// @@ -272,7 +262,7 @@ public void Close(ShutdownEventArgs reason) ///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity. /// /// - public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout) + internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout) { if (!SetCloseReason(reason)) { @@ -326,16 +316,14 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout) } } - bool closeFrameHandler; try { - closeFrameHandler = !_mainLoopTask.Wait(timeout); + if (!_mainLoopTask.Wait(timeout)) + { + _frameHandler.Close(); + } } catch (AggregateException) - { - closeFrameHandler = true; - } - if (closeFrameHandler) { _frameHandler.Close(); } @@ -799,7 +787,7 @@ public void TerminateMainloop() public override string ToString() { - return string.Format("Connection({0},{1})", _id, Endpoint); + return $"Connection({_id},{Endpoint})"; } public void Write(ReadOnlyMemory memory) @@ -812,52 +800,10 @@ public void UpdateSecret(string newSecret, string reason) _model0.UpdateSecret(newSecret, reason); } - ///API-side invocation of connection abort. - public void Abort() - { - Abort(Timeout.InfiniteTimeSpan); - } - - ///API-side invocation of connection abort. - public void Abort(ushort reasonCode, string reasonText) - { - Abort(reasonCode, reasonText, Timeout.InfiniteTimeSpan); - } - - ///API-side invocation of connection abort with timeout. - public void Abort(TimeSpan timeout) - { - Abort(Constants.ReplySuccess, "Connection close forced", timeout); - } - - ///API-side invocation of connection abort with timeout. - public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) - { - Abort(reasonCode, reasonText, ShutdownInitiator.Application, timeout); - } - - ///API-side invocation of connection.close. - public void Close() - { - Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan); - } - - ///API-side invocation of connection.close. - public void Close(ushort reasonCode, string reasonText) - { - Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan); - } - - ///API-side invocation of connection.close with timeout. - public void Close(TimeSpan timeout) - { - Close(Constants.ReplySuccess, "Goodbye", timeout); - } - ///API-side invocation of connection.close with timeout. - public void Close(ushort reasonCode, string reasonText, TimeSpan timeout) + public void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort) { - Close(new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText), false, timeout); + Close(new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText), abort, timeout); } public IModel CreateModel() @@ -903,7 +849,7 @@ private void Dispose(bool disposing) // dispose managed resources try { - Abort(); + this.Abort(); _mainLoopTask.Wait(); } catch (OperationInterruptedException) diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index 4c39e360b8..aa4f9aa67f 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -199,14 +199,12 @@ protected void TakeOver(ModelBase other) _recoveryWrapper.Takeover(other._recoveryWrapper); } - public Task Close(ushort replyCode, string replyText, bool abort) + public void Close(ushort replyCode, string replyText, bool abort) { - return Close(new ShutdownEventArgs(ShutdownInitiator.Application, - replyCode, replyText), - abort); + _ = CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText), abort); } - public async Task Close(ShutdownEventArgs reason, bool abort) + private async Task CloseAsync(ShutdownEventArgs reason, bool abort) { var k = new ShutdownContinuation(); ModelShutdown += k.OnConnectionShutdown; @@ -464,7 +462,7 @@ protected virtual void Dispose(bool disposing) if (disposing) { // dispose managed resources - Abort(); + this.Abort(); } // dispose unmanaged resources @@ -784,11 +782,8 @@ public void HandleConnectionStart(byte versionMajor, { if (m_connectionStartCell is null) { - var reason = - new ShutdownEventArgs(ShutdownInitiator.Library, - Constants.CommandInvalid, - "Unexpected Connection.Start"); - ((Connection)Session.Connection).Close(reason); + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); + Session.Connection.Close(reason, false, Timeout.InfiniteTimeSpan); } var details = new ConnectionStartDetails { @@ -943,16 +938,6 @@ public abstract uint _Private_QueueDelete(string queue, public abstract uint _Private_QueuePurge(string queue, bool nowait); - public void Abort() - { - Abort(Constants.ReplySuccess, "Goodbye"); - } - - public void Abort(ushort replyCode, string replyText) - { - Close(replyCode, replyText, true); - } - public abstract void BasicAck(ulong deliveryTag, bool multiple); public void BasicCancel(string consumerTag) @@ -1123,16 +1108,6 @@ public void BasicRecover(bool requeue) public abstract void BasicReject(ulong deliveryTag, bool requeue); - public void Close() - { - Close(Constants.ReplySuccess, "Goodbye"); - } - - public void Close(ushort replyCode, string replyText) - { - Close(replyCode, replyText, false); - } - public void ConfirmSelect() { if (NextPublishSeqNo == 0UL) @@ -1365,14 +1340,14 @@ public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default) return; } - await Close( + await CloseAsync( new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "Nacks Received", new IOException("nack received")), false).ConfigureAwait(false); } catch (TaskCanceledException exception) { - await Close(new ShutdownEventArgs(ShutdownInitiator.Application, + await CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "Timed out waiting for acks", exception), diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index 2576fdb3ff..a732329e4f 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -325,19 +325,23 @@ namespace RabbitMQ.Client event System.EventHandler ConsumerTagChangeAfterRecovery; event System.EventHandler QueueNameChangeAfterRecovery; event System.EventHandler RecoverySucceeded; - void Abort(); - void Abort(System.TimeSpan timeout); - void Abort(ushort reasonCode, string reasonText); - void Abort(ushort reasonCode, string reasonText, System.TimeSpan timeout); - void Close(); - void Close(System.TimeSpan timeout); - void Close(ushort reasonCode, string reasonText); - void Close(ushort reasonCode, string reasonText, System.TimeSpan timeout); + void Close(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort); RabbitMQ.Client.IModel CreateModel(); void HandleConnectionBlocked(string reason); void HandleConnectionUnblocked(); void UpdateSecret(string newSecret, string reason); } + public static class IConnectionExtensions + { + public static void Abort(this RabbitMQ.Client.IConnection connection) { } + public static void Abort(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) { } + public static void Abort(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) { } + public static void Abort(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) { } + public static void Close(this RabbitMQ.Client.IConnection connection) { } + public static void Close(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) { } + public static void Close(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) { } + public static void Close(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) { } + } public interface IConnectionFactory { System.Collections.Generic.IDictionary ClientProperties { get; set; } @@ -386,8 +390,6 @@ namespace RabbitMQ.Client event System.EventHandler CallbackException; event System.EventHandler FlowControl; event System.EventHandler ModelShutdown; - void Abort(); - void Abort(ushort replyCode, string replyText); void BasicAck(ulong deliveryTag, bool multiple); void BasicCancel(string consumerTag); void BasicCancelNoWait(string consumerTag); @@ -400,8 +402,7 @@ namespace RabbitMQ.Client void BasicRecover(bool requeue); void BasicRecoverAsync(bool requeue); void BasicReject(ulong deliveryTag, bool requeue); - void Close(); - void Close(ushort replyCode, string replyText); + void Close(ushort replyCode, string replyText, bool abort); void ConfirmSelect(); uint ConsumerCount(string queue); RabbitMQ.Client.IBasicProperties CreateBasicProperties(); @@ -434,6 +435,8 @@ namespace RabbitMQ.Client } public static class IModelExtensions { + public static void Abort(this RabbitMQ.Client.IModel model) { } + public static void Abort(this RabbitMQ.Client.IModel model, ushort replyCode, string replyText) { } public static string BasicConsume(this RabbitMQ.Client.IModel model, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) { } public static string BasicConsume(this RabbitMQ.Client.IModel model, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) { } public static string BasicConsume(this RabbitMQ.Client.IModel model, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IBasicConsumer consumer) { } @@ -442,6 +445,8 @@ namespace RabbitMQ.Client public static void BasicPublish(this RabbitMQ.Client.IModel model, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, RabbitMQ.Client.IBasicProperties basicProperties, System.ReadOnlyMemory body) { } public static void BasicPublish(this RabbitMQ.Client.IModel model, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties basicProperties, System.ReadOnlyMemory body) { } public static void BasicPublish(this RabbitMQ.Client.IModel model, string exchange, string routingKey, bool mandatory = false, RabbitMQ.Client.IBasicProperties basicProperties = null, System.ReadOnlyMemory body = default) { } + public static void Close(this RabbitMQ.Client.IModel model) { } + public static void Close(this RabbitMQ.Client.IModel model, ushort replyCode, string replyText) { } public static void ExchangeBind(this RabbitMQ.Client.IModel model, string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments = null) { } public static void ExchangeBindNoWait(this RabbitMQ.Client.IModel model, string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments = null) { } public static void ExchangeDeclare(this RabbitMQ.Client.IModel model, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary arguments = null) { }