Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 5 additions & 120 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
Expand Down Expand Up @@ -214,126 +210,15 @@ public interface IConnection : INetworkConnection, IDisposable
/// <param name="reason">The reason for the secret update.</param>
void UpdateSecret(string newSecret, string reason);

/// <summary>
/// Abort this connection and all its channels.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be closed if this method is called.
/// In comparison to normal <see cref="Close()"/> method, <see cref="Abort()"/> will not throw
/// <see cref="IOException"/> during closing connection.
///This method waits infinitely for the in-progress close operation to complete.
/// </remarks>
void Abort();

/// <summary>
/// Abort this connection and all its channels.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="Abort()"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
/// </para>
/// <para>
/// A message indicating the reason for closing the connection
/// </para>
/// </remarks>
void Abort(ushort reasonCode, string reasonText);

/// <summary>
/// Abort this connection and all its channels and wait with a
/// timeout for all the in-progress close operations to complete.
/// </summary>
/// <remarks>
/// This method, behaves in a similar way as method <see cref="Abort()"/> with the
/// only difference that it explictly specifies a timeout given
/// for all the in-progress close operations to complete.
/// If timeout is reached and the close operations haven't finished, then socket is forced to close.
/// <para>
/// To wait infinitely for the close operations to complete use <see cref="Timeout.Infinite"/>.
/// </para>
/// </remarks>
void Abort(TimeSpan timeout);

/// <summary>
/// Abort this connection and all its channels and wait with a
/// timeout for all the in-progress close operations to complete.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="Abort(TimeSpan)"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification).
/// </para>
/// <para>
/// A message indicating the reason for closing the connection.
/// </para>
/// </remarks>
void Abort(ushort reasonCode, string reasonText, TimeSpan timeout);

/// <summary>
/// Close this connection and all its channels.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete. This method will not return to the caller
/// until the shutdown is complete. If the connection is already closed
/// (or closing), then this method will do nothing.
/// It can also throw <see cref="IOException"/> when socket was closed unexpectedly.
/// </remarks>
void Close();

/// <summary>
/// Close this connection and all its channels.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="Close()"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification).
/// </para>
/// <para>
/// A message indicating the reason for closing the connection.
/// </para>
/// </remarks>
void Close(ushort reasonCode, string reasonText);

/// <summary>
/// Close this connection and all its channels
/// and wait with a timeout for all the in-progress close operations to complete.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete with a timeout. If the connection is
/// already closed (or closing), then this method will do nothing.
/// It can also throw <see cref="IOException"/> when socket was closed unexpectedly.
/// If timeout is reached and the close operations haven't finished, then socket is forced to close.
/// <para>
/// To wait infinitely for the close operations to complete use <see cref="System.Threading.Timeout.InfiniteTimeSpan"/>.
/// </para>
/// </remarks>
void Close(TimeSpan timeout);

/// <summary>
/// Close this connection and all its channels
/// and wait with a timeout for all the in-progress close operations to complete.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="Close(TimeSpan)"/>, with the only
/// difference that the connection is closed with the given connection close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification).
/// </para>
/// <para>
/// A message indicating the reason for closing the connection.
/// </para>
/// <para>
/// Operation timeout.
/// </para>
/// </remarks>
void Close(ushort reasonCode, string reasonText, TimeSpan timeout);
/// <param name="reasonCode">The close code (See under "Reply Codes" in the AMQP 0-9-1 specification).</param>
/// <param name="reasonText">A message indicating the reason for closing the connection.</param>
/// <param name="timeout">Operation timeout.</param>
/// <param name="abort">Whether or not this close is an abort (ignores certain exceptions).</param>
void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort);

/// <summary>
/// Create and return a fresh channel, session, and model.
Expand Down
53 changes: 4 additions & 49 deletions projects/RabbitMQ.Client/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,34 +143,6 @@ public interface IModel : IDisposable
/// </remarks>
event EventHandler<ShutdownEventArgs> ModelShutdown;

/// <summary>
/// Abort this session.
/// </summary>
/// <remarks>
/// If the session is already closed (or closing), then this
/// method does nothing but wait for the in-progress close
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// In comparison to normal <see cref="Close()"/> method, <see cref="Abort()"/> will not throw
/// <see cref="Exceptions.AlreadyClosedException"/> or <see cref="System.IO.IOException"/> or any other <see cref="Exception"/> during closing model.
/// </remarks>
void Abort();

/// <summary>
/// Abort this session.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="Abort()"/>, with the only
/// difference that the model is closed with the given model close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification)
/// </para>
/// <para>
/// A message indicating the reason for closing the model
/// </para>
/// </remarks>
void Abort(ushort replyCode, string replyText);

/// <summary>
/// Acknowledge one or more delivered message(s).
/// </summary>
Expand Down Expand Up @@ -247,27 +219,10 @@ string BasicConsume(
void BasicReject(ulong deliveryTag, bool requeue);

/// <summary>Close this session.</summary>
/// <remarks>
/// If the session is already closed (or closing), then this
/// method does nothing but wait for the in-progress close
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// </remarks>
void Close();

/// <summary>Close this session.</summary>
/// <remarks>
/// The method behaves in the same way as Close(), with the only
/// difference that the model is closed with the given model
/// close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification)
/// </para>
/// <para>
/// A message indicating the reason for closing the model
/// </para>
/// </remarks>
void Close(ushort replyCode, string replyText);
/// <param name="replyCode">The reply code to send for closing (See under "Reply Codes" in the AMQP specification).</param>
/// <param name="replyText">The reply text to send for closing.</param>
/// <param name="abort">Whether or not the close is an abort (ignoring certain exceptions).</param>
void Close(ushort replyCode, string replyText, bool abort);

/// <summary>
/// Enable publisher acknowledgements.
Expand Down
63 changes: 63 additions & 0 deletions projects/RabbitMQ.Client/client/api/IModelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,68 @@ public static void QueueUnbind(this IModel model, string queue, string exchange,
{
model.QueueUnbind(queue, exchange, routingKey, arguments);
}

/// <summary>
/// Abort this session.
/// </summary>
/// <remarks>
/// If the session is already closed (or closing), then this
/// method does nothing but wait for the in-progress close
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// In comparison to normal <see cref="Close(IModel)"/> method, <see cref="Abort(IModel)"/> will not throw
/// <see cref="Exceptions.AlreadyClosedException"/> or <see cref="System.IO.IOException"/> or any other <see cref="Exception"/> during closing model.
/// </remarks>
public static void Abort(this IModel model)
{
model.Close(Constants.ReplySuccess, "Goodbye", true);
}

/// <summary>
/// Abort this session.
/// </summary>
/// <remarks>
/// The method behaves in the same way as <see cref="Abort(IModel)"/>, with the only
/// difference that the model is closed with the given model close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification)
/// </para>
/// <para>
/// A message indicating the reason for closing the model
/// </para>
/// </remarks>
public static void Abort(this IModel model, ushort replyCode, string replyText)
{
model.Close(replyCode, replyText, true);
}

/// <summary>Close this session.</summary>
/// <remarks>
/// If the session is already closed (or closing), then this
/// method does nothing but wait for the in-progress close
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// </remarks>
public static void Close(this IModel model)
{
model.Close(Constants.ReplySuccess, "Goodbye", false);
}

/// <summary>Close this session.</summary>
/// <remarks>
/// The method behaves in the same way as Close(), with the only
/// difference that the model is closed with the given model
/// close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification)
/// </para>
/// <para>
/// A message indicating the reason for closing the model
/// </para>
/// </remarks>
public static void Close(this IModel model, ushort replyCode, string replyText)
{
model.Close(replyCode, replyText, false);
}
}
}
85 changes: 3 additions & 82 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ public AmqpTcpEndpoint[] KnownHosts

IProtocol IConnection.Protocol => Endpoint.Protocol;

public void Close(ShutdownEventArgs reason) => Delegate.Close(reason);

public RecoveryAwareModel CreateNonRecoveringModel()
{
ISession session = Delegate.CreateSession();
Expand Down Expand Up @@ -182,91 +180,14 @@ public void UpdateSecret(string newSecret, string reason)
_factory.Password = newSecret;
}

///<summary>API-side invocation of connection abort.</summary>
public void Abort()
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Abort();
}
}

///<summary>API-side invocation of connection abort.</summary>
public void Abort(ushort reasonCode, string reasonText)
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Abort(reasonCode, reasonText);
}
}

///<summary>API-side invocation of connection abort with timeout.</summary>
public void Abort(TimeSpan timeout)
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Abort(timeout);
}
}

///<summary>API-side invocation of connection abort with timeout.</summary>
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Abort(reasonCode, reasonText, timeout);
}
}

///<summary>API-side invocation of connection.close.</summary>
public void Close()
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Close();
}
}

///<summary>API-side invocation of connection.close.</summary>
public void Close(ushort reasonCode, string reasonText)
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Close(reasonCode, reasonText);
}
}

///<summary>API-side invocation of connection.close with timeout.</summary>
public void Close(TimeSpan timeout)
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Close(timeout);
}
}

///<summary>API-side invocation of connection.close with timeout.</summary>
public void Close(ushort reasonCode, string reasonText, TimeSpan timeout)
public void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort)
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Close(reasonCode, reasonText, timeout);
_delegate.Close(reasonCode, reasonText, timeout, abort);
}
}

Expand Down Expand Up @@ -295,7 +216,7 @@ private void Dispose(bool disposing)
{
try
{
Abort();
this.Abort();
}
catch (Exception)
{
Expand Down
Loading