From 39dea2c8efb4701cd5a48c2d7538d04f93d09570 Mon Sep 17 00:00:00 2001 From: David Turanski Date: Mon, 12 Aug 2019 13:00:47 -0400 Subject: [PATCH 1/2] Implement enhancements to RotatingServerAdvice --- .../aop/AbstractStandardRotationPolicy.java | 131 +++++++++++++++ .../file/remote/aop/KeyDirectory.java | 51 ++++++ .../file/remote/aop/RotatingServerAdvice.java | 157 +----------------- .../file/remote/aop/RotationPolicy.java | 54 ++++++ .../ftp/inbound/RotatingServersTests.java | 2 +- 5 files changed, 244 insertions(+), 151 deletions(-) create mode 100644 spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java create mode 100644 spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java create mode 100644 spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java new file mode 100644 index 00000000000..704b83bd156 --- /dev/null +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java @@ -0,0 +1,131 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.file.remote.aop; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.integration.core.MessageSource; +import org.springframework.integration.file.remote.session.DelegatingSessionFactory; +import org.springframework.util.Assert; + +/** + * Standard rotation policy; iterates over key/directory pairs; when the end + * is reached, starts again at the beginning. If the fair option is true + * the rotation occurs on every poll, regardless of result. Otherwise rotation + * occurs when the current pair returns no message. + * + * Subclasses implement {@code onRotation(MessageSource source)} to configure the {@link MessageSource} on each rotation. + * + * @author Gary Russell + * @author Michael Forstner + * @author Artem Bilan + * @author David Turanski + * + * @since 5.2.0 + */ +public abstract class AbstractStandardRotationPolicy implements RotationPolicy { + protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final + + private final DelegatingSessionFactory factory; // NOSONAR final + + private final List keyDirectories = new ArrayList<>(); + + private final boolean fair; + + private volatile Iterator iterator; + + private volatile KeyDirectory current; + + private volatile boolean initialized; + + protected AbstractStandardRotationPolicy(DelegatingSessionFactory factory, List keyDirectories, + boolean fair) { + + Assert.notNull(factory, "factory cannot be null"); + Assert.notNull(keyDirectories, "keyDirectories cannot be null"); + Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required"); + this.factory = factory; + this.keyDirectories.addAll(keyDirectories); + this.fair = fair; + this.iterator = this.keyDirectories.iterator(); + } + + @Override + public void beforeReceive(MessageSource source) { + if (this.fair || !this.initialized) { + configureSource(source); + this.initialized = true; + } + if (this.logger.isTraceEnabled()) { + this.logger.trace("Next poll is for " + this.current); + } + this.factory.setThreadKey(this.current.getKey()); + } + + @Override + public void afterReceive(boolean messageReceived, MessageSource source) { + if (this.logger.isTraceEnabled()) { + this.logger.trace("Poll produced " + + (messageReceived ? "a" : "no") + + " message"); + } + this.factory.clearThreadKey(); + if (!this.fair && !messageReceived) { + configureSource(source); + } + } + + @Override + public KeyDirectory getCurrent() { + return current; + } + + + protected DelegatingSessionFactory getFactory() { + return factory; + } + + protected List getKeyDirectories() { + return keyDirectories; + } + + protected boolean isFair() { + return fair; + } + + protected Iterator getIterator() { + return iterator; + } + + protected boolean isInitialized() { + return initialized; + } + + protected void configureSource(MessageSource source) { + + if (!this.iterator.hasNext()) { + this.iterator = this.keyDirectories.iterator(); + } + this.current = this.iterator.next(); + + onRotation(source); + } + + protected abstract void onRotation(MessageSource source); +} diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java new file mode 100644 index 00000000000..1ff79854840 --- /dev/null +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java @@ -0,0 +1,51 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.file.remote.aop; + +import org.springframework.integration.file.remote.session.DelegatingSessionFactory; +import org.springframework.util.Assert; + +/** + * A {@link DelegatingSessionFactory} key/directory pair. + */ +public class KeyDirectory { + + private final Object key; + + private final String directory; + + public KeyDirectory(Object key, String directory) { + Assert.notNull(key, "key cannot be null"); + Assert.notNull(directory, "directory cannot be null"); + this.key = key; + this.directory = directory; + } + + public Object getKey() { + return this.key; + } + + public String getDirectory() { + return this.directory; + } + + @Override + public String toString() { + return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]"; + } + +} diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java index bff21192005..642a5022da7 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java @@ -16,13 +16,8 @@ package org.springframework.integration.file.remote.aop; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import org.springframework.integration.aop.AbstractMessageSourceAdvice; import org.springframework.integration.core.MessageSource; import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource; @@ -37,6 +32,7 @@ * @author Gary Russell * @author Michael Forstner * @author Artem Bilan + * @author David Turanski * * @since 5.0.7 * @@ -88,167 +84,28 @@ public Message afterReceive(Message result, MessageSource source) { return result; } - /** - * Implementations can reconfigure the message source before and/or after - * a poll. - */ - public interface RotationPolicy { - - /** - * Invoked before the message source receive() method. - * @param source the message source. - */ - void beforeReceive(MessageSource source); - - /** - * Invoked after the message source receive() method. - * @param messageReceived true if a message was received. - * @param source the message source. - */ - void afterReceive(boolean messageReceived, MessageSource source); - - } - - /** - * Standard rotation policy; iterates over key/directory pairs; when the end - * is reached, starts again at the beginning. If the fair option is true - * the rotation occurs on every poll, regardless of result. Otherwise rotation - * occurs when the current pair returns no message. - */ - public static class StandardRotationPolicy implements RotationPolicy { - - protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final - - protected final DelegatingSessionFactory factory; // NOSONAR final - - private final List keyDirectories = new ArrayList<>(); - - private final boolean fair; - - private volatile Iterator iterator; + public static class StandardRotationPolicy extends AbstractStandardRotationPolicy { - private volatile KeyDirectory current; - - private volatile boolean initialized; public StandardRotationPolicy(DelegatingSessionFactory factory, List keyDirectories, boolean fair) { - - Assert.notNull(factory, "factory cannot be null"); - Assert.notNull(keyDirectories, "keyDirectories cannot be null"); - Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required"); - this.factory = factory; - this.keyDirectories.addAll(keyDirectories); - this.fair = fair; - this.iterator = this.keyDirectories.iterator(); - } - - protected Iterator getIterator() { - return this.iterator; - } - - protected void setIterator(Iterator iterator) { - this.iterator = iterator; - } - - protected boolean isInitialized() { - return this.initialized; + super(factory, keyDirectories, fair); } - protected void setInitialized(boolean initialized) { - this.initialized = initialized; - } - - protected DelegatingSessionFactory getFactory() { - return this.factory; - } - - protected List getKeyDirectories() { - return this.keyDirectories; - } - - protected boolean isFair() { - return this.fair; - } - - protected KeyDirectory getCurrent() { - return this.current; - } - - @Override - public void beforeReceive(MessageSource source) { - if (this.fair || !this.initialized) { - configureSource(source); - this.initialized = true; - } - if (this.logger.isTraceEnabled()) { - this.logger.trace("Next poll is for " + this.current); - } - this.factory.setThreadKey(this.current.getKey()); - } - - @Override - public void afterReceive(boolean messageReceived, MessageSource source) { - if (this.logger.isTraceEnabled()) { - this.logger.trace("Poll produced " - + (messageReceived ? "a" : "no") - + " message"); - } - this.factory.clearThreadKey(); - if (!this.fair && !messageReceived) { - configureSource(source); - } - } - - protected void configureSource(MessageSource source) { + protected void onRotation(MessageSource source) { Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource || source instanceof AbstractRemoteFileStreamingMessageSource, "source must be an AbstractInboundFileSynchronizingMessageSource or a " + "AbstractRemoteFileStreamingMessageSource"); - if (!this.iterator.hasNext()) { - this.iterator = this.keyDirectories.iterator(); - } - this.current = this.iterator.next(); + if (source instanceof AbstractRemoteFileStreamingMessageSource) { - ((AbstractRemoteFileStreamingMessageSource) source).setRemoteDirectory(this.current.getDirectory()); + ((AbstractRemoteFileStreamingMessageSource) source).setRemoteDirectory(this.getCurrent().getDirectory()); } else { ((AbstractInboundFileSynchronizingMessageSource) source).getSynchronizer() - .setRemoteDirectory(this.current.getDirectory()); + .setRemoteDirectory(this.getCurrent().getDirectory()); } } } - - /** - * A {@link DelegatingSessionFactory} key/directory pair. - */ - public static class KeyDirectory { - - private final Object key; - - private final String directory; - - public KeyDirectory(Object key, String directory) { - Assert.notNull(key, "key cannot be null"); - Assert.notNull(directory, "directory cannot be null"); - this.key = key; - this.directory = directory; - } - - public Object getKey() { - return this.key; - } - - public String getDirectory() { - return this.directory; - } - - @Override - public String toString() { - return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]"; - } - - } - } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java new file mode 100644 index 00000000000..9c66d4c5f62 --- /dev/null +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java @@ -0,0 +1,54 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.file.remote.aop; + +import org.springframework.integration.core.MessageSource; + +/** + * Implementations can reconfigure the message source before and/or after + * a poll. + * + * @author Gary Russell + * @author Michael Forstner + * @author Artem Bilan + * @author David Turanski + * + * @since 5.0.7 + */ +public interface RotationPolicy { + + /** + * Invoked before the message source receive() method. + * @param source the message source. + */ + void beforeReceive(MessageSource source); + + /** + * Invoked after the message source receive() method. + * @param messageReceived true if a message was received. + * @param source the message source. + */ + void afterReceive(boolean messageReceived, MessageSource source); + + + /** + * + * @return the current {@link KeyDirectory} + */ + KeyDirectory getCurrent(); + +} \ No newline at end of file diff --git a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java index b3a94805f6d..e75ab4a28fa 100644 --- a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java +++ b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java @@ -43,8 +43,8 @@ import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.dsl.StandardIntegrationFlow; +import org.springframework.integration.file.remote.aop.KeyDirectory; import org.springframework.integration.file.remote.aop.RotatingServerAdvice; -import org.springframework.integration.file.remote.aop.RotatingServerAdvice.KeyDirectory; import org.springframework.integration.file.remote.session.CachingSessionFactory; import org.springframework.integration.file.remote.session.DefaultSessionFactoryLocator; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; From a8148c96e196e051e3e92513e5e205f6fc7f728b Mon Sep 17 00:00:00 2001 From: David Turanski Date: Tue, 13 Aug 2019 13:46:18 -0400 Subject: [PATCH 2/2] Fix checkstyle errors --- .../aop/AbstractStandardRotationPolicy.java | 18 ++++++++++-------- .../file/remote/aop/RotationPolicy.java | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java index 704b83bd156..a95e0b2e682 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java @@ -13,13 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.integration.file.remote.aop; import java.util.ArrayList; import java.util.Iterator; import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.integration.core.MessageSource; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; import org.springframework.util.Assert; @@ -54,8 +57,7 @@ public abstract class AbstractStandardRotationPolicy implements RotationPolicy { private volatile boolean initialized; - protected AbstractStandardRotationPolicy(DelegatingSessionFactory factory, List keyDirectories, - boolean fair) { + protected AbstractStandardRotationPolicy(DelegatingSessionFactory factory, List keyDirectories, boolean fair) { Assert.notNull(factory, "factory cannot be null"); Assert.notNull(keyDirectories, "keyDirectories cannot be null"); @@ -93,28 +95,28 @@ public void afterReceive(boolean messageReceived, MessageSource source) { @Override public KeyDirectory getCurrent() { - return current; + return this.current; } protected DelegatingSessionFactory getFactory() { - return factory; + return this.factory; } protected List getKeyDirectories() { - return keyDirectories; + return this.keyDirectories; } protected boolean isFair() { - return fair; + return this.fair; } protected Iterator getIterator() { - return iterator; + return this.iterator; } protected boolean isInitialized() { - return initialized; + return this.initialized; } protected void configureSource(MessageSource source) { diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java index 9c66d4c5f62..ab21233ce6c 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotationPolicy.java @@ -51,4 +51,4 @@ public interface RotationPolicy { */ KeyDirectory getCurrent(); -} \ No newline at end of file +}