Skip to content

Commit 931df86

Browse files
garyrussellartembilan
authored andcommitted
GH-2777: Remote File Filter Improvements
Resolves #2777 If the filter supports it, defer filtering until the last possible moment. Then, the worst case scenario after a catastrophic failure (e.g. power loss), would be that at most one file will be incorrectly filtered on restart. Polishing and add more tests. Polishing Javadocs More Polishing Final polishing More polishing. Polishing and docPolishing and docs. * Fix typos in Docs
1 parent bb62cb8 commit 931df86

File tree

21 files changed

+447
-110
lines changed

21 files changed

+447
-110
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractFileListFilter.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
*
2727
* @author Mark Fisher
2828
* @author Iwein Fuld
29+
* @author Gary Russell
2930
*/
3031
public abstract class AbstractFileListFilter<F> implements FileListFilter<F> {
3132

@@ -42,11 +43,17 @@ public final List<F> filterFiles(F[] files) {
4243
return accepted;
4344
}
4445

46+
@Override
47+
public boolean supportsSingleFileFiltering() {
48+
return true;
49+
}
50+
4551
/**
4652
* Subclasses must implement this method.
4753
* @param file The file.
4854
* @return true if the file passes the filter.
4955
*/
56+
@Override
5057
public abstract boolean accept(F file);
5158

5259
}

spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractMarkerFilePresentFileListFilter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,9 @@
3131
* A FileListFilter that only passes files matched by one or more {@link FileListFilter}
3232
* if a corresponding marker file is also present to indicate a file transfer is complete.
3333
*
34+
* Since they look at multiple files, they cannot be used for late filtering in the
35+
* streaming message source.
36+
*
3437
* @author Gary Russell
3538
* @since 5.0
3639
*

spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ protected void flushIfNeeded() {
173173
try {
174174
this.flushableStore.flush();
175175
}
176-
catch (IOException e) {
176+
catch (@SuppressWarnings("unused") IOException e) {
177177
// store's responsibility to log
178178
}
179179
}

spring-integration-file/src/main/java/org/springframework/integration/file/filters/ChainFileListFilter.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,4 +62,15 @@ public List<F> filterFiles(F[] files) {
6262
return leftOver;
6363
}
6464

65+
@Override
66+
public boolean accept(F file) {
67+
// we can't use stream().allMatch() because there is no guarantee of early exit
68+
for (FileListFilter<F> filter : this.fileFilters) {
69+
if (!filter.accept(file)) {
70+
return false;
71+
}
72+
}
73+
return true;
74+
}
75+
6576
}

spring-integration-file/src/main/java/org/springframework/integration/file/filters/CompositeFileListFilter.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.LinkedHashSet;
2626
import java.util.List;
2727
import java.util.Set;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.function.Consumer;
2930

3031
import org.springframework.beans.factory.InitializingBean;
@@ -53,13 +54,16 @@ public class CompositeFileListFilter<F>
5354

5455
private Consumer<F> discardCallback;
5556

57+
private boolean allSupportAccept = true;
58+
5659

5760
public CompositeFileListFilter() {
5861
this.fileFilters = new LinkedHashSet<>();
5962
}
6063

6164
public CompositeFileListFilter(Collection<? extends FileListFilter<F>> fileFilters) {
6265
this.fileFilters = new LinkedHashSet<>(fileFilters);
66+
this.allSupportAccept = fileFilters.stream().allMatch(FileListFilter<F>::supportsSingleFileFiltering);
6367
}
6468

6569

@@ -73,6 +77,7 @@ public void close() throws IOException {
7377
}
7478

7579
public CompositeFileListFilter<F> addFilter(FileListFilter<F> filter) {
80+
this.allSupportAccept &= filter.supportsSingleFileFiltering();
7681
return addFilters(Collections.singletonList(filter));
7782
}
7883

@@ -81,9 +86,11 @@ public CompositeFileListFilter<F> addFilter(FileListFilter<F> filter) {
8186
* @return this CompositeFileFilter instance with the added filters
8287
* @see #addFilters(Collection)
8388
*/
84-
@SuppressWarnings("unchecked")
85-
public CompositeFileListFilter<F> addFilters(FileListFilter<F>... filters) {
86-
return addFilters(Arrays.asList(filters));
89+
@SafeVarargs
90+
@SuppressWarnings("varargs")
91+
public final CompositeFileListFilter<F> addFilters(FileListFilter<F>... filters) {
92+
List<FileListFilter<F>> asList = Arrays.asList(filters);
93+
return addFilters(asList);
8794
}
8895

8996
/**
@@ -109,18 +116,19 @@ public CompositeFileListFilter<F> addFilters(Collection<? extends FileListFilter
109116
}
110117
}
111118
this.fileFilters.addAll(filtersToAdd);
119+
this.allSupportAccept &= filtersToAdd.stream().allMatch(FileListFilter<F>::supportsSingleFileFiltering);
112120
return this;
113121
}
114122

115123
@Override
116-
public void addDiscardCallback(Consumer<F> discardCallback) {
117-
this.discardCallback = discardCallback;
124+
public void addDiscardCallback(Consumer<F> discardCallbackToSet) {
125+
this.discardCallback = discardCallbackToSet;
118126
if (this.discardCallback != null) {
119127
this.fileFilters
120128
.stream()
121129
.filter(DiscardAwareFileListFilter.class::isInstance)
122130
.map(f -> (DiscardAwareFileListFilter<F>) f)
123-
.forEach(f -> f.addDiscardCallback(discardCallback));
131+
.forEach(f -> f.addDiscardCallback(discardCallbackToSet));
124132
}
125133
}
126134

@@ -135,6 +143,19 @@ public List<F> filterFiles(F[] files) {
135143
return results;
136144
}
137145

146+
@Override
147+
public boolean accept(F file) {
148+
AtomicBoolean allAccept = new AtomicBoolean(true);
149+
// we can't use stream().allMatch() because we have to call all filters for this filter's contract
150+
this.fileFilters.forEach(f -> allAccept.compareAndSet(true, f.accept(file)));
151+
return allAccept.get();
152+
}
153+
154+
@Override
155+
public boolean supportsSingleFileFiltering() {
156+
return this.allSupportAccept;
157+
}
158+
138159
@Override
139160
public void rollback(F file, List<F> files) {
140161
for (FileListFilter<F> fileFilter : this.fileFilters) {

spring-integration-file/src/main/java/org/springframework/integration/file/filters/DiscardAwareFileListFilter.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222

2323
/**
2424
* The {@link FileListFilter} modification which can accept a {@link Consumer}
25-
* which can be called when filter discards the file.
25+
* which can be called when the filter discards the file.
2626
*
2727
* @author Artem Bilan
28+
* @author Gary Russell
2829
*
2930
* @since 5.0.5
3031
*/

spring-integration-file/src/main/java/org/springframework/integration/file/filters/FileListFilter.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
*
2525
* @author Iwein Fuld
2626
* @author Josh Long
27+
* @author Gary Russell
2728
*
2829
* @since 1.0.0
2930
*/
@@ -38,4 +39,29 @@ public interface FileListFilter<F> {
3839
*/
3940
List<F> filterFiles(F[] files);
4041

42+
/**
43+
* Filter a single file; only called externally if {@link #supportsSingleFileFiltering()}
44+
* returns true.
45+
* @param file the file.
46+
* @return true if the file passes the filter, false to filter.
47+
* @since 5.2
48+
* @see #supportsSingleFileFiltering()
49+
*/
50+
default boolean accept(F file) {
51+
throw new UnsupportedOperationException(
52+
"Filters that return true in supportsSingleFileFiltering() must implement this method");
53+
}
54+
55+
/**
56+
* Indicates that this filter supports filtering a single file.
57+
* Filters that return true <b>must</b> override {@link #accept(Object)}.
58+
* Default false.
59+
* @return true to allow external calls to {@link #accept(Object)}.
60+
* @since 5.2
61+
* @see #accept(Object)
62+
*/
63+
default boolean supportsSingleFileFiltering() {
64+
return false;
65+
}
66+
4167
}

spring-integration-file/src/main/java/org/springframework/integration/file/filters/LastModifiedFileListFilter.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,16 @@ public long getAge() {
105105
}
106106

107107
@Override
108-
public void addDiscardCallback(@Nullable Consumer<File> discardCallback) {
109-
this.discardCallback = discardCallback;
108+
public void addDiscardCallback(@Nullable Consumer<File> discardCallbackToSet) {
109+
this.discardCallback = discardCallbackToSet;
110110
}
111111

112112
@Override
113113
public List<File> filterFiles(File[] files) {
114114
List<File> list = new ArrayList<>();
115115
long now = System.currentTimeMillis() / ONE_SECOND;
116116
for (File file : files) {
117-
if (file.lastModified() / ONE_SECOND + this.age <= now) {
117+
if (fileIsAged(file, now)) {
118118
list.add(file);
119119
}
120120
else if (this.discardCallback != null) {
@@ -124,4 +124,25 @@ else if (this.discardCallback != null) {
124124
return list;
125125
}
126126

127+
@Override
128+
public boolean accept(File file) {
129+
if (fileIsAged(file, System.currentTimeMillis() / ONE_SECOND)) {
130+
return true;
131+
}
132+
else if (this.discardCallback != null) {
133+
this.discardCallback.accept(file);
134+
}
135+
return false;
136+
}
137+
138+
private boolean fileIsAged(File file, long now) {
139+
return file.lastModified() / ONE_SECOND + this.age <= now;
140+
}
141+
142+
@Override
143+
public boolean supportsSingleFileFiltering() {
144+
return true;
145+
}
146+
147+
127148
}

0 commit comments

Comments
 (0)