Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 78 additions & 61 deletions projects/Unit/TestAsyncConsumerExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,89 @@ namespace RabbitMQ.Client.Unit
[TestFixture]
public class TestAsyncConsumerExceptions : IntegrationFixture
{
protected void TestExceptionHandlingWith(IBasicConsumer consumer,
Action<IModel, string, IBasicConsumer, string> action)
{
object o = new object();
bool notified = false;
string q = _model.QueueDeclare();


_model.CallbackException += (m, evt) =>
{
notified = true;
Monitor.PulseAll(o);
};

string tag = _model.BasicConsume(q, true, consumer);
action(_model, q, consumer, tag);
WaitOn(o);

Assert.IsTrue(notified);
}

[SetUp]
public override void Init()
{
_connFactory = new ConnectionFactory
{
DispatchConsumersAsync = true
};

_conn = _connFactory.CreateConnection();
_model = _conn.CreateModel();
}

[Test]
public void TestCancelNotificationExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnCancel(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.QueueDelete(q));
}

[Test]
public void TestConsumerCancelOkExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.BasicCancel(ct));
}

[Test]
public void TestConsumerConsumeOkExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => { });
}

[Test]
public void TestConsumerShutdownExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnShutdown(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.Close());
}

[Test]
public void TestDeliveryExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnDelivery(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.BasicPublish("", q, null, _encoding.GetBytes("msg")));
}

private class ConsumerFailingOnDelivery : AsyncEventingBasicConsumer
{
public ConsumerFailingOnDelivery(IModel model) : base(model)
{
}

public override Task HandleBasicDeliver(string consumerTag,
public override async Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
ReadOnlyMemory<byte> body)
{
await Task.Delay(0);
throw new Exception("oops");
}
}
Expand All @@ -64,8 +133,9 @@ public ConsumerFailingOnCancel(IModel model) : base(model)
{
}

public override Task HandleBasicCancel(string consumerTag)
public override async Task HandleBasicCancel(string consumerTag)
{
await Task.Delay(0);
throw new Exception("oops");
}
}
Expand All @@ -76,8 +146,9 @@ public ConsumerFailingOnShutdown(IModel model) : base(model)
{
}

public override Task HandleModelShutdown(object model, ShutdownEventArgs reason)
public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason)
{
await Task.Delay(0);
throw new Exception("oops");
}
}
Expand All @@ -88,8 +159,9 @@ public ConsumerFailingOnConsumeOk(IModel model) : base(model)
{
}

public override Task HandleBasicConsumeOk(string consumerTag)
public override async Task HandleBasicConsumeOk(string consumerTag)
{
await Task.Delay(0);
throw new Exception("oops");
}
}
Expand All @@ -100,66 +172,11 @@ public ConsumerFailingOnCancelOk(IModel model) : base(model)
{
}

public override Task HandleBasicCancelOk(string consumerTag)
public override async Task HandleBasicCancelOk(string consumerTag)
{
await Task.Delay(0);
throw new Exception("oops");
}
}

protected void TestExceptionHandlingWith(IBasicConsumer consumer,
Action<IModel, string, IBasicConsumer, string> action)
{
object o = new object();
bool notified = false;
string q = _model.QueueDeclare();


_model.CallbackException += (m, evt) =>
{
notified = true;
Monitor.PulseAll(o);
};

string tag = _model.BasicConsume(q, true, consumer);
action(_model, q, consumer, tag);
WaitOn(o);

Assert.IsTrue(notified);
}

[Test]
public void TestCancelNotificationExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnCancel(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.QueueDelete(q));
}

[Test]
public void TestConsumerCancelOkExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.BasicCancel(ct));
}

[Test]
public void TestConsumerConsumeOkExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => { });
}

[Test]
public void TestConsumerShutdownExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnShutdown(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.Close());
}

[Test]
public void TestDeliveryExceptionHandling()
{
IBasicConsumer consumer = new ConsumerFailingOnDelivery(_model);
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.BasicPublish("", q, null, _encoding.GetBytes("msg")));
}
}
}