Skip to content

Commit 27353ed

Browse files
artembilangaryrussell
authored andcommitted
Add Reactive mode for AbstractPollingEndpoint (#2429)
* Add Reactive mode for AbstractPollingEndpoint * When `SourcePollingChannelAdapter.outputChannel` is a `ReactiveStreamsSubscribableChannel`, use `Flux.generate()` for polling * Refactor `AbstractPollingEndpoint` to remove redundant `Poller` class in favor of lambda * Extract `pollForMessage()` method to handle TX states instead of `Poller` class previously * * Rebase and fix conflicts * Polishing for GatewayProxyFactoryBeanTests
1 parent 76b1d1d commit 27353ed

File tree

9 files changed

+346
-157
lines changed

9 files changed

+346
-157
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

+162-101
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19+
import java.time.Duration;
1920
import java.util.Collection;
21+
import java.util.Date;
2022
import java.util.List;
2123
import java.util.concurrent.Callable;
2224
import java.util.concurrent.Executor;
2325
import java.util.concurrent.ScheduledFuture;
2426
import java.util.stream.Collectors;
2527

2628
import org.aopalliance.aop.Advice;
29+
import org.reactivestreams.Subscription;
2730

2831
import org.springframework.aop.framework.ProxyFactory;
2932
import org.springframework.beans.factory.BeanClassLoaderAware;
@@ -43,6 +46,7 @@
4346
import org.springframework.messaging.MessagingException;
4447
import org.springframework.scheduling.Trigger;
4548
import org.springframework.scheduling.support.PeriodicTrigger;
49+
import org.springframework.scheduling.support.SimpleTriggerContext;
4650
import org.springframework.transaction.interceptor.TransactionInterceptor;
4751
import org.springframework.transaction.support.TransactionSynchronization;
4852
import org.springframework.transaction.support.TransactionSynchronizationManager;
@@ -51,6 +55,10 @@
5155
import org.springframework.util.CollectionUtils;
5256
import org.springframework.util.ErrorHandler;
5357

58+
import reactor.core.publisher.Flux;
59+
import reactor.core.publisher.Mono;
60+
import reactor.core.scheduler.Schedulers;
61+
5462
/**
5563
* @author Mark Fisher
5664
* @author Oleg Zhurakousky
@@ -66,23 +74,27 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement
6674

6775
private boolean syncExecutor = true;
6876

77+
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
78+
79+
private Trigger trigger = new PeriodicTrigger(10);
80+
81+
private long maxMessagesPerPoll = -1;
82+
6983
private ErrorHandler errorHandler;
7084

7185
private boolean errorHandlerIsDefault;
7286

73-
private Trigger trigger = new PeriodicTrigger(10);
74-
7587
private List<Advice> adviceChain;
7688

77-
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
89+
private TransactionSynchronizationFactory transactionSynchronizationFactory;
7890

79-
private long maxMessagesPerPoll = -1;
91+
private volatile Callable<Message<?>> pollingTask;
8092

81-
private TransactionSynchronizationFactory transactionSynchronizationFactory;
93+
private volatile Flux<Message<?>> pollingFlux;
8294

83-
private volatile ScheduledFuture<?> runningTask;
95+
private volatile Subscription subscription;
8496

85-
private volatile Runnable poller;
97+
private volatile ScheduledFuture<?> runningTask;
8698

8799
private volatile boolean initialized;
88100

@@ -167,6 +179,14 @@ protected boolean isReceiveOnlyAdvice(Advice advice) {
167179
protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
168180
}
169181

182+
protected boolean isReactive() {
183+
return false;
184+
}
185+
186+
protected Flux<Message<?>> getPollingFlux() {
187+
return this.pollingFlux;
188+
}
189+
170190
@Override
171191
protected void onInit() {
172192
synchronized (this.initializationMonitor) {
@@ -200,16 +220,38 @@ protected void onInit() {
200220
}
201221
}
202222

223+
// LifecycleSupport implementation
224+
225+
@Override // guarded by super#lifecycleLock
226+
protected void doStart() {
227+
if (!this.initialized) {
228+
onInit();
229+
}
230+
231+
this.pollingTask = createPollingTask();
232+
233+
if (isReactive()) {
234+
this.pollingFlux = createFluxGenerator();
235+
}
236+
else {
237+
Assert.state(getTaskScheduler() != null, "unable to start polling, no taskScheduler available");
238+
239+
this.runningTask =
240+
getTaskScheduler()
241+
.schedule(createPoller(), this.trigger);
242+
}
243+
}
244+
203245
@SuppressWarnings("unchecked")
204-
private Runnable createPoller() throws Exception {
246+
private Callable<Message<?>> createPollingTask() {
205247
List<Advice> receiveOnlyAdviceChain = null;
206248
if (!CollectionUtils.isEmpty(this.adviceChain)) {
207249
receiveOnlyAdviceChain = this.adviceChain.stream()
208250
.filter(this::isReceiveOnlyAdvice)
209251
.collect(Collectors.toList());
210252
}
211253

212-
Callable<Boolean> pollingTask = this::doPoll;
254+
Callable<Message<?>> pollingTask = this::doPoll;
213255

214256
List<Advice> adviceChain = this.adviceChain;
215257
if (!CollectionUtils.isEmpty(adviceChain)) {
@@ -219,65 +261,122 @@ private Runnable createPoller() throws Exception {
219261
.filter(advice -> !isReceiveOnlyAdvice(advice))
220262
.forEach(proxyFactory::addAdvice);
221263
}
222-
pollingTask = (Callable<Boolean>) proxyFactory.getProxy(this.beanClassLoader);
264+
pollingTask = (Callable<Message<?>>) proxyFactory.getProxy(this.beanClassLoader);
223265
}
224266
if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) {
225267
applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain);
226268
}
227-
return new Poller(pollingTask);
269+
270+
return pollingTask;
228271
}
229272

230-
// LifecycleSupport implementation
273+
private Runnable createPoller() {
274+
return () ->
275+
this.taskExecutor.execute(() -> {
276+
int count = 0;
277+
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
278+
if (pollForMessage() == null) {
279+
break;
280+
}
281+
count++;
282+
}
283+
});
284+
}
231285

232-
@Override // guarded by super#lifecycleLock
233-
protected void doStart() {
234-
if (!this.initialized) {
235-
this.onInit();
236-
}
237-
Assert.state(this.getTaskScheduler() != null,
238-
"unable to start polling, no taskScheduler available");
286+
private Flux<Message<?>> createFluxGenerator() {
287+
SimpleTriggerContext triggerContext = new SimpleTriggerContext();
288+
289+
return Flux
290+
.<Duration>generate(sink -> {
291+
Date date = this.trigger.nextExecutionTime(triggerContext);
292+
if (date != null) {
293+
triggerContext.update(date, null, null);
294+
long millis = date.getTime() - System.currentTimeMillis();
295+
sink.next(Duration.ofMillis(millis));
296+
}
297+
else {
298+
sink.complete();
299+
}
300+
})
301+
.concatMap(duration ->
302+
Mono.delay(duration)
303+
.doOnNext(l ->
304+
triggerContext.update(triggerContext.lastScheduledExecutionTime(),
305+
new Date(), null))
306+
.flatMapMany(l ->
307+
Flux
308+
.<Message<?>>generate(fluxSink -> {
309+
Message<?> message = pollForMessage();
310+
if (message != null) {
311+
fluxSink.next(message);
312+
}
313+
else {
314+
fluxSink.complete();
315+
}
316+
})
317+
.take(this.maxMessagesPerPoll)
318+
.subscribeOn(Schedulers.fromExecutor(this.taskExecutor))
319+
.doOnComplete(() ->
320+
triggerContext.update(triggerContext.lastScheduledExecutionTime(),
321+
triggerContext.lastActualExecutionTime(),
322+
new Date())
323+
)), 1)
324+
.repeat(this::isRunning)
325+
.doOnSubscribe(subscription -> this.subscription = subscription);
326+
}
327+
328+
private Message<?> pollForMessage() {
239329
try {
240-
this.poller = createPoller();
330+
return this.pollingTask.call();
241331
}
242332
catch (Exception e) {
243-
this.initialized = false;
244-
throw new MessagingException("Failed to create Poller", e);
333+
if (e instanceof MessagingException) {
334+
throw (MessagingException) e;
335+
}
336+
else {
337+
Message<?> failedMessage = null;
338+
if (this.transactionSynchronizationFactory != null) {
339+
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
340+
if (resource instanceof IntegrationResourceHolder) {
341+
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
342+
}
343+
}
344+
throw new MessagingException(failedMessage, e);
345+
}
245346
}
246-
this.runningTask = this.getTaskScheduler().schedule(this.poller, this.trigger);
247-
}
248-
249-
@Override // guarded by super#lifecycleLock
250-
protected void doStop() {
251-
if (this.runningTask != null) {
252-
this.runningTask.cancel(true);
347+
finally {
348+
if (this.transactionSynchronizationFactory != null) {
349+
Object resource = getResourceToBind();
350+
if (TransactionSynchronizationManager.hasResource(resource)) {
351+
TransactionSynchronizationManager.unbindResource(resource);
352+
}
353+
}
253354
}
254-
this.runningTask = null;
255355
}
256356

257-
private boolean doPoll() {
258-
IntegrationResourceHolder holder = this.bindResourceHolderIfNecessary(
259-
this.getResourceKey(), this.getResourceToBind());
260-
Message<?> message = null;
357+
private Message<?> doPoll() {
358+
IntegrationResourceHolder holder = bindResourceHolderIfNecessary(getResourceKey(), getResourceToBind());
359+
Message<?> message;
261360
try {
262-
message = this.receiveMessage();
361+
message = receiveMessage();
263362
}
264363
catch (Exception e) {
265364
if (Thread.interrupted()) {
266365
if (logger.isDebugEnabled()) {
267366
logger.debug("Poll interrupted - during stop()? : " + e.getMessage());
268367
}
269-
return false;
368+
return null;
270369
}
271370
else {
272371
throw (RuntimeException) e;
273372
}
274373
}
275-
boolean result;
374+
276375
if (message == null) {
277376
if (this.logger.isDebugEnabled()) {
278377
this.logger.debug("Received no Message during the poll, returning 'false'");
279378
}
280-
result = false;
379+
return null;
281380
}
282381
else {
283382
if (this.logger.isDebugEnabled()) {
@@ -286,20 +385,35 @@ private boolean doPoll() {
286385
if (holder != null) {
287386
holder.setMessage(message);
288387
}
289-
try {
290-
this.handleMessage(message);
291-
}
292-
catch (Exception e) {
293-
if (e instanceof MessagingException) {
294-
throw new MessagingExceptionWrapper(message, (MessagingException) e);
388+
389+
if (!isReactive()) {
390+
try {
391+
handleMessage(message);
295392
}
296-
else {
297-
throw new MessagingException(message, e);
393+
catch (Exception e) {
394+
if (e instanceof MessagingException) {
395+
throw new MessagingExceptionWrapper(message, (MessagingException) e);
396+
}
397+
else {
398+
throw new MessagingException(message, e);
399+
}
298400
}
299401
}
300-
result = true;
301402
}
302-
return result;
403+
404+
return message;
405+
}
406+
407+
@Override // guarded by super#lifecycleLock
408+
protected void doStop() {
409+
if (this.runningTask != null) {
410+
this.runningTask.cancel(true);
411+
}
412+
this.runningTask = null;
413+
414+
if (this.subscription != null) {
415+
this.subscription.cancel();
416+
}
303417
}
304418

305419
/**
@@ -369,57 +483,4 @@ private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Obje
369483
return null;
370484
}
371485

372-
/**
373-
* Default Poller implementation
374-
*/
375-
private final class Poller implements Runnable {
376-
377-
private final Callable<Boolean> pollingTask;
378-
379-
Poller(Callable<Boolean> pollingTask) {
380-
this.pollingTask = pollingTask;
381-
}
382-
383-
@Override
384-
public void run() {
385-
AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
386-
int count = 0;
387-
while (AbstractPollingEndpoint.this.initialized
388-
&& (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
389-
|| count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
390-
try {
391-
if (!Poller.this.pollingTask.call()) {
392-
break;
393-
}
394-
count++;
395-
}
396-
catch (Exception e) {
397-
if (e instanceof MessagingException) {
398-
throw (MessagingException) e;
399-
}
400-
else {
401-
Message<?> failedMessage = null;
402-
if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
403-
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
404-
if (resource instanceof IntegrationResourceHolder) {
405-
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
406-
}
407-
}
408-
throw new MessagingException(failedMessage, e);
409-
}
410-
}
411-
finally {
412-
if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
413-
Object resource = getResourceToBind();
414-
if (TransactionSynchronizationManager.hasResource(resource)) {
415-
TransactionSynchronizationManager.unbindResource(resource);
416-
}
417-
}
418-
}
419-
}
420-
});
421-
}
422-
423-
}
424-
425486
}

0 commit comments

Comments
 (0)