Skip to content

Commit 119975c

Browse files
committed
Add Reactive mode for AbstractPollingEndpoint
* When `SourcePollingChannelAdapter.outputChannel` is a `ReactiveStreamsSubscribableChannel`, use `Flux.generate()` for polling * Refactor `AbstractPollingEndpoint` to remove redundant `Poller` class in favor of lambda * Extract `pollForMessage()` method to handle TX states instead of `Poller` class previously
1 parent 000f297 commit 119975c

File tree

8 files changed

+322
-145
lines changed

8 files changed

+322
-145
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 167 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19+
import java.time.Duration;
1920
import java.util.Collection;
21+
import java.util.Date;
2022
import java.util.List;
2123
import java.util.concurrent.Callable;
2224
import java.util.concurrent.Executor;
2325
import java.util.concurrent.ScheduledFuture;
2426
import java.util.stream.Collectors;
2527

2628
import org.aopalliance.aop.Advice;
29+
import org.reactivestreams.Subscription;
2730

2831
import org.springframework.aop.framework.ProxyFactory;
2932
import org.springframework.beans.factory.BeanClassLoaderAware;
@@ -43,6 +46,7 @@
4346
import org.springframework.messaging.MessagingException;
4447
import org.springframework.scheduling.Trigger;
4548
import org.springframework.scheduling.support.PeriodicTrigger;
49+
import org.springframework.scheduling.support.SimpleTriggerContext;
4650
import org.springframework.transaction.interceptor.TransactionInterceptor;
4751
import org.springframework.transaction.support.TransactionSynchronization;
4852
import org.springframework.transaction.support.TransactionSynchronizationManager;
@@ -51,6 +55,10 @@
5155
import org.springframework.util.CollectionUtils;
5256
import org.springframework.util.ErrorHandler;
5357

58+
import reactor.core.publisher.Flux;
59+
import reactor.core.publisher.Mono;
60+
import reactor.core.scheduler.Schedulers;
61+
5462
/**
5563
* @author Mark Fisher
5664
* @author Oleg Zhurakousky
@@ -60,29 +68,33 @@
6068
*/
6169
public abstract class AbstractPollingEndpoint extends AbstractEndpoint implements BeanClassLoaderAware {
6270

63-
private volatile Executor taskExecutor = new SyncTaskExecutor();
71+
private final Object initializationMonitor = new Object();
6472

65-
private volatile ErrorHandler errorHandler;
73+
private Executor taskExecutor = new SyncTaskExecutor();
6674

67-
private volatile boolean errorHandlerIsDefault;
75+
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
6876

69-
private volatile Trigger trigger = new PeriodicTrigger(10);
77+
private Trigger trigger = new PeriodicTrigger(10);
7078

71-
private volatile List<Advice> adviceChain;
79+
private long maxMessagesPerPoll = -1;
7280

73-
private volatile ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
81+
private ErrorHandler errorHandler;
7482

75-
private volatile ScheduledFuture<?> runningTask;
83+
private boolean errorHandlerIsDefault;
7684

77-
private volatile Runnable poller;
85+
private List<Advice> adviceChain;
7886

79-
private volatile boolean initialized;
87+
private TransactionSynchronizationFactory transactionSynchronizationFactory;
8088

81-
private volatile long maxMessagesPerPoll = -1;
89+
private volatile Callable<Message<?>> pollingTask;
8290

83-
private final Object initializationMonitor = new Object();
91+
private volatile Flux<Message<?>> pollingFlux;
92+
93+
private volatile Subscription subscription;
8494

85-
private volatile TransactionSynchronizationFactory transactionSynchronizationFactory;
95+
private volatile ScheduledFuture<?> runningTask;
96+
97+
private volatile boolean initialized;
8698

8799
public AbstractPollingEndpoint() {
88100
this.setPhase(Integer.MAX_VALUE / 2);
@@ -154,6 +166,14 @@ protected boolean isReceiveOnlyAdvice(Advice advice) {
154166
protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
155167
}
156168

169+
protected boolean isReactive() {
170+
return false;
171+
}
172+
173+
protected Flux<Message<?>> getPollingFlux() {
174+
return this.pollingFlux;
175+
}
176+
157177
@Override
158178
protected void onInit() {
159179
synchronized (this.initializationMonitor) {
@@ -187,16 +207,38 @@ protected void onInit() {
187207
}
188208
}
189209

210+
// LifecycleSupport implementation
211+
212+
@Override // guarded by super#lifecycleLock
213+
protected void doStart() {
214+
if (!this.initialized) {
215+
onInit();
216+
}
217+
218+
this.pollingTask = createPollingTask();
219+
220+
if (isReactive()) {
221+
this.pollingFlux = createFluxGenerator();
222+
}
223+
else {
224+
Assert.state(getTaskScheduler() != null, "unable to start polling, no taskScheduler available");
225+
226+
this.runningTask =
227+
getTaskScheduler()
228+
.schedule(createPoller(), this.trigger);
229+
}
230+
}
231+
190232
@SuppressWarnings("unchecked")
191-
private Runnable createPoller() throws Exception {
233+
private Callable<Message<?>> createPollingTask() {
192234
List<Advice> receiveOnlyAdviceChain = null;
193235
if (!CollectionUtils.isEmpty(this.adviceChain)) {
194236
receiveOnlyAdviceChain = this.adviceChain.stream()
195237
.filter(this::isReceiveOnlyAdvice)
196238
.collect(Collectors.toList());
197239
}
198240

199-
Callable<Boolean> pollingTask = this::doPoll;
241+
Callable<Message<?>> pollingTask = this::doPoll;
200242

201243
List<Advice> adviceChain = this.adviceChain;
202244
if (!CollectionUtils.isEmpty(adviceChain)) {
@@ -206,65 +248,122 @@ private Runnable createPoller() throws Exception {
206248
.filter(advice -> !isReceiveOnlyAdvice(advice))
207249
.forEach(proxyFactory::addAdvice);
208250
}
209-
pollingTask = (Callable<Boolean>) proxyFactory.getProxy(this.beanClassLoader);
251+
pollingTask = (Callable<Message<?>>) proxyFactory.getProxy(this.beanClassLoader);
210252
}
211253
if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) {
212254
applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain);
213255
}
214-
return new Poller(pollingTask);
256+
257+
return pollingTask;
215258
}
216259

217-
// LifecycleSupport implementation
260+
private Runnable createPoller() {
261+
return () ->
262+
this.taskExecutor.execute(() -> {
263+
int count = 0;
264+
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
265+
if (pollForMessage() == null) {
266+
break;
267+
}
268+
count++;
269+
}
270+
});
271+
}
218272

219-
@Override // guarded by super#lifecycleLock
220-
protected void doStart() {
221-
if (!this.initialized) {
222-
this.onInit();
223-
}
224-
Assert.state(this.getTaskScheduler() != null,
225-
"unable to start polling, no taskScheduler available");
273+
private Flux<Message<?>> createFluxGenerator() {
274+
SimpleTriggerContext triggerContext = new SimpleTriggerContext();
275+
276+
return Flux
277+
.<Duration>generate(sink -> {
278+
Date date = this.trigger.nextExecutionTime(triggerContext);
279+
if (date != null) {
280+
triggerContext.update(date, null, null);
281+
long millis = date.getTime() - System.currentTimeMillis();
282+
sink.next(Duration.ofMillis(millis));
283+
}
284+
else {
285+
sink.complete();
286+
}
287+
})
288+
.concatMap(duration ->
289+
Mono.delay(duration)
290+
.doOnNext(l ->
291+
triggerContext.update(triggerContext.lastScheduledExecutionTime(),
292+
new Date(), null))
293+
.flatMapMany(l ->
294+
Flux
295+
.<Message<?>>generate(fluxSink -> {
296+
Message<?> message = pollForMessage();
297+
if (message != null) {
298+
fluxSink.next(message);
299+
}
300+
else {
301+
fluxSink.complete();
302+
}
303+
})
304+
.take(this.maxMessagesPerPoll)
305+
.subscribeOn(Schedulers.fromExecutor(this.taskExecutor))
306+
.doOnComplete(() ->
307+
triggerContext.update(triggerContext.lastScheduledExecutionTime(),
308+
triggerContext.lastActualExecutionTime(),
309+
new Date())
310+
)), 1)
311+
.repeat(this::isRunning)
312+
.doOnSubscribe(subscription -> this.subscription = subscription);
313+
}
314+
315+
private Message<?> pollForMessage() {
226316
try {
227-
this.poller = createPoller();
317+
return this.pollingTask.call();
228318
}
229319
catch (Exception e) {
230-
this.initialized = false;
231-
throw new MessagingException("Failed to create Poller", e);
320+
if (e instanceof MessagingException) {
321+
throw (MessagingException) e;
322+
}
323+
else {
324+
Message<?> failedMessage = null;
325+
if (this.transactionSynchronizationFactory != null) {
326+
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
327+
if (resource instanceof IntegrationResourceHolder) {
328+
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
329+
}
330+
}
331+
throw new MessagingException(failedMessage, e);
332+
}
232333
}
233-
this.runningTask = this.getTaskScheduler().schedule(this.poller, this.trigger);
234-
}
235-
236-
@Override // guarded by super#lifecycleLock
237-
protected void doStop() {
238-
if (this.runningTask != null) {
239-
this.runningTask.cancel(true);
334+
finally {
335+
if (this.transactionSynchronizationFactory != null) {
336+
Object resource = getResourceToBind();
337+
if (TransactionSynchronizationManager.hasResource(resource)) {
338+
TransactionSynchronizationManager.unbindResource(resource);
339+
}
340+
}
240341
}
241-
this.runningTask = null;
242342
}
243343

244-
private boolean doPoll() {
245-
IntegrationResourceHolder holder = this.bindResourceHolderIfNecessary(
246-
this.getResourceKey(), this.getResourceToBind());
247-
Message<?> message = null;
344+
private Message<?> doPoll() {
345+
IntegrationResourceHolder holder = bindResourceHolderIfNecessary(getResourceKey(), getResourceToBind());
346+
Message<?> message;
248347
try {
249-
message = this.receiveMessage();
348+
message = receiveMessage();
250349
}
251350
catch (Exception e) {
252351
if (Thread.interrupted()) {
253352
if (logger.isDebugEnabled()) {
254353
logger.debug("Poll interrupted - during stop()? : " + e.getMessage());
255354
}
256-
return false;
355+
return null;
257356
}
258357
else {
259358
throw (RuntimeException) e;
260359
}
261360
}
262-
boolean result;
361+
263362
if (message == null) {
264363
if (this.logger.isDebugEnabled()) {
265364
this.logger.debug("Received no Message during the poll, returning 'false'");
266365
}
267-
result = false;
366+
return null;
268367
}
269368
else {
270369
if (this.logger.isDebugEnabled()) {
@@ -273,20 +372,35 @@ private boolean doPoll() {
273372
if (holder != null) {
274373
holder.setMessage(message);
275374
}
276-
try {
277-
this.handleMessage(message);
278-
}
279-
catch (Exception e) {
280-
if (e instanceof MessagingException) {
281-
throw new MessagingExceptionWrapper(message, (MessagingException) e);
375+
376+
if (!isReactive()) {
377+
try {
378+
handleMessage(message);
282379
}
283-
else {
284-
throw new MessagingException(message, e);
380+
catch (Exception e) {
381+
if (e instanceof MessagingException) {
382+
throw new MessagingExceptionWrapper(message, (MessagingException) e);
383+
}
384+
else {
385+
throw new MessagingException(message, e);
386+
}
285387
}
286388
}
287-
result = true;
288389
}
289-
return result;
390+
391+
return message;
392+
}
393+
394+
@Override // guarded by super#lifecycleLock
395+
protected void doStop() {
396+
if (this.runningTask != null) {
397+
this.runningTask.cancel(true);
398+
}
399+
this.runningTask = null;
400+
401+
if (this.subscription != null) {
402+
this.subscription.cancel();
403+
}
290404
}
291405

292406
/**
@@ -356,57 +470,4 @@ private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Obje
356470
return null;
357471
}
358472

359-
/**
360-
* Default Poller implementation
361-
*/
362-
private final class Poller implements Runnable {
363-
364-
private final Callable<Boolean> pollingTask;
365-
366-
Poller(Callable<Boolean> pollingTask) {
367-
this.pollingTask = pollingTask;
368-
}
369-
370-
@Override
371-
public void run() {
372-
AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
373-
int count = 0;
374-
while (AbstractPollingEndpoint.this.initialized
375-
&& (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
376-
|| count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
377-
try {
378-
if (!Poller.this.pollingTask.call()) {
379-
break;
380-
}
381-
count++;
382-
}
383-
catch (Exception e) {
384-
if (e instanceof MessagingException) {
385-
throw (MessagingException) e;
386-
}
387-
else {
388-
Message<?> failedMessage = null;
389-
if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
390-
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
391-
if (resource instanceof IntegrationResourceHolder) {
392-
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
393-
}
394-
}
395-
throw new MessagingException(failedMessage, e);
396-
}
397-
}
398-
finally {
399-
if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
400-
Object resource = getResourceToBind();
401-
if (TransactionSynchronizationManager.hasResource(resource)) {
402-
TransactionSynchronizationManager.unbindResource(resource);
403-
}
404-
}
405-
}
406-
}
407-
});
408-
}
409-
410-
}
411-
412473
}

0 commit comments

Comments
 (0)