Skip to content

Commit a756e63

Browse files
artembilangaryrussell
authored andcommitted
GH-3043: Add FileHeaders.REMOTE_HOST header (#3044)
* GH-3043: Add FileHeaders.REMOTE_HOST header Fixes #3043 * Populate a `FileHeaders.REMOTE_HOST` from the `AbstractRemoteFileStreamingMessageSource` and "get"-based commands in the `AbstractRemoteFileOutboundGateway` * Extract the value from the a `Session.getHost()` contract * The `AbstractInboundFileSynchronizingMessageSource` cannot be addressed with this because the real message is already based on the locally stored file * Adjust some affected tests according our code style requirements * * Add remote file info support into `AbstractInboundFileSynchronizingMessageSource` * Introduce a `MetadataStore` functionality into the `AbstractInboundFileSynchronizer` to gather a remote file info an save it in the URI style against local file * Retrieve such an info in the `AbstractInboundFileSynchronizingMessageSource` during local file polling * Introduce `protocol()` contract for the `AbstractInboundFileSynchronizer` to build a proper URI in the metadata for external readers to distinguish remote files properly * Document the feature * * Fix some typos in Docs * * Rename property and header constant to the `HOST_PORT` pair * Fix typos in Docs * Add `remote-file-metadata-store` and `metadata-store-prefix` into XSD of (S)FTP Inbound Channel Adapters * Add `remoteFileMetadataStore` and `metadataStorePrefix` options into `RemoteFileInboundChannelAdapterSpec` for Java DSL
1 parent ff15d52 commit a756e63

File tree

33 files changed

+605
-297
lines changed

33 files changed

+605
-297
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/FileHeaders.java

+5
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,9 @@ public abstract class FileHeaders {
5252
*/
5353
public static final String REMOTE_FILE_INFO = PREFIX + "remoteFileInfo";
5454

55+
/**
56+
* A remote host/port the file has been polled from
57+
*/
58+
public static final String REMOTE_HOST_PORT = PREFIX + "remoteHostPort";
59+
5560
}

spring-integration-file/src/main/java/org/springframework/integration/file/config/AbstractRemoteFileInboundChannelAdapterParser.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ public abstract class AbstractRemoteFileInboundChannelAdapterParser extends Abst
4545

4646
@Override
4747
protected final BeanMetadataElement parseSource(Element element, ParserContext parserContext) {
48-
BeanDefinitionBuilder synchronizerBuilder = BeanDefinitionBuilder.genericBeanDefinition(
49-
this.getInboundFileSynchronizerClass());
48+
BeanDefinitionBuilder synchronizerBuilder =
49+
BeanDefinitionBuilder.genericBeanDefinition(getInboundFileSynchronizerClass());
5050

5151
synchronizerBuilder.addConstructorArgReference(element.getAttribute("session-factory"));
5252

5353
// configure the InboundFileSynchronizer properties
54-
BeanDefinition expressionDef = IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(
55-
"remote-directory", "remote-directory-expression", parserContext, element, false);
54+
BeanDefinition expressionDef =
55+
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression(
56+
"remote-directory", "remote-directory-expression", parserContext, element, false);
5657
if (expressionDef != null) {
5758
synchronizerBuilder.addPropertyValue("remoteDirectoryExpression", expressionDef);
5859
}
@@ -62,6 +63,9 @@ protected final BeanMetadataElement parseSource(Element element, ParserContext p
6263
String remoteFileSeparator = element.getAttribute("remote-file-separator");
6364
synchronizerBuilder.addPropertyValue("remoteFileSeparator", remoteFileSeparator);
6465
IntegrationNamespaceUtils.setValueIfAttributeDefined(synchronizerBuilder, element, "temporary-file-suffix");
66+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(synchronizerBuilder, element,
67+
"remote-file-metadata-store");
68+
IntegrationNamespaceUtils.setValueIfAttributeDefined(synchronizerBuilder, element, "metadata-store-prefix");
6569

6670
FileParserUtils.configureFilter(synchronizerBuilder, element, parserContext,
6771
getSimplePatternFileListFilterClass(), getRegexPatternFileListFilterClass(),
@@ -100,6 +104,7 @@ protected final BeanMetadataElement parseSource(Element element, ParserContext p
100104

101105
protected abstract Class<? extends FileListFilter<?>> getRegexPatternFileListFilterClass();
102106

103-
protected abstract Class<? extends AbstractPersistentAcceptOnceFileListFilter<?>> getPersistentAcceptOnceFileListFilterClass();
107+
protected abstract Class<? extends AbstractPersistentAcceptOnceFileListFilter<?>>
108+
getPersistentAcceptOnceFileListFilterClass();
104109

105110
}

spring-integration-file/src/main/java/org/springframework/integration/file/dsl/RemoteFileInboundChannelAdapterSpec.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.integration.file.filters.FileListFilter;
3030
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer;
3131
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
32+
import org.springframework.integration.metadata.MetadataStore;
3233

3334
/**
3435
* A {@link MessageSourceSpec} for an {@link AbstractInboundFileSynchronizingMessageSource}.
@@ -245,15 +246,37 @@ public S maxFetchSize(int maxFetchSize) {
245246
return _this();
246247
}
247248

249+
/**
250+
* Configure a {@link MetadataStore} for remote files metadata.
251+
* @param remoteFileMetadataStore the {@link MetadataStore} to use.
252+
* @return the spec.
253+
* @since 5.2
254+
* @see AbstractInboundFileSynchronizer#setRemoteFileMetadataStore(MetadataStore)
255+
*/
256+
public S remoteFileMetadataStore(MetadataStore remoteFileMetadataStore) {
257+
this.synchronizer.setRemoteFileMetadataStore(remoteFileMetadataStore);
258+
return _this();
259+
}
260+
261+
/**
262+
* Configure a prefix for remote files metadata keys.
263+
* @param metadataStorePrefix the metadata key prefix to use.
264+
* @return the spec.
265+
* @since 5.2
266+
* @see #remoteFileMetadataStore
267+
*/
268+
public S metadataStorePrefix(String metadataStorePrefix) {
269+
this.synchronizer.setMetadataStorePrefix(metadataStorePrefix);
270+
return _this();
271+
}
272+
248273
@Override
249274
public Map<Object, String> getComponentsToRegister() {
250275
Map<Object, String> componentsToRegister = new LinkedHashMap<>();
251276
componentsToRegister.put(this.synchronizer, null);
252-
253277
if (this.expressionFileListFilter != null) {
254278
componentsToRegister.put(this.expressionFileListFilter, null);
255279
}
256-
257280
return componentsToRegister;
258281
}
259282

spring-integration-file/src/main/java/org/springframework/integration/file/filters/AbstractPersistentAcceptOnceFileListFilter.java

+16-20
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,16 @@
3838
*
3939
*/
4040
public abstract class AbstractPersistentAcceptOnceFileListFilter<F> extends AbstractFileListFilter<F>
41-
implements ReversibleFileListFilter<F>, ResettableFileListFilter<F>, Closeable {
41+
implements ReversibleFileListFilter<F>, ResettableFileListFilter<F>, Closeable {
4242

4343
protected final ConcurrentMetadataStore store; // NOSONAR
4444

45-
@Nullable
46-
protected final Flushable flushableStore; // NOSONAR
47-
4845
protected final String prefix; // NOSONAR
4946

50-
protected volatile boolean flushOnUpdate; // NOSONAR
47+
@Nullable
48+
protected final Flushable flushableStore; // NOSONAR
5149

52-
private final Object monitor = new Object();
50+
protected boolean flushOnUpdate; // NOSONAR
5351

5452
public AbstractPersistentAcceptOnceFileListFilter(ConcurrentMetadataStore store, String prefix) {
5553
Assert.notNull(store, "'store' cannot be null");
@@ -76,20 +74,18 @@ public void setFlushOnUpdate(boolean flushOnUpdate) {
7674
@Override
7775
public boolean accept(F file) {
7876
String key = buildKey(file);
79-
synchronized (this.monitor) {
80-
String newValue = value(file);
81-
String oldValue = this.store.putIfAbsent(key, newValue);
82-
if (oldValue == null) { // not in store
83-
flushIfNeeded();
84-
return fileStillExists(file);
85-
}
86-
// same value in store
87-
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
88-
flushIfNeeded();
89-
return fileStillExists(file);
90-
}
91-
return false;
77+
String newValue = value(file);
78+
String oldValue = this.store.putIfAbsent(key, newValue);
79+
if (oldValue == null) { // not in store
80+
flushIfNeeded();
81+
return fileStillExists(file);
82+
}
83+
// same value in store
84+
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
85+
flushIfNeeded();
86+
return fileStillExists(file);
9287
}
88+
return false;
9389
}
9490

9591
/**
@@ -151,7 +147,7 @@ private String value(F file) {
151147
* @return true if equal.
152148
*/
153149
protected boolean isEqual(F file, String value) {
154-
return Long.valueOf(value) == modified(file);
150+
return Long.parseLong(value) == modified(file);
155151
}
156152

157153
/**

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ protected Object doReceive() {
205205
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
206206
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
207207
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
208+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
208209
.setHeader(FileHeaders.REMOTE_FILE_INFO,
209210
this.fileInfoJson ? file.toJson() : file);
210211
}

spring-integration-file/src/main/java/org/springframework/integration/file/remote/gateway/AbstractRemoteFileOutboundGateway.java

+61-43
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
*/
7373
public abstract class AbstractRemoteFileOutboundGateway<F> extends AbstractReplyProducingMessageHandler {
7474

75-
7675
private final RemoteFileTemplate<F> remoteFileTemplate;
7776

7877
private final Command command;
@@ -118,7 +117,7 @@ public abstract class AbstractRemoteFileOutboundGateway<F> extends AbstractReply
118117
public AbstractRemoteFileOutboundGateway(SessionFactory<F> sessionFactory,
119118
MessageSessionCallback<F, ?> messageSessionCallback) {
120119

121-
this(new RemoteFileTemplate<F>(sessionFactory), messageSessionCallback);
120+
this(new RemoteFileTemplate<>(sessionFactory), messageSessionCallback);
122121
}
123122

124123
/**
@@ -161,7 +160,7 @@ public AbstractRemoteFileOutboundGateway(SessionFactory<F> sessionFactory, Strin
161160
public AbstractRemoteFileOutboundGateway(SessionFactory<F> sessionFactory, Command command,
162161
@Nullable String expression) {
163162

164-
this(new RemoteFileTemplate<F>(sessionFactory), command, expression);
163+
this(new RemoteFileTemplate<>(sessionFactory), command, expression);
165164
}
166165

167166
/**
@@ -317,7 +316,7 @@ public void setMputFilter(FileListFilter<File> filter) {
317316
* @since 4.3
318317
*/
319318
public void setRenameExpression(Expression renameExpression) {
320-
this.renameProcessor = new ExpressionEvaluatingMessageProcessor<String>(renameExpression);
319+
this.renameProcessor = new ExpressionEvaluatingMessageProcessor<>(renameExpression);
321320
}
322321

323322
/**
@@ -490,10 +489,14 @@ private Object doLs(Message<?> requestMessage) {
490489
dir += this.remoteFileTemplate.getRemoteFileSeparator();
491490
}
492491
final String fullDir = dir;
493-
List<?> payload = this.remoteFileTemplate.execute(session -> ls(requestMessage, session, fullDir));
494-
return getMessageBuilderFactory()
495-
.withPayload(payload)
496-
.setHeader(FileHeaders.REMOTE_DIRECTORY, dir);
492+
return this.remoteFileTemplate.execute(session -> {
493+
List<?> payload = ls(requestMessage, session, fullDir);
494+
return getMessageBuilderFactory()
495+
.withPayload(payload)
496+
.setHeader(FileHeaders.REMOTE_DIRECTORY, fullDir)
497+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort());
498+
});
499+
497500
}
498501

499502
private Object doNlst(Message<?> requestMessage) {
@@ -504,11 +507,13 @@ private Object doNlst(Message<?> requestMessage) {
504507
dir += this.remoteFileTemplate.getRemoteFileSeparator();
505508
}
506509
final String fullDir = dir;
507-
List<?> payload = this.remoteFileTemplate.execute(session -> nlst(requestMessage, session, fullDir));
508-
509-
return getMessageBuilderFactory()
510-
.withPayload(payload)
511-
.setHeader(FileHeaders.REMOTE_DIRECTORY, dir);
510+
return this.remoteFileTemplate.execute(session -> {
511+
List<?> payload = nlst(requestMessage, session, fullDir);
512+
return getMessageBuilderFactory()
513+
.withPayload(payload)
514+
.setHeader(FileHeaders.REMOTE_DIRECTORY, fullDir)
515+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort());
516+
});
512517
}
513518

514519
/**
@@ -541,6 +546,12 @@ private Object doGet(final Message<?> requestMessage) {
541546
session = this.remoteFileTemplate.getSessionFactory().getSession();
542547
try {
543548
payload = session.readRaw(remoteFilePath);
549+
return getMessageBuilderFactory()
550+
.withPayload(payload)
551+
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
552+
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
553+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
554+
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session);
544555
}
545556
catch (IOException e) {
546557
throw new MessageHandlingException(requestMessage,
@@ -549,39 +560,45 @@ private Object doGet(final Message<?> requestMessage) {
549560
}
550561
}
551562
else {
552-
payload = this.remoteFileTemplate.execute(session1 ->
553-
get(requestMessage, session1, remoteDir, remoteFilePath, remoteFilename, null));
563+
return this.remoteFileTemplate.execute(session1 -> {
564+
Object getPayload = get(requestMessage, session1, remoteDir, remoteFilePath, remoteFilename, null);
565+
return getMessageBuilderFactory()
566+
.withPayload(getPayload)
567+
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
568+
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
569+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session1.getHostPort());
570+
});
554571
}
555-
return getMessageBuilderFactory()
556-
.withPayload(payload)
557-
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
558-
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
559-
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session);
560572
}
561573

562574
private Object doMget(final Message<?> requestMessage) {
563575
String remoteFilePath = obtainRemoteFilePath(requestMessage);
564-
final String remoteFilename = getRemoteFilename(remoteFilePath);
565-
final String remoteDir = getRemoteDirectory(remoteFilePath, remoteFilename);
566-
List<File> payload = this.remoteFileTemplate.execute(session ->
567-
mGet(requestMessage, session, remoteDir, remoteFilename));
568-
return getMessageBuilderFactory()
569-
.withPayload(payload)
570-
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
571-
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename);
576+
String remoteFilename = getRemoteFilename(remoteFilePath);
577+
String remoteDir = getRemoteDirectory(remoteFilePath, remoteFilename);
578+
return this.remoteFileTemplate.execute(session -> {
579+
List<File> payload = mGet(requestMessage, session, remoteDir, remoteFilename);
580+
return getMessageBuilderFactory()
581+
.withPayload(payload)
582+
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
583+
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
584+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort());
585+
}
586+
);
572587
}
573588

574589
private Object doRm(Message<?> requestMessage) {
575590
String remoteFilePath = obtainRemoteFilePath(requestMessage);
576591
String remoteFilename = getRemoteFilename(remoteFilePath);
577592
String remoteDir = getRemoteDirectory(remoteFilePath, remoteFilename);
578593

579-
boolean payload = this.remoteFileTemplate.execute(session -> rm(requestMessage, session, remoteFilePath));
580-
581-
return getMessageBuilderFactory()
582-
.withPayload(payload)
583-
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
584-
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename);
594+
return this.remoteFileTemplate.execute(session -> {
595+
boolean payload = rm(requestMessage, session, remoteFilePath);
596+
return getMessageBuilderFactory()
597+
.withPayload(payload)
598+
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
599+
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
600+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort());
601+
});
585602
}
586603

587604
/**
@@ -606,15 +623,16 @@ private Object doMv(Message<?> requestMessage) {
606623
String remoteFileNewPath = this.renameProcessor.processMessage(requestMessage);
607624
Assert.hasLength(remoteFileNewPath, "New filename cannot be empty");
608625

609-
Boolean result =
610-
this.remoteFileTemplate.execute(session ->
611-
mv(requestMessage, session, remoteFilePath, remoteFileNewPath));
612-
613-
return getMessageBuilderFactory()
614-
.withPayload(result)
615-
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
616-
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
617-
.setHeader(FileHeaders.RENAME_TO, remoteFileNewPath);
626+
return this.remoteFileTemplate.execute(session -> {
627+
Boolean result = mv(requestMessage, session, remoteFilePath, remoteFileNewPath);
628+
return getMessageBuilderFactory()
629+
.withPayload(result)
630+
.setHeader(FileHeaders.REMOTE_DIRECTORY, remoteDir)
631+
.setHeader(FileHeaders.REMOTE_FILE, remoteFilename)
632+
.setHeader(FileHeaders.RENAME_TO, remoteFileNewPath)
633+
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort());
634+
}
635+
);
618636
}
619637

620638
private String obtainRemoteFilePath(Message<?> requestMessage) {

0 commit comments

Comments
 (0)