|
1 | 1 | /*
|
2 |
| - * Copyright 2002-2020 the original author or authors. |
| 2 | + * Copyright 2002-2021 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
@@ -188,12 +188,12 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
|
188 | 188 |
|
189 | 189 | private int maxMessagesPerTask = Integer.MIN_VALUE;
|
190 | 190 |
|
191 |
| - private int idleReceivesPerTaskLimit = Integer.MIN_VALUE; |
192 |
| - |
193 | 191 | private int idleConsumerLimit = 1;
|
194 | 192 |
|
195 | 193 | private int idleTaskExecutionLimit = 1;
|
196 | 194 |
|
| 195 | + private int idleReceivesPerTaskLimit = Integer.MIN_VALUE; |
| 196 | + |
197 | 197 | private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<>();
|
198 | 198 |
|
199 | 199 | private int activeInvokerCount = 0;
|
@@ -441,44 +441,6 @@ public final int getMaxMessagesPerTask() {
|
441 | 441 | }
|
442 | 442 | }
|
443 | 443 |
|
444 |
| - /** |
445 |
| - * Return the maximum number of subsequent idle (or null) messages to receive in a single task. |
446 |
| - */ |
447 |
| - public int getIdleReceivesPerTaskLimit() { |
448 |
| - synchronized (this.lifecycleMonitor) { |
449 |
| - return idleReceivesPerTaskLimit; |
450 |
| - } |
451 |
| - } |
452 |
| - |
453 |
| - /** |
454 |
| - * Marks the consumer as 'idle' after the specified number of idle receives |
455 |
| - * have been reached. An idle receive is counted from the moment a null message |
456 |
| - * is returned by the receiver after the potential {@link #setReceiveTimeout(long)} |
457 |
| - * elapsed. This gives the opportunity to check if the idle task count exceeds |
458 |
| - * {@link #setIdleTaskExecutionLimit(int)} and based on that decide if the task needs |
459 |
| - * to be re-scheduled or not, saving resources that would otherwise be held. |
460 |
| - * <p> This setting differs from {@link #setMaxMessagesPerTask(int)} where the task |
461 |
| - * is released and re-scheduled after this limit is reached, no matter if the the received |
462 |
| - * messages were null or non-null messages. This setting alone can be inflexible if one |
463 |
| - * desires to have a large enough batch for each task but requires a quick(er) release |
464 |
| - * from the moment there are no more messages to process. <p> This setting differs from |
465 |
| - * {@link #setIdleTaskExecutionLimit(int)} where this limit decides after how many iterations |
466 |
| - * of being marked as idle, a task is released. <p> For example; if |
467 |
| - * {@link #setMaxMessagesPerTask(int)} is set to '500' and |
468 |
| - * {@link #setIdleReceivesPerTaskLimit(int)} is set to '60' and {@link #setReceiveTimeout(long)} |
469 |
| - * is set to '1000' and {@link #setIdleTaskExecutionLimit(int)} is set to '1', then 500 messages |
470 |
| - * per task would be processed unless there is a subsequent number of 60 idle messages received, |
471 |
| - * the task would be marked as idle and released. This also means that after the last message was |
472 |
| - * processed, the task would be released after 60seconds as long as no new messages appear. |
473 |
| - * @param idleReceivesPerTaskLimit {@link Integer#MIN_VALUE} to disable, any value > 0 to enable releasing the |
474 |
| - */ |
475 |
| - public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) { |
476 |
| - Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)"); |
477 |
| - synchronized (this.lifecycleMonitor) { |
478 |
| - this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit; |
479 |
| - } |
480 |
| - } |
481 |
| - |
482 | 444 | /**
|
483 | 445 | * Specify the limit for the number of consumers that are allowed to be idle
|
484 | 446 | * at any given time.
|
@@ -548,6 +510,49 @@ public final int getIdleTaskExecutionLimit() {
|
548 | 510 | }
|
549 | 511 | }
|
550 | 512 |
|
| 513 | + /** |
| 514 | + * Marks the consumer as 'idle' after the specified number of idle receives |
| 515 | + * have been reached. An idle receive is counted from the moment a null message |
| 516 | + * is returned by the receiver after the potential {@link #setReceiveTimeout} |
| 517 | + * elapsed. This gives the opportunity to check if the idle task count exceeds |
| 518 | + * {@link #setIdleTaskExecutionLimit} and based on that decide if the task needs |
| 519 | + * to be re-scheduled or not, saving resources that would otherwise be held. |
| 520 | + * <p>This setting differs from {@link #setMaxMessagesPerTask} where the task is |
| 521 | + * released and re-scheduled after this limit is reached, no matter if the received |
| 522 | + * messages were null or non-null messages. This setting alone can be inflexible |
| 523 | + * if one desires to have a large enough batch for each task but requires a |
| 524 | + * quick(er) release from the moment there are no more messages to process. |
| 525 | + * <p>This setting differs from {@link #setIdleTaskExecutionLimit} where this limit |
| 526 | + * decides after how many iterations of being marked as idle, a task is released. |
| 527 | + * <p>For example: If {@link #setMaxMessagesPerTask} is set to '500' and |
| 528 | + * {@code #setIdleReceivesPerTaskLimit} is set to '60' and {@link #setReceiveTimeout} |
| 529 | + * is set to '1000' and {@link #setIdleTaskExecutionLimit} is set to '1', then 500 |
| 530 | + * messages per task would be processed unless there is a subsequent number of 60 |
| 531 | + * idle messages received, the task would be marked as idle and released. This also |
| 532 | + * means that after the last message was processed, the task would be released after |
| 533 | + * 60 seconds as long as no new messages appear. |
| 534 | + * @since 5.3.5 |
| 535 | + * @see #setMaxMessagesPerTask |
| 536 | + * @see #setReceiveTimeout |
| 537 | + */ |
| 538 | + public void setIdleReceivesPerTaskLimit(int idleReceivesPerTaskLimit) { |
| 539 | + Assert.isTrue(idleReceivesPerTaskLimit != 0, "'idleReceivesPerTaskLimit' must not be 0)"); |
| 540 | + synchronized (this.lifecycleMonitor) { |
| 541 | + this.idleReceivesPerTaskLimit = idleReceivesPerTaskLimit; |
| 542 | + } |
| 543 | + } |
| 544 | + |
| 545 | + /** |
| 546 | + * Return the maximum number of subsequent null messages to receive in a single task |
| 547 | + * before marking the consumer as 'idle'. |
| 548 | + * @since 5.3.5 |
| 549 | + */ |
| 550 | + public int getIdleReceivesPerTaskLimit() { |
| 551 | + synchronized (this.lifecycleMonitor) { |
| 552 | + return this.idleReceivesPerTaskLimit; |
| 553 | + } |
| 554 | + } |
| 555 | + |
551 | 556 |
|
552 | 557 | //-------------------------------------------------------------------------
|
553 | 558 | // Implementation of AbstractMessageListenerContainer's template methods
|
@@ -1112,17 +1117,20 @@ public void run() {
|
1112 | 1117 | }
|
1113 | 1118 | boolean messageReceived = false;
|
1114 | 1119 | try {
|
1115 |
| - if (maxMessagesPerTask < 0 && idleReceivesPerTaskLimit < 0) { |
| 1120 | + int messageLimit = maxMessagesPerTask; |
| 1121 | + int idleLimit = idleReceivesPerTaskLimit; |
| 1122 | + if (messageLimit < 0 && idleLimit < 0) { |
1116 | 1123 | messageReceived = executeOngoingLoop();
|
1117 | 1124 | }
|
1118 | 1125 | else {
|
1119 |
| - int idleMessagesReceived = 0; |
1120 | 1126 | int messageCount = 0;
|
1121 |
| - while (isRunning() && (messageCount < maxMessagesPerTask) && (idleMessagesReceived < idleReceivesPerTaskLimit)) { |
1122 |
| - boolean messageReceivedThisInvocation = invokeListener(); |
1123 |
| - idleMessagesReceived = messageReceivedThisInvocation ? 0 : idleMessagesReceived + 1; |
1124 |
| - messageReceived |= messageReceivedThisInvocation; |
| 1127 | + int idleCount = 0; |
| 1128 | + while (isRunning() && (messageLimit < 0 || messageCount < messageLimit) && |
| 1129 | + (idleLimit < 0 || idleCount < idleLimit)) { |
| 1130 | + boolean currentReceived = invokeListener(); |
| 1131 | + messageReceived |= currentReceived; |
1125 | 1132 | messageCount++;
|
| 1133 | + idleCount = (currentReceived ? 0 : idleCount + 1); |
1126 | 1134 | }
|
1127 | 1135 | }
|
1128 | 1136 | }
|
|
0 commit comments