From de3169c47eed9f669a10e38475c139f28ff29423 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 21 Jun 2019 15:23:40 -0400 Subject: [PATCH 1/2] Fix `SmartLifecycle.stop(Runnable)` usage We always have to call `callback` in the `SmartLifecycle.stop(Runnable)` implementation independently of component state * Fix `StandardIntegrationFlow.stop(Runnable)` for a logic when we don't have any `this.lifecycles` * Remove those `stop(Runnable)` which are fully equivalent of the `default` on in the `SmartLifecycle` * Some other simple polishing for the affected classes, e.g. `isSingleton()` is `default` with `true` in the `InitializingBean` **Cherry-pick to 5.1.x** --- .../config/ConsumerEndpointFactoryBean.java | 15 ++-- ...ourcePollingChannelAdapterFactoryBean.java | 5 -- .../dsl/StandardIntegrationFlow.java | 23 +++-- .../history/MessageHistoryConfigurer.java | 8 +- .../store/MessageGroupStoreReaper.java | 25 ++---- .../leader/LockRegistryLeaderInitiator.java | 6 -- .../jms/config/JmsChannelFactoryBean.java | 83 +++++++++---------- .../stomp/AbstractStompSessionManager.java | 10 --- .../config/XmppConnectionFactoryBean.java | 6 -- .../config/CuratorFrameworkFactoryBean.java | 11 --- .../config/LeaderInitiatorFactoryBean.java | 5 -- .../zookeeper/leader/LeaderInitiator.java | 23 ++--- .../metadata/ZookeeperMetadataStore.java | 9 +- 13 files changed, 78 insertions(+), 151 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index 74ee3053bb6..5546fad9fe5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -246,11 +246,6 @@ private void adviceChain() { } } - @Override - public boolean isSingleton() { - return true; - } - @Override public AbstractEndpoint getObject() { if (!this.initialized) { @@ -302,8 +297,9 @@ else if (channel instanceof PollableChannel) { } private void eventDrivenConsumer(MessageChannel channel) { - Assert.isNull(this.pollerMetadata, "A poller should not be specified for endpoint '" + this.beanName - + "', since '" + channel + "' is a SubscribableChannel (not pollable)."); + Assert.isNull(this.pollerMetadata, + () -> "A poller should not be specified for endpoint '" + this.beanName + + "', since '" + channel + "' is a SubscribableChannel (not pollable)."); this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler); if (logger.isWarnEnabled() && Boolean.FALSE.equals(this.autoStartup) @@ -316,8 +312,9 @@ private void pollingConsumer(MessageChannel channel) { PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) channel, this.handler); if (this.pollerMetadata == null) { this.pollerMetadata = PollerMetadata.getDefaultPollerMetadata(this.beanFactory); - Assert.notNull(this.pollerMetadata, "No poller has been defined for endpoint '" + this.beanName - + "', and no default poller is available within the context."); + Assert.notNull(this.pollerMetadata, + () -> "No poller has been defined for endpoint '" + this.beanName + + "', and no default poller is available within the context."); } pollingConsumer.setTaskExecutor(this.pollerMetadata.getTaskExecutor()); pollingConsumer.setTrigger(this.pollerMetadata.getTrigger()); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/SourcePollingChannelAdapterFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/SourcePollingChannelAdapterFactoryBean.java index 3a52eaa3596..d8c831d0231 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/SourcePollingChannelAdapterFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/SourcePollingChannelAdapterFactoryBean.java @@ -157,11 +157,6 @@ public Class getObjectType() { return SourcePollingChannelAdapter.class; } - @Override - public boolean isSingleton() { - return true; - } - private void initializeAdapter() { synchronized (this.initializationMonitor) { if (this.initialized) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java index 84748ad0221..fc1839d34d6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java @@ -127,17 +127,22 @@ public void start() { @Override public void stop(Runnable callback) { - AggregatingCallback aggregatingCallback = new AggregatingCallback(this.lifecycles.size(), callback); - ListIterator iterator = this.lifecycles.listIterator(this.lifecycles.size()); - while (iterator.hasPrevious()) { - SmartLifecycle lifecycle = iterator.previous(); - if (lifecycle.isRunning()) { - lifecycle.stop(aggregatingCallback); - } - else { - aggregatingCallback.run(); + if (this.lifecycles.size() > 0) { + AggregatingCallback aggregatingCallback = new AggregatingCallback(this.lifecycles.size(), callback); + ListIterator iterator = this.lifecycles.listIterator(this.lifecycles.size()); + while (iterator.hasPrevious()) { + SmartLifecycle lifecycle = iterator.previous(); + if (lifecycle.isRunning()) { + lifecycle.stop(aggregatingCallback); + } + else { + aggregatingCallback.run(); + } } } + else { + callback.run(); + } this.running = false; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java b/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java index 72a8c512273..42f88ed1ad6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java @@ -52,7 +52,7 @@ @IntegrationManagedResource public class MessageHistoryConfigurer implements SmartLifecycle, BeanFactoryAware, DestructionAwareBeanPostProcessor { - private final Log logger = LogFactory.getLog(this.getClass()); + private final static Log logger = LogFactory.getLog(MessageHistoryConfigurer.class); private final Set currentlyTrackedComponents = ConcurrentHashMap.newKeySet(); @@ -230,12 +230,6 @@ public void stop() { } } - @Override - public void stop(Runnable callback) { - this.stop(); - callback.run(); - } - private static Collection getTrackableComponents(ListableBeanFactory beanFactory) { return BeanFactoryUtils.beansOfTypeIncludingAncestors(beanFactory, TrackableComponent.class).values(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStoreReaper.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStoreReaper.java index 7fa51c960d4..3160c48d999 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStoreReaper.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStoreReaper.java @@ -39,19 +39,19 @@ public class MessageGroupStoreReaper implements Runnable, DisposableBean, Initia private static Log logger = LogFactory.getLog(MessageGroupStoreReaper.class); + private final ReentrantLock lifecycleLock = new ReentrantLock(); + private MessageGroupStore messageGroupStore; private boolean expireOnDestroy = false; private long timeout = -1; - private volatile boolean running; - - private final ReentrantLock lifecycleLock = new ReentrantLock(); + private int phase = 0; - private volatile int phase = 0; + private boolean autoStartup = true; - private volatile boolean autoStartup = true; + private volatile boolean running; public MessageGroupStoreReaper(MessageGroupStore messageGroupStore) { this.messageGroupStore = messageGroupStore; @@ -63,7 +63,6 @@ public MessageGroupStoreReaper() { /** * Flag to indicate that the stores should be expired when this component is destroyed (i.e. usually when its * enclosing {@link org.springframework.context.ApplicationContext} is closed). - * * @param expireOnDestroy the flag value to set */ public void setExpireOnDestroy(boolean expireOnDestroy) { @@ -73,7 +72,6 @@ public void setExpireOnDestroy(boolean expireOnDestroy) { /** * Timeout in milliseconds (default -1). If negative then no groups ever time out. If greater than zero then all * groups older than that value are expired when this component is {@link #run()}. - * * @param timeout the timeout to set */ public void setTimeout(long timeout) { @@ -82,7 +80,6 @@ public void setTimeout(long timeout) { /** * A message group store to expire according the other configurations. - * * @param messageGroupStore the {@link MessageGroupStore} to set */ public void setMessageGroupStore(MessageGroupStore messageGroupStore) { @@ -187,16 +184,4 @@ public void setAutoStartup(boolean autoStartup) { this.autoStartup = autoStartup; } - @Override - public void stop(Runnable callback) { - this.lifecycleLock.lock(); - try { - this.stop(); - callback.run(); - } - finally { - this.lifecycleLock.unlock(); - } - } - } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java index 4a5596f7ecd..f41323b211a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/leader/LockRegistryLeaderInitiator.java @@ -308,12 +308,6 @@ public void destroy() { } } - @Override - public void stop(Runnable runnable) { - stop(); - runnable.run(); - } - /** * Stop the registration of the {@link #candidate} for leader election. If the * candidate is currently leader, its leadership will be revoked. diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java index f8433149fb4..7573d27c302 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsChannelFactoryBean.java @@ -51,6 +51,7 @@ * @author Oleg Zhurakousky * @author Gary Russell * @author Artem Bilan + * * @since 2.0 */ public class JmsChannelFactoryBean extends AbstractFactoryBean @@ -64,83 +65,83 @@ public class JmsChannelFactoryBean extends AbstractFactoryBean containerType; + private Class containerType; - private volatile boolean acceptMessagesWhileStopping; + private boolean acceptMessagesWhileStopping; - private volatile boolean autoStartup = true; + private boolean autoStartup = true; - private volatile String cacheLevelName; + private String cacheLevelName; - private volatile Integer cacheLevel; + private Integer cacheLevel; - private volatile String clientId; + private String clientId; - private volatile String concurrency; + private String concurrency; - private volatile Integer concurrentConsumers; + private Integer concurrentConsumers; - private volatile ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactory; - private volatile Destination destination; + private Destination destination; - private volatile String destinationName; + private String destinationName; - private volatile DestinationResolver destinationResolver; + private DestinationResolver destinationResolver; - private volatile String durableSubscriptionName; + private String durableSubscriptionName; - private volatile ErrorHandler errorHandler; + private ErrorHandler errorHandler; - private volatile ExceptionListener exceptionListener; + private ExceptionListener exceptionListener; - private volatile Boolean exposeListenerSession; + private Boolean exposeListenerSession; - private volatile Integer idleTaskExecutionLimit; + private Integer idleTaskExecutionLimit; - private volatile Integer maxConcurrentConsumers; + private Integer maxConcurrentConsumers; - private volatile Integer maxMessagesPerTask; + private Integer maxMessagesPerTask; - private volatile String messageSelector; + private String messageSelector; - private volatile Integer phase; + private Integer phase; - private volatile Boolean pubSubDomain; + private Boolean pubSubDomain; - private volatile boolean pubSubNoLocal; + private boolean pubSubNoLocal; - private volatile Long receiveTimeout; + private Long receiveTimeout; - private volatile Long recoveryInterval; + private Long recoveryInterval; - private volatile String beanName; + private String beanName; - private volatile boolean subscriptionShared; + private boolean subscriptionShared; /** * This value differs from the container implementations' default (which is AUTO_ACKNOWLEDGE) */ - private volatile int sessionAcknowledgeMode = Session.SESSION_TRANSACTED; + private int sessionAcknowledgeMode = Session.SESSION_TRANSACTED; /** * This value differs from the container implementations' default (which is false). */ - private volatile boolean sessionTransacted = true; + private boolean sessionTransacted = true; - private volatile boolean subscriptionDurable; + private boolean subscriptionDurable; - private volatile Executor taskExecutor; + private Executor taskExecutor; - private volatile PlatformTransactionManager transactionManager; + private PlatformTransactionManager transactionManager; - private volatile String transactionName; + private String transactionName; - private volatile Integer transactionTimeout; + private Integer transactionTimeout; - private volatile int maxSubscribers = Integer.MAX_VALUE; + private int maxSubscribers = Integer.MAX_VALUE; public JmsChannelFactoryBean() { @@ -204,8 +205,7 @@ public void setAutoStartup(boolean autoStartup) { public void setCacheLevelName(String cacheLevelName) { Assert.isTrue(this.messageDriven, "'cacheLevelName' is allowed only in case of 'messageDriven = true'"); - Assert.state(this.cacheLevel == null, - "'cacheLevelName' and 'cacheLevel' are mutually exclusive"); + Assert.state(this.cacheLevel == null, "'cacheLevelName' and 'cacheLevel' are mutually exclusive"); this.cacheLevelName = cacheLevelName; } @@ -376,7 +376,8 @@ protected AbstractJmsChannel createInstance() { this.initializeJmsTemplate(); if (this.messageDriven) { this.listenerContainer = createContainer(); - SubscribableJmsChannel subscribableJmsChannel = new SubscribableJmsChannel(this.listenerContainer, this.jmsTemplate); + SubscribableJmsChannel subscribableJmsChannel = + new SubscribableJmsChannel(this.listenerContainer, this.jmsTemplate); subscribableJmsChannel.setMaxSubscribers(this.maxSubscribers); this.channel = subscribableJmsChannel; } @@ -519,9 +520,7 @@ public void stop(Runnable callback) { @Override protected void destroyInstance(AbstractJmsChannel instance) { - if (instance instanceof SubscribableJmsChannel) { - ((SubscribableJmsChannel) this.channel).destroy(); - } + instance.destroy(); } } diff --git a/spring-integration-stomp/src/main/java/org/springframework/integration/stomp/AbstractStompSessionManager.java b/spring-integration-stomp/src/main/java/org/springframework/integration/stomp/AbstractStompSessionManager.java index 887d467aa80..521827f08b0 100644 --- a/spring-integration-stomp/src/main/java/org/springframework/integration/stomp/AbstractStompSessionManager.java +++ b/spring-integration-stomp/src/main/java/org/springframework/integration/stomp/AbstractStompSessionManager.java @@ -306,16 +306,6 @@ public void start() { } } - @Override - public void stop(Runnable callback) { - synchronized (this.lifecycleMonitor) { - stop(); - if (callback != null) { - callback.run(); - } - } - } - @Override public void stop() { synchronized (this.lifecycleMonitor) { diff --git a/spring-integration-xmpp/src/main/java/org/springframework/integration/xmpp/config/XmppConnectionFactoryBean.java b/spring-integration-xmpp/src/main/java/org/springframework/integration/xmpp/config/XmppConnectionFactoryBean.java index 28db04e4dbc..aeed7f5b3ea 100644 --- a/spring-integration-xmpp/src/main/java/org/springframework/integration/xmpp/config/XmppConnectionFactoryBean.java +++ b/spring-integration-xmpp/src/main/java/org/springframework/integration/xmpp/config/XmppConnectionFactoryBean.java @@ -207,12 +207,6 @@ public void stop() { } } - @Override - public void stop(Runnable callback) { - stop(); - callback.run(); - } - @Override public boolean isRunning() { return this.running; diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/CuratorFrameworkFactoryBean.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/CuratorFrameworkFactoryBean.java index 2042a90e4fb..5feab91b244 100644 --- a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/CuratorFrameworkFactoryBean.java +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/CuratorFrameworkFactoryBean.java @@ -129,12 +129,6 @@ public void stop() { } } - @Override - public void stop(Runnable runnable) { - stop(); - runnable.run(); - } - @Override public CuratorFramework getObject() { return this.client; @@ -145,9 +139,4 @@ public Class getObjectType() { return CuratorFramework.class; } - @Override - public boolean isSingleton() { - return true; - } - } diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/LeaderInitiatorFactoryBean.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/LeaderInitiatorFactoryBean.java index 956eafc6dab..7b2c1dbb153 100644 --- a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/LeaderInitiatorFactoryBean.java +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/config/LeaderInitiatorFactoryBean.java @@ -166,9 +166,4 @@ public Class getObjectType() { return LeaderInitiator.class; } - @Override - public boolean isSingleton() { - return true; - } - } diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/LeaderInitiator.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/LeaderInitiator.java index 8e2e5fe01bc..342fafd24e7 100644 --- a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/LeaderInitiator.java +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/leader/LeaderInitiator.java @@ -96,15 +96,18 @@ public String getRole() { */ private volatile boolean running; - /** Base path in a zookeeper */ + /** + * Base path in a zookeeper + */ private final String namespace; - /** Leader event publisher if set */ + /** + * Leader event publisher if set + */ private volatile LeaderEventPublisher leaderEventPublisher; /** * Construct a {@link LeaderInitiator}. - * * @param client Curator client * @param candidate leadership election candidate */ @@ -114,7 +117,6 @@ public LeaderInitiator(CuratorFramework client, Candidate candidate) { /** * Construct a {@link LeaderInitiator}. - * * @param client Curator client * @param candidate leadership election candidate * @param namespace namespace base path in zookeeper @@ -199,15 +201,8 @@ public void stop() { } } - @Override - public void stop(Runnable runnable) { - stop(); - runnable.run(); - } - /** * Sets the {@link LeaderEventPublisher}. - * * @param leaderEventPublisher the event publisher */ public void setLeaderEventPublisher(LeaderEventPublisher leaderEventPublisher) { @@ -230,13 +225,12 @@ public Context getContext() { * @return the ZooKeeper path used for leadership election by Curator */ private String buildLeaderPath() { - String ns = StringUtils.hasText(this.namespace) ? this.namespace : DEFAULT_NAMESPACE; if (!ns.startsWith("/")) { - ns = "/" + ns; + ns = '/' + ns; } if (!ns.endsWith("/")) { - ns = ns + "/"; + ns = ns + '/'; } return ns + this.candidate.getRole(); } @@ -283,6 +277,7 @@ public void takeLeadership(CuratorFramework framework) { } } } + } /** diff --git a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/metadata/ZookeeperMetadataStore.java b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/metadata/ZookeeperMetadataStore.java index 381c6e30da3..0d04b9714ba 100644 --- a/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/metadata/ZookeeperMetadataStore.java +++ b/spring-integration-zookeeper/src/main/java/org/springframework/integration/zookeeper/metadata/ZookeeperMetadataStore.java @@ -16,6 +16,7 @@ package org.springframework.integration.zookeeper.metadata; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -66,7 +67,7 @@ public class ZookeeperMetadataStore implements ListenableMetadataStore, SmartLif private String root = "/SpringIntegration-MetadataStore"; - private String encoding = "UTF-8"; + private String encoding = StandardCharsets.UTF_8.name(); private PathChildrenCache cache; @@ -306,12 +307,6 @@ public void stop() { } } - @Override - public void stop(Runnable callback) { - stop(); - callback.run(); - } - @Override public boolean isRunning() { return this.running; From 6b009bdff062fb85ec0773001a62d348f05f10d9 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 21 Jun 2019 16:22:50 -0400 Subject: [PATCH 2/2] * Fix checkstyle violation --- .../integration/history/MessageHistoryConfigurer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java b/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java index 42f88ed1ad6..575478a2870 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistoryConfigurer.java @@ -52,7 +52,7 @@ @IntegrationManagedResource public class MessageHistoryConfigurer implements SmartLifecycle, BeanFactoryAware, DestructionAwareBeanPostProcessor { - private final static Log logger = LogFactory.getLog(MessageHistoryConfigurer.class); + private static final Log logger = LogFactory.getLog(MessageHistoryConfigurer.class); private final Set currentlyTrackedComponents = ConcurrentHashMap.newKeySet();