Skip to content

GH-3444: Add Custom TTL support for RedisLock, and JdbcLock #9053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP

private boolean sequenceAware;

private LockRegistry lockRegistry = new DefaultLockRegistry();
private LockRegistry<?> lockRegistry = new DefaultLockRegistry();

private boolean lockRegistrySet = false;

Expand Down Expand Up @@ -198,7 +198,7 @@ public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) {
this(processor, new SimpleMessageStore(0), null, null);
}

public void setLockRegistry(LockRegistry lockRegistry) {
public void setLockRegistry(LockRegistry<?> lockRegistry) {
Assert.isTrue(!this.lockRegistrySet, "'this.lockRegistry' can not be reset once its been set");
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
this.lockRegistry = lockRegistry;
Expand Down Expand Up @@ -516,7 +516,7 @@ protected boolean isSequenceAware() {
return this.sequenceAware;
}

protected LockRegistry getLockRegistry() {
protected LockRegistry<?> getLockRegistry() {
return this.lockRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class AggregatorFactoryBean extends AbstractSimpleMessageHandlerFactoryBe

private String outputChannelName;

private LockRegistry lockRegistry;
private LockRegistry<?> lockRegistry;

private MessageGroupStore messageStore;

Expand Down Expand Up @@ -125,7 +125,7 @@ public void setOutputChannelName(String outputChannelName) {
this.outputChannelName = outputChannelName;
}

public void setLockRegistry(LockRegistry lockRegistry) {
public void setLockRegistry(LockRegistry<?> lockRegistry) {
this.lockRegistry = lockRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public S forceReleaseAdvice(Advice... advice) {
* @param lockRegistry the {@link LockRegistry} to use.
* @return the endpoint spec.
*/
public S lockRegistry(LockRegistry lockRegistry) {
public S lockRegistry(LockRegistry<?> lockRegistry) {
Assert.notNull(lockRegistry, "'lockRegistry' must not be null.");
this.handler.setLockRegistry(lockRegistry);
return _this();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*/
public class LockRequestHandlerAdvice extends AbstractRequestHandlerAdvice {

private final LockRegistry lockRegistry;
private final LockRegistry<?> lockRegistry;

private final Expression lockKeyExpression;

Expand All @@ -65,7 +65,7 @@ public class LockRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
* @param lockRegistry the {@link LockRegistry} to use.
* @param lockKey the static (shared) lock key for all the calls.
*/
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) {
public LockRequestHandlerAdvice(LockRegistry<?> lockRegistry, Object lockKey) {
this(lockRegistry, new ValueExpression<>(lockKey));
}

Expand All @@ -75,7 +75,7 @@ public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) {
* @param lockRegistry the {@link LockRegistry} to use.
* @param lockKeyExpression the SpEL expression to evaluate a lock key against request message.
*/
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExpression) {
public LockRequestHandlerAdvice(LockRegistry<?> lockRegistry, Expression lockKeyExpression) {
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
Assert.notNull(lockKeyExpression, "'lockKeyExpression' must not be null");
this.lockRegistry = lockRegistry;
Expand All @@ -88,7 +88,7 @@ public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExp
* @param lockRegistry the {@link LockRegistry} to use.
* @param lockKeyFunction the function to evaluate a lock key against request message.
*/
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Function<Message<?>, Object> lockKeyFunction) {
public LockRequestHandlerAdvice(LockRegistry<?> lockRegistry, Function<Message<?>, Object> lockKeyFunction) {
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
Assert.notNull(lockKeyFunction, "'lockKeyFunction' must not be null");
this.lockRegistry = lockRegistry;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,7 +65,7 @@ public class PropertiesPersistingMetadataStore implements ConcurrentMetadataStor

private final DefaultPropertiesPersister persister = new DefaultPropertiesPersister();

private final LockRegistry lockRegistry = new DefaultLockRegistry();
private final LockRegistry<Lock> lockRegistry = new DefaultLockRegistry();

private String baseDirectory = System.getProperty("java.io.tmpdir") + "/spring-integration/";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG

private boolean timeoutOnIdle;

private LockRegistry lockRegistry = new DefaultLockRegistry();
private LockRegistry<?> lockRegistry = new DefaultLockRegistry();

protected AbstractMessageGroupStore() {
}
Expand Down Expand Up @@ -127,12 +127,12 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) {
* @param lockRegistry lockRegistryType
* @since 6.5
*/
public final void setLockRegistry(LockRegistry lockRegistry) {
public final void setLockRegistry(LockRegistry<?> lockRegistry) {
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
this.lockRegistry = lockRegistry;
}

protected LockRegistry getLockRegistry() {
protected LockRegistry<?> getLockRegistry() {
return this.lockRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperB
* @param lockRegistry The lock registry.
* @see #SimpleMessageStore(int, int, long, LockRegistry)
*/
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry lockRegistry) {
public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistry<?> lockRegistry) {
this(individualCapacity, groupCapacity, 0, lockRegistry);
}

Expand All @@ -122,7 +122,7 @@ public SimpleMessageStore(int individualCapacity, int groupCapacity, LockRegistr
*/
@SuppressWarnings("this-escape")
public SimpleMessageStore(int individualCapacity, int groupCapacity, long upperBoundTimeout,
LockRegistry lockRegistry) {
LockRegistry<?> lockRegistry) {

super(false);
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2024 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,7 +74,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
* A lock registry. The locks it manages should be global (whatever that means for the
* system) and expiring, in case the holder dies without notifying anyone.
*/
private final LockRegistry locks;
private final LockRegistry<?> locks;

/**
* Candidate for leader election. User injects this to receive callbacks on leadership
Expand Down Expand Up @@ -162,7 +162,7 @@ public String getRole() {
* candidate (which just logs the leadership events).
* @param locks lock registry
*/
public LockRegistryLeaderInitiator(LockRegistry locks) {
public LockRegistryLeaderInitiator(LockRegistry<?> locks) {
this(locks, new DefaultCandidate());
}

Expand All @@ -172,7 +172,7 @@ public LockRegistryLeaderInitiator(LockRegistry locks) {
* @param locks lock registry
* @param candidate leadership election candidate
*/
public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
public LockRegistryLeaderInitiator(LockRegistry<?> locks, Candidate candidate) {
Assert.notNull(locks, "'locks' must not be null");
Assert.notNull(candidate, "'candidate' must not be null");
this.locks = locks;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,7 @@
* @since 2.1.1
*
*/
public final class DefaultLockRegistry implements LockRegistry {
public final class DefaultLockRegistry implements LockRegistry<Lock> {

private final Lock[] lockTable;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.locks;

import java.time.Duration;
import java.util.concurrent.locks.Lock;

/**
* A distributed {@link Lock} extension.
*
* @author Eddie Cho
*
* @since 7.0
*/
public interface DistributedLock extends Lock {

/**
* Attempt to acquire a lock with a specific time-to-live
* @param ttl the specific time-to-live for the lock status data
*/
void lock(Duration ttl);

/**
* Attempt to acquire a lock with a specific time-to-live
* @param waitTime the maximum time to wait for the lock
* @param ttl the specific time-to-live for the lock status data
* @return {@code true} if the lock was acquired and {@code false}
* if the waiting time elapsed before the lock was acquired
* @throws InterruptedException if the current thread is interrupted
* while acquiring the lock (and interruption of lock
* acquisition is supported)
*/
boolean tryLock(Duration waitTime, Duration ttl) throws InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,18 @@

package org.springframework.integration.support.locks;

import java.util.concurrent.locks.Lock;

/**
* A {@link LockRegistry} implementing this interface supports the removal of aged locks
* that are not currently locked.
* @param <L> The expected class of the lock implementation
*
* @author Gary Russell
* @since 4.2
*
*/
public interface ExpirableLockRegistry extends LockRegistry {
public interface ExpirableLockRegistry<L extends Lock> extends LockRegistry<L> {

/**
* Remove locks last acquired more than 'age' ago that are not currently locked.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@

/**
* Strategy for maintaining a registry of shared locks.
* @param <L> The expected class of the lock implementation
*
* @author Oleg Zhurakousky
* @author Gary Russell
Expand All @@ -34,14 +35,14 @@
* @since 2.1.1
*/
@FunctionalInterface
public interface LockRegistry {
public interface LockRegistry<L extends Lock> {

/**
* Obtain the lock associated with the parameter object.
* @param lockKey The object with which the lock is associated.
* @return The associated lock.
*/
Lock obtain(Object lockKey);
L obtain(Object lockKey);

/**
* Perform the provided task when the lock for the key is locked.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@
* @since 2.2
*
*/
public final class PassThruLockRegistry implements LockRegistry {
public final class PassThruLockRegistry implements LockRegistry<Lock> {

@Override
public Lock obtain(Object lockKey) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,19 +16,24 @@

package org.springframework.integration.support.locks;

import java.time.Duration;
import java.util.concurrent.locks.Lock;

import org.springframework.scheduling.TaskScheduler;

/**
* A {@link LockRegistry} implementing this interface supports the renewal
* of the time to live of a lock.
* @param <L> The expected class of the lock implementation
*
* @author Alexandre Strubel
* @author Artem Bilan
* @author Youbin Wu
* @author Eddie Cho
*
* @since 5.4
*/
public interface RenewableLockRegistry extends LockRegistry {
public interface RenewableLockRegistry<L extends Lock> extends LockRegistry<L> {

/**
* Renew the time to live of the lock is associated with the parameter object.
Expand All @@ -37,6 +42,15 @@ public interface RenewableLockRegistry extends LockRegistry {
*/
void renewLock(Object lockKey);

/**
* Renew the time to live of the lock is associated with the parameter object with a specific value.
* The lock must be held by the current thread
* @param lockKey The object with which the lock is associated.
* @param ttl the specific time-to-live for the lock status data
* @since 7.0
*/
void renewLock(Object lockKey, Duration ttl);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have added a new API to the class, please, add your name to the @author list.


/**
* Set the {@link TaskScheduler} to use for the renewal task.
* When renewalTaskScheduler is set, it will be used to periodically renew the lock to ensure that
Expand Down
Loading