Skip to content

Some file module Sonar fixes #2718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,6 +47,7 @@
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.file.filters.ResettableFileListFilter;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -103,23 +104,23 @@ public class FileReadingMessageSource extends AbstractMessageSource<File>
*/
private final Queue<File> toBeReceived;

private volatile File directory;
private File directory;

private volatile DirectoryScanner scanner = new DefaultDirectoryScanner();
private DirectoryScanner scanner = new DefaultDirectoryScanner();

private volatile boolean scannerExplicitlySet;
private boolean scannerExplicitlySet;

private volatile boolean autoCreateDirectory = true;
private boolean autoCreateDirectory = true;

private volatile boolean scanEachPoll = false;
private boolean scanEachPoll = false;

private FileListFilter<File> filter;

private FileLocker locker;

private boolean useWatchService;

private WatchEventType[] watchEvents = new WatchEventType[] { WatchEventType.CREATE };
private WatchEventType[] watchEvents = { WatchEventType.CREATE };

/**
* Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
Expand All @@ -132,7 +133,6 @@ public FileReadingMessageSource() {
* Creates a FileReadingMessageSource with a bounded queue of the given
* capacity. This can be used to reduce the memory footprint of this
* component when reading from a large directory.
*
* @param internalQueueCapacity
* the size of the queue used to cache files to be received
* internally. This queue can be made larger to optimize the
Expand All @@ -151,26 +151,22 @@ public FileReadingMessageSource(int internalQueueCapacity) {

/**
* Creates a FileReadingMessageSource with a {@link PriorityBlockingQueue}
* ordered with the passed in {@link Comparator}
* <p>
* The size of the queue used should be large enough to hold all the files
* ordered with the passed in {@link Comparator}.
* <p> The size of the queue used should be large enough to hold all the files
* in the input directory in order to sort all of them, so restricting the
* size of the queue is mutually exclusive with ordering. No guarantees
* about file delivery order can be made under concurrent access.
* <p>
*
* @param receptionOrderComparator
* the comparator to be used to order the files in the internal
* queue
*/
public FileReadingMessageSource(Comparator<File> receptionOrderComparator) {
public FileReadingMessageSource(@Nullable Comparator<File> receptionOrderComparator) {
this.toBeReceived = new PriorityBlockingQueue<>(DEFAULT_INTERNAL_QUEUE_CAPACITY, receptionOrderComparator);
}


/**
* Specify the input directory.
*
* @param directory to monitor
*/
public void setDirectory(File directory) {
Expand All @@ -181,7 +177,6 @@ public void setDirectory(File directory) {
/**
* Optionally specify a custom scanner, for example the
* {@link WatchServiceDirectoryScanner}
*
* @param scanner scanner implementation
*/
public void setScanner(DirectoryScanner scanner) {
Expand All @@ -207,7 +202,6 @@ public DirectoryScanner getScanner() {
* <em>true</em>. If set to <em>false</em> and the
* source directory does not exist, an Exception will be thrown upon
* initialization.
*
* @param autoCreateDirectory
* should the directory to be monitored be created when this
* component starts up?
Expand All @@ -224,8 +218,7 @@ public void setAutoCreateDirectory(boolean autoCreateDirectory) {
* If multiple filters are required a
* {@link org.springframework.integration.file.filters.CompositeFileListFilter}
* can be used to group them together.
* <p>
* <b>The supplied filter must be thread safe.</b>.
* <p> <b>The supplied filter must be thread safe.</b>.
* @param filter a filter
*/
public void setFilter(FileListFilter<File> filter) {
Expand Down Expand Up @@ -256,7 +249,6 @@ public void setLocker(FileLocker locker) {
* will more likely be out of sync with the file system if this flag is set
* to <code>false</code>, but it will change more often (causing expensive
* reordering) if it is set to <code>true</code>.
*
* @param scanEachPoll
* whether or not the component should re-scan (as opposed to not
* rescanning until the entire backlog has been delivered)
Expand Down Expand Up @@ -304,15 +296,15 @@ public String getComponentType() {
@Override
public void start() {
if (!this.running.getAndSet(true)) {
if (!this.directory.exists() && this.autoCreateDirectory) {
this.directory.mkdirs();
if (!this.directory.exists() && this.autoCreateDirectory && !this.directory.mkdirs()) {
throw new IllegalStateException("Cannot create directory or ita parents: " + this.directory);
}
Assert.isTrue(this.directory.exists(),
"Source directory [" + this.directory + "] does not exist.");
() -> "Source directory [" + this.directory + "] does not exist.");
Assert.isTrue(this.directory.isDirectory(),
"Source path [" + this.directory + "] does not point to a directory.");
() -> "Source path [" + this.directory + "] does not point to a directory.");
Assert.isTrue(this.directory.canRead(),
"Source directory [" + this.directory + "] is not readable.");
() -> "Source directory [" + this.directory + "] is not readable.");
if (this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).start();
}
Expand All @@ -336,7 +328,7 @@ protected void onInit() {
Assert.notNull(this.directory, "'directory' must not be null");

Assert.state(!(this.scannerExplicitlySet && this.useWatchService),
"The 'scanner' and 'useWatchService' options are mutually exclusive: " + this.scanner);
() -> "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this.scanner);

if (this.useWatchService) {
this.scanner = new WatchServiceDirectoryScanner();
Expand All @@ -345,8 +337,8 @@ protected void onInit() {
// Check that the filter and locker options are _NOT_ set if an external scanner has been set.
// The external scanner is responsible for the filter and locker options in that case.
Assert.state(!(this.scannerExplicitlySet && (this.filter != null || this.locker != null)),
"When using an external scanner the 'filter' and 'locker' options should not be used. Instead, set these options on the external DirectoryScanner: "
+ this.scanner);
() -> "When using an external scanner the 'filter' and 'locker' options should not be used. " +
"Instead, set these options on the external DirectoryScanner: " + this.scanner);
if (this.filter != null) {
this.scanner.setFilter(this.filter);
}
Expand Down Expand Up @@ -492,7 +484,7 @@ protected File[] listEligibleFiles(File directory) {

files.addAll(filesFromEvents());

return files.toArray(new File[files.size()]);
return files.toArray(new File[0]);
}

private Set<File> filesFromEvents() {
Expand All @@ -504,57 +496,11 @@ private Set<File> filesFromEvents() {
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE ||
event.kind() == StandardWatchEventKinds.ENTRY_MODIFY ||
event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
Path item = (Path) event.context();
File file = new File(parentDir, item.toFile().getName());
if (logger.isDebugEnabled()) {
logger.debug("Watch event [" + event.kind() + "] for file [" + file + "]");
}

if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
if (getFilter() instanceof ResettableFileListFilter) {
((ResettableFileListFilter<File>) getFilter()).remove(file);
}
boolean fileRemoved = files.remove(file);
if (fileRemoved && logger.isDebugEnabled()) {
logger.debug("The file [" + file +
"] has been removed from the queue because of DELETE event.");
}
}
else {
if (file.exists()) {
if (file.isDirectory()) {
files.addAll(walkDirectory(file.toPath(), event.kind()));
}
else {
files.remove(file);
files.add(file);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("A file [" + file + "] for the event [" + event.kind() +
"] doesn't exist. Ignored.");
}
}
}
processFilesFromNormalEvent(files, parentDir, event);
}
else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
if (logger.isDebugEnabled()) {
logger.debug("Watch event [" + StandardWatchEventKinds.OVERFLOW +
"] with context [" + event.context() + "]");
}

for (WatchKey watchKey : this.pathKeys.values()) {
watchKey.cancel();
}
this.pathKeys.clear();

if (event.context() != null && event.context() instanceof Path) {
files.addAll(walkDirectory((Path) event.context(), event.kind()));
}
else {
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind()));
}
processFilesFromOverflowEvent(files, event);
}
}
key.reset();
Expand All @@ -563,6 +509,61 @@ else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
return files;
}

private void processFilesFromNormalEvent(Set<File> files, File parentDir, WatchEvent<?> event) {
Path item = (Path) event.context();
File file = new File(parentDir, item.toFile().getName());
if (logger.isDebugEnabled()) {
logger.debug("Watch event [" + event.kind() + "] for file [" + file + "]");
}

if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
if (getFilter() instanceof ResettableFileListFilter) {
((ResettableFileListFilter<File>) getFilter()).remove(file);
}
boolean fileRemoved = files.remove(file);
if (fileRemoved && logger.isDebugEnabled()) {
logger.debug("The file [" + file +
"] has been removed from the queue because of DELETE event.");
}
}
else {
if (file.exists()) {
if (file.isDirectory()) {
files.addAll(walkDirectory(file.toPath(), event.kind()));
}
else {
files.remove(file);
files.add(file);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("A file [" + file + "] for the event [" + event.kind() +
"] doesn't exist. Ignored.");
}
}
}
}

private void processFilesFromOverflowEvent(Set<File> files, WatchEvent<?> event) {
if (logger.isDebugEnabled()) {
logger.debug("Watch event [" + StandardWatchEventKinds.OVERFLOW +
"] with context [" + event.context() + "]");
}

for (WatchKey watchKey : this.pathKeys.values()) {
watchKey.cancel();
}
this.pathKeys.clear();

if (event.context() != null && event.context() instanceof Path) {
files.addAll(walkDirectory((Path) event.context(), event.kind()));
}
else {
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind()));
}
}

private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
final Set<File> walkedFiles = new LinkedHashSet<>();
try {
Expand Down
Loading