Skip to content

Commit 34febd4

Browse files
garyrussellartembilan
authored andcommitted
GH-2776: Fix Streaming Remote File MessageSource
Fixes #2776 Also see #2777 - reset the filter for the current file if the fetch fails - implement `Lifecycle` and clear the `toBeReceived` queue and corresponding filter entries * Polishing - PR Comments **cherry-pick to all supported** * Polishing # Conflicts: # spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java # spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java # spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/StoredProcJavaConfigTests.java # Conflicts: # spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java # spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java
1 parent 8f1a1c1 commit 34febd4

File tree

4 files changed

+202
-94
lines changed

4 files changed

+202
-94
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

+61-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.io.InputStream;
21+
import java.io.UncheckedIOException;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Collection;
@@ -26,19 +27,20 @@
2627
import java.util.List;
2728
import java.util.concurrent.BlockingQueue;
2829
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.atomic.AtomicBoolean;
2931

30-
import org.springframework.beans.factory.BeanFactoryAware;
3132
import org.springframework.beans.factory.InitializingBean;
33+
import org.springframework.context.Lifecycle;
3234
import org.springframework.expression.Expression;
3335
import org.springframework.expression.common.LiteralExpression;
3436
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3537
import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource;
3638
import org.springframework.integration.file.FileHeaders;
3739
import org.springframework.integration.file.filters.FileListFilter;
40+
import org.springframework.integration.file.filters.ResettableFileListFilter;
3841
import org.springframework.integration.file.filters.ReversibleFileListFilter;
3942
import org.springframework.integration.file.remote.session.Session;
4043
import org.springframework.integration.file.support.FileUtils;
41-
import org.springframework.messaging.MessagingException;
4244
import org.springframework.util.Assert;
4345
import org.springframework.util.ObjectUtils;
4446

@@ -53,14 +55,16 @@
5355
*
5456
*/
5557
public abstract class AbstractRemoteFileStreamingMessageSource<F>
56-
extends AbstractFetchLimitingMessageSource<InputStream> implements BeanFactoryAware, InitializingBean {
58+
extends AbstractFetchLimitingMessageSource<InputStream> implements Lifecycle {
5759

5860
private final RemoteFileTemplate<F> remoteFileTemplate;
5961

6062
private final BlockingQueue<AbstractFileInfo<F>> toBeReceived = new LinkedBlockingQueue<AbstractFileInfo<F>>();
6163

6264
private final Comparator<AbstractFileInfo<F>> comparator;
6365

66+
private final AtomicBoolean running = new AtomicBoolean();
67+
6468
private boolean fileInfoJson = true;
6569

6670
/**
@@ -117,8 +121,8 @@ public void setFilter(FileListFilter<F> filter) {
117121
doSetFilter(filter);
118122
}
119123

120-
protected final void doSetFilter(FileListFilter<F> filter) {
121-
this.filter = filter;
124+
protected final void doSetFilter(FileListFilter<F> filterToSet) {
125+
this.filter = filterToSet;
122126
}
123127

124128
/**
@@ -149,23 +153,53 @@ public final void afterPropertiesSet() {
149153
protected void doInit() {
150154
}
151155

156+
157+
@Override
158+
public void start() {
159+
this.running.set(true);
160+
}
161+
162+
@Override
163+
public void stop() {
164+
if (this.running.compareAndSet(true, false)) {
165+
// remove unprocessed files from the queue (and filter)
166+
AbstractFileInfo<F> file = this.toBeReceived.poll();
167+
while (file != null) {
168+
resetFilterIfNecessary(file);
169+
file = this.toBeReceived.poll();
170+
}
171+
}
172+
}
173+
174+
@Override
175+
public boolean isRunning() {
176+
return this.running.get();
177+
}
178+
152179
@Override
153180
protected Object doReceive() {
181+
Assert.state(this.running.get(), () -> getComponentName() + " is not running");
154182
AbstractFileInfo<F> file = poll();
155183
if (file != null) {
156-
String remotePath = remotePath(file);
157-
Session<?> session = this.remoteFileTemplate.getSession();
158184
try {
159-
return getMessageBuilderFactory()
160-
.withPayload(session.readRaw(remotePath))
161-
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
162-
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
163-
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
164-
.setHeader(FileHeaders.REMOTE_FILE_INFO,
165-
this.fileInfoJson ? file.toJson() : file);
185+
String remotePath = remotePath(file);
186+
Session<?> session = this.remoteFileTemplate.getSession();
187+
try {
188+
return getMessageBuilderFactory()
189+
.withPayload(session.readRaw(remotePath))
190+
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
191+
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
192+
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
193+
.setHeader(FileHeaders.REMOTE_FILE_INFO,
194+
this.fileInfoJson ? file.toJson() : file);
195+
}
196+
catch (IOException e) {
197+
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
198+
}
166199
}
167-
catch (IOException e) {
168-
throw new MessagingException("IOException when retrieving " + remotePath, e);
200+
catch (RuntimeException e) {
201+
resetFilterIfNecessary(file);
202+
throw e;
169203
}
170204
}
171205
return null;
@@ -176,6 +210,16 @@ protected Object doReceive(int maxFetchSize) {
176210
return doReceive();
177211
}
178212

213+
private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
214+
if (this.filter instanceof ResettableFileListFilter) {
215+
if (this.logger.isInfoEnabled()) {
216+
this.logger.info("Removing the remote file '" + file +
217+
"' from the filter for a subsequent transfer attempt");
218+
}
219+
((ResettableFileListFilter<F>) this.filter).remove(file.getFileInfo());
220+
}
221+
}
222+
179223
protected AbstractFileInfo<F> poll() {
180224
if (this.toBeReceived.size() == 0) {
181225
listFiles();

0 commit comments

Comments
 (0)