7
7
using System . Collections . Generic ;
8
8
using System . Diagnostics ;
9
9
using System . Runtime . CompilerServices ;
10
+ using System . Threading ;
10
11
using System . Threading . Channels ;
11
12
using System . Threading . Tasks ;
12
13
using Microsoft . Extensions . Logging ;
@@ -197,7 +198,8 @@ public async Task StoreOffset(ulong offset)
197
198
await _client . StoreOffset ( _config . Reference , _config . Stream , offset ) . ConfigureAwait ( false ) ;
198
199
}
199
200
200
- // It is needed to understand if the consumer is active or not
201
+ ////// *********************
202
+ // IsPromotedAsActive is needed to understand if the consumer is active or not
201
203
// by default is active
202
204
// in case of single active consumer can be not active
203
205
// it is important to skip the messages in the chunk that
@@ -206,6 +208,36 @@ public async Task StoreOffset(ulong offset)
206
208
// long task
207
209
private bool IsPromotedAsActive { get ; set ; }
208
210
211
+ // PromotionLock avoids race conditions when the consumer is promoted as active
212
+ // and the messages are dispatched in parallel.
213
+ // The consumer can be promoted as active with the function ConsumerUpdateListener
214
+ // It is needed when the consumer is single active consumer
215
+ private SemaphoreSlim PromotionLock { get ; } = new ( 1 ) ;
216
+
217
+ /// <summary>
218
+ /// MaybeLockDispatch locks the dispatch of the messages
219
+ /// it is needed only when the consumer is single active consumer
220
+ /// MaybeLockDispatch is an optimization to avoid to lock the dispatch
221
+ /// when the consumer is not single active consumer
222
+ /// </summary>
223
+
224
+ private async Task MaybeLockDispatch ( )
225
+ {
226
+ if ( _config . IsSingleActiveConsumer )
227
+ await PromotionLock . WaitAsync ( Token ) . ConfigureAwait ( false ) ;
228
+ }
229
+
230
+ /// <summary>
231
+ /// MaybeReleaseLock releases the lock on the dispatch of the messages
232
+ /// Following the MaybeLockDispatch method
233
+ /// </summary>
234
+ private void MaybeReleaseLock ( )
235
+ {
236
+ if ( _config . IsSingleActiveConsumer )
237
+ PromotionLock . Release ( ) ;
238
+ }
239
+
240
+ ////// *********************
209
241
public static async Task < IConsumer > Create (
210
242
ClientParameters clientParameters ,
211
243
RawConsumerConfig config ,
@@ -389,7 +421,15 @@ await _config.MessageHandler(this,
389
421
for ( ulong z = 0 ; z < subEntryChunk . NumRecordsInBatch ; z ++ )
390
422
{
391
423
var message = MessageFromSequence ( ref unCompressedData , ref compressOffset ) ;
392
- await DispatchMessage ( message , messageOffset ++ ) . ConfigureAwait ( false ) ;
424
+ await MaybeLockDispatch ( ) . ConfigureAwait ( false ) ;
425
+ try
426
+ {
427
+ await DispatchMessage ( message , messageOffset ++ ) . ConfigureAwait ( false ) ;
428
+ }
429
+ finally
430
+ {
431
+ MaybeReleaseLock ( ) ;
432
+ }
393
433
}
394
434
395
435
numRecords -= subEntryChunk . NumRecordsInBatch ;
@@ -590,7 +630,7 @@ private async Task Init()
590
630
{
591
631
// in this case the StoredOffsetSpec is overridden by the ConsumerUpdateListener
592
632
// since the user decided to override the default behavior
593
-
633
+ await MaybeLockDispatch ( ) . ConfigureAwait ( false ) ;
594
634
try
595
635
{
596
636
_config . StoredOffsetSpec = await _config . ConsumerUpdateListener (
@@ -606,6 +646,10 @@ private async Task Init()
606
646
// in this case the default behavior is to use the OffsetTypeNext
607
647
_config . StoredOffsetSpec = new OffsetTypeNext ( ) ;
608
648
}
649
+ finally
650
+ {
651
+ MaybeReleaseLock ( ) ;
652
+ }
609
653
}
610
654
611
655
// Here we set the promotion status
0 commit comments