diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java index 763c80930c8..d89b3d9d575 100755 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java @@ -390,7 +390,7 @@ public Object[] receive() throws javax.mail.MessagingException { } } - private void closeFolder() { + protected void closeFolder() { this.folderReadLock.lock(); try { MailTransportUtils.closeFolder(this.folder, this.shouldDeleteMessages); diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java index f8ca262f2e3..cc9867e5eaf 100755 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java @@ -78,7 +78,7 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be private boolean shouldReconnectAutomatically = true; - private Executor sendingTaskExecutor; + private Executor sendingTaskExecutor = Executors.newFixedThreadPool(1); private boolean sendingTaskExecutorSet; @@ -95,6 +95,7 @@ public ImapIdleChannelAdapter(ImapMailReceiver mailReceiver) { public void setTransactionSynchronizationFactory( TransactionSynchronizationFactory transactionSynchronizationFactory) { + this.transactionSynchronizationFactory = transactionSynchronizationFactory; } @@ -102,7 +103,6 @@ public void setAdviceChain(List adviceChain) { this.adviceChain = adviceChain; } - /** * Specify an {@link Executor} used to send messages received by the * adapter. @@ -156,9 +156,6 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv protected void doStart() { TaskScheduler scheduler = getTaskScheduler(); Assert.notNull(scheduler, "'taskScheduler' must not be null"); - if (this.sendingTaskExecutor == null) { - this.sendingTaskExecutor = Executors.newFixedThreadPool(1); - } this.receivingTask = scheduler.schedule(new ReceivingTask(), this.receivingTaskTrigger); } @@ -166,19 +163,16 @@ protected void doStart() { // guarded by super#lifecycleLock protected void doStop() { this.receivingTask.cancel(true); - try { - this.mailReceiver.destroy(); - } - catch (Exception e) { - throw new IllegalStateException( - "Failure during the destruction of Mail receiver: " + this.mailReceiver, e); - } - /* - * If we're running with the default executor, shut it down. - */ + this.mailReceiver.cancelPing(); + } + + @Override + public void destroy() { + super.destroy(); + this.mailReceiver.destroy(); + // If we're running with the default executor, shut it down. if (!this.sendingTaskExecutorSet && this.sendingTaskExecutor != null) { ((ExecutorService) this.sendingTaskExecutor).shutdown(); - this.sendingTaskExecutor = null; } } @@ -250,17 +244,19 @@ private class ReceivingTask implements Runnable { @Override public void run() { - try { - ImapIdleChannelAdapter.this.idleTask.run(); - logger.debug("Task completed successfully. Re-scheduling it again right away."); - } - catch (Exception e) { //run again after a delay - if (logger.isWarnEnabled()) { - logger.warn("Failed to execute IDLE task. Will attempt to resubmit in " - + ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.", e); + if (isRunning()) { + try { + ImapIdleChannelAdapter.this.idleTask.run(); + logger.debug("Task completed successfully. Re-scheduling it again right away."); + } + catch (Exception e) { //run again after a delay + if (logger.isWarnEnabled()) { + logger.warn("Failed to execute IDLE task. Will attempt to resubmit in " + + ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.", e); + } + ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution(); + publishException(e); } - ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution(); - publishException(e); } } @@ -275,38 +271,33 @@ private class IdleTask implements Runnable { @Override public void run() { - final TaskScheduler scheduler = getTaskScheduler(); - Assert.notNull(scheduler, "'taskScheduler' must not be null"); - /* - * The following shouldn't be necessary because doStart() will have ensured we have - * one. But, just in case... - */ - Assert.state(ImapIdleChannelAdapter.this.sendingTaskExecutor != null, - "'sendingTaskExecutor' must not be null"); - - try { - logger.debug("waiting for mail"); - ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages(); - Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder(); - if (folder != null && folder.isOpen()) { - Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive(); - if (logger.isDebugEnabled()) { - logger.debug("received " + mailMessages.length + " mail messages"); - } - for (Object mailMessage : mailMessages) { - Runnable messageSendingTask = createMessageSendingTask(mailMessage); - ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask); + if (isRunning()) { + try { + logger.debug("waiting for mail"); + ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages(); + Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder(); + if (folder != null && folder.isOpen() && isRunning()) { + Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive(); + if (logger.isDebugEnabled()) { + logger.debug("received " + mailMessages.length + " mail messages"); + } + for (Object mailMessage : mailMessages) { + Runnable messageSendingTask = createMessageSendingTask(mailMessage); + if (isRunning()) { + ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask); + } + } } } - } - catch (MessagingException e) { - logger.warn("error occurred in idle task", e); - if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) { - throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", e); - } - else { - throw new org.springframework.messaging.MessagingException( - "Failure in 'idle' task. Will NOT resubmit.", e); + catch (MessagingException e) { + logger.warn("error occurred in idle task", e); + if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) { + throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", e); + } + else { + throw new org.springframework.messaging.MessagingException( + "Failure in 'idle' task. Will NOT resubmit.", e); + } } } } diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapMailReceiver.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapMailReceiver.java index 51135a3bcd5..7820f85fbfb 100755 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapMailReceiver.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapMailReceiver.java @@ -156,9 +156,20 @@ public void destroy() { if (this.isInternalScheduler) { ((ThreadPoolTaskScheduler) this.scheduler).shutdown(); } + cancelPing(); + } + + /** + * The hook to be called when we need to cancel the current ping task and close the mail folder. + * In other words: when IMAP idle should be stopped for some reason. + * The next {@link #waitForNewMessages()} call will re-open the folder and start a new ping task. + * @since 5.2 + */ + public void cancelPing() { if (this.pingTask != null) { this.pingTask.cancel(true); } + closeFolder(); } /** diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java index 1db1e47ddad..0daf7dde0ed 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java @@ -34,7 +34,6 @@ import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -808,25 +807,6 @@ private Folder testAttachmentsGuts(final ImapMailReceiver receiver) throws Messa return folder; } - @Test - public void testExecShutdown() { - ImapIdleChannelAdapter adapter = new ImapIdleChannelAdapter(new ImapMailReceiver()); - ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); - taskScheduler.initialize(); - adapter.setTaskScheduler(taskScheduler); - adapter.setReconnectDelay(1); - adapter.start(); - ExecutorService exec = TestUtils.getPropertyValue(adapter, "sendingTaskExecutor", ExecutorService.class); - adapter.stop(); - assertThat(exec.isShutdown()).isTrue(); - adapter.start(); - exec = TestUtils.getPropertyValue(adapter, "sendingTaskExecutor", ExecutorService.class); - adapter.stop(); - assertThat(exec.isShutdown()).isTrue(); - - taskScheduler.shutdown(); - } - @Test public void testNullMessages() throws Exception { Message message1 = mock(Message.class);