1
1
/*
2
- * Copyright 2018-2022 the original author or authors.
2
+ * Copyright 2018-2023 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
30
30
import java .util .TreeSet ;
31
31
import java .util .concurrent .ConcurrentHashMap ;
32
32
import java .util .concurrent .atomic .AtomicInteger ;
33
+ import java .util .concurrent .locks .Lock ;
34
+ import java .util .concurrent .locks .ReentrantLock ;
33
35
import java .util .regex .Pattern ;
34
36
import java .util .stream .Collectors ;
35
37
95
97
* @author Mark Norkin
96
98
* @author Artem Bilan
97
99
* @author Anshul Mehra
100
+ * @author Christian Tzolov
98
101
*
99
102
* @since 5.4
100
103
*
@@ -111,11 +114,13 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
111
114
*/
112
115
public static final String REMAINING_RECORDS = KafkaHeaders .PREFIX + "remainingRecords" ;
113
116
117
+ private final Lock lock = new ReentrantLock ();
118
+
114
119
private final ConsumerFactory <K , V > consumerFactory ;
115
120
116
121
private final KafkaAckCallbackFactory <K , V > ackCallbackFactory ;
117
122
118
- private final Object consumerMonitor = new Object ();
123
+ private final Lock consumerMonitor = new ReentrantLock ();
119
124
120
125
private final Map <TopicPartition , Set <KafkaAckInfo <K , V >>> inflightRecords = new ConcurrentHashMap <>();
121
126
@@ -385,31 +390,61 @@ private boolean maxPollStringGtr1(Object maxPoll) {
385
390
}
386
391
387
392
@ Override
388
- public synchronized boolean isRunning () {
389
- return this .running ;
393
+ public boolean isRunning () {
394
+ this .lock .lock ();
395
+ try {
396
+ return this .running ;
397
+ }
398
+ finally {
399
+ this .lock .unlock ();
400
+ }
390
401
}
391
402
392
403
@ Override
393
- public synchronized void start () {
394
- this .running = true ;
395
- this .stopped = false ;
404
+ public void start () {
405
+ this .lock .lock ();
406
+ try {
407
+ this .running = true ;
408
+ this .stopped = false ;
409
+ }
410
+ finally {
411
+ this .lock .unlock ();
412
+ }
396
413
}
397
414
398
415
@ Override
399
- public synchronized void stop () {
400
- stopConsumer ();
401
- this .running = false ;
402
- this .stopped = true ;
416
+ public void stop () {
417
+ this .lock .lock ();
418
+ try {
419
+ stopConsumer ();
420
+ this .running = false ;
421
+ this .stopped = true ;
422
+ }
423
+ finally {
424
+ this .lock .unlock ();
425
+ }
403
426
}
404
427
405
428
@ Override
406
- public synchronized void pause () {
407
- this .pausing = true ;
429
+ public void pause () {
430
+ this .lock .lock ();
431
+ try {
432
+ this .pausing = true ;
433
+ }
434
+ finally {
435
+ this .lock .unlock ();
436
+ }
408
437
}
409
438
410
439
@ Override
411
- public synchronized void resume () {
412
- this .pausing = false ;
440
+ public void resume () {
441
+ this .lock .lock ();
442
+ try {
443
+ this .pausing = false ;
444
+ }
445
+ finally {
446
+ this .lock .unlock ();
447
+ }
413
448
}
414
449
415
450
@ Override
@@ -418,35 +453,43 @@ public boolean isPaused() {
418
453
}
419
454
420
455
@ Override // NOSONAR - not so complex
421
- protected synchronized Object doReceive () {
422
- if (this .stopped ) {
423
- this .logger .debug ("Message source is stopped; no records will be returned" );
424
- return null ;
425
- }
426
- if (this .consumer == null ) {
427
- createConsumer ();
428
- this .running = true ;
429
- }
430
- if (this .pausing && !this .paused && this .assignedPartitions .size () > 0 ) {
431
- this .consumer .pause (this .assignedPartitions );
432
- this .paused = true ;
433
- }
434
- else if (this .paused && !this .pausing ) {
435
- this .consumer .resume (this .assignedPartitions );
436
- this .paused = false ;
456
+ protected Object doReceive () {
457
+ this .lock .lock ();
458
+ try {
459
+
460
+ if (this .stopped ) {
461
+ this .logger .debug ("Message source is stopped; no records will be returned" );
462
+ return null ;
463
+ }
464
+ if (this .consumer == null ) {
465
+ createConsumer ();
466
+ this .running = true ;
467
+ }
468
+ if (this .pausing && !this .paused && !this .assignedPartitions .isEmpty ()) {
469
+ this .consumer .pause (this .assignedPartitions );
470
+ this .paused = true ;
471
+ }
472
+ else if (this .paused && !this .pausing ) {
473
+ this .consumer .resume (this .assignedPartitions );
474
+ this .paused = false ;
475
+ }
476
+ if (this .paused && this .recordsIterator == null ) {
477
+ this .logger .debug ("Consumer is paused; no records will be returned" );
478
+ }
479
+ ConsumerRecord <K , V > record = pollRecord ();
480
+
481
+ return record != null
482
+ ? recordToMessage (record )
483
+ : null ;
437
484
}
438
- if ( this . paused && this . recordsIterator == null ) {
439
- this .logger . debug ( "Consumer is paused; no records will be returned" );
485
+ finally {
486
+ this .lock . unlock ( );
440
487
}
441
- ConsumerRecord <K , V > record = pollRecord ();
442
-
443
- return record != null
444
- ? recordToMessage (record )
445
- : null ;
446
488
}
447
489
448
490
protected void createConsumer () {
449
- synchronized (this .consumerMonitor ) {
491
+ this .consumerMonitor .lock ();
492
+ try {
450
493
this .consumer = this .consumerFactory .createConsumer (this .consumerProperties .getGroupId (),
451
494
this .consumerProperties .getClientId (), null , this .consumerProperties .getKafkaConsumerProperties ());
452
495
@@ -466,6 +509,9 @@ else if (partitions != null) {
466
509
rebalanceCallback );
467
510
}
468
511
}
512
+ finally {
513
+ this .consumerMonitor .unlock ();
514
+ }
469
515
}
470
516
471
517
private void assignAndSeekPartitions (TopicPartitionOffset [] partitions ) {
@@ -522,7 +568,8 @@ private ConsumerRecord<K, V> pollRecord() {
522
568
return nextRecord ();
523
569
}
524
570
else {
525
- synchronized (this .consumerMonitor ) {
571
+ this .consumerMonitor .lock ();
572
+ try {
526
573
try {
527
574
ConsumerRecords <K , V > records = this .consumer
528
575
.poll (this .assignedPartitions .isEmpty () ? this .assignTimeout : this .pollTimeout );
@@ -545,6 +592,9 @@ private ConsumerRecord<K, V> pollRecord() {
545
592
return null ;
546
593
}
547
594
}
595
+ finally {
596
+ this .consumerMonitor .unlock ();
597
+ }
548
598
}
549
599
}
550
600
@@ -590,18 +640,28 @@ private Object recordToMessage(ConsumerRecord<K, V> record) {
590
640
}
591
641
592
642
@ Override
593
- public synchronized void destroy () {
594
- stopConsumer ();
643
+ public void destroy () {
644
+ this .lock .lock ();
645
+ try {
646
+ stopConsumer ();
647
+ }
648
+ finally {
649
+ this .lock .unlock ();
650
+ }
595
651
}
596
652
597
653
private void stopConsumer () {
598
- synchronized (this .consumerMonitor ) {
654
+ this .consumerMonitor .lock ();
655
+ try {
599
656
if (this .consumer != null ) {
600
657
this .consumer .close (this .closeTimeout );
601
658
this .consumer = null ;
602
659
this .assignedPartitions .clear ();
603
660
}
604
661
}
662
+ finally {
663
+ this .consumerMonitor .unlock ();
664
+ }
605
665
}
606
666
607
667
private class IntegrationConsumerRebalanceListener implements ConsumerRebalanceListener {
0 commit comments