Skip to content

Commit 33e5248

Browse files
artembilangaryrussell
authored andcommitted
More fixes for IMAP channel adapter and its tests
https://build.spring.io/browse/INT-MASTERSPRING40-553 The `ImapMailReceiverTests` fails sporadically according some race condition or wrong logic. * Fix `ImapIdleChannelAdapter` to check for folder not null before performing logic in the `IdleTask` * Polishing for logs which are based on constant strings * Remove `volatile` from configuration properties in the `ImapIdleChannelAdapter` and `AbstractMailReceiver` * Refactor some smells into the `protected` getters instead of direct access to the property * Stop channel adapters in the `ImapMailReceiverTests` * Also destroy task schedulers in the `ImapMailReceiverTests`
1 parent ca2e70f commit 33e5248

File tree

3 files changed

+73
-66
lines changed

3 files changed

+73
-66
lines changed

spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java

+31-29
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Iwein Fuld
6161
* @author Oleg Zhurakousky
6262
* @author Gary Russell
63+
* @author Artem Bilan
6364
*/
6465
public abstract class AbstractMailReceiver extends IntegrationObjectSupport implements MailReceiver, DisposableBean {
6566

@@ -69,43 +70,41 @@ public abstract class AbstractMailReceiver extends IntegrationObjectSupport impl
6970
*/
7071
public final static String DEFAULT_SI_USER_FLAG = "spring-integration-mail-adapter";
7172

72-
protected final Log logger = LogFactory.getLog(getClass());
73+
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR safe to use final
7374

7475
private final URLName url;
7576

7677
private final Object folderMonitor = new Object();
7778

78-
private volatile String protocol;
79+
private String protocol;
7980

80-
private volatile int maxFetchSize = -1;
81+
private int maxFetchSize = -1;
8182

82-
private volatile Session session;
83+
private Session session;
8384

84-
private volatile Store store;
85-
86-
private volatile Folder folder;
85+
private boolean shouldDeleteMessages;
8786

88-
private volatile boolean shouldDeleteMessages;
87+
private int folderOpenMode = Folder.READ_ONLY;
8988

90-
protected volatile int folderOpenMode = Folder.READ_ONLY;
89+
private Properties javaMailProperties = new Properties();
9190

92-
private volatile Properties javaMailProperties = new Properties();
91+
private Authenticator javaMailAuthenticator;
9392

94-
private volatile Authenticator javaMailAuthenticator;
93+
private StandardEvaluationContext evaluationContext;
9594

96-
private volatile StandardEvaluationContext evaluationContext;
95+
private Expression selectorExpression;
9796

98-
private volatile Expression selectorExpression;
97+
private HeaderMapper<MimeMessage> headerMapper;
9998

100-
private volatile HeaderMapper<MimeMessage> headerMapper;
99+
private String userFlag = DEFAULT_SI_USER_FLAG;
101100

102-
protected volatile boolean initialized;
101+
private boolean embeddedPartsAsBytes = true;
103102

104-
private volatile String userFlag = DEFAULT_SI_USER_FLAG;
103+
private boolean simpleContent;
105104

106-
private volatile boolean embeddedPartsAsBytes = true;
105+
private volatile Store store;
107106

108-
private volatile boolean simpleContent;
107+
private volatile Folder folder;
109108

110109
public AbstractMailReceiver() {
111110
this.url = null;
@@ -283,6 +282,10 @@ protected Folder getFolder() {
283282
return this.folder;
284283
}
285284

285+
protected int getFolderOpenMode() {
286+
return this.folderOpenMode;
287+
}
288+
286289
/**
287290
* Subclasses must implement this method to return new mail messages.
288291
*
@@ -291,7 +294,7 @@ protected Folder getFolder() {
291294
*/
292295
protected abstract Message[] searchForNewMessages() throws MessagingException;
293296

294-
private void openSession() throws MessagingException {
297+
private void openSession() {
295298
if (this.session == null) {
296299
if (this.javaMailAuthenticator != null) {
297300
this.session = Session.getInstance(this.javaMailProperties, this.javaMailAuthenticator);
@@ -316,7 +319,7 @@ else if (this.protocol != null) {
316319
}
317320
if (!this.store.isConnected()) {
318321
if (this.logger.isDebugEnabled()) {
319-
this.logger.debug("connecting to store [" + MailTransportUtils.toPasswordProtectedString(this.url) + "]");
322+
this.logger.debug("connecting to store [" + this.store.getURLName() + "]");
320323
}
321324
this.store.connect();
322325
}
@@ -338,7 +341,7 @@ protected void openFolder() throws MessagingException {
338341
return;
339342
}
340343
if (this.logger.isDebugEnabled()) {
341-
this.logger.debug("opening folder [" + MailTransportUtils.toPasswordProtectedString(this.url) + "]");
344+
this.logger.debug("opening folder [" + this.folder.getURLName() + "]");
342345
}
343346
this.folder.open(this.folderOpenMode);
344347
}
@@ -353,7 +356,7 @@ public Object[] receive() throws javax.mail.MessagingException {
353356
try {
354357
this.openFolder();
355358
if (this.logger.isInfoEnabled()) {
356-
this.logger.info("attempting to receive mail from folder [" + this.getFolder().getFullName() + "]");
359+
this.logger.info("attempting to receive mail from folder [" + getFolder().getFullName() + "]");
357360
}
358361
Message[] messages = searchForNewMessages();
359362
if (this.maxFetchSize > 0 && messages.length > this.maxFetchSize) {
@@ -496,25 +499,26 @@ private void setMessageFlags(Message[] filteredMessages) throws MessagingExcepti
496499
* will be filtered out and remain on the server as never touched.
497500
*/
498501
private MimeMessage[] filterMessagesThruSelector(Message[] messages) throws MessagingException {
499-
List<MimeMessage> filteredMessages = new LinkedList<MimeMessage>();
502+
List<MimeMessage> filteredMessages = new LinkedList<>();
500503
for (int i = 0; i < messages.length; i++) {
501504
MimeMessage message = (MimeMessage) messages[i];
502505
if (this.selectorExpression != null) {
503-
if (this.selectorExpression.getValue(this.evaluationContext, message, Boolean.class)) {
506+
if (Boolean.TRUE.equals(
507+
this.selectorExpression.getValue(this.evaluationContext, message, Boolean.class))) {
504508
filteredMessages.add(message);
505509
}
506510
else {
507511
if (this.logger.isDebugEnabled()) {
508-
this.logger.debug("Fetched email with subject '" + message.getSubject() + "' will be discarded by the matching filter" +
509-
" and will not be flagged as SEEN.");
512+
this.logger.debug("Fetched email with subject '" + message.getSubject()
513+
+ "' will be discarded by the matching filter and will not be flagged as SEEN.");
510514
}
511515
}
512516
}
513517
else {
514518
filteredMessages.add(message);
515519
}
516520
}
517-
return filteredMessages.toArray(new MimeMessage[filteredMessages.size()]);
521+
return filteredMessages.toArray(new MimeMessage[0]);
518522
}
519523

520524
/**
@@ -562,7 +566,6 @@ public void destroy() {
562566
MailTransportUtils.closeService(this.store);
563567
this.folder = null;
564568
this.store = null;
565-
this.initialized = false;
566569
}
567570
}
568571

@@ -571,7 +574,6 @@ protected void onInit() throws Exception {
571574
super.onInit();
572575
this.folderOpenMode = Folder.READ_WRITE;
573576
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
574-
this.initialized = true;
575577
}
576578

577579
@Override

spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java

+24-30
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.Executors;
2424
import java.util.concurrent.ScheduledFuture;
2525

26+
import javax.mail.Folder;
2627
import javax.mail.FolderClosedException;
2728
import javax.mail.Message;
2829
import javax.mail.MessagingException;
@@ -64,29 +65,29 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be
6465

6566
private static final int DEFAULT_RECONNECT_DELAY = 10000;
6667

68+
private final ExceptionAwarePeriodicTrigger receivingTaskTrigger = new ExceptionAwarePeriodicTrigger();
69+
6770
private final IdleTask idleTask = new IdleTask();
6871

69-
private volatile Executor sendingTaskExecutor;
72+
private final ImapMailReceiver mailReceiver;
7073

71-
private volatile boolean sendingTaskExecutorSet;
74+
private TransactionSynchronizationFactory transactionSynchronizationFactory;
7275

73-
private volatile boolean shouldReconnectAutomatically = true;
76+
private ClassLoader classLoader;
7477

75-
private volatile ClassLoader classLoader;
78+
private ApplicationEventPublisher applicationEventPublisher;
7679

77-
private volatile List<Advice> adviceChain;
80+
private boolean shouldReconnectAutomatically = true;
7881

79-
private final ImapMailReceiver mailReceiver;
82+
private Executor sendingTaskExecutor;
8083

81-
private volatile long reconnectDelay = DEFAULT_RECONNECT_DELAY; // milliseconds
84+
private boolean sendingTaskExecutorSet;
8285

83-
private volatile ScheduledFuture<?> receivingTask;
84-
85-
private final ExceptionAwarePeriodicTrigger receivingTaskTrigger = new ExceptionAwarePeriodicTrigger();
86+
private List<Advice> adviceChain;
8687

87-
private volatile TransactionSynchronizationFactory transactionSynchronizationFactory;
88+
private long reconnectDelay = DEFAULT_RECONNECT_DELAY; // milliseconds
8889

89-
private volatile ApplicationEventPublisher applicationEventPublisher;
90+
private volatile ScheduledFuture<?> receivingTask;
9091

9192
public ImapIdleChannelAdapter(ImapMailReceiver mailReceiver) {
9293
Assert.notNull(mailReceiver, "'mailReceiver' must not be null");
@@ -187,7 +188,7 @@ private Runnable createMessageSendingTask(final Object mailMessage) {
187188
@SuppressWarnings("unchecked")
188189
org.springframework.messaging.Message<?> message =
189190
mailMessage instanceof Message
190-
? ImapIdleChannelAdapter.this.getMessageBuilderFactory().withPayload(mailMessage).build()
191+
? getMessageBuilderFactory().withPayload(mailMessage).build()
191192
: (org.springframework.messaging.Message<Object>) mailMessage;
192193

193194
if (TransactionSynchronizationManager.isActualTransactionActive()) {
@@ -251,9 +252,7 @@ private class ReceivingTask implements Runnable {
251252
public void run() {
252253
try {
253254
ImapIdleChannelAdapter.this.idleTask.run();
254-
if (logger.isDebugEnabled()) {
255-
logger.debug("Task completed successfully. Re-scheduling it again right away.");
256-
}
255+
logger.debug("Task completed successfully. Re-scheduling it again right away.");
257256
}
258257
catch (Exception e) { //run again after a delay
259258
if (logger.isWarnEnabled()) {
@@ -281,33 +280,28 @@ public void run() {
281280
* The following shouldn't be necessary because doStart() will have ensured we have
282281
* one. But, just in case...
283282
*/
284-
Assert.state(ImapIdleChannelAdapter.this.sendingTaskExecutor != null, "'sendingTaskExecutor' must not be null");
283+
Assert.state(ImapIdleChannelAdapter.this.sendingTaskExecutor != null,
284+
"'sendingTaskExecutor' must not be null");
285285

286286
try {
287-
if (logger.isDebugEnabled()) {
288-
logger.debug("waiting for mail");
289-
}
287+
logger.debug("waiting for mail");
290288
ImapIdleChannelAdapter.this.mailReceiver.waitForNewMessages();
291-
if (ImapIdleChannelAdapter.this.mailReceiver.getFolder().isOpen()) {
289+
Folder folder = ImapIdleChannelAdapter.this.mailReceiver.getFolder();
290+
if (folder != null && folder.isOpen()) {
292291
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
293292
if (logger.isDebugEnabled()) {
294293
logger.debug("received " + mailMessages.length + " mail messages");
295294
}
296-
for (final Object mailMessage : mailMessages) {
297-
295+
for (Object mailMessage : mailMessages) {
298296
Runnable messageSendingTask = createMessageSendingTask(mailMessage);
299-
300297
ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask);
301298
}
302299
}
303300
}
304301
catch (MessagingException e) {
305-
if (logger.isWarnEnabled()) {
306-
logger.warn("error occurred in idle task", e);
307-
}
302+
logger.warn("error occurred in idle task", e);
308303
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
309-
throw new IllegalStateException(
310-
"Failure in 'idle' task. Will resubmit.", e);
304+
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", e);
311305
}
312306
else {
313307
throw new org.springframework.messaging.MessagingException(

spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java

+18-7
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import javax.mail.search.AndTerm;
6565
import javax.mail.search.FlagTerm;
6666
import javax.mail.search.FromTerm;
67-
import javax.mail.search.SearchTerm;
6867

6968
import org.apache.commons.logging.Log;
7069
import org.junit.AfterClass;
@@ -254,6 +253,8 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo
254253
assertNotNull(channel.receive(10000)); // new message after idle
255254
assertNull(channel.receive(100)); // no new message after second and third idle
256255
verify(logger).debug("Canceling IDLE");
256+
257+
adapter.stop();
257258
taskScheduler.shutdown();
258259
assertTrue(imapIdleServer.assertReceived("storeUserFlag"));
259260
}
@@ -470,6 +471,7 @@ public void receiveAndIgnoreMarkAsReadDontDelete() throws Exception {
470471
@Ignore
471472
public void testMessageHistory() throws Exception {
472473
ImapIdleChannelAdapter adapter = this.context.getBean("simpleAdapter", ImapIdleChannelAdapter.class);
474+
adapter.setReconnectDelay(1);
473475

474476
AbstractMailReceiver receiver = new ImapMailReceiver();
475477
receiver = spy(receiver);
@@ -526,6 +528,7 @@ protected Object handleRequestMessage(org.springframework.messaging.Message<?> r
526528
adapter.setOutputChannel(channel);
527529
QueueChannel errorChannel = new QueueChannel();
528530
adapter.setErrorChannel(errorChannel);
531+
adapter.setReconnectDelay(1);
529532

530533
AbstractMailReceiver receiver = new ImapMailReceiver();
531534
receiver = spy(receiver);
@@ -568,6 +571,7 @@ public void testNoInitialIdleDelayWhenRecentNotSupported() throws Exception {
568571

569572
QueueChannel channel = new QueueChannel();
570573
adapter.setOutputChannel(channel);
574+
adapter.setReconnectDelay(1);
571575

572576
ImapMailReceiver receiver = new ImapMailReceiver("imap:foo");
573577
receiver = spy(receiver);
@@ -706,10 +710,14 @@ public void testConnectionException() throws Exception {
706710
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
707711
taskScheduler.initialize();
708712
adapter.setTaskScheduler(taskScheduler);
713+
adapter.setReconnectDelay(1);
709714
adapter.start();
710715
assertTrue(latch.await(10, TimeUnit.SECONDS));
711716
assertThat(theEvent.get().toString(),
712717
endsWith("cause=java.lang.IllegalStateException: Failure in 'idle' task. Will resubmit.]"));
718+
719+
adapter.stop();
720+
taskScheduler.destroy();
713721
}
714722

715723
@Test // see INT-1801
@@ -777,9 +785,9 @@ public void testAttachmentsWithMappingMultiAsBytes() throws Exception {
777785
org.springframework.messaging.Message<?> received = messages[0];
778786
Object content = received.getPayload();
779787
assertThat(content, instanceOf(byte[].class));
780-
assertThat((String) received.getHeaders().get(MailHeaders.CONTENT_TYPE),
788+
assertThat(received.getHeaders().get(MailHeaders.CONTENT_TYPE),
781789
equalTo("multipart/mixed;\r\n boundary=\"------------040903000701040401040200\""));
782-
assertThat((String) received.getHeaders().get(MessageHeaders.CONTENT_TYPE),
790+
assertThat(received.getHeaders().get(MessageHeaders.CONTENT_TYPE),
783791
equalTo("application/octet-stream"));
784792
}
785793

@@ -789,8 +797,8 @@ public void testAttachmentsWithMapping() throws Exception {
789797
receiver.setHeaderMapper(new DefaultMailHeaderMapper());
790798
receiver.setEmbeddedPartsAsBytes(false);
791799
testAttachmentsGuts(receiver);
792-
org.springframework.messaging.Message<?>[] messages = (org.springframework.messaging.Message<?>[]) receiver
793-
.receive();
800+
org.springframework.messaging.Message<?>[] messages =
801+
(org.springframework.messaging.Message<?>[]) receiver.receive();
794802
Object content = messages[0].getPayload();
795803
assertThat(content, instanceOf(Multipart.class));
796804
assertEquals("bar", ((Multipart) content).getBodyPart(0).getContent().toString().trim());
@@ -804,7 +812,7 @@ private Folder testAttachmentsGuts(final ImapMailReceiver receiver) throws Messa
804812
given(folder.isOpen()).willReturn(true);
805813

806814
Message message = new MimeMessage(null, new ClassPathResource("test.mail").getInputStream());
807-
given(folder.search((SearchTerm) Mockito.any())).willReturn(new Message[] { message });
815+
given(folder.search(Mockito.any())).willReturn(new Message[] { message });
808816
given(store.getFolder(Mockito.any(URLName.class))).willReturn(folder);
809817
given(folder.getPermanentFlags()).willReturn(new Flags(Flags.Flag.USER));
810818
DirectFieldAccessor df = new DirectFieldAccessor(receiver);
@@ -821,6 +829,7 @@ public void testExecShutdown() {
821829
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
822830
taskScheduler.initialize();
823831
adapter.setTaskScheduler(taskScheduler);
832+
adapter.setReconnectDelay(1);
824833
adapter.start();
825834
ExecutorService exec = TestUtils.getPropertyValue(adapter, "sendingTaskExecutor", ExecutorService.class);
826835
adapter.stop();
@@ -908,7 +917,7 @@ public void testIdleReconnects() throws Exception {
908917
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
909918
taskScheduler.initialize();
910919
adapter.setTaskScheduler(taskScheduler);
911-
adapter.setReconnectDelay(50);
920+
adapter.setReconnectDelay(1);
912921
adapter.afterPropertiesSet();
913922
final CountDownLatch latch = new CountDownLatch(3);
914923
adapter.setApplicationEventPublisher(e -> {
@@ -917,6 +926,8 @@ public void testIdleReconnects() throws Exception {
917926
adapter.start();
918927
assertTrue(latch.await(60, TimeUnit.SECONDS));
919928
verify(store, atLeast(3)).connect();
929+
930+
adapter.stop();
920931
taskScheduler.shutdown();
921932
}
922933

0 commit comments

Comments
 (0)