diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java b/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java index 18f5aad27da..a7a63e5a315 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ import org.springframework.integration.file.filters.FileListFilter; import org.springframework.integration.file.filters.ResettableFileListFilter; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -103,15 +104,15 @@ public class FileReadingMessageSource extends AbstractMessageSource */ private final Queue toBeReceived; - private volatile File directory; + private File directory; - private volatile DirectoryScanner scanner = new DefaultDirectoryScanner(); + private DirectoryScanner scanner = new DefaultDirectoryScanner(); - private volatile boolean scannerExplicitlySet; + private boolean scannerExplicitlySet; - private volatile boolean autoCreateDirectory = true; + private boolean autoCreateDirectory = true; - private volatile boolean scanEachPoll = false; + private boolean scanEachPoll = false; private FileListFilter filter; @@ -119,7 +120,7 @@ public class FileReadingMessageSource extends AbstractMessageSource private boolean useWatchService; - private WatchEventType[] watchEvents = new WatchEventType[] { WatchEventType.CREATE }; + private WatchEventType[] watchEvents = { WatchEventType.CREATE }; /** * Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity. @@ -132,7 +133,6 @@ public FileReadingMessageSource() { * Creates a FileReadingMessageSource with a bounded queue of the given * capacity. This can be used to reduce the memory footprint of this * component when reading from a large directory. - * * @param internalQueueCapacity * the size of the queue used to cache files to be received * internally. This queue can be made larger to optimize the @@ -151,26 +151,22 @@ public FileReadingMessageSource(int internalQueueCapacity) { /** * Creates a FileReadingMessageSource with a {@link PriorityBlockingQueue} - * ordered with the passed in {@link Comparator} - *

- * The size of the queue used should be large enough to hold all the files + * ordered with the passed in {@link Comparator}. + *

The size of the queue used should be large enough to hold all the files * in the input directory in order to sort all of them, so restricting the * size of the queue is mutually exclusive with ordering. No guarantees * about file delivery order can be made under concurrent access. - *

- * * @param receptionOrderComparator * the comparator to be used to order the files in the internal * queue */ - public FileReadingMessageSource(Comparator receptionOrderComparator) { + public FileReadingMessageSource(@Nullable Comparator receptionOrderComparator) { this.toBeReceived = new PriorityBlockingQueue<>(DEFAULT_INTERNAL_QUEUE_CAPACITY, receptionOrderComparator); } /** * Specify the input directory. - * * @param directory to monitor */ public void setDirectory(File directory) { @@ -181,7 +177,6 @@ public void setDirectory(File directory) { /** * Optionally specify a custom scanner, for example the * {@link WatchServiceDirectoryScanner} - * * @param scanner scanner implementation */ public void setScanner(DirectoryScanner scanner) { @@ -207,7 +202,6 @@ public DirectoryScanner getScanner() { * true. If set to false and the * source directory does not exist, an Exception will be thrown upon * initialization. - * * @param autoCreateDirectory * should the directory to be monitored be created when this * component starts up? @@ -224,8 +218,7 @@ public void setAutoCreateDirectory(boolean autoCreateDirectory) { * If multiple filters are required a * {@link org.springframework.integration.file.filters.CompositeFileListFilter} * can be used to group them together. - *

- * The supplied filter must be thread safe.. + *

The supplied filter must be thread safe.. * @param filter a filter */ public void setFilter(FileListFilter filter) { @@ -256,7 +249,6 @@ public void setLocker(FileLocker locker) { * will more likely be out of sync with the file system if this flag is set * to false, but it will change more often (causing expensive * reordering) if it is set to true. - * * @param scanEachPoll * whether or not the component should re-scan (as opposed to not * rescanning until the entire backlog has been delivered) @@ -304,15 +296,15 @@ public String getComponentType() { @Override public void start() { if (!this.running.getAndSet(true)) { - if (!this.directory.exists() && this.autoCreateDirectory) { - this.directory.mkdirs(); + if (!this.directory.exists() && this.autoCreateDirectory && !this.directory.mkdirs()) { + throw new IllegalStateException("Cannot create directory or ita parents: " + this.directory); } Assert.isTrue(this.directory.exists(), - "Source directory [" + this.directory + "] does not exist."); + () -> "Source directory [" + this.directory + "] does not exist."); Assert.isTrue(this.directory.isDirectory(), - "Source path [" + this.directory + "] does not point to a directory."); + () -> "Source path [" + this.directory + "] does not point to a directory."); Assert.isTrue(this.directory.canRead(), - "Source directory [" + this.directory + "] is not readable."); + () -> "Source directory [" + this.directory + "] is not readable."); if (this.scanner instanceof Lifecycle) { ((Lifecycle) this.scanner).start(); } @@ -336,7 +328,7 @@ protected void onInit() { Assert.notNull(this.directory, "'directory' must not be null"); Assert.state(!(this.scannerExplicitlySet && this.useWatchService), - "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this.scanner); + () -> "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this.scanner); if (this.useWatchService) { this.scanner = new WatchServiceDirectoryScanner(); @@ -345,8 +337,8 @@ protected void onInit() { // Check that the filter and locker options are _NOT_ set if an external scanner has been set. // The external scanner is responsible for the filter and locker options in that case. Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)), - "When using an external scanner the 'filter' and 'locker' options should not be used. Instead, set these options on the external DirectoryScanner: " - + this.scanner); + () -> "When using an external scanner the 'filter' and 'locker' options should not be used. " + + "Instead, set these options on the external DirectoryScanner: " + this.scanner); if (this.filter != null) { this.scanner.setFilter(this.filter); } @@ -492,7 +484,7 @@ protected File[] listEligibleFiles(File directory) { files.addAll(filesFromEvents()); - return files.toArray(new File[files.size()]); + return files.toArray(new File[0]); } private Set filesFromEvents() { @@ -504,57 +496,11 @@ private Set filesFromEvents() { if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE || event.kind() == StandardWatchEventKinds.ENTRY_MODIFY || event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { - Path item = (Path) event.context(); - File file = new File(parentDir, item.toFile().getName()); - if (logger.isDebugEnabled()) { - logger.debug("Watch event [" + event.kind() + "] for file [" + file + "]"); - } - if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { - if (getFilter() instanceof ResettableFileListFilter) { - ((ResettableFileListFilter) getFilter()).remove(file); - } - boolean fileRemoved = files.remove(file); - if (fileRemoved && logger.isDebugEnabled()) { - logger.debug("The file [" + file + - "] has been removed from the queue because of DELETE event."); - } - } - else { - if (file.exists()) { - if (file.isDirectory()) { - files.addAll(walkDirectory(file.toPath(), event.kind())); - } - else { - files.remove(file); - files.add(file); - } - } - else { - if (logger.isDebugEnabled()) { - logger.debug("A file [" + file + "] for the event [" + event.kind() + - "] doesn't exist. Ignored."); - } - } - } + processFilesFromNormalEvent(files, parentDir, event); } else if (event.kind() == StandardWatchEventKinds.OVERFLOW) { - if (logger.isDebugEnabled()) { - logger.debug("Watch event [" + StandardWatchEventKinds.OVERFLOW + - "] with context [" + event.context() + "]"); - } - - for (WatchKey watchKey : this.pathKeys.values()) { - watchKey.cancel(); - } - this.pathKeys.clear(); - - if (event.context() != null && event.context() instanceof Path) { - files.addAll(walkDirectory((Path) event.context(), event.kind())); - } - else { - files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind())); - } + processFilesFromOverflowEvent(files, event); } } key.reset(); @@ -563,6 +509,61 @@ else if (event.kind() == StandardWatchEventKinds.OVERFLOW) { return files; } + private void processFilesFromNormalEvent(Set files, File parentDir, WatchEvent event) { + Path item = (Path) event.context(); + File file = new File(parentDir, item.toFile().getName()); + if (logger.isDebugEnabled()) { + logger.debug("Watch event [" + event.kind() + "] for file [" + file + "]"); + } + + if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { + if (getFilter() instanceof ResettableFileListFilter) { + ((ResettableFileListFilter) getFilter()).remove(file); + } + boolean fileRemoved = files.remove(file); + if (fileRemoved && logger.isDebugEnabled()) { + logger.debug("The file [" + file + + "] has been removed from the queue because of DELETE event."); + } + } + else { + if (file.exists()) { + if (file.isDirectory()) { + files.addAll(walkDirectory(file.toPath(), event.kind())); + } + else { + files.remove(file); + files.add(file); + } + } + else { + if (logger.isDebugEnabled()) { + logger.debug("A file [" + file + "] for the event [" + event.kind() + + "] doesn't exist. Ignored."); + } + } + } + } + + private void processFilesFromOverflowEvent(Set files, WatchEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("Watch event [" + StandardWatchEventKinds.OVERFLOW + + "] with context [" + event.context() + "]"); + } + + for (WatchKey watchKey : this.pathKeys.values()) { + watchKey.cancel(); + } + this.pathKeys.clear(); + + if (event.context() != null && event.context() instanceof Path) { + files.addAll(walkDirectory((Path) event.context(), event.kind())); + } + else { + files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind())); + } + } + private Set walkDirectory(Path directory, final WatchEvent.Kind kind) { final Set walkedFiles = new LinkedHashSet<>(); try { diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java index e570a69806d..5a63456925c 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/FileWritingMessageHandler.java @@ -25,6 +25,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.nio.charset.Charset; import java.nio.file.Files; @@ -32,7 +33,6 @@ import java.nio.file.attribute.PosixFilePermission; import java.util.BitSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -42,9 +42,7 @@ import java.util.function.BiConsumer; import java.util.regex.Matcher; import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.stream.Collectors; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.context.Lifecycle; @@ -111,13 +109,24 @@ public class FileWritingMessageHandler extends AbstractReplyProducingMessageHandler implements Lifecycle, MessageTriggerAction { - private final Log logger = LogFactory.getLog(this.getClass()); - private static final int DEFAULT_BUFFER_SIZE = 8192; private static final long DEFAULT_FLUSH_INTERVAL = 30000L; - private final Map fileStates = new HashMap(); + private static final PosixFilePermission[] POSIX_FILE_PERMISSIONS = + { + PosixFilePermission.OTHERS_EXECUTE, + PosixFilePermission.OTHERS_WRITE, + PosixFilePermission.OTHERS_READ, + PosixFilePermission.GROUP_EXECUTE, + PosixFilePermission.GROUP_WRITE, + PosixFilePermission.GROUP_READ, + PosixFilePermission.OWNER_EXECUTE, + PosixFilePermission.OWNER_WRITE, + PosixFilePermission.OWNER_READ + }; + + private final Map fileStates = new HashMap<>(); private final Expression destinationDirectoryExpression; @@ -332,8 +341,12 @@ public void setFlushWhenIdle(boolean flushWhenIdle) { this.flushWhenIdle = flushWhenIdle; } + /** + * Configure a {@link TaskScheduler} for flush operations. + * @param taskScheduler the {@link TaskScheduler} to use. + */ @Override - public void setTaskScheduler(TaskScheduler taskScheduler) { + public void setTaskScheduler(TaskScheduler taskScheduler) { // NOSONAR super.setTaskScheduler(taskScheduler); } @@ -370,7 +383,7 @@ public void setPreserveTimestamp(boolean preserveTimestamp) { */ public void setChmodOctal(String chmod) { Assert.notNull(chmod, "'chmod' cannot be null"); - setChmod(Integer.parseInt(chmod, 8)); + setChmod(Integer.parseInt(chmod, 8)); // NOSONAR 8-bit } /** @@ -382,53 +395,17 @@ public void setChmodOctal(String chmod) { * @since 5.0 */ public void setChmod(int chmod) { - Assert.isTrue(chmod >= 0 && chmod <= 0777, "'chmod' must be between 0 and 0777 (octal)"); + Assert.isTrue(chmod >= 0 && chmod <= 0777, // NOSONAR permissions octal + "'chmod' must be between 0 and 0777 (octal)"); if (!FileUtils.IS_POSIX) { this.logger.error("'chmod' setting ignored - the file system does not support Posix attributes"); return; } - /* - * Bitset.valueOf(byte[]) takes a little-endian array of bytes to create a BitSet. - * Since we are interested in 9 bits, we construct an array with the low-order byte - * (bits 0-7) followed by the second order byte (bit 8). - * BitSet.stream() returns a stream of ints representing those bits that are set. - * We use that stream with a switch to create the set of PosixFilePermissions - * representing the bits that were set in the chmod value. - */ - BitSet bits = BitSet.valueOf(new byte[] { (byte) chmod, (byte) (chmod >> 8) }); - final Set posixPermissions = new HashSet<>(); - bits.stream().forEach(b -> { - switch (b) { - case 0: - posixPermissions.add(PosixFilePermission.OTHERS_EXECUTE); - break; - case 1: - posixPermissions.add(PosixFilePermission.OTHERS_WRITE); - break; - case 2: - posixPermissions.add(PosixFilePermission.OTHERS_READ); - break; - case 3: - posixPermissions.add(PosixFilePermission.GROUP_EXECUTE); - break; - case 4: - posixPermissions.add(PosixFilePermission.GROUP_WRITE); - break; - case 5: - posixPermissions.add(PosixFilePermission.GROUP_READ); - break; - case 6: - posixPermissions.add(PosixFilePermission.OWNER_EXECUTE); - break; - case 7: - posixPermissions.add(PosixFilePermission.OWNER_WRITE); - break; - case 8: - posixPermissions.add(PosixFilePermission.OWNER_READ); - break; - } - }); - this.permissions = posixPermissions; + BitSet bits = BitSet.valueOf(new byte[] { (byte) chmod, (byte) (chmod >> 8) }); // NOSONAR + this.permissions = bits.stream() + .boxed() + .map((b) -> POSIX_FILE_PERMISSIONS[b]) + .collect(Collectors.toSet()); } /** @@ -449,7 +426,7 @@ protected void doInit() { if (this.destinationDirectoryExpression instanceof LiteralExpression) { final File directory = ExpressionUtils.expressionToFile(this.destinationDirectoryExpression, - this.evaluationContext, null, "destinationDirectoryExpression"); + this.evaluationContext, null, "destinationDirectoryExpression"); validateDestinationDirectory(directory, this.autoCreateDirectory); } @@ -469,7 +446,7 @@ public void start() { if (this.flushTask == null && FileExistsMode.APPEND_NO_FLUSH.equals(this.fileExistsMode)) { TaskScheduler taskScheduler = getTaskScheduler(); Assert.state(taskScheduler != null, "'taskScheduler' is required for FileExistsMode.APPEND_NO_FLUSH"); - this.flushTask = taskScheduler.scheduleAtFixedRate(new Flusher(), this.flushInterval / 3); + this.flushTask = taskScheduler.scheduleAtFixedRate(new Flusher(), this.flushInterval / 3); // NOSONAR } } @@ -485,7 +462,7 @@ public void stop() { flusher.run(); boolean needInterrupt = this.fileStates.size() > 0; int n = 0; - while (n++ < 10 && this.fileStates.size() > 0) { + while (n++ < 10 && this.fileStates.size() > 0) { // NOSONAR try { Thread.sleep(1); } @@ -520,15 +497,13 @@ private void validateDestinationDirectory(File destinationDirectory, boolean aut () -> "Destination directory [" + destinationDirectory + "] is not writable."); } - @Override + @Override // NOSONAR protected Object handleRequestMessage(Message requestMessage) { - Assert.notNull(requestMessage, "message must not be null"); Object payload = requestMessage.getPayload(); - Assert.notNull(payload, "message payload must not be null"); String generatedFileName = this.fileNameGenerator.generateFileName(requestMessage); File originalFileFromHeader = retrieveOriginalFileFromHeader(requestMessage); - final File destinationDirectoryToUse = evaluateDestinationDirectoryExpression(requestMessage); + File destinationDirectoryToUse = evaluateDestinationDirectoryExpression(requestMessage); File tempFile = new File(destinationDirectoryToUse, generatedFileName + this.temporaryFileSuffix); File resultFile = new File(destinationDirectoryToUse, generatedFileName); @@ -543,7 +518,7 @@ protected Object handleRequestMessage(Message requestMessage) { if (payload instanceof File) { timestamp = ((File) payload).lastModified(); } - boolean ignore = (FileExistsMode.IGNORE.equals(this.fileExistsMode) + boolean ignore = (FileExistsMode.IGNORE.equals(this.fileExistsMode) // NOSONAR && (exists || (StringUtils.hasText(this.temporaryFileSuffix) && tempFile.exists()))) || ((exists && FileExistsMode.REPLACE_IF_MODIFIED.equals(this.fileExistsMode)) && (timestamp instanceof Number @@ -556,56 +531,66 @@ protected Object handleRequestMessage(Message requestMessage) { resultFile.getParentFile().mkdirs(); //NOSONAR - will fail on the writing below } - if (payload instanceof File) { - resultFile = handleFileMessage((File) payload, tempFile, resultFile, requestMessage); - } - else if (payload instanceof InputStream) { - resultFile = handleInputStreamMessage((InputStream) payload, originalFileFromHeader, tempFile, - resultFile, requestMessage); - } - else if (payload instanceof byte[]) { - resultFile = this.handleByteArrayMessage( - (byte[]) payload, originalFileFromHeader, tempFile, resultFile, requestMessage); - } - else if (payload instanceof String) { - resultFile = this.handleStringMessage( - (String) payload, originalFileFromHeader, tempFile, resultFile, requestMessage); - } - else { - throw new IllegalArgumentException( - "unsupported Message payload type [" + payload.getClass().getName() + "]"); - } - if (this.preserveTimestamp) { - if (timestamp instanceof Number) { - resultFile.setLastModified(((Number) timestamp).longValue()); - } - else { - if (this.logger.isWarnEnabled()) { - this.logger.warn("Could not set lastModified, header " + FileHeaders.SET_MODIFIED - + " must be a Number, not " + (timestamp == null ? "null" : timestamp.getClass())); - } - } - } + resultFile = writeMessageToFile(requestMessage, originalFileFromHeader, tempFile, resultFile, + timestamp); } catch (Exception e) { throw new MessageHandlingException(requestMessage, "failed to write Message payload to file", e); } - } if (!this.expectReply) { return null; } - if (resultFile != null) { - if (originalFileFromHeader == null && payload instanceof File) { - return this.getMessageBuilderFactory().withPayload(resultFile) - .setHeader(FileHeaders.ORIGINAL_FILE, payload); - } + if (resultFile != null && originalFileFromHeader == null && payload instanceof File) { + return getMessageBuilderFactory() + .withPayload(resultFile) + .setHeader(FileHeaders.ORIGINAL_FILE, payload); } return resultFile; } + private File writeMessageToFile(Message requestMessage, File originalFileFromHeader, File tempFile, + File resultFile, Object timestamp) throws IOException { + + File fileToReturn = null; + Object payload = requestMessage.getPayload(); + if (payload instanceof File) { + fileToReturn = handleFileMessage((File) payload, tempFile, resultFile, requestMessage); + } + else if (payload instanceof InputStream) { + fileToReturn = handleInputStreamMessage((InputStream) payload, originalFileFromHeader, tempFile, + resultFile, requestMessage); + } + else if (payload instanceof byte[]) { + fileToReturn = handleByteArrayMessage((byte[]) payload, originalFileFromHeader, tempFile, resultFile, + requestMessage); + } + else if (payload instanceof String) { + fileToReturn = handleStringMessage((String) payload, originalFileFromHeader, tempFile, resultFile, + requestMessage); + } + else { + throw new IllegalArgumentException( + "Unsupported Message payload type [" + payload.getClass().getName() + "]"); + } + + if (this.preserveTimestamp) { + if (timestamp instanceof Number) { + if (!fileToReturn.setLastModified(((Number) timestamp).longValue())) { + throw new IllegalStateException("Could not set last modified '" + timestamp + + "' timestamp on file: " + fileToReturn); + } + } + else if (this.logger.isWarnEnabled()) { + this.logger.warn("Could not set lastModified, header " + FileHeaders.SET_MODIFIED + + " must be a Number, not " + (timestamp == null ? "null" : timestamp.getClass())); + } + } + return fileToReturn; + } + /** * Retrieves the File instance from the {@link FileHeaders#ORIGINAL_FILE} * header if available. If the value is not a File instance or a String @@ -654,39 +639,7 @@ protected void whileLocked() throws IOException { FileWritingMessageHandler.this.newFileCallback.accept(fileToWriteTo, requestMessage); } - FileState state = getFileState(fileToWriteTo, false); - BufferedOutputStream bos = null; - try { - bos = state != null ? state.stream : createOutputStream(fileToWriteTo, true); - byte[] buffer = new byte[StreamUtils.BUFFER_SIZE]; - int bytesRead = -1; - while ((bytesRead = sourceFileInputStream.read(buffer)) != -1) { - bos.write(buffer, 0, bytesRead); - } - if (FileWritingMessageHandler.this.appendNewLine) { - bos.write(System.lineSeparator().getBytes()); - } - } - finally { - try { - sourceFileInputStream.close(); - } - catch (IOException ex) { - } - try { - if (state == null || FileWritingMessageHandler.this.flushTask == null) { - if (bos != null) { - bos.close(); - } - clearState(fileToWriteTo, state); - } - else { - state.lastWrite = System.currentTimeMillis(); - } - } - catch (IOException ex) { - } - } + appendStreamToFile(fileToWriteTo, sourceFileInputStream); } }; @@ -696,35 +649,53 @@ protected void whileLocked() throws IOException { } else { - BufferedOutputStream bos = null; - try { - bos = new BufferedOutputStream(new FileOutputStream(tempFile), this.bufferSize); + try (InputStream inputStream = sourceFileInputStream; + OutputStream outputStream = + new BufferedOutputStream(new FileOutputStream(tempFile), this.bufferSize)) { + byte[] buffer = new byte[StreamUtils.BUFFER_SIZE]; - int bytesRead = -1; - while ((bytesRead = sourceFileInputStream.read(buffer)) != -1) { - bos.write(buffer, 0, bytesRead); + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); } if (this.appendNewLine) { - bos.write(System.lineSeparator().getBytes()); + outputStream.write(System.lineSeparator().getBytes()); } - bos.flush(); + outputStream.flush(); } - finally { - try { - sourceFileInputStream.close(); - } - catch (IOException ex) { - } - try { + cleanUpAfterCopy(tempFile, resultFile, originalFile); + return resultFile; + } + } + + private void appendStreamToFile(File fileToWriteTo, InputStream sourceFileInputStream) throws IOException { + FileState state = getFileState(fileToWriteTo, false); + BufferedOutputStream bos = null; + try (InputStream inputStream = sourceFileInputStream) { + bos = state != null ? state.stream : createOutputStream(fileToWriteTo, true); + byte[] buffer = new byte[StreamUtils.BUFFER_SIZE]; + int bytesRead = -1; + while ((bytesRead = inputStream.read(buffer)) != -1) { + bos.write(buffer, 0, bytesRead); + } + if (FileWritingMessageHandler.this.appendNewLine) { + bos.write(System.lineSeparator().getBytes()); + } + } + finally { + try { + if (state == null || FileWritingMessageHandler.this.flushTask == null) { if (bos != null) { bos.close(); } + clearState(fileToWriteTo, state); } - catch (IOException ex) { + else { + state.lastWrite = System.currentTimeMillis(); } } - cleanUpAfterCopy(tempFile, resultFile, originalFile); - return resultFile; + catch (IOException ex) { + } } } @@ -745,38 +716,42 @@ protected void whileLocked() throws IOException { FileWritingMessageHandler.this.newFileCallback.accept(fileToWriteTo, requestMessage); } - FileState state = getFileState(fileToWriteTo, false); - BufferedOutputStream bos = null; - try { - bos = state != null ? state.stream : createOutputStream(fileToWriteTo, append); - bos.write(bytes); - if (FileWritingMessageHandler.this.appendNewLine) { - bos.write(System.lineSeparator().getBytes()); - } - } - finally { - try { - if (state == null || FileWritingMessageHandler.this.flushTask == null) { - if (bos != null) { - bos.close(); - } - clearState(fileToWriteTo, state); - } - else { - state.lastWrite = System.currentTimeMillis(); - } - } - catch (IOException ex) { - } - } + writeBytesToFile(fileToWriteTo, append, bytes); } }; whileLockedProcessor.doWhileLocked(); - this.cleanUpAfterCopy(fileToWriteTo, resultFile, originalFile); + cleanUpAfterCopy(fileToWriteTo, resultFile, originalFile); return resultFile; } + private void writeBytesToFile(File fileToWriteTo, boolean append, byte[] bytes) throws IOException { + FileState state = getFileState(fileToWriteTo, false); + BufferedOutputStream bos = null; + try { + bos = state != null ? state.stream : createOutputStream(fileToWriteTo, append); + bos.write(bytes); + if (this.appendNewLine) { + bos.write(System.lineSeparator().getBytes()); + } + } + finally { + try { + if (state == null || this.flushTask == null) { + if (bos != null) { + bos.close(); + } + clearState(fileToWriteTo, state); + } + else { + state.lastWrite = System.currentTimeMillis(); + } + } + catch (IOException ex) { + } + } + } + private File handleStringMessage(String content, File originalFile, File tempFile, File resultFile, Message requestMessage) throws IOException { @@ -794,40 +769,43 @@ protected void whileLocked() throws IOException { FileWritingMessageHandler.this.newFileCallback.accept(fileToWriteTo, requestMessage); } - FileState state = getFileState(fileToWriteTo, true); - BufferedWriter writer = null; - try { - writer = state != null ? state.writer : createWriter(fileToWriteTo, append); - writer.write(content); - if (FileWritingMessageHandler.this.appendNewLine) { - writer.newLine(); - } - } - finally { - try { - if (state == null || FileWritingMessageHandler.this.flushTask == null) { - if (writer != null) { - writer.close(); - } - clearState(fileToWriteTo, state); - } - else { - state.lastWrite = System.currentTimeMillis(); - } - } - catch (IOException ex) { - } - } - + writeStringToFile(fileToWriteTo, append, content); } }; whileLockedProcessor.doWhileLocked(); - this.cleanUpAfterCopy(fileToWriteTo, resultFile, originalFile); + cleanUpAfterCopy(fileToWriteTo, resultFile, originalFile); return resultFile; } + private void writeStringToFile(File fileToWriteTo, boolean append, String content) throws IOException { + FileState state = getFileState(fileToWriteTo, true); + BufferedWriter writer = null; + try { + writer = state != null ? state.writer : createWriter(fileToWriteTo, append); + writer.write(content); + if (FileWritingMessageHandler.this.appendNewLine) { + writer.newLine(); + } + } + finally { + try { + if (state == null || FileWritingMessageHandler.this.flushTask == null) { + if (writer != null) { + writer.close(); + } + clearState(fileToWriteTo, state); + } + else { + state.lastWrite = System.currentTimeMillis(); + } + } + catch (IOException ex) { + } + } + } + private File determineFileToWrite(File resultFile, File tempFile) { final File fileToWriteTo; @@ -856,8 +834,8 @@ private void cleanUpAfterCopy(File fileToWriteTo, File resultFile, File original this.renameTo(fileToWriteTo, resultFile); } - if (this.deleteSourceFiles && originalFile != null) { - originalFile.delete(); + if (this.deleteSourceFiles && originalFile != null && !originalFile.delete()) { + throw new IllegalStateException("Could not delete original file: " + originalFile); } setPermissions(resultFile); @@ -909,7 +887,8 @@ private synchronized FileState getFileState(File fileToWriteTo, boolean isString if (appendNoFlush) { String absolutePath = fileToWriteTo.getAbsolutePath(); state = this.fileStates.get(absolutePath); - if (state != null && ((isString && state.stream != null) || (!isString && state.writer != null))) { + if (state != null // NOSONAR + && ((isString && state.stream != null) || (!isString && state.writer != null))) { state.close(); state = null; this.fileStates.remove(absolutePath); @@ -1101,6 +1080,7 @@ private boolean close() { this.lock.unlock(); } } + } private final class Flusher implements Runnable { diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/config/AbstractRemoteFileOutboundGatewayParser.java b/spring-integration-file/src/main/java/org/springframework/integration/file/config/AbstractRemoteFileOutboundGatewayParser.java index 3a1807316b6..40afc9a8013 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/config/AbstractRemoteFileOutboundGatewayParser.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/config/AbstractRemoteFileOutboundGatewayParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,6 @@ protected String getInputChannelAttributeName() { @Override protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { - BeanDefinition templateDefinition = FileParserUtils.parseRemoteFileTemplate(element, parserContext, false, getTemplateClass()); @@ -66,8 +65,9 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "command-options", "options"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-timeout", "sendTimeout"); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel", "outputChannel"); - this.configureFilter(builder, element, parserContext, "filter", "filename", "filter"); - this.configureFilter(builder, element, parserContext, "mput-filter", "mput", "mputFilter"); + configureFilter(builder, element, parserContext, FileParserUtils.FILTER_ATTRIBUTE, "filename", + FileParserUtils.FILTER_ATTRIBUTE); + configureFilter(builder, element, parserContext, "mput-filter", "mput", "mputFilter"); BeanDefinition localDirExpressionDef = IntegrationNamespaceUtils .createExpressionDefinitionFromValueOrExpression("local-directory", "local-directory-expression", @@ -92,6 +92,7 @@ protected void postProcessBuilder(BeanDefinitionBuilder builder, Element element protected void configureFilter(BeanDefinitionBuilder builder, Element element, ParserContext parserContext, String filterAttribute, String patternPrefix, String propertyName) { + String filter = element.getAttribute(filterAttribute); String filterExpression = element.getAttribute(filterAttribute + "-expression"); String fileNamePattern = element.getAttribute(patternPrefix + "-pattern"); @@ -107,38 +108,57 @@ protected void configureFilter(BeanDefinitionBuilder builder, Element element, P if (count > 1) { parserContext.getReaderContext() .error("at most one of '" + patternPrefix + "-pattern', " + - "'" + patternPrefix + "-regex', '" + filterAttribute + - "' or '" + filterAttribute + "-expression' is allowed on a remote file outbound gateway", + "'" + patternPrefix + "-regex', '" + filterAttribute + + "' or '" + filterAttribute + "-expression' is allowed on a remote file outbound " + + "gateway", element); } else if (hasFilter) { builder.addPropertyReference(propertyName, filter); } else if (hasFilterExpression) { - BeanDefinition expressionFilterBeanDefinition = - BeanDefinitionBuilder.genericBeanDefinition(ExpressionFileListFilter.class) - .addConstructorArgValue(filterExpression) - .getBeanDefinition(); - builder.addPropertyValue(propertyName, expressionFilterBeanDefinition); + registerExpressionFilter(builder, propertyName, filterExpression); } else if (hasFileNamePattern) { - BeanDefinitionBuilder filterBuilder = BeanDefinitionBuilder.genericBeanDefinition( - "filter".equals(filterAttribute) ? - this.getSimplePatternFileListFilterClassName() : - SimplePatternFileListFilter.class.getName()); - filterBuilder.addConstructorArgValue(fileNamePattern); - builder.addPropertyValue(propertyName, filterBuilder.getBeanDefinition()); + registerPatternFilter(builder, filterAttribute, propertyName, fileNamePattern); } else if (hasFileNameRegex) { - BeanDefinitionBuilder filterBuilder = BeanDefinitionBuilder.genericBeanDefinition( - "filter".equals(filterAttribute) ? - this.getRegexPatternFileListFilterClassName() : - RegexPatternFileListFilter.class.getName()); - filterBuilder.addConstructorArgValue(fileNameRegex); - builder.addPropertyValue(propertyName, filterBuilder.getBeanDefinition()); + registerRegexFilter(builder, filterAttribute, propertyName, fileNameRegex); } } + private void registerRegexFilter(BeanDefinitionBuilder builder, String filterAttribute, String propertyName, + String fileNameRegex) { + + BeanDefinitionBuilder filterBuilder = BeanDefinitionBuilder.genericBeanDefinition( + FileParserUtils.FILTER_ATTRIBUTE.equals(filterAttribute) ? + getRegexPatternFileListFilterClassName() : + RegexPatternFileListFilter.class.getName()); + filterBuilder.addConstructorArgValue(fileNameRegex); + builder.addPropertyValue(propertyName, filterBuilder.getBeanDefinition()); + } + + private void registerPatternFilter(BeanDefinitionBuilder builder, String filterAttribute, String propertyName, + String fileNamePattern) { + + BeanDefinitionBuilder filterBuilder = BeanDefinitionBuilder.genericBeanDefinition( + FileParserUtils.FILTER_ATTRIBUTE.equals(filterAttribute) ? + getSimplePatternFileListFilterClassName() : + SimplePatternFileListFilter.class.getName()); + filterBuilder.addConstructorArgValue(fileNamePattern); + builder.addPropertyValue(propertyName, filterBuilder.getBeanDefinition()); + } + + private void registerExpressionFilter(BeanDefinitionBuilder builder, String propertyName, + String filterExpression) { + + BeanDefinition expressionFilterBeanDefinition = + BeanDefinitionBuilder.genericBeanDefinition(ExpressionFileListFilter.class) + .addConstructorArgValue(filterExpression) + .getBeanDefinition(); + builder.addPropertyValue(propertyName, expressionFilterBeanDefinition); + } + protected abstract String getRegexPatternFileListFilterClassName(); protected abstract String getSimplePatternFileListFilterClassName(); diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileInboundChannelAdapterParser.java b/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileInboundChannelAdapterParser.java index 0e29e6a2478..6bc69291a79 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileInboundChannelAdapterParser.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileInboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,7 @@ protected BeanMetadataElement parseSource(Element element, ParserContext parserC String filterBeanName = this.registerFilter(element, parserContext); String lockerBeanName = registerLocker(element, parserContext); if (filterBeanName != null) { - builder.addPropertyReference("filter", filterBeanName); + builder.addPropertyReference(FileParserUtils.FILTER_ATTRIBUTE, filterBeanName); } if (lockerBeanName != null) { builder.addPropertyReference("locker", lockerBeanName); @@ -83,14 +83,14 @@ private String registerLocker(Element element, ParserContext parserContext) { return lockerBeanName; } - private String registerFilter(Element element, ParserContext parserContext) { + private String registerFilter(Element element, ParserContext parserContext) { // NOSONAR String filenamePattern = element.getAttribute("filename-pattern"); String filenameRegex = element.getAttribute("filename-regex"); String preventDuplicates = element.getAttribute("prevent-duplicates"); String ignoreHidden = element.getAttribute("ignore-hidden"); - String filter = element.getAttribute("filter"); + String filter = element.getAttribute(FileParserUtils.FILTER_ATTRIBUTE); String filterExpression = element.getAttribute("filter-expression"); - if (!StringUtils.hasText(filter) + if (!StringUtils.hasText(filter) // NOSONAR && !StringUtils.hasText(filenamePattern) && !StringUtils.hasText(filenameRegex) && !StringUtils.hasText(preventDuplicates) @@ -102,7 +102,7 @@ private String registerFilter(Element element, ParserContext parserContext) { BeanDefinitionBuilder.genericBeanDefinition(FileListFilterFactoryBean.class); factoryBeanBuilder.setRole(BeanDefinition.ROLE_SUPPORT); if (StringUtils.hasText(filter)) { - factoryBeanBuilder.addPropertyReference("filter", filter); + factoryBeanBuilder.addPropertyReference(FileParserUtils.FILTER_ATTRIBUTE, filter); } if (StringUtils.hasText(filterExpression)) { if (StringUtils.hasText(filter)) { @@ -113,7 +113,7 @@ private String registerFilter(Element element, ParserContext parserContext) { BeanDefinitionBuilder.genericBeanDefinition(ExpressionFileListFilter.class) .addConstructorArgValue(filterExpression) .getBeanDefinition(); - factoryBeanBuilder.addPropertyValue("filter", expressionFilterBeanDefinition); + factoryBeanBuilder.addPropertyValue(FileParserUtils.FILTER_ATTRIBUTE, expressionFilterBeanDefinition); } if (StringUtils.hasText(filenamePattern)) { if (StringUtils.hasText(filter)) { diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileParserUtils.java b/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileParserUtils.java index ef605b5c5ed..894a10c3f16 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileParserUtils.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileParserUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2017 the original author or authors. + * Copyright 2013-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,8 @@ */ public final class FileParserUtils { + public static final String FILTER_ATTRIBUTE = "filter"; + private FileParserUtils() { } @@ -97,10 +99,12 @@ public static BeanDefinition parseRemoteFileTemplate(Element element, ParserCont return templateBuilder.getBeanDefinition(); } - static void configureFilter(BeanDefinitionBuilder synchronizerBuilder, Element element, ParserContext parserContext, - Class> patternClass, Class> regexClass, + static void configureFilter(BeanDefinitionBuilder synchronizerBuilder, Element element, // NOSONAR + ParserContext parserContext, Class> patternClass, + Class> regexClass, Class> persistentAcceptOnceFileListFilterClass) { - String filter = element.getAttribute("filter"); + + String filter = element.getAttribute(FILTER_ATTRIBUTE); String filterExpression = element.getAttribute("filter-expression"); String fileNamePattern = element.getAttribute("filename-pattern"); String fileNameRegex = element.getAttribute("filename-regex"); @@ -128,14 +132,14 @@ static void configureFilter(BeanDefinitionBuilder synchronizerBuilder, Element e element); } if (hasFilter) { - synchronizerBuilder.addPropertyReference("filter", filter); + synchronizerBuilder.addPropertyReference(FILTER_ATTRIBUTE, filter); } else if (hasFilterExpression) { BeanDefinition expressionFilterBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(ExpressionFileListFilter.class) .addConstructorArgValue(filterExpression) .getBeanDefinition(); - synchronizerBuilder.addPropertyValue("filter", expressionFilterBeanDefinition); + synchronizerBuilder.addPropertyValue(FILTER_ATTRIBUTE, expressionFilterBeanDefinition); } else if (hasFileNamePattern) { BeanDefinition patternFilter = @@ -175,7 +179,7 @@ private static void composeFilters(BeanDefinitionBuilder synchronizerBuilder, BeanDefinitionBuilder.genericBeanDefinition(CompositeFileListFilter.class) .addConstructorArgValue(filters) .getBeanDefinition(); - synchronizerBuilder.addPropertyValue("filter", compositeFilterDefinition); + synchronizerBuilder.addPropertyValue(FILTER_ATTRIBUTE, compositeFilterDefinition); } } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileReadingMessageSourceFactoryBean.java b/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileReadingMessageSourceFactoryBean.java index 885f32ffb2b..7fa7e5803c2 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileReadingMessageSourceFactoryBean.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/config/FileReadingMessageSourceFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,12 +19,7 @@ import java.io.File; import java.util.Comparator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.BeanFactoryAware; -import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.integration.file.DirectoryScanner; import org.springframework.integration.file.FileReadingMessageSource; import org.springframework.integration.file.filters.CompositeFileListFilter; @@ -39,36 +34,29 @@ * * @since 1.0.3 */ -public class FileReadingMessageSourceFactoryBean implements FactoryBean, - BeanFactoryAware { - - private static Log logger = LogFactory.getLog(FileReadingMessageSourceFactoryBean.class); +public class FileReadingMessageSourceFactoryBean extends AbstractFactoryBean { - private volatile FileReadingMessageSource source; + private FileReadingMessageSource source; - private volatile File directory; + private File directory; - private volatile FileListFilter filter; + private FileListFilter filter; - private volatile AbstractFileLockerFilter locker; + private AbstractFileLockerFilter locker; - private volatile Comparator comparator; + private Comparator comparator; - private volatile DirectoryScanner scanner; + private DirectoryScanner scanner; private boolean useWatchService; private FileReadingMessageSource.WatchEventType[] watchEvents; - private volatile Boolean scanEachPoll; - - private volatile Boolean autoCreateDirectory; + private Boolean scanEachPoll; - private volatile Integer queueSize; + private Boolean autoCreateDirectory; - private volatile BeanFactory beanFactory; - - private final Object initializationMonitor = new Object(); + private Integer queueSize; public void setDirectory(File directory) { this.directory = directory; @@ -114,87 +102,83 @@ public void setLocker(AbstractFileLockerFilter locker) { } @Override - public void setBeanFactory(BeanFactory beanFactory) { - this.beanFactory = beanFactory; + public Class getObjectType() { + return FileReadingMessageSource.class; } @Override - public FileReadingMessageSource getObject() throws Exception { + protected FileReadingMessageSource createInstance() { if (this.source == null) { initSource(); } return this.source; } - @Override - public Class getObjectType() { - return FileReadingMessageSource.class; - } - - @Override - public boolean isSingleton() { - return true; - } - - private void initSource() throws Exception { - synchronized (this.initializationMonitor) { - if (this.source != null) { - return; - } - boolean comparatorSet = this.comparator != null; - boolean queueSizeSet = this.queueSize != null; - if (comparatorSet) { - if (queueSizeSet) { - logger.warn("'comparator' and 'queueSize' are mutually exclusive. Ignoring 'queueSize'"); - } - this.source = new FileReadingMessageSource(this.comparator); - } - else if (queueSizeSet) { - this.source = new FileReadingMessageSource(this.queueSize); + private void initSource() { // NOSONAR + if (this.source != null) { + return; + } + boolean comparatorSet = this.comparator != null; + boolean queueSizeSet = this.queueSize != null; + if (comparatorSet) { + if (queueSizeSet) { + logger.warn("'comparator' and 'queueSize' are mutually exclusive. Ignoring 'queueSize'"); } - else { - this.source = new FileReadingMessageSource(); + this.source = new FileReadingMessageSource(this.comparator); + } + else if (queueSizeSet) { + this.source = new FileReadingMessageSource(this.queueSize); + } + else { + this.source = new FileReadingMessageSource(); + } + this.source.setDirectory(this.directory); + if (this.scanner != null) { + this.source.setScanner(this.scanner); + } + else { + this.source.setUseWatchService(this.useWatchService); + if (this.watchEvents != null) { + this.source.setWatchEvents(this.watchEvents); } - this.source.setDirectory(this.directory); - if (this.scanner != null) { - this.source.setScanner(this.scanner); + } + if (this.filter != null) { + if (this.locker == null) { + this.source.setFilter(this.filter); } else { - this.source.setUseWatchService(this.useWatchService); - if (this.watchEvents != null) { - this.source.setWatchEvents(this.watchEvents); - } - } - if (this.filter != null) { - if (this.locker == null) { - this.source.setFilter(this.filter); - } - else { - CompositeFileListFilter compositeFileListFilter = new CompositeFileListFilter(); - compositeFileListFilter.addFilter(this.filter); - compositeFileListFilter.addFilter(this.locker); - this.source.setFilter(compositeFileListFilter); - this.source.setLocker(this.locker); - } - } - else if (this.locker != null) { - CompositeFileListFilter compositeFileListFilter = new CompositeFileListFilter(); - compositeFileListFilter.addFilter(new FileListFilterFactoryBean().getObject()); + CompositeFileListFilter compositeFileListFilter = new CompositeFileListFilter<>(); + compositeFileListFilter.addFilter(this.filter); compositeFileListFilter.addFilter(this.locker); this.source.setFilter(compositeFileListFilter); this.source.setLocker(this.locker); } - if (this.scanEachPoll != null) { - this.source.setScanEachPoll(this.scanEachPoll); - } - if (this.autoCreateDirectory != null) { - this.source.setAutoCreateDirectory(this.autoCreateDirectory); + } + else if (this.locker != null) { + CompositeFileListFilter compositeFileListFilter = new CompositeFileListFilter<>(); + try { + compositeFileListFilter.addFilter(new FileListFilterFactoryBean().getObject()); } - if (this.beanFactory != null) { - this.source.setBeanFactory(this.beanFactory); + catch (Exception e) { + throw new IllegalStateException(e); } + compositeFileListFilter.addFilter(this.locker); + this.source.setFilter(compositeFileListFilter); + this.source.setLocker(this.locker); + } + if (this.scanEachPoll != null) { + this.source.setScanEachPoll(this.scanEachPoll); + } + if (this.autoCreateDirectory != null) { + this.source.setAutoCreateDirectory(this.autoCreateDirectory); + } + this.source.setBeanFactory(getBeanFactory()); + try { this.source.afterPropertiesSet(); } + catch (Exception e) { + throw new IllegalStateException(e); + } } } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java index a3027ef83a6..3048c817f9a 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2016 the original author or authors. + * Copyright 2013-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.List; import org.springframework.integration.metadata.ConcurrentMetadataStore; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -31,19 +32,22 @@ * same modified time as the current file. * * @author Gary Russell + * @author Artem Bilan + * * @since 3.0 * */ public abstract class AbstractPersistentAcceptOnceFileListFilter extends AbstractFileListFilter implements ReversibleFileListFilter, ResettableFileListFilter, Closeable { - protected final ConcurrentMetadataStore store; + protected final ConcurrentMetadataStore store; // NOSONAR - protected final Flushable flushableStore; + @Nullable + protected final Flushable flushableStore; // NOSONAR - protected final String prefix; + protected final String prefix; // NOSONAR - protected volatile boolean flushOnUpdate; + protected volatile boolean flushOnUpdate; // NOSONAR private final Object monitor = new Object(); @@ -136,7 +140,7 @@ public void close() throws IOException { * @return The value to store for the file. */ private String value(F file) { - return Long.toString(this.modified(file)); + return Long.toString(modified(file)); } /** @@ -147,7 +151,7 @@ private String value(F file) { * @return true if equal. */ protected boolean isEqual(F file, String value) { - return Long.valueOf(value) == this.modified(file); + return Long.valueOf(value) == modified(file); } /** @@ -156,7 +160,7 @@ protected boolean isEqual(F file, String value) { * @return The key. */ protected String buildKey(F file) { - return this.prefix + this.fileName(file); + return this.prefix + fileName(file); } /** diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractRegexPatternFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractRegexPatternFileListFilter.java index 644cc8fbf43..84773146fa5 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractRegexPatternFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractRegexPatternFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,37 +25,47 @@ * Filters a listing of files by qualifying their 'name' * against a regular expression (an instance of {@link java.util.regex.Pattern}) * + * @param the type of file entry + * * @author Iwein Fuld * @author Josh Long - * @param the type of file entry + * @author Artem Bilan + * * @since 2.0 */ public abstract class AbstractRegexPatternFileListFilter extends AbstractDirectoryAwareFileListFilter - implements InitializingBean { + implements InitializingBean { // TODO Remove in the next version - private volatile Pattern pattern; + private Pattern pattern; public AbstractRegexPatternFileListFilter(String pattern) { - this.pattern = Pattern.compile(pattern); + this(Pattern.compile(pattern)); } public AbstractRegexPatternFileListFilter(Pattern pattern) { + Assert.notNull(pattern, "'pattern' must not be null!"); this.pattern = pattern; } - public void setPattern(Pattern pattern) { - this.pattern = pattern; + public void setPattern(String pattern) { + Assert.notNull(pattern, "'pattern' must not be null!"); + setPattern(Pattern.compile(pattern)); } - public void setPattern(String pattern) { - this.pattern = Pattern.compile(pattern); + public void setPattern(Pattern pattern) { + Assert.notNull(pattern, "'pattern' must not be null!"); + this.pattern = pattern; } + /** + * @deprecated since 5.1.3. Will be removed in the next 5.2 version. + */ @Override - public void afterPropertiesSet() throws Exception { - Assert.notNull(this.pattern, "'pattern' must not be null!"); + @Deprecated + public void afterPropertiesSet() { + } @Override diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AcceptOnceFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AcceptOnceFileListFilter.java index 97fcd679798..ddc9fca8ee8 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AcceptOnceFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/AcceptOnceFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import org.springframework.lang.Nullable; + /** * {@link FileListFilter} that passes files only one time. This can * conveniently be used to prevent duplication of files, as is done in @@ -32,11 +34,12 @@ * @author Iwein Fuld * @author Josh Long * @author Gary Russell - * @since 1.0.0 + * @author Artem Bilan */ public class AcceptOnceFileListFilter extends AbstractFileListFilter implements ReversibleFileListFilter, ResettableFileListFilter { + @Nullable private final Queue seen; private final Set seenSet = new HashSet(); @@ -48,7 +51,6 @@ public class AcceptOnceFileListFilter extends AbstractFileListFilter imple * Creates an AcceptOnceFileListFilter that is based on a bounded queue. If the queue overflows, * files that fall out will be passed through this filter again if passed to the * {@link #filterFiles(Object[])} - * * @param maxCapacity the maximum number of Files to maintain in the 'seen' queue. */ public AcceptOnceFileListFilter(int maxCapacity) { @@ -69,22 +71,16 @@ public boolean accept(F file) { if (this.seenSet.contains(file)) { return false; } - if (this.seen != null) { - if (!this.seen.offer(file)) { - F removed = this.seen.poll(); - this.seenSet.remove(removed); - this.seen.add(file); - } + if (this.seen != null && !this.seen.offer(file)) { + F removed = this.seen.poll(); + this.seenSet.remove(removed); + this.seen.add(file); } this.seenSet.add(file); return true; } } - /** - * {@inheritDoc} - * @since 4.0.4 - */ @Override public void rollback(F file, List files) { synchronized (this.monitor) { diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/CompositeFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/CompositeFileListFilter.java index 339759fb45a..b96efc355c7 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/CompositeFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/CompositeFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -104,7 +104,7 @@ public CompositeFileListFilter addFilters(Collection discardCallback) { @Override public List filterFiles(F[] files) { Assert.notNull(files, "'files' should not be null"); - List results = new ArrayList(Arrays.asList(files)); + List results = new ArrayList<>(Arrays.asList(files)); for (FileListFilter fileFilter : this.fileFilters) { List currentResults = fileFilter.filterFiles(files); results.retainAll(currentResults); diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/DiscardAwareFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/DiscardAwareFileListFilter.java index 86334faae76..58deef8ac98 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/DiscardAwareFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/DiscardAwareFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.util.function.Consumer; +import org.springframework.lang.Nullable; + /** * The {@link FileListFilter} modification which can accept a {@link Consumer} * which can be called when filter discards the file. @@ -28,6 +30,6 @@ */ public interface DiscardAwareFileListFilter extends FileListFilter { - void addDiscardCallback(Consumer discardCallback); + void addDiscardCallback(@Nullable Consumer discardCallback); } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/LastModifiedFileListFilter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/LastModifiedFileListFilter.java index a0d5116ef1d..111493562c8 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/filters/LastModifiedFileListFilter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/filters/LastModifiedFileListFilter.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,14 @@ package org.springframework.integration.file.filters; import java.io.File; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.springframework.lang.Nullable; + /** * The {@link FileListFilter} implementation to filter those files which * {@link File#lastModified()} is less than the {@link #age} in comparison @@ -40,10 +43,13 @@ */ public class LastModifiedFileListFilter implements DiscardAwareFileListFilter { + private static final long ONE_SECOND = 1000; + private static final long DEFAULT_AGE = 60; private volatile long age = DEFAULT_AGE; + @Nullable private Consumer discardCallback; public LastModifiedFileListFilter() { @@ -65,13 +71,22 @@ public LastModifiedFileListFilter(long age) { * is filtered. The resolution is seconds. * Defaults to 60 seconds. * @param age the age + * @param unit the timeUnit. */ - public void setAge(long age) { - setAge(age, TimeUnit.SECONDS); + public void setAge(long age, TimeUnit unit) { + this.age = unit.toSeconds(age); } - public long getAge() { - return this.age; + /** + * Set the age that files have to be before being passed by this filter. + * If {@link File#lastModified()} plus age is greater than the current time, the file + * is filtered. The resolution is seconds. + * Defaults to 60 seconds. + * @param age the age + * @since 5.1.3 + */ + public void setAge(Duration age) { + setAge(age.getSeconds()); } /** @@ -80,23 +95,26 @@ public long getAge() { * is filtered. The resolution is seconds. * Defaults to 60 seconds. * @param age the age - * @param unit the timeUnit. */ - public void setAge(long age, TimeUnit unit) { - this.age = unit.toSeconds(age); + public void setAge(long age) { + setAge(age, TimeUnit.SECONDS); + } + + public long getAge() { + return this.age; } @Override - public void addDiscardCallback(Consumer discardCallback) { + public void addDiscardCallback(@Nullable Consumer discardCallback) { this.discardCallback = discardCallback; } @Override public List filterFiles(File[] files) { List list = new ArrayList<>(); - long now = System.currentTimeMillis() / 1000; + long now = System.currentTimeMillis() / ONE_SECOND; for (File file : files) { - if (file.lastModified() / 1000 + this.age <= now) { + if (file.lastModified() / ONE_SECOND + this.age <= now) { list.add(file); } else if (this.discardCallback != null) { diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/RemoteFileTemplate.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/RemoteFileTemplate.java index 9906c75e399..d98373ef964 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/RemoteFileTemplate.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/RemoteFileTemplate.java @@ -67,7 +67,7 @@ public class RemoteFileTemplate implements RemoteFileOperations, Initializ /** * the {@link SessionFactory} for acquiring remote file Sessions. */ - protected final SessionFactory sessionFactory; + protected final SessionFactory sessionFactory; // NOSONAR /* * Not static as normal since we want this TL to be scoped within the template instance. @@ -295,48 +295,11 @@ private String send(final Message message, final String subDirectory, final F final StreamHolder inputStreamHolder = payloadToInputStream(message); if (inputStreamHolder != null) { try { - return this.execute(session -> { - String fileName = inputStreamHolder.getName(); - try { - String remoteDirectory = RemoteFileTemplate.this.directoryExpressionProcessor - .processMessage(message); - remoteDirectory = RemoteFileTemplate.this.normalizeDirectoryPath(remoteDirectory); - if (StringUtils.hasText(subDirectory)) { - if (subDirectory.startsWith(RemoteFileTemplate.this.remoteFileSeparator)) { - remoteDirectory += subDirectory.substring(1); - } - else { - remoteDirectory += RemoteFileTemplate.this.normalizeDirectoryPath(subDirectory); - } - } - String temporaryRemoteDirectory = remoteDirectory; - if (RemoteFileTemplate.this.temporaryDirectoryExpressionProcessor != null) { - temporaryRemoteDirectory = RemoteFileTemplate.this.temporaryDirectoryExpressionProcessor - .processMessage(message); - } - fileName = RemoteFileTemplate.this.fileNameGenerator.generateFileName(message); - RemoteFileTemplate.this.sendFileToRemoteDirectory(inputStreamHolder.getStream(), - temporaryRemoteDirectory, remoteDirectory, fileName, session, mode); - return remoteDirectory + fileName; - } - catch (FileNotFoundException e) { - throw new MessageDeliveryException(message, "File [" + inputStreamHolder.getName() - + "] not found in local working directory; it was moved or deleted unexpectedly.", e); - } - catch (IOException e) { - throw new MessageDeliveryException(message, "Failed to transfer file [" - + inputStreamHolder.getName() + " -> " + fileName - + "] from local directory to remote directory.", e); - } - catch (Exception e) { - throw new MessageDeliveryException(message, "Error handling message for file [" - + inputStreamHolder.getName() + " -> " + fileName + "]", e); - } - }); + return execute(session -> doSend(message, subDirectory, mode, inputStreamHolder, session)); } finally { try { - inputStreamHolder.getStream().close(); + inputStreamHolder.stream.close(); } catch (IOException e) { } @@ -351,6 +314,45 @@ private String send(final Message message, final String subDirectory, final F } } + private String doSend(Message message, String subDirectory, FileExistsMode mode, + StreamHolder inputStreamHolder, Session session) { + + String fileName = inputStreamHolder.name; + try { + String remoteDirectory = this.directoryExpressionProcessor.processMessage(message); + remoteDirectory = normalizeDirectoryPath(remoteDirectory); + if (StringUtils.hasText(subDirectory)) { + if (subDirectory.startsWith(this.remoteFileSeparator)) { + remoteDirectory += subDirectory.substring(1); + } + else { + remoteDirectory += normalizeDirectoryPath(subDirectory); + } + } + String temporaryRemoteDirectory = remoteDirectory; + if (this.temporaryDirectoryExpressionProcessor != null) { + temporaryRemoteDirectory = this.temporaryDirectoryExpressionProcessor.processMessage(message); + } + fileName = this.fileNameGenerator.generateFileName(message); + sendFileToRemoteDirectory(inputStreamHolder.stream, temporaryRemoteDirectory, remoteDirectory, fileName, + session, mode); + return remoteDirectory + fileName; + } + catch (FileNotFoundException e) { + throw new MessageDeliveryException(message, "File [" + inputStreamHolder.name + + "] not found in local working directory; it was moved or deleted unexpectedly.", e); + } + catch (IOException e) { + throw new MessageDeliveryException(message, "Failed to transfer file [" + + inputStreamHolder.name + " -> " + fileName + + "] from local directory to remote directory.", e); + } + catch (Exception e) { + throw new MessageDeliveryException(message, "Error handling message for file [" + + inputStreamHolder.name + " -> " + fileName + "]", e); + } + } + @Override public boolean exists(final String path) { return execute(session -> session.exists(path)); @@ -391,17 +393,10 @@ public boolean get(Message message, InputStreamCallback callback) { public boolean get(final String remotePath, final InputStreamCallback callback) { Assert.notNull(remotePath, "'remotePath' cannot be null"); return execute(session -> { - InputStream inputStream = null; - try { - inputStream = session.readRaw(remotePath); + try (InputStream inputStream = session.readRaw(remotePath)) { callback.doWithInputStream(inputStream); return session.finalizeRaw(); } - finally { - if (inputStream != null) { - inputStream.close(); - } - } }); } @@ -456,9 +451,7 @@ public T execute(SessionCallback callback) { session.close(); } catch (Exception ignored) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("failed to close Session", ignored); - } + this.logger.debug("failed to close Session", ignored); } } } @@ -557,13 +550,13 @@ private void sendFileToRemoteDirectory(InputStream inputStream, String temporary } } - try { + try (InputStream stream = inputStream) { boolean rename = this.useTemporaryFileName; if (FileExistsMode.REPLACE.equals(mode)) { - session.write(inputStream, tempFilePath); + session.write(stream, tempFilePath); } else if (FileExistsMode.APPEND.equals(mode)) { - session.append(inputStream, tempFilePath); + session.append(stream, tempFilePath); } else { if (exists(remoteFilePath)) { @@ -579,7 +572,7 @@ else if (FileExistsMode.APPEND.equals(mode)) { rename = false; } else { - session.write(inputStream, tempFilePath); + session.write(stream, tempFilePath); } } // then rename it to its final name if necessary @@ -590,9 +583,6 @@ else if (FileExistsMode.APPEND.equals(mode)) { catch (Exception e) { throw new MessagingException("Failed to write to '" + tempFilePath + "' while uploading the file", e); } - finally { - inputStream.close(); - } } private String normalizeDirectoryPath(String directoryPath) { @@ -613,19 +603,11 @@ private static final class StreamHolder { private final String name; - private StreamHolder(InputStream stream, String name) { + StreamHolder(InputStream stream, String name) { this.stream = stream; this.name = name; } - public InputStream getStream() { - return this.stream; - } - - public String getName() { - return this.name; - } - } } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/handler/FileTransferringMessageHandler.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/handler/FileTransferringMessageHandler.java index b58395a1f66..d3a263cf328 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/handler/FileTransferringMessageHandler.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/handler/FileTransferringMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,12 +34,13 @@ * @author Oleg Zhurakousky * @author David Turanski * @author Gary Russell + * @author Artem Bilan * * @since 2.0 */ public class FileTransferringMessageHandler extends AbstractMessageHandler { - protected final RemoteFileTemplate remoteFileTemplate; + protected final RemoteFileTemplate remoteFileTemplate; // NOSONAR private final FileExistsMode mode; @@ -175,7 +176,7 @@ public void setTemporaryFileSuffix(String temporaryFileSuffix) { */ public void setChmodOctal(String chmod) { Assert.notNull(chmod, "'chmod' cannot be null"); - setChmod(Integer.parseInt(chmod, 8)); + setChmod(Integer.parseInt(chmod, 8)); // NOSONAR } /** @@ -200,7 +201,7 @@ protected void onInit() { } @Override - protected void handleMessageInternal(Message message) throws Exception { + protected void handleMessageInternal(Message message) { String path = this.remoteFileTemplate.send(message, this.mode); if (this.chmod != null && isChmodCapable()) { doChmod(this.remoteFileTemplate, path, this.chmod); diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java index 4994e0db5b2..f2f2d235c4e 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import org.springframework.integration.file.remote.session.Session; import org.springframework.integration.file.remote.session.SessionFactory; import org.springframework.integration.file.support.FileUtils; +import org.springframework.lang.Nullable; import org.springframework.messaging.MessagingException; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -75,7 +76,7 @@ public abstract class AbstractInboundFileSynchronizer protected static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser(); - protected final Log logger = LogFactory.getLog(this.getClass()); + protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR private final RemoteFileTemplate remoteFileTemplate; @@ -93,7 +94,7 @@ public abstract class AbstractInboundFileSynchronizer /** * the path on the remote mount as a String. */ - private volatile Expression remoteDirectoryExpression; + private Expression remoteDirectoryExpression; /** * The current evaluation of the expression. @@ -103,6 +104,7 @@ public abstract class AbstractInboundFileSynchronizer /** * An {@link FileListFilter} that runs against the remote file system view. */ + @Nullable private FileListFilter filter; /** @@ -119,6 +121,7 @@ public abstract class AbstractInboundFileSynchronizer private BeanFactory beanFactory; + @Nullable private Comparator comparator; /** @@ -131,7 +134,7 @@ public AbstractInboundFileSynchronizer(SessionFactory sessionFactory) { this.remoteFileTemplate = new RemoteFileTemplate(sessionFactory); } - + @Nullable protected Comparator getComparator() { return this.comparator; } @@ -142,7 +145,7 @@ protected Comparator getComparator() { * @param comparator the comparator. * @since 5.1 */ - public void setComparator(Comparator comparator) { + public void setComparator(@Nullable Comparator comparator) { this.comparator = comparator; } @@ -222,11 +225,11 @@ protected final void doSetRemoteDirectoryExpression(Expression remoteDirectoryEx * Set the filter to be applied to the remote files before transferring. * @param filter the file list filter. */ - public void setFilter(FileListFilter filter) { + public void setFilter(@Nullable FileListFilter filter) { doSetFilter(filter); } - protected final void doSetFilter(FileListFilter filter) { + protected final void doSetFilter(@Nullable FileListFilter filter) { this.filter = filter; } @@ -302,46 +305,8 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max this.logger.trace("Synchronizing " + this.evaluatedRemoteDirectory + " to " + localDirectory); } try { - int transferred = this.remoteFileTemplate.execute(session -> { - F[] files = session.list(this.evaluatedRemoteDirectory); - if (!ObjectUtils.isEmpty(files)) { - files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator); - } - if (!ObjectUtils.isEmpty(files)) { - List filteredFiles = filterFiles(files); - if (maxFetchSize >= 0 && filteredFiles.size() > maxFetchSize) { - rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize)); - List newList = new ArrayList<>(maxFetchSize); - for (int i = 0; i < maxFetchSize; i++) { - newList.add(filteredFiles.get(i)); - } - filteredFiles = newList; - } - - int copied = filteredFiles.size(); - - for (F file : filteredFiles) { - try { - if (file != null && !copyFileToLocalDirectory(this.evaluatedRemoteDirectory, file, - localDirectory, session)) { - copied--; - } - } - catch (RuntimeException e1) { - rollbackFromFileToListEnd(filteredFiles, file); - throw e1; - } - catch (IOException e1) { - rollbackFromFileToListEnd(filteredFiles, file); - throw e1; - } - } - return copied; - } - else { - return 0; - } - }); + int transferred = this.remoteFileTemplate.execute(session -> + transferFilesFromRemoteToLocal(localDirectory, maxFetchSize, session)); if (this.logger.isDebugEnabled()) { this.logger.debug(transferred + " files transferred from '" + this.evaluatedRemoteDirectory + "'"); } @@ -352,6 +317,45 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max } } + private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetchSize, Session session) + throws IOException { + + F[] files = session.list(this.evaluatedRemoteDirectory); + if (!ObjectUtils.isEmpty(files)) { + files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator); + } + if (!ObjectUtils.isEmpty(files)) { + List filteredFiles = filterFiles(files); + if (maxFetchSize >= 0 && filteredFiles.size() > maxFetchSize) { + rollbackFromFileToListEnd(filteredFiles, filteredFiles.get(maxFetchSize)); + List newList = new ArrayList<>(maxFetchSize); + for (int i = 0; i < maxFetchSize; i++) { + newList.add(filteredFiles.get(i)); + } + filteredFiles = newList; + } + + int copied = filteredFiles.size(); + + for (F file : filteredFiles) { + try { + if (file != null && !copyFileToLocalDirectory(this.evaluatedRemoteDirectory, file, + localDirectory, session)) { + copied--; + } + } + catch (RuntimeException | IOException e1) { + rollbackFromFileToListEnd(filteredFiles, file); + throw e1; + } + } + return copied; + } + else { + return 0; + } + } + protected void rollbackFromFileToListEnd(List filteredFiles, F file) { if (this.filter instanceof ReversibleFileListFilter) { ((ReversibleFileListFilter) this.filter) @@ -359,8 +363,9 @@ protected void rollbackFromFileToListEnd(List filteredFiles, F file) { } } - protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, File localDirectory, - Session session) throws IOException { + protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, // NOSONAR + File localDirectory, Session session) throws IOException { + String remoteFileName = getFilename(remoteFile); String localFileName = generateLocalFileName(remoteFileName); String remoteFilePath = remoteDirectoryPath != null @@ -398,47 +403,7 @@ protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, F remoteF boolean renamed = false; if (transfer) { - String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix; - File tempFile = new File(tempFileName); - - OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile)); - try { - session.read(remoteFilePath, outputStream); - } - catch (Exception e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - else { - throw new MessagingException("Failure occurred while copying '" + remoteFilePath - + "' from the remote to the local directory", e); - } - } - finally { - try { - outputStream.close(); - } - catch (Exception ignored2) { - } - } - - renamed = tempFile.renameTo(localFile); - - if (!renamed) { - if (localFile.delete()) { - renamed = tempFile.renameTo(localFile); - if (!renamed && this.logger.isInfoEnabled()) { - this.logger.info("Cannot rename '" - + tempFileName - + "' to local file '" + localFile + "' after deleting. " + - "The local file may be busy in some other process."); - } - } - else if (this.logger.isInfoEnabled()) { - this.logger.info("Cannot delete local file '" + localFile + - "'. The local file may be busy in some other process."); - } - } + renamed = copyRemoteContentToLocalFile(session, remoteFilePath, localFile); } if (renamed) { @@ -448,8 +413,8 @@ else if (this.logger.isInfoEnabled()) { this.logger.debug("deleted remote file: " + remoteFilePath); } } - if (this.preserveTimestamp) { - localFile.setLastModified(modified); + if (this.preserveTimestamp && !localFile.setLastModified(modified)) { + throw new IllegalStateException("Could not sent last modified on file: " + localFile); } return true; } @@ -469,6 +434,44 @@ else if (this.logger.isWarnEnabled()) { return false; } + private boolean copyRemoteContentToLocalFile(Session session, String remoteFilePath, File localFile) { + boolean renamed; + String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix; + File tempFile = new File(tempFileName); + + try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))) { + session.read(remoteFilePath, outputStream); + } + catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + else { + throw new MessagingException("Failure occurred while copying '" + remoteFilePath + + "' from the remote to the local directory", e); + } + } + + renamed = tempFile.renameTo(localFile); + + if (!renamed) { + if (localFile.delete()) { + renamed = tempFile.renameTo(localFile); + if (!renamed && this.logger.isInfoEnabled()) { + this.logger.info("Cannot rename '" + + tempFileName + + "' to local file '" + localFile + "' after deleting. " + + "The local file may be busy in some other process."); + } + } + else if (this.logger.isInfoEnabled()) { + this.logger.info("Cannot delete local file '" + localFile + + "'. The local file may be busy in some other process."); + } + } + return renamed; + } + private String generateLocalFileName(String remoteFileName) { if (this.localFilenameGeneratorExpression != null) { return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName, diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/splitter/FileSplitter.java b/spring-integration-file/src/main/java/org/springframework/integration/file/splitter/FileSplitter.java index 05e7fd90232..a60158e78c3 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/splitter/FileSplitter.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/splitter/FileSplitter.java @@ -75,7 +75,7 @@ */ public class FileSplitter extends AbstractMessageSplitter { - private static final JsonObjectMapper objectMapper = + private static final JsonObjectMapper OBJECT_MAPPER = JsonObjectMapperProvider.jsonAvailable() ? JsonObjectMapperProvider.newInstance() : null; private final boolean returnIterator; @@ -140,7 +140,7 @@ public FileSplitter(boolean iterator, boolean markers, boolean markersJson) { if (markers) { setApplySequence(false); if (markersJson) { - Assert.notNull(objectMapper, "'markersJson' requires an object mapper"); + Assert.notNull(OBJECT_MAPPER, "'markersJson' requires an object mapper"); } } this.markersJson = markersJson; @@ -170,9 +170,9 @@ public void setFirstLineAsHeader(String firstLineHeaderName) { protected Object splitMessage(final Message message) { Object payload = message.getPayload(); - Reader reader = null; + Reader reader; - final String filePath; + String filePath; if (payload instanceof String) { try { @@ -214,24 +214,24 @@ else if (payload instanceof Reader) { return message; } - final BufferedReader bufferedReader = new BufferedReader(reader) { + Iterator iterator = messageToFileIterator(message, reader, filePath); - @Override - public void close() throws IOException { - try { - super.close(); - } - finally { - Closeable closeableResource = StaticMessageHeaderAccessor.getCloseableResource(message); - if (closeableResource != null) { - closeableResource.close(); - } - } + if (this.returnIterator) { + return iterator; + } + else { + List lines = new ArrayList<>(); + while (iterator.hasNext()) { + lines.add(iterator.next()); } + return lines; + } + } - }; + private Iterator messageToFileIterator(Message message, Reader reader, String filePath) { + BufferedReader bufferedReader = wrapToBufferedReader(message, reader); - String firstLineAsHeader; + String firstLineAsHeader = null; if (this.firstLineHeaderName != null) { try { @@ -241,134 +241,27 @@ public void close() throws IOException { throw new MessageHandlingException(message, "IOException while reading first line", e); } } - else { - firstLineAsHeader = null; - } - - Iterator iterator = new CloseableIterator() { - - boolean markers = FileSplitter.this.markers; - - boolean sof = this.markers; - - boolean eof; - - boolean done; - String line; - - long lineCount; + return new FileIterator(message, bufferedReader, firstLineAsHeader, filePath); + } - boolean hasNextCalled; + private BufferedReader wrapToBufferedReader(Message message, Reader reader) { + return new BufferedReader(reader) { @Override - public boolean hasNext() { - this.hasNextCalled = true; + public void close() throws IOException { try { - if (!this.done && this.line == null) { - this.line = bufferedReader.readLine(); - } - boolean ready = !this.done && this.line != null; - if (!ready) { - if (this.markers) { - this.eof = true; - if (this.sof) { - this.done = true; - } - } - bufferedReader.close(); - } - return this.sof || ready || this.eof; - } - catch (IOException e) { - try { - this.done = true; - bufferedReader.close(); - } - catch (IOException e1) { - // ignored - } - throw new MessageHandlingException(message, "IOException while iterating", e); - } - } - - @Override - public Object next() { - if (!this.hasNextCalled) { - hasNext(); - } - this.hasNextCalled = false; - if (this.sof) { - this.sof = false; - return markerToReturn(new FileMarker(filePath, Mark.START, 0)); - } - if (this.eof) { - this.eof = false; - this.markers = false; - this.done = true; - return markerToReturn(new FileMarker(filePath, Mark.END, this.lineCount)); - } - if (this.line != null) { - String line = this.line; - this.line = null; - this.lineCount++; - - AbstractIntegrationMessageBuilder messageBuilder = - getMessageBuilderFactory() - .withPayload(line); - - if (firstLineAsHeader != null) { - messageBuilder.setHeader(FileSplitter.this.firstLineHeaderName, firstLineAsHeader); - } - - return messageBuilder; - } - else { - this.done = true; - throw new NoSuchElementException(filePath + " has been consumed"); + super.close(); } - } - - private AbstractIntegrationMessageBuilder markerToReturn(FileMarker fileMarker) { - Object payload; - if (FileSplitter.this.markersJson) { - try { - payload = objectMapper.toJson(fileMarker); - } - catch (Exception e) { - throw new MessageHandlingException(message, "Failed to convert marker to JSON", e); + finally { + Closeable closeableResource = StaticMessageHeaderAccessor.getCloseableResource(message); + if (closeableResource != null) { + closeableResource.close(); } } - else { - payload = fileMarker; - } - return getMessageBuilderFactory().withPayload(payload) - .setHeader(FileHeaders.MARKER, fileMarker.mark.name()); - } - - @Override - public void close() { - try { - this.done = true; - bufferedReader.close(); - } - catch (IOException e) { - // ignored - } } }; - - if (this.returnIterator) { - return iterator; - } - else { - List lines = new ArrayList(); - while (iterator.hasNext()) { - lines.add(iterator.next()); - } - return lines; - } } @Override @@ -407,6 +300,141 @@ private String buildPathFromMessage(Message message, String defaultPath) { } } + private class FileIterator implements CloseableIterator { + + private final Message message; + + private final BufferedReader bufferedReader; + + private final String firstLineAsHeader; + + private final String filePath; + + private boolean markers = FileSplitter.this.markers; + + private boolean sof = this.markers; + + private boolean eof; + + private boolean done; + + private String line; + + private long lineCount; + + private boolean hasNextCalled; + + FileIterator(Message message, BufferedReader bufferedReader, String firstLineAsHeader, + String filePath) { + + this.message = message; + this.bufferedReader = bufferedReader; + this.firstLineAsHeader = firstLineAsHeader; + this.filePath = filePath; + } + + @Override + public boolean hasNext() { + this.hasNextCalled = true; + try { + return hasNextLine(); + } + catch (IOException e) { + try { + this.done = true; + this.bufferedReader.close(); + } + catch (IOException e1) { + // ignored + } + throw new MessageHandlingException(this.message, "IOException while iterating", e); + } + } + + private boolean hasNextLine() throws IOException { + if (!this.done && this.line == null) { + this.line = this.bufferedReader.readLine(); + } + boolean ready = !this.done && this.line != null; + if (!ready) { + if (this.markers) { + this.eof = true; + if (this.sof) { + this.done = true; + } + } + this.bufferedReader.close(); + } + return this.sof || ready || this.eof; + } + + @Override + public Object next() { + if (!this.hasNextCalled) { + hasNext(); + } + this.hasNextCalled = false; + if (this.sof) { + this.sof = false; + return markerToReturn(new FileMarker(this.filePath, Mark.START, 0)); + } + if (this.eof) { + this.eof = false; + this.markers = false; + this.done = true; + return markerToReturn(new FileMarker(this.filePath, Mark.END, this.lineCount)); + } + if (this.line != null) { + String payload = this.line; + this.line = null; + this.lineCount++; + + AbstractIntegrationMessageBuilder messageBuilder = + getMessageBuilderFactory() + .withPayload(payload); + + if (this.firstLineAsHeader != null) { + messageBuilder.setHeader(FileSplitter.this.firstLineHeaderName, this.firstLineAsHeader); + } + + return messageBuilder; + } + else { + this.done = true; + throw new NoSuchElementException(this.filePath + " has been consumed"); + } + } + + private AbstractIntegrationMessageBuilder markerToReturn(FileMarker fileMarker) { + Object payload; + if (FileSplitter.this.markersJson) { + try { + payload = OBJECT_MAPPER.toJson(fileMarker); + } + catch (Exception e) { + throw new MessageHandlingException(this.message, "Failed to convert marker to JSON", e); + } + } + else { + payload = fileMarker; + } + return getMessageBuilderFactory().withPayload(payload) + .setHeader(FileHeaders.MARKER, fileMarker.mark.name()); + } + + @Override + public void close() { + try { + this.done = true; + this.bufferedReader.close(); + } + catch (IOException e) { + // ignored + } + } + + } + public static class FileMarker implements Serializable { private static final long serialVersionUID = 8514605438145748406L; @@ -451,11 +479,12 @@ public long getLineCount() { @Override public String toString() { - if (this.mark.equals(Mark.START)) { + if (Mark.START.equals(this.mark)) { return "FileMarker [filePath=" + this.filePath + ", mark=" + this.mark + "]"; } else { - return "FileMarker [filePath=" + this.filePath + ", mark=" + this.mark + ", lineCount=" + this.lineCount + "]"; + return "FileMarker [filePath=" + this.filePath + ", mark=" + this.mark + + ", lineCount=" + this.lineCount + "]"; } } diff --git a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/session/FtpRemoteFileTemplateTests.java b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/session/FtpRemoteFileTemplateTests.java index 76c50bb6bc3..9fd20d28726 100644 --- a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/session/FtpRemoteFileTemplateTests.java +++ b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/session/FtpRemoteFileTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2016 the original author or authors. + * Copyright 2014-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -49,6 +50,8 @@ /** * @author Gary Russell + * @author Artem Bilan + * * @since 4.1 * */ @@ -63,16 +66,19 @@ public class FtpRemoteFileTemplateTests extends FtpTestSupport { public void testINT3412AppendStatRmdir() throws IOException { FtpRemoteFileTemplate template = new FtpRemoteFileTemplate(sessionFactory); DefaultFileNameGenerator fileNameGenerator = new DefaultFileNameGenerator(); + fileNameGenerator.setBeanFactory(mock(BeanFactory.class)); fileNameGenerator.setExpression("'foobar.txt'"); template.setFileNameGenerator(fileNameGenerator); template.setRemoteDirectoryExpression(new LiteralExpression("foo/")); template.setUseTemporaryFileName(false); + template.setBeanFactory(mock(BeanFactory.class)); + template.afterPropertiesSet(); template.execute(session -> { session.mkdir("foo/"); return session.mkdir("foo/bar/"); }); - template.append(new GenericMessage("foo")); - template.append(new GenericMessage("bar")); + template.append(new GenericMessage<>("foo")); + template.append(new GenericMessage<>("bar")); assertTrue(template.exists("foo/foobar.txt")); template.executeWithClient((ClientCallbackWithoutResult) client -> { try { @@ -106,7 +112,7 @@ public void testFileCloseOnBadConnect() throws Exception { fileOutputStream.write("foo".getBytes()); fileOutputStream.close(); try { - template.send(new GenericMessage(file)); + template.send(new GenericMessage<>(file)); fail("exception expected"); } catch (MessagingException e) {