Skip to content

Fix SmartLifecycle.stop(Runnable) usage #2973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,6 @@ private void adviceChain() {
}
}

@Override
public boolean isSingleton() {
return true;
}

@Override
public AbstractEndpoint getObject() {
if (!this.initialized) {
Expand Down Expand Up @@ -302,8 +297,9 @@ else if (channel instanceof PollableChannel) {
}

private void eventDrivenConsumer(MessageChannel channel) {
Assert.isNull(this.pollerMetadata, "A poller should not be specified for endpoint '" + this.beanName
+ "', since '" + channel + "' is a SubscribableChannel (not pollable).");
Assert.isNull(this.pollerMetadata,
() -> "A poller should not be specified for endpoint '" + this.beanName
+ "', since '" + channel + "' is a SubscribableChannel (not pollable).");
this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler);
if (logger.isWarnEnabled()
&& Boolean.FALSE.equals(this.autoStartup)
Expand All @@ -316,8 +312,9 @@ private void pollingConsumer(MessageChannel channel) {
PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) channel, this.handler);
if (this.pollerMetadata == null) {
this.pollerMetadata = PollerMetadata.getDefaultPollerMetadata(this.beanFactory);
Assert.notNull(this.pollerMetadata, "No poller has been defined for endpoint '" + this.beanName
+ "', and no default poller is available within the context.");
Assert.notNull(this.pollerMetadata,
() -> "No poller has been defined for endpoint '" + this.beanName
+ "', and no default poller is available within the context.");
}
pollingConsumer.setTaskExecutor(this.pollerMetadata.getTaskExecutor());
pollingConsumer.setTrigger(this.pollerMetadata.getTrigger());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ public Class<?> getObjectType() {
return SourcePollingChannelAdapter.class;
}

@Override
public boolean isSingleton() {
return true;
}

private void initializeAdapter() {
synchronized (this.initializationMonitor) {
if (this.initialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,22 @@ public void start() {

@Override
public void stop(Runnable callback) {
AggregatingCallback aggregatingCallback = new AggregatingCallback(this.lifecycles.size(), callback);
ListIterator<SmartLifecycle> iterator = this.lifecycles.listIterator(this.lifecycles.size());
while (iterator.hasPrevious()) {
SmartLifecycle lifecycle = iterator.previous();
if (lifecycle.isRunning()) {
lifecycle.stop(aggregatingCallback);
}
else {
aggregatingCallback.run();
if (this.lifecycles.size() > 0) {
AggregatingCallback aggregatingCallback = new AggregatingCallback(this.lifecycles.size(), callback);
ListIterator<SmartLifecycle> iterator = this.lifecycles.listIterator(this.lifecycles.size());
while (iterator.hasPrevious()) {
SmartLifecycle lifecycle = iterator.previous();
if (lifecycle.isRunning()) {
lifecycle.stop(aggregatingCallback);
}
else {
aggregatingCallback.run();
}
}
}
else {
callback.run();
}
this.running = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
@IntegrationManagedResource
public class MessageHistoryConfigurer implements SmartLifecycle, BeanFactoryAware, DestructionAwareBeanPostProcessor {

private final Log logger = LogFactory.getLog(this.getClass());
private static final Log logger = LogFactory.getLog(MessageHistoryConfigurer.class);

private final Set<TrackableComponent> currentlyTrackedComponents = ConcurrentHashMap.newKeySet();

Expand Down Expand Up @@ -230,12 +230,6 @@ public void stop() {
}
}

@Override
public void stop(Runnable callback) {
this.stop();
callback.run();
}

private static Collection<TrackableComponent> getTrackableComponents(ListableBeanFactory beanFactory) {
return BeanFactoryUtils.beansOfTypeIncludingAncestors(beanFactory, TrackableComponent.class).values();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ public class MessageGroupStoreReaper implements Runnable, DisposableBean, Initia

private static Log logger = LogFactory.getLog(MessageGroupStoreReaper.class);

private final ReentrantLock lifecycleLock = new ReentrantLock();

private MessageGroupStore messageGroupStore;

private boolean expireOnDestroy = false;

private long timeout = -1;

private volatile boolean running;

private final ReentrantLock lifecycleLock = new ReentrantLock();
private int phase = 0;

private volatile int phase = 0;
private boolean autoStartup = true;

private volatile boolean autoStartup = true;
private volatile boolean running;

public MessageGroupStoreReaper(MessageGroupStore messageGroupStore) {
this.messageGroupStore = messageGroupStore;
Expand All @@ -63,7 +63,6 @@ public MessageGroupStoreReaper() {
/**
* Flag to indicate that the stores should be expired when this component is destroyed (i.e. usually when its
* enclosing {@link org.springframework.context.ApplicationContext} is closed).
*
* @param expireOnDestroy the flag value to set
*/
public void setExpireOnDestroy(boolean expireOnDestroy) {
Expand All @@ -73,7 +72,6 @@ public void setExpireOnDestroy(boolean expireOnDestroy) {
/**
* Timeout in milliseconds (default -1). If negative then no groups ever time out. If greater than zero then all
* groups older than that value are expired when this component is {@link #run()}.
*
* @param timeout the timeout to set
*/
public void setTimeout(long timeout) {
Expand All @@ -82,7 +80,6 @@ public void setTimeout(long timeout) {

/**
* A message group store to expire according the other configurations.
*
* @param messageGroupStore the {@link MessageGroupStore} to set
*/
public void setMessageGroupStore(MessageGroupStore messageGroupStore) {
Expand Down Expand Up @@ -187,16 +184,4 @@ public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

@Override
public void stop(Runnable callback) {
this.lifecycleLock.lock();
try {
this.stop();
callback.run();
}
finally {
this.lifecycleLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,6 @@ public void destroy() {
}
}

@Override
public void stop(Runnable runnable) {
stop();
runnable.run();
}

/**
* Stop the registration of the {@link #candidate} for leader election. If the
* candidate is currently leader, its leadership will be revoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.0
*/
public class JmsChannelFactoryBean extends AbstractFactoryBean<AbstractJmsChannel>
Expand All @@ -64,83 +65,83 @@ public class JmsChannelFactoryBean extends AbstractFactoryBean<AbstractJmsChanne

private final JmsTemplate jmsTemplate = new DynamicJmsTemplate();

private volatile AbstractMessageListenerContainer listenerContainer;
private AbstractMessageListenerContainer listenerContainer;

private volatile Class<? extends AbstractMessageListenerContainer> containerType;
private Class<? extends AbstractMessageListenerContainer> containerType;

private volatile boolean acceptMessagesWhileStopping;
private boolean acceptMessagesWhileStopping;

private volatile boolean autoStartup = true;
private boolean autoStartup = true;

private volatile String cacheLevelName;
private String cacheLevelName;

private volatile Integer cacheLevel;
private Integer cacheLevel;

private volatile String clientId;
private String clientId;

private volatile String concurrency;
private String concurrency;

private volatile Integer concurrentConsumers;
private Integer concurrentConsumers;

private volatile ConnectionFactory connectionFactory;
private ConnectionFactory connectionFactory;

private volatile Destination destination;
private Destination destination;

private volatile String destinationName;
private String destinationName;

private volatile DestinationResolver destinationResolver;
private DestinationResolver destinationResolver;

private volatile String durableSubscriptionName;
private String durableSubscriptionName;

private volatile ErrorHandler errorHandler;
private ErrorHandler errorHandler;

private volatile ExceptionListener exceptionListener;
private ExceptionListener exceptionListener;

private volatile Boolean exposeListenerSession;
private Boolean exposeListenerSession;

private volatile Integer idleTaskExecutionLimit;
private Integer idleTaskExecutionLimit;

private volatile Integer maxConcurrentConsumers;
private Integer maxConcurrentConsumers;

private volatile Integer maxMessagesPerTask;
private Integer maxMessagesPerTask;

private volatile String messageSelector;
private String messageSelector;

private volatile Integer phase;
private Integer phase;

private volatile Boolean pubSubDomain;
private Boolean pubSubDomain;

private volatile boolean pubSubNoLocal;
private boolean pubSubNoLocal;

private volatile Long receiveTimeout;
private Long receiveTimeout;

private volatile Long recoveryInterval;
private Long recoveryInterval;

private volatile String beanName;
private String beanName;

private volatile boolean subscriptionShared;
private boolean subscriptionShared;

/**
* This value differs from the container implementations' default (which is AUTO_ACKNOWLEDGE)
*/
private volatile int sessionAcknowledgeMode = Session.SESSION_TRANSACTED;
private int sessionAcknowledgeMode = Session.SESSION_TRANSACTED;

/**
* This value differs from the container implementations' default (which is false).
*/
private volatile boolean sessionTransacted = true;
private boolean sessionTransacted = true;

private volatile boolean subscriptionDurable;
private boolean subscriptionDurable;

private volatile Executor taskExecutor;
private Executor taskExecutor;

private volatile PlatformTransactionManager transactionManager;
private PlatformTransactionManager transactionManager;

private volatile String transactionName;
private String transactionName;

private volatile Integer transactionTimeout;
private Integer transactionTimeout;

private volatile int maxSubscribers = Integer.MAX_VALUE;
private int maxSubscribers = Integer.MAX_VALUE;


public JmsChannelFactoryBean() {
Expand Down Expand Up @@ -204,8 +205,7 @@ public void setAutoStartup(boolean autoStartup) {

public void setCacheLevelName(String cacheLevelName) {
Assert.isTrue(this.messageDriven, "'cacheLevelName' is allowed only in case of 'messageDriven = true'");
Assert.state(this.cacheLevel == null,
"'cacheLevelName' and 'cacheLevel' are mutually exclusive");
Assert.state(this.cacheLevel == null, "'cacheLevelName' and 'cacheLevel' are mutually exclusive");
this.cacheLevelName = cacheLevelName;
}

Expand Down Expand Up @@ -376,7 +376,8 @@ protected AbstractJmsChannel createInstance() {
this.initializeJmsTemplate();
if (this.messageDriven) {
this.listenerContainer = createContainer();
SubscribableJmsChannel subscribableJmsChannel = new SubscribableJmsChannel(this.listenerContainer, this.jmsTemplate);
SubscribableJmsChannel subscribableJmsChannel =
new SubscribableJmsChannel(this.listenerContainer, this.jmsTemplate);
subscribableJmsChannel.setMaxSubscribers(this.maxSubscribers);
this.channel = subscribableJmsChannel;
}
Expand Down Expand Up @@ -519,9 +520,7 @@ public void stop(Runnable callback) {

@Override
protected void destroyInstance(AbstractJmsChannel instance) {
if (instance instanceof SubscribableJmsChannel) {
((SubscribableJmsChannel) this.channel).destroy();
}
instance.destroy();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -306,16 +306,6 @@ public void start() {
}
}

@Override
public void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
if (callback != null) {
callback.run();
}
}
}

@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,6 @@ public void stop() {
}
}

@Override
public void stop(Runnable callback) {
stop();
callback.run();
}

@Override
public boolean isRunning() {
return this.running;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ public void stop() {
}
}

@Override
public void stop(Runnable runnable) {
stop();
runnable.run();
}

@Override
public CuratorFramework getObject() {
return this.client;
Expand All @@ -145,9 +139,4 @@ public Class<?> getObjectType() {
return CuratorFramework.class;
}

@Override
public boolean isSingleton() {
return true;
}

}
Loading