Skip to content

Commit be302f9

Browse files
authored
feat: restore queue model and consumer on disconnection (#428)
Co-authored-by: t.sarmis <[email protected]>
1 parent 23540d1 commit be302f9

File tree

7 files changed

+83
-19
lines changed

7 files changed

+83
-19
lines changed

bus/EasyCaching.Bus.RabbitMQ/DefaultRabbitMQBus.cs

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -150,20 +150,51 @@ public override void BaseSubscribe(string topic, Action<EasyCachingMessage> acti
150150
queueName = _options.QueueName;
151151
}
152152

153-
Task.Factory.StartNew(() =>
153+
Task.Factory.StartNew(
154+
() => StartConsumer(queueName, topic),
155+
TaskCreationOptions.LongRunning);
156+
}
157+
158+
159+
private void StartConsumer(string queueName, string topic)
160+
{
161+
var model = _subConnection.CreateModel();
162+
163+
model.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null);
164+
model.QueueDeclare(queueName, false, false, true, null);
165+
// bind the queue with the exchange.
166+
model.QueueBind(queueName, _options.TopicExchangeName, topic);
167+
var consumer = new EventingBasicConsumer(model);
168+
consumer.Received += OnMessage;
169+
consumer.Shutdown += (sender, e) =>
154170
{
155-
var model = _subConnection.CreateModel();
156-
model.ExchangeDeclare(_options.TopicExchangeName, ExchangeType.Topic, true, false, null);
157-
model.QueueDeclare(queueName, false, false, true, null);
158-
// bind the queue with the exchange.
159-
model.QueueBind(queueName, _options.TopicExchangeName, topic);
160-
var consumer = new EventingBasicConsumer(model);
161-
consumer.Received += OnMessage;
162-
consumer.Shutdown += OnConsumerShutdown;
163-
164-
model.BasicConsume(queueName, true, consumer);
165-
166-
}, TaskCreationOptions.LongRunning);
171+
OnConsumerShutdown(sender, e);
172+
OnConsumerError(queueName, topic, model);
173+
};
174+
175+
consumer.ConsumerCancelled += (s, e) =>
176+
{
177+
OnConsumerError(queueName, topic, model);
178+
};
179+
180+
model.BasicConsume(queueName, true, consumer);
181+
}
182+
183+
private void OnConsumerError(string queueName, string topic, IModel model)
184+
{
185+
StartConsumer(queueName, topic);
186+
BaseOnReconnect();
187+
try
188+
{
189+
if (model?.IsOpen == true)
190+
{
191+
model?.Dispose();
192+
}
193+
}
194+
catch
195+
{
196+
// nothing to do
197+
}
167198
}
168199

169200
/// <summary>

src/EasyCaching.Core/Bus/IEasyCachingSubscriber.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public interface IEasyCachingSubscriber
1212
/// </summary>
1313
/// <param name="topic">Topic.</param>
1414
/// <param name="action">Action.</param>
15-
void Subscribe(string topic, Action<EasyCachingMessage> action);
15+
/// <param name="reconnectAction"> Reconnect Action.</param>
16+
void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null);
1617
}
1718
}

src/EasyCaching.Core/Bus/NullEasyCachingBus.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class NullEasyCachingBus : IEasyCachingBus
2626
/// <see cref="T:EasyCaching.Core.Bus.NullEasyCachingBus"/> so the garbage collector can reclaim the memory that
2727
/// the <see cref="T:EasyCaching.Core.Bus.NullEasyCachingBus"/> was occupying.</remarks>
2828
public void Dispose() { }
29-
29+
3030
/// <summary>
3131
/// Publish the specified topic and message.
3232
/// </summary>
@@ -54,7 +54,8 @@ public void Publish(string topic, EasyCachingMessage message)
5454
/// </summary>
5555
/// <param name="topic">Topic.</param>
5656
/// <param name="action">Action.</param>
57-
public void Subscribe(string topic, Action<EasyCachingMessage> action)
57+
/// <param name="reconnectAction">Reconnect Action.</param>
58+
public void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null)
5859
{
5960

6061
}

src/EasyCaching.Core/EasyCachingAbstractBus.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public abstract class EasyCachingAbstractBus : IEasyCachingBus
1818

1919
protected Action<EasyCachingMessage> _handler;
2020

21+
protected Action _reconnectHandler;
22+
2123
protected string BusName { get; set; }
2224

2325
public string Name => this.BusName;
@@ -74,9 +76,10 @@ public void Publish(string topic, EasyCachingMessage message)
7476
}
7577
}
7678

77-
public void Subscribe(string topic, Action<EasyCachingMessage> action)
79+
public void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction)
7880
{
7981
_handler = action;
82+
_reconnectHandler = reconnectAction;
8083
BaseSubscribe(topic, action);
8184
}
8285

@@ -105,5 +108,10 @@ public virtual void BaseOnMessage(EasyCachingMessage message)
105108
}
106109
}
107110
}
111+
112+
public virtual void BaseOnReconnect()
113+
{
114+
_reconnectHandler?.Invoke();
115+
}
108116
}
109117
}

src/EasyCaching.HybridCache/Configurations/HybridCachingOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,13 @@ public class HybridCachingOptions
4343
/// When sending message failed, we will retry some times, default is 3 times.
4444
/// </remarks>
4545
public int BusRetryCount { get; set; } = 3;
46+
47+
/// <summary>
48+
/// Flush the local cache on bus disconnection/reconnection
49+
/// </summary>
50+
/// <remarks>
51+
/// Flushing the local cache will avoid using stale data but may cause app jitters until the local cache get's re-populated.
52+
/// </remarks>
53+
public bool FlushLocalCacheOnBusReconnection { get; set; } = false;
4654
}
4755
}

src/EasyCaching.HybridCache/HybridCachingProvider.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ string name
9393
else this._distributedCache = distributed;
9494

9595
this._bus = bus ?? NullEasyCachingBus.Instance;
96-
this._bus.Subscribe(_options.TopicName, OnMessage);
96+
this._bus.Subscribe(_options.TopicName, OnMessage, OnReconnect);
9797

9898
this._cacheId = Guid.NewGuid().ToString("N");
9999

@@ -159,6 +159,21 @@ private void OnMessage(EasyCachingMessage message)
159159
}
160160
}
161161

162+
/// <summary>
163+
/// On reconnect (flushes local memory as it could be stale).
164+
/// </summary>
165+
166+
private void OnReconnect()
167+
{
168+
if (!_options.FlushLocalCacheOnBusReconnection)
169+
{
170+
return;
171+
}
172+
173+
LogMessage("Flushing local cache due to bus reconnection");
174+
_localCache.Flush();
175+
}
176+
162177
/// <summary>
163178
/// Exists the specified cacheKey.
164179
/// </summary>

test/EasyCaching.UnitTests/Fake/FakeBus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public void Publish(string topic, EasyCachingMessage message)
1919
return Task.CompletedTask;
2020
}
2121

22-
public void Subscribe(string topic, Action<EasyCachingMessage> action)
22+
public void Subscribe(string topic, Action<EasyCachingMessage> action, Action reconnectAction = null)
2323
{
2424

2525
}

0 commit comments

Comments
 (0)