Skip to content

Commit a167290

Browse files
garyrussellartembilan
authored andcommitted
GH-2776: Fix Streaming Remote File MessageSource
Fixes #2776 Also see #2777 - reset the filter for the current file if the fetch fails - implement `Lifecycle` and clear the `toBeReceived` queue and corresponding filter entries * Polishing - PR Comments **cherry-pick to all supported** * Polishing
1 parent 23843c7 commit a167290

File tree

5 files changed

+150
-35
lines changed

5 files changed

+150
-35
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

+61-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,27 +18,29 @@
1818

1919
import java.io.IOException;
2020
import java.io.InputStream;
21+
import java.io.UncheckedIOException;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Collection;
2425
import java.util.Comparator;
2526
import java.util.List;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.LinkedBlockingQueue;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2830

29-
import org.springframework.beans.factory.BeanFactoryAware;
3031
import org.springframework.beans.factory.InitializingBean;
32+
import org.springframework.context.Lifecycle;
3133
import org.springframework.expression.Expression;
3234
import org.springframework.expression.common.LiteralExpression;
3335
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3436
import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource;
3537
import org.springframework.integration.file.FileHeaders;
3638
import org.springframework.integration.file.filters.FileListFilter;
39+
import org.springframework.integration.file.filters.ResettableFileListFilter;
3740
import org.springframework.integration.file.filters.ReversibleFileListFilter;
3841
import org.springframework.integration.file.remote.session.Session;
3942
import org.springframework.integration.file.support.FileUtils;
4043
import org.springframework.lang.Nullable;
41-
import org.springframework.messaging.MessagingException;
4244
import org.springframework.util.Assert;
4345
import org.springframework.util.ObjectUtils;
4446

@@ -53,14 +55,16 @@
5355
*
5456
*/
5557
public abstract class AbstractRemoteFileStreamingMessageSource<F>
56-
extends AbstractFetchLimitingMessageSource<InputStream> implements BeanFactoryAware, InitializingBean {
58+
extends AbstractFetchLimitingMessageSource<InputStream> implements Lifecycle {
5759

5860
private final RemoteFileTemplate<F> remoteFileTemplate;
5961

6062
private final BlockingQueue<AbstractFileInfo<F>> toBeReceived = new LinkedBlockingQueue<>();
6163

6264
private final Comparator<F> comparator;
6365

66+
private final AtomicBoolean running = new AtomicBoolean();
67+
6468
private boolean fileInfoJson = true;
6569

6670
/**
@@ -117,8 +121,8 @@ public void setFilter(FileListFilter<F> filter) {
117121
doSetFilter(filter);
118122
}
119123

120-
protected final void doSetFilter(FileListFilter<F> filter) {
121-
this.filter = filter;
124+
protected final void doSetFilter(FileListFilter<F> filterToSet) {
125+
this.filter = filterToSet;
122126
}
123127

124128
/**
@@ -149,23 +153,53 @@ public final void onInit() {
149153
protected void doInit() {
150154
}
151155

156+
157+
@Override
158+
public void start() {
159+
this.running.set(true);
160+
}
161+
162+
@Override
163+
public void stop() {
164+
if (this.running.compareAndSet(true, false)) {
165+
// remove unprocessed files from the queue (and filter)
166+
AbstractFileInfo<F> file = this.toBeReceived.poll();
167+
while (file != null) {
168+
resetFilterIfNecessary(file);
169+
file = this.toBeReceived.poll();
170+
}
171+
}
172+
}
173+
174+
@Override
175+
public boolean isRunning() {
176+
return this.running.get();
177+
}
178+
152179
@Override
153180
protected Object doReceive() {
181+
Assert.state(this.running.get(), () -> getComponentName() + " is not running");
154182
AbstractFileInfo<F> file = poll();
155183
if (file != null) {
156-
String remotePath = remotePath(file);
157-
Session<?> session = this.remoteFileTemplate.getSession();
158184
try {
159-
return getMessageBuilderFactory()
160-
.withPayload(session.readRaw(remotePath))
161-
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
162-
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
163-
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
164-
.setHeader(FileHeaders.REMOTE_FILE_INFO,
165-
this.fileInfoJson ? file.toJson() : file);
185+
String remotePath = remotePath(file);
186+
Session<?> session = this.remoteFileTemplate.getSession();
187+
try {
188+
return getMessageBuilderFactory()
189+
.withPayload(session.readRaw(remotePath))
190+
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
191+
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
192+
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
193+
.setHeader(FileHeaders.REMOTE_FILE_INFO,
194+
this.fileInfoJson ? file.toJson() : file);
195+
}
196+
catch (IOException e) {
197+
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
198+
}
166199
}
167-
catch (IOException e) {
168-
throw new MessagingException("IOException when retrieving " + remotePath, e);
200+
catch (RuntimeException e) {
201+
resetFilterIfNecessary(file);
202+
throw e;
169203
}
170204
}
171205
return null;
@@ -176,6 +210,16 @@ protected Object doReceive(int maxFetchSize) {
176210
return doReceive();
177211
}
178212

213+
private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
214+
if (this.filter instanceof ResettableFileListFilter) {
215+
if (this.logger.isInfoEnabled()) {
216+
this.logger.info("Removing the remote file '" + file +
217+
"' from the filter for a subsequent transfer attempt");
218+
}
219+
((ResettableFileListFilter<F>) this.filter).remove(file.getFileInfo());
220+
}
221+
}
222+
179223
protected AbstractFileInfo<F> poll() {
180224
if (this.toBeReceived.size() == 0) {
181225
listFiles();

spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

+72-15
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
import java.io.ByteArrayInputStream;
2929
import java.io.IOException;
3030
import java.io.InputStream;
31+
import java.io.UncheckedIOException;
3132
import java.util.ArrayList;
3233
import java.util.Collection;
3334
import java.util.Comparator;
3435
import java.util.List;
36+
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.ConcurrentHashMap;
3538

3639
import org.junit.Test;
3740

@@ -40,13 +43,16 @@
4043
import org.springframework.integration.StaticMessageHeaderAccessor;
4144
import org.springframework.integration.channel.QueueChannel;
4245
import org.springframework.integration.file.FileHeaders;
46+
import org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter;
4347
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
4448
import org.springframework.integration.file.remote.session.Session;
4549
import org.springframework.integration.file.remote.session.SessionFactory;
4650
import org.springframework.integration.file.splitter.FileSplitter;
51+
import org.springframework.integration.metadata.ConcurrentMetadataStore;
52+
import org.springframework.integration.metadata.SimpleMetadataStore;
53+
import org.springframework.integration.test.util.TestUtils;
4754
import org.springframework.integration.transformer.StreamTransformer;
4855
import org.springframework.messaging.Message;
49-
import org.springframework.messaging.MessagingException;
5056

5157
/**
5258
* @author Gary Russell
@@ -67,6 +73,7 @@ public void testAllData() throws Exception {
6773
streamer.setBeanFactory(mock(BeanFactory.class));
6874
streamer.setRemoteDirectory("/foo");
6975
streamer.afterPropertiesSet();
76+
streamer.start();
7077
Message<byte[]> received = (Message<byte[]>) this.transformer.transform(streamer.receive());
7178
assertThat(received.getPayload()).isEqualTo("foo\nbar".getBytes());
7279
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
@@ -112,6 +119,7 @@ public void testAllDataMaxFetch() throws Exception {
112119
streamer.setMaxFetchSize(1);
113120
streamer.setFilter(new AcceptOnceFileListFilter<>());
114121
streamer.afterPropertiesSet();
122+
streamer.start();
115123
Message<byte[]> received = (Message<byte[]>) this.transformer.transform(streamer.receive());
116124
assertThat(received.getPayload()).isEqualTo("foo\nbar".getBytes());
117125
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
@@ -138,17 +146,18 @@ public void testExceptionOnFetch() {
138146
streamer.setBeanFactory(mock(BeanFactory.class));
139147
streamer.setRemoteDirectory("/bad");
140148
streamer.afterPropertiesSet();
141-
assertThatExceptionOfType(MessagingException.class)
149+
streamer.start();
150+
assertThatExceptionOfType(UncheckedIOException.class)
142151
.isThrownBy(streamer::receive);
143152
}
144153

145-
@SuppressWarnings("unchecked")
146154
@Test
147155
public void testLineByLine() throws Exception {
148156
Streamer streamer = new Streamer(new StringRemoteFileTemplate(new StringSessionFactory()), null);
149157
streamer.setBeanFactory(mock(BeanFactory.class));
150158
streamer.setRemoteDirectory("/foo");
151159
streamer.afterPropertiesSet();
160+
streamer.start();
152161
QueueChannel out = new QueueChannel();
153162
FileSplitter splitter = new FileSplitter();
154163
splitter.setBeanFactory(mock(BeanFactory.class));
@@ -160,7 +169,7 @@ public void testLineByLine() throws Exception {
160169
assertThat(received.getPayload()).isEqualTo("foo");
161170
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
162171
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("foo");
163-
received = (Message<byte[]>) out.receive(0);
172+
received = out.receive(0);
164173
assertThat(received.getPayload()).isEqualTo("bar");
165174
assertThat(received.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)).isEqualTo("/foo");
166175
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE)).isEqualTo("foo");
@@ -185,10 +194,43 @@ public void testLineByLine() throws Exception {
185194
verify(new IntegrationMessageHeaderAccessor(receivedStream).getCloseableResource(), times(5)).close();
186195
}
187196

197+
@SuppressWarnings("unchecked")
198+
@Test
199+
public void testStopAdapterRemovesUnprocessed() {
200+
Streamer streamer = new Streamer(new StringRemoteFileTemplate(new StringSessionFactory()), null);
201+
streamer.setBeanFactory(mock(BeanFactory.class));
202+
streamer.setRemoteDirectory("/foo");
203+
streamer.afterPropertiesSet();
204+
streamer.start();
205+
assertThat(streamer.receive()).isNotNull();
206+
assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(1);
207+
assertThat(streamer.metadataMap).hasSize(2);
208+
streamer.stop();
209+
assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(0);
210+
assertThat(streamer.metadataMap).hasSize(1);
211+
}
212+
213+
@SuppressWarnings("unchecked")
214+
@Test
215+
public void testFilterReversedOnBadFetch() {
216+
Streamer streamer = new Streamer(new StringRemoteFileTemplate(new StringSessionFactory()), null);
217+
streamer.setBeanFactory(mock(BeanFactory.class));
218+
streamer.setRemoteDirectory("/bad");
219+
streamer.afterPropertiesSet();
220+
streamer.start();
221+
assertThatExceptionOfType(UncheckedIOException.class)
222+
.isThrownBy(streamer::receive);
223+
assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(1);
224+
assertThat(streamer.metadataMap).hasSize(1);
225+
}
226+
188227
public static class Streamer extends AbstractRemoteFileStreamingMessageSource<String> {
189228

229+
ConcurrentHashMap<String, String> metadataMap = new ConcurrentHashMap<>();
230+
190231
protected Streamer(RemoteFileTemplate<String> template, Comparator<String> comparator) {
191232
super(template, comparator);
233+
doSetFilter(new StringPersistentFileListFilter(new SimpleMetadataStore(this.metadataMap), "streamer"));
192234
}
193235

194236
@Override
@@ -252,11 +294,7 @@ public String getPermissions() {
252294

253295
@Override
254296
public String getFileInfo() {
255-
return asString();
256-
}
257-
258-
private String asString() {
259-
return "StringFileInfo [name=" + this.name + "]";
297+
return name;
260298
}
261299

262300
}
@@ -271,13 +309,13 @@ public StringRemoteFileTemplate(SessionFactory<String> sessionFactory) {
271309

272310
public static class StringSessionFactory implements SessionFactory<String> {
273311

274-
private Session<String> session;
312+
private Session<String> singletonSession;
275313

276314
@SuppressWarnings("unchecked")
277315
@Override
278316
public Session<String> getSession() {
279-
if (this.session != null) {
280-
return this.session;
317+
if (this.singletonSession != null) {
318+
return this.singletonSession;
281319
}
282320
try {
283321
Session<String> session = mock(Session.class);
@@ -293,12 +331,13 @@ public Session<String> getSession() {
293331
willReturn(foo2).given(session).readRaw("/bar/foo");
294332
willReturn(bar2).given(session).readRaw("/bar/bar");
295333

296-
willReturn(new String[] { "/bad/file" }).given(session).list("/bad");
297-
willThrow(new IOException("No file")).given(session).readRaw("/bad/file");
334+
willReturn(new String[] { "/bad/file1", "/bad/file2" }).given(session).list("/bad");
335+
willThrow(new IOException("No file")).given(session).readRaw("/bad/file1");
336+
willThrow(new IOException("No file")).given(session).readRaw("/bad/file2");
298337

299338
given(session.finalizeRaw()).willReturn(true);
300339

301-
this.session = session;
340+
this.singletonSession = session;
302341

303342
return session;
304343
}
@@ -309,4 +348,22 @@ public Session<String> getSession() {
309348

310349
}
311350

351+
public static class StringPersistentFileListFilter extends AbstractPersistentAcceptOnceFileListFilter<String> {
352+
353+
public StringPersistentFileListFilter(ConcurrentMetadataStore store, String prefix) {
354+
super(store, prefix);
355+
}
356+
357+
@Override
358+
protected long modified(String file) {
359+
return 0;
360+
}
361+
362+
@Override
363+
protected String fileName(String file) {
364+
return file;
365+
}
366+
367+
}
368+
312369
}

0 commit comments

Comments
 (0)