Skip to content

Commit 1820c05

Browse files
lsgemeartembilan
lsgeme
authored andcommitted
GH-3105: Fix (S)FTP streaming single filter logic
Fixes #3105 The loop in the `AbstractRemoteFileStreamingMessageSource` doesn't check the next polled file for filtering. * Add `continue;` when we filter the current file and poll the next one. So, we go over into the `while()` beginning * Clean up the code style and cover more code path in test
1 parent ddccb5a commit 1820c05

File tree

2 files changed

+179
-36
lines changed

2 files changed

+179
-36
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
*
5050
* @author Gary Russell
5151
* @author Artem Bilan
52+
* @author Lukas Gemela
5253
*
5354
* @since 4.3
5455
*
@@ -180,51 +181,57 @@ public boolean isRunning() {
180181
return this.running.get();
181182
}
182183

184+
@Override
185+
protected Object doReceive(int maxFetchSize) {
186+
return doReceive();
187+
}
188+
183189
@Override
184190
protected Object doReceive() {
185191
Assert.state(this.running.get(), () -> getComponentName() + " is not running");
186192
AbstractFileInfo<F> file = poll();
187193
while (file != null) {
188194
if (this.filter != null && this.filter.supportsSingleFileFiltering()
189-
&& !this.filter.accept(file.getFileInfo())) {
195+
&& !this.filter.accept(file.getFileInfo())) {
190196

191-
if (this.toBeReceived.size() > 0) { // don't re-fetch already filtered files
192-
file = poll();
193-
}
194-
else {
195-
file = null;
196-
}
197+
if (this.toBeReceived.size() > 0) { // don't re-fetch already filtered files
198+
file = poll();
199+
continue;
200+
}
201+
else {
202+
break;
203+
}
197204
}
198-
if (file != null) {
205+
try {
206+
String remotePath = remotePath(file);
207+
Session<?> session = this.remoteFileTemplate.getSession();
199208
try {
200-
String remotePath = remotePath(file);
201-
Session<?> session = this.remoteFileTemplate.getSession();
202-
try {
203-
return getMessageBuilderFactory()
204-
.withPayload(session.readRaw(remotePath))
205-
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
206-
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
207-
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
208-
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
209-
.setHeader(FileHeaders.REMOTE_FILE_INFO,
210-
this.fileInfoJson ? file.toJson() : file);
211-
}
212-
catch (IOException e) {
213-
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
214-
}
209+
return getMessageBuilderFactory()
210+
.withPayload(session.readRaw(remotePath))
211+
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
212+
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
213+
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
214+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
215+
.setHeader(FileHeaders.REMOTE_FILE_INFO,
216+
this.fileInfoJson ? file.toJson() : file);
215217
}
216-
catch (RuntimeException e) {
217-
resetFilterIfNecessary(file);
218-
throw e;
218+
catch (IOException e) {
219+
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
219220
}
220221
}
222+
catch (RuntimeException e) {
223+
resetFilterIfNecessary(file);
224+
throw e;
225+
}
221226
}
222227
return null;
223228
}
224229

225-
@Override
226-
protected Object doReceive(int maxFetchSize) {
227-
return doReceive();
230+
protected AbstractFileInfo<F> poll() {
231+
if (this.toBeReceived.size() == 0) {
232+
listFiles();
233+
}
234+
return this.toBeReceived.poll();
228235
}
229236

230237
private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
@@ -237,13 +244,6 @@ private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
237244
}
238245
}
239246

240-
protected AbstractFileInfo<F> poll() {
241-
if (this.toBeReceived.size() == 0) {
242-
listFiles();
243-
}
244-
return this.toBeReceived.poll();
245-
}
246-
247247
protected String remotePath(AbstractFileInfo<F> file) {
248248
return file.getRemoteDirectory().endsWith(this.remoteFileSeparator)
249249
? file.getRemoteDirectory() + file.getFilename()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2015-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.file.remote;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.anyString;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.when;
23+
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.util.Collection;
27+
import java.util.Comparator;
28+
import java.util.List;
29+
import java.util.stream.Collectors;
30+
31+
import org.junit.Test;
32+
33+
import org.springframework.beans.factory.BeanFactory;
34+
import org.springframework.integration.file.filters.FileListFilter;
35+
import org.springframework.integration.file.remote.session.Session;
36+
37+
/**
38+
* @author Lukas Gemela
39+
* @author Artem Bilan
40+
*
41+
* @since 5.2.2
42+
*
43+
*/
44+
public class RemoteFileStreamingMessageSourceTests {
45+
46+
@Test
47+
@SuppressWarnings("unchecked")
48+
public void filterOutFilesNotAcceptedByFilter() throws IOException {
49+
RemoteFileTemplate<String> remoteFileTemplate = mock(RemoteFileTemplate.class);
50+
when(remoteFileTemplate.list("remoteDirectory")).thenReturn(new String[] { "file1", "file2" });
51+
Session<String> session = mock(Session.class);
52+
when(session.readRaw(anyString())).thenReturn(mock(InputStream.class));
53+
when(remoteFileTemplate.getSession()).thenReturn(session);
54+
55+
FileListFilter<String> fileListFilter = mock(FileListFilter.class);
56+
when(fileListFilter.supportsSingleFileFiltering()).thenReturn(true);
57+
when(fileListFilter.accept("file1")).thenReturn(false);
58+
when(fileListFilter.accept("file2")).thenReturn(false);
59+
60+
Comparator<String> comparator = mock(Comparator.class);
61+
TestRemoteFileStreamingMessageSource testRemoteFileStreamingMessageSource =
62+
new TestRemoteFileStreamingMessageSource(remoteFileTemplate, comparator);
63+
64+
testRemoteFileStreamingMessageSource.setFilter(fileListFilter);
65+
testRemoteFileStreamingMessageSource.setRemoteDirectory("remoteDirectory");
66+
testRemoteFileStreamingMessageSource.setBeanFactory(mock(BeanFactory.class));
67+
testRemoteFileStreamingMessageSource.start();
68+
69+
assertThat(testRemoteFileStreamingMessageSource.doReceive()).isNull();
70+
}
71+
72+
static class TestRemoteFileStreamingMessageSource extends AbstractRemoteFileStreamingMessageSource<String> {
73+
74+
TestRemoteFileStreamingMessageSource(RemoteFileTemplate<String> template, Comparator<String> comparator) {
75+
super(template, comparator);
76+
}
77+
78+
@Override
79+
protected List<AbstractFileInfo<String>> asFileInfoList(Collection<String> files) {
80+
return files
81+
.stream()
82+
.map(TestFileInfo::new)
83+
.collect(Collectors.toList());
84+
}
85+
86+
@Override
87+
protected boolean isDirectory(String file) {
88+
return false;
89+
}
90+
91+
@Override
92+
public String getComponentType() {
93+
return null;
94+
}
95+
96+
}
97+
98+
static class TestFileInfo extends AbstractFileInfo<String> {
99+
100+
TestFileInfo(String fileName) {
101+
this.fileName = fileName;
102+
}
103+
104+
private final String fileName;
105+
106+
@Override
107+
public boolean isDirectory() {
108+
return false;
109+
}
110+
111+
@Override
112+
public boolean isLink() {
113+
return false;
114+
}
115+
116+
@Override
117+
public long getSize() {
118+
return 0;
119+
}
120+
121+
@Override
122+
public long getModified() {
123+
return 0;
124+
}
125+
126+
@Override
127+
public String getFilename() {
128+
return fileName;
129+
}
130+
131+
@Override
132+
public String getPermissions() {
133+
return null;
134+
}
135+
136+
@Override
137+
public String getFileInfo() {
138+
return null;
139+
}
140+
141+
}
142+
143+
}

0 commit comments

Comments
 (0)