Skip to content

Commit f7032c7

Browse files
committed
Revert "GH-3027: RotatingServerAdvice enhancements"
This reverts commit 03b6316.
1 parent 03b6316 commit f7032c7

File tree

5 files changed

+150
-248
lines changed

5 files changed

+150
-248
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/AbstractStandardRotationPolicy.java

-135
This file was deleted.

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/KeyDirectory.java

-51
This file was deleted.

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java

+149-7
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@
1616

1717
package org.springframework.integration.file.remote.aop;
1818

19+
import java.util.ArrayList;
20+
import java.util.Iterator;
1921
import java.util.List;
2022

23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
2126
import org.springframework.integration.aop.AbstractMessageSourceAdvice;
2227
import org.springframework.integration.core.MessageSource;
2328
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
@@ -32,7 +37,6 @@
3237
* @author Gary Russell
3338
* @author Michael Forstner
3439
* @author Artem Bilan
35-
* @author David Turanski
3640
*
3741
* @since 5.0.7
3842
*
@@ -84,29 +88,167 @@ public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
8488
return result;
8589
}
8690

87-
public static class StandardRotationPolicy extends AbstractStandardRotationPolicy {
91+
/**
92+
* Implementations can reconfigure the message source before and/or after
93+
* a poll.
94+
*/
95+
public interface RotationPolicy {
96+
97+
/**
98+
* Invoked before the message source receive() method.
99+
* @param source the message source.
100+
*/
101+
void beforeReceive(MessageSource<?> source);
102+
103+
/**
104+
* Invoked after the message source receive() method.
105+
* @param messageReceived true if a message was received.
106+
* @param source the message source.
107+
*/
108+
void afterReceive(boolean messageReceived, MessageSource<?> source);
109+
110+
}
111+
112+
/**
113+
* Standard rotation policy; iterates over key/directory pairs; when the end
114+
* is reached, starts again at the beginning. If the fair option is true
115+
* the rotation occurs on every poll, regardless of result. Otherwise rotation
116+
* occurs when the current pair returns no message.
117+
*/
118+
public static class StandardRotationPolicy implements RotationPolicy {
119+
120+
protected final Log logger = LogFactory.getLog(getClass());
121+
122+
protected final DelegatingSessionFactory<?> factory;
123+
124+
private final List<KeyDirectory> keyDirectories = new ArrayList<>();
125+
126+
private final boolean fair;
127+
128+
private volatile Iterator<KeyDirectory> iterator;
129+
130+
private volatile KeyDirectory current;
88131

132+
private volatile boolean initialized;
89133

90134
public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
91135
boolean fair) {
92-
super(factory, keyDirectories, fair);
136+
137+
Assert.notNull(factory, "factory cannot be null");
138+
Assert.notNull(keyDirectories, "keyDirectories cannot be null");
139+
Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
140+
this.factory = factory;
141+
this.keyDirectories.addAll(keyDirectories);
142+
this.fair = fair;
143+
this.iterator = this.keyDirectories.iterator();
144+
}
145+
146+
protected Iterator<KeyDirectory> getIterator() {
147+
return this.iterator;
148+
}
149+
150+
protected void setIterator(Iterator<KeyDirectory> iterator) {
151+
this.iterator = iterator;
152+
}
153+
154+
protected boolean isInitialized() {
155+
return this.initialized;
156+
}
157+
158+
protected void setInitialized(boolean initialized) {
159+
this.initialized = initialized;
160+
}
161+
162+
protected DelegatingSessionFactory<?> getFactory() {
163+
return this.factory;
164+
}
165+
166+
protected List<KeyDirectory> getKeyDirectories() {
167+
return this.keyDirectories;
168+
}
169+
170+
protected boolean isFair() {
171+
return this.fair;
172+
}
173+
174+
protected KeyDirectory getCurrent() {
175+
return this.current;
176+
}
177+
178+
@Override
179+
public void beforeReceive(MessageSource<?> source) {
180+
if (this.fair || !this.initialized) {
181+
configureSource(source);
182+
this.initialized = true;
183+
}
184+
if (this.logger.isTraceEnabled()) {
185+
this.logger.trace("Next poll is for " + this.current);
186+
}
187+
this.factory.setThreadKey(this.current.getKey());
93188
}
94189

95190
@Override
96-
protected void onRotation(MessageSource<?> source) {
191+
public void afterReceive(boolean messageReceived, MessageSource<?> source) {
192+
if (this.logger.isTraceEnabled()) {
193+
this.logger.trace("Poll produced "
194+
+ (messageReceived ? "a" : "no")
195+
+ " message");
196+
}
197+
this.factory.clearThreadKey();
198+
if (!this.fair && !messageReceived) {
199+
configureSource(source);
200+
}
201+
}
202+
203+
protected void configureSource(MessageSource<?> source) {
97204
Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
98205
|| source instanceof AbstractRemoteFileStreamingMessageSource,
99206
"source must be an AbstractInboundFileSynchronizingMessageSource or a "
100207
+ "AbstractRemoteFileStreamingMessageSource");
101-
208+
if (!this.iterator.hasNext()) {
209+
this.iterator = this.keyDirectories.iterator();
210+
}
211+
this.current = this.iterator.next();
102212
if (source instanceof AbstractRemoteFileStreamingMessageSource) {
103-
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(getCurrent().getDirectory());
213+
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.current.getDirectory());
104214
}
105215
else {
106216
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
107-
.setRemoteDirectory(getCurrent().getDirectory());
217+
.setRemoteDirectory(this.current.getDirectory());
108218
}
109219
}
110220

111221
}
222+
223+
/**
224+
* A {@link DelegatingSessionFactory} key/directory pair.
225+
*/
226+
public static class KeyDirectory {
227+
228+
private final Object key;
229+
230+
private final String directory;
231+
232+
public KeyDirectory(Object key, String directory) {
233+
Assert.notNull(key, "key cannot be null");
234+
Assert.notNull(directory, "directory cannot be null");
235+
this.key = key;
236+
this.directory = directory;
237+
}
238+
239+
public Object getKey() {
240+
return this.key;
241+
}
242+
243+
public String getDirectory() {
244+
return this.directory;
245+
}
246+
247+
@Override
248+
public String toString() {
249+
return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]";
250+
}
251+
252+
}
253+
112254
}

0 commit comments

Comments
 (0)