Skip to content

Fix mail lock race condition & Sonar smells #2721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.mail.Authenticator;
Expand Down Expand Up @@ -72,7 +71,7 @@ public abstract class AbstractMailReceiver extends IntegrationObjectSupport impl

private final URLName url;

private final ReadWriteLock folderLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock folderLock = new ReentrantReadWriteLock();

private final Lock folderReadLock = this.folderLock.readLock();

Expand Down Expand Up @@ -340,7 +339,7 @@ private Folder obtainFolderInstance() throws MessagingException {

@Override
public Object[] receive() throws javax.mail.MessagingException {
this.folderReadLock.lock();
this.folderReadLock.lock(); // NOSONAR - guarded with the getReadHoldCount()
try {
try {
Folder folderToCheck = getFolder();
Expand All @@ -362,7 +361,9 @@ public Object[] receive() throws javax.mail.MessagingException {
}
}
finally {
this.folderReadLock.unlock();
if (this.folderLock.getReadHoldCount() > 0) {
this.folderReadLock.unlock();
}
}
}

Expand Down Expand Up @@ -416,22 +417,17 @@ private Object[] convertMessagesIfNecessary(MimeMessage[] filteredMessages) {
private Object extractContent(MimeMessage message, Map<String, Object> headers) {
Object content;
try {
MimeMessage theMessage;
MimeMessage theMessage = message;
if (this.simpleContent) {
theMessage = new IntegrationMimeMessage(message);
}
else {
theMessage = message;
}
content = theMessage.getContent();
if (content instanceof String) {
headers.put(MessageHeaders.CONTENT_TYPE, "text/plain");
String mailContentType = (String) headers.get(MailHeaders.CONTENT_TYPE);
if (mailContentType != null && mailContentType.toLowerCase().startsWith("text")) {
headers.put(MessageHeaders.CONTENT_TYPE, mailContentType);
}
else {
headers.put(MessageHeaders.CONTENT_TYPE, "text/plain");
}
}
else if (content instanceof InputStream) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -154,7 +154,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv

@Override // guarded by super#lifecycleLock
protected void doStart() {
final TaskScheduler scheduler = this.getTaskScheduler();
TaskScheduler scheduler = getTaskScheduler();
Assert.notNull(scheduler, "'taskScheduler' must not be null");
if (this.sendingTaskExecutor == null) {
this.sendingTaskExecutor = Executors.newFixedThreadPool(1);
Expand Down Expand Up @@ -182,39 +182,8 @@ protected void doStop() {
}
}

private Runnable createMessageSendingTask(final Object mailMessage) {
Runnable sendingTask = () -> {
@SuppressWarnings("unchecked")
org.springframework.messaging.Message<?> message =
mailMessage instanceof Message
? getMessageBuilderFactory().withPayload(mailMessage).build()
: (org.springframework.messaging.Message<Object>) mailMessage;

if (TransactionSynchronizationManager.isActualTransactionActive()) {
if (ImapIdleChannelAdapter.this.transactionSynchronizationFactory != null) {
TransactionSynchronization synchronization =
ImapIdleChannelAdapter.this.transactionSynchronizationFactory
.create(ImapIdleChannelAdapter.this);
if (synchronization != null) {
TransactionSynchronizationManager.registerSynchronization(synchronization);

if (synchronization instanceof IntegrationResourceHolderSynchronization
&& !TransactionSynchronizationManager.hasResource(ImapIdleChannelAdapter.this)) {

TransactionSynchronizationManager.bindResource(ImapIdleChannelAdapter.this,
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder());
}

Object resourceHolder =
TransactionSynchronizationManager.getResource(ImapIdleChannelAdapter.this);
if (resourceHolder instanceof IntegrationResourceHolder) {
((IntegrationResourceHolder) resourceHolder).setMessage(message);
}
}
}
}
sendMessage(message);
};
private Runnable createMessageSendingTask(Object mailMessage) {
Runnable sendingTask = prepareSendingTask(mailMessage);

// wrap in the TX proxy if necessary
if (!CollectionUtils.isEmpty(this.adviceChain)) {
Expand All @@ -229,6 +198,38 @@ private Runnable createMessageSendingTask(final Object mailMessage) {
return sendingTask;
}

private Runnable prepareSendingTask(Object mailMessage) {
return () -> {
@SuppressWarnings("unchecked")
org.springframework.messaging.Message<?> message =
mailMessage instanceof Message
? getMessageBuilderFactory().withPayload(mailMessage).build()
: (org.springframework.messaging.Message<Object>) mailMessage;

if (TransactionSynchronizationManager.isActualTransactionActive()
&& this.transactionSynchronizationFactory != null) {

TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(this);
if (synchronization != null) {
TransactionSynchronizationManager.registerSynchronization(synchronization);

if (synchronization instanceof IntegrationResourceHolderSynchronization
&& !TransactionSynchronizationManager.hasResource(this)) {

TransactionSynchronizationManager.bindResource(this,
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder());
}

Object resourceHolder = TransactionSynchronizationManager.getResource(this);
if (resourceHolder instanceof IntegrationResourceHolder) {
((IntegrationResourceHolder) resourceHolder).setMessage(message);
}
}
}
sendMessage(message);
};
}

private void publishException(Exception e) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(e));
Expand Down Expand Up @@ -262,6 +263,7 @@ public void run() {
publishException(e);
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class ImapMailReceiver extends AbstractMailReceiver {

private static final int DEFAULT_CANCEL_IDLE_INTERVAL = 120000;

private static final String PROTOCOL = "imap";

private final MessageCountListener messageCountListener = new SimpleMessageCountListener();

private final IdleCanceler idleCanceler = new IdleCanceler();
Expand All @@ -77,17 +79,17 @@ public class ImapMailReceiver extends AbstractMailReceiver {

public ImapMailReceiver() {
super();
this.setProtocol("imap");
setProtocol(PROTOCOL);
}

public ImapMailReceiver(String url) {
super(url);
if (url != null) {
Assert.isTrue(url.toLowerCase().startsWith("imap"),
Assert.isTrue(url.toLowerCase().startsWith(PROTOCOL),
"URL must start with 'imap' for the IMAP Mail receiver.");
}
else {
this.setProtocol("imap");
setProtocol(PROTOCOL);
}
}

Expand Down Expand Up @@ -126,7 +128,7 @@ public void setShouldMarkMessagesAsRead(Boolean shouldMarkMessagesAsRead) {
* @since 3.0.5
*/
public void setCancelIdleInterval(long cancelIdleInterval) {
this.cancelIdleInterval = cancelIdleInterval * 1000;
this.cancelIdleInterval = cancelIdleInterval * 1000; // NOSONAR - convert seconds to milliseconds
}

@Override
Expand All @@ -140,7 +142,7 @@ protected void onInit() {
this.isInternalScheduler = true;
}
Properties javaMailProperties = getJavaMailProperties();
for (String name : new String[]{"imap", "imaps"}) {
for (String name : new String[] { PROTOCOL, "imaps" }) {
String peek = "mail." + name + ".peek";
if (javaMailProperties.getProperty(peek) == null) {
javaMailProperties.setProperty(peek, "true");
Expand All @@ -154,6 +156,9 @@ public void destroy() {
if (this.isInternalScheduler) {
((ThreadPoolTaskScheduler) this.scheduler).shutdown();
}
if (this.pingTask != null) {
this.pingTask.cancel(true);
}
}

/**
Expand All @@ -162,18 +167,16 @@ public void destroy() {
* @throws MessagingException Any MessagingException.
*/
public void waitForNewMessages() throws MessagingException {
this.openFolder();
Folder folder = this.getFolder();
openFolder();
Folder folder = getFolder();
Assert.state(folder instanceof IMAPFolder,
"folder is not an instance of [" + IMAPFolder.class.getName() + "]");
() -> "folder is not an instance of [" + IMAPFolder.class.getName() + "]");
IMAPFolder imapFolder = (IMAPFolder) folder;
if (imapFolder.hasNewMessages()) {
return;
}
else if (!folder.getPermanentFlags().contains(Flags.Flag.RECENT)) {
if (searchForNewMessages().length > 0) {
return;
}
else if (!folder.getPermanentFlags().contains(Flags.Flag.RECENT) && searchForNewMessages().length > 0) {
return;
}
imapFolder.addMessageCountListener(this.messageCountListener);
try {
Expand Down Expand Up @@ -203,11 +206,11 @@ else if (!folder.getPermanentFlags().contains(Flags.Flag.RECENT)) {
*/
@Override
protected Message[] searchForNewMessages() throws MessagingException {
Flags supportedFlags = this.getFolder().getPermanentFlags();
SearchTerm searchTerm = this.compileSearchTerms(supportedFlags);
Folder folder = this.getFolder();
if (folder.isOpen()) {
return nullSafeMessages(searchTerm != null ? folder.search(searchTerm) : folder.getMessages());
Folder folderToUse = getFolder();
Flags supportedFlags = folderToUse.getPermanentFlags();
SearchTerm searchTerm = compileSearchTerms(supportedFlags);
if (folderToUse.isOpen()) {
return nullSafeMessages(searchTerm != null ? folderToUse.search(searchTerm) : folderToUse.getMessages());
}
throw new MessagingException("Folder is closed");
}
Expand Down Expand Up @@ -257,9 +260,11 @@ public void run() {
folder.isOpen(); // resets idle state
}
}
catch (Exception ignore) {
catch (Exception ex) {
logger.error("Error during resetting idle state.", ex);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this was to avoid log noise. Why change? If your intention is to alert the user of the problem, it would be better to log rather than the task executor printing the stack trace.


}

/**
Expand Down Expand Up @@ -327,28 +332,35 @@ public SearchTerm generateSearchTerm(Flags supportedFlags, Folder folder) {
}

if (!recentFlagSupported) {
NotTerm notFlagged = null;
if (folder.getPermanentFlags().contains(Flags.Flag.USER)) {
searchTerm = applyTermsWhenNoRecentFlag(folder, searchTerm);
}
return searchTerm;
}

private SearchTerm applyTermsWhenNoRecentFlag(Folder folder, SearchTerm searchTerm) {
NotTerm notFlagged = null;
if (folder.getPermanentFlags().contains(Flag.USER)) {
if (logger.isDebugEnabled()) {
logger.debug("This email server does not support RECENT flag, but it does support " +
"USER flags which will be used to prevent duplicates during email fetch." +
" This receiver instance uses flag: " + getUserFlag());
Flags siFlags = new Flags();
siFlags.add(getUserFlag());
notFlagged = new NotTerm(new FlagTerm(siFlags, true));
}
else {
logger.debug("This email server does not support RECENT or USER flags. " +
"System flag 'Flag.FLAGGED' will be used to prevent duplicates during email fetch.");
notFlagged = new NotTerm(new FlagTerm(new Flags(Flags.Flag.FLAGGED), true));
}
if (searchTerm == null) {
searchTerm = notFlagged;
}
else {
searchTerm = new AndTerm(searchTerm, notFlagged);
}
Flags siFlags = new Flags();
siFlags.add(getUserFlag());
notFlagged = new NotTerm(new FlagTerm(siFlags, true));
}
else {
logger.debug("This email server does not support RECENT or USER flags. " +
"System flag 'Flag.FLAGGED' will be used to prevent duplicates during email fetch.");
notFlagged = new NotTerm(new FlagTerm(new Flags(Flag.FLAGGED), true));
}

if (searchTerm == null) {
return notFlagged;
}
else {
return new AndTerm(searchTerm, notFlagged);
}
return searchTerm;
}

}
Expand Down
Loading