Skip to content

Implement enhancements to RotatingServerAdvice #3029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.file.remote.aop;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.util.Assert;

/**
* Standard rotation policy; iterates over key/directory pairs; when the end
* is reached, starts again at the beginning. If the fair option is true
* the rotation occurs on every poll, regardless of result. Otherwise rotation
* occurs when the current pair returns no message.
*
* Subclasses implement {@code onRotation(MessageSource<?> source)} to configure the {@link MessageSource} on each rotation.
*
* @author Gary Russell
* @author Michael Forstner
* @author Artem Bilan
* @author David Turanski
*
* @since 5.2.0
*/
public abstract class AbstractStandardRotationPolicy implements RotationPolicy {
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final

private final DelegatingSessionFactory<?> factory; // NOSONAR final

private final List<KeyDirectory> keyDirectories = new ArrayList<>();

private final boolean fair;

private volatile Iterator<KeyDirectory> iterator;

private volatile KeyDirectory current;

private volatile boolean initialized;

protected AbstractStandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories, boolean fair) {

Assert.notNull(factory, "factory cannot be null");
Assert.notNull(keyDirectories, "keyDirectories cannot be null");
Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
this.factory = factory;
this.keyDirectories.addAll(keyDirectories);
this.fair = fair;
this.iterator = this.keyDirectories.iterator();
}

@Override
public void beforeReceive(MessageSource<?> source) {
if (this.fair || !this.initialized) {
configureSource(source);
this.initialized = true;
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Next poll is for " + this.current);
}
this.factory.setThreadKey(this.current.getKey());
}

@Override
public void afterReceive(boolean messageReceived, MessageSource<?> source) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Poll produced "
+ (messageReceived ? "a" : "no")
+ " message");
}
this.factory.clearThreadKey();
if (!this.fair && !messageReceived) {
configureSource(source);
}
}

@Override
public KeyDirectory getCurrent() {
return this.current;
}


protected DelegatingSessionFactory<?> getFactory() {
return this.factory;
}

protected List<KeyDirectory> getKeyDirectories() {
return this.keyDirectories;
}

protected boolean isFair() {
return this.fair;
}

protected Iterator<KeyDirectory> getIterator() {
return this.iterator;
}

protected boolean isInitialized() {
return this.initialized;
}

protected void configureSource(MessageSource<?> source) {

if (!this.iterator.hasNext()) {
this.iterator = this.keyDirectories.iterator();
}
this.current = this.iterator.next();

onRotation(source);
}

protected abstract void onRotation(MessageSource<?> source);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.file.remote.aop;

import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
import org.springframework.util.Assert;

/**
* A {@link DelegatingSessionFactory} key/directory pair.
*/
public class KeyDirectory {

private final Object key;

private final String directory;

public KeyDirectory(Object key, String directory) {
Assert.notNull(key, "key cannot be null");
Assert.notNull(directory, "directory cannot be null");
this.key = key;
this.directory = directory;
}

public Object getKey() {
return this.key;
}

public String getDirectory() {
return this.directory;
}

@Override
public String toString() {
return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@

package org.springframework.integration.file.remote.aop;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.aop.AbstractMessageSourceAdvice;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
Expand All @@ -37,6 +32,7 @@
* @author Gary Russell
* @author Michael Forstner
* @author Artem Bilan
* @author David Turanski
*
* @since 5.0.7
*
Expand Down Expand Up @@ -88,167 +84,28 @@ public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
return result;
}

/**
* Implementations can reconfigure the message source before and/or after
* a poll.
*/
public interface RotationPolicy {

/**
* Invoked before the message source receive() method.
* @param source the message source.
*/
void beforeReceive(MessageSource<?> source);

/**
* Invoked after the message source receive() method.
* @param messageReceived true if a message was received.
* @param source the message source.
*/
void afterReceive(boolean messageReceived, MessageSource<?> source);

}

/**
* Standard rotation policy; iterates over key/directory pairs; when the end
* is reached, starts again at the beginning. If the fair option is true
* the rotation occurs on every poll, regardless of result. Otherwise rotation
* occurs when the current pair returns no message.
*/
public static class StandardRotationPolicy implements RotationPolicy {

protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final

protected final DelegatingSessionFactory<?> factory; // NOSONAR final

private final List<KeyDirectory> keyDirectories = new ArrayList<>();

private final boolean fair;

private volatile Iterator<KeyDirectory> iterator;
public static class StandardRotationPolicy extends AbstractStandardRotationPolicy {

private volatile KeyDirectory current;

private volatile boolean initialized;

public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
boolean fair) {

Assert.notNull(factory, "factory cannot be null");
Assert.notNull(keyDirectories, "keyDirectories cannot be null");
Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
this.factory = factory;
this.keyDirectories.addAll(keyDirectories);
this.fair = fair;
this.iterator = this.keyDirectories.iterator();
}

protected Iterator<KeyDirectory> getIterator() {
return this.iterator;
}

protected void setIterator(Iterator<KeyDirectory> iterator) {
this.iterator = iterator;
}

protected boolean isInitialized() {
return this.initialized;
super(factory, keyDirectories, fair);
}

protected void setInitialized(boolean initialized) {
this.initialized = initialized;
}

protected DelegatingSessionFactory<?> getFactory() {
return this.factory;
}

protected List<KeyDirectory> getKeyDirectories() {
return this.keyDirectories;
}

protected boolean isFair() {
return this.fair;
}

protected KeyDirectory getCurrent() {
return this.current;
}

@Override
public void beforeReceive(MessageSource<?> source) {
if (this.fair || !this.initialized) {
configureSource(source);
this.initialized = true;
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Next poll is for " + this.current);
}
this.factory.setThreadKey(this.current.getKey());
}

@Override
public void afterReceive(boolean messageReceived, MessageSource<?> source) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Poll produced "
+ (messageReceived ? "a" : "no")
+ " message");
}
this.factory.clearThreadKey();
if (!this.fair && !messageReceived) {
configureSource(source);
}
}

protected void configureSource(MessageSource<?> source) {
protected void onRotation(MessageSource<?> source) {
Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
|| source instanceof AbstractRemoteFileStreamingMessageSource,
"source must be an AbstractInboundFileSynchronizingMessageSource or a "
+ "AbstractRemoteFileStreamingMessageSource");
if (!this.iterator.hasNext()) {
this.iterator = this.keyDirectories.iterator();
}
this.current = this.iterator.next();

if (source instanceof AbstractRemoteFileStreamingMessageSource) {
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.current.getDirectory());
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.getCurrent().getDirectory());
}
else {
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
.setRemoteDirectory(this.current.getDirectory());
.setRemoteDirectory(this.getCurrent().getDirectory());
}
}

}

/**
* A {@link DelegatingSessionFactory} key/directory pair.
*/
public static class KeyDirectory {

private final Object key;

private final String directory;

public KeyDirectory(Object key, String directory) {
Assert.notNull(key, "key cannot be null");
Assert.notNull(directory, "directory cannot be null");
this.key = key;
this.directory = directory;
}

public Object getKey() {
return this.key;
}

public String getDirectory() {
return this.directory;
}

@Override
public String toString() {
return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]";
}

}

}
Loading