Skip to content

Commit 2c7f796

Browse files
committed
GH-8056: Add LockRequestHandlerAdvice
Fixes: #8056 * Add `LockRequestHandlerAdvice` for exclusive service access against specific `key`.
1 parent c10be96 commit 2c7f796

File tree

5 files changed

+401
-0
lines changed

5 files changed

+401
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.TimeoutException;
22+
import java.util.concurrent.locks.Lock;
23+
import java.util.function.Function;
24+
25+
import org.springframework.expression.EvaluationContext;
26+
import org.springframework.expression.Expression;
27+
import org.springframework.integration.expression.ExpressionUtils;
28+
import org.springframework.integration.expression.FunctionExpression;
29+
import org.springframework.integration.expression.ValueExpression;
30+
import org.springframework.integration.support.locks.LockRegistry;
31+
import org.springframework.integration.support.utils.IntegrationUtils;
32+
import org.springframework.lang.Nullable;
33+
import org.springframework.messaging.Message;
34+
import org.springframework.messaging.MessageChannel;
35+
import org.springframework.util.Assert;
36+
37+
/**
38+
* The {@link AbstractRequestHandlerAdvice} to ensure exclusive access to the
39+
* {@code AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage(Message)} calls
40+
* based on the {@code lockKey} from message.
41+
* <p>
42+
* If {@code lockKey} for the message is {@code null}, the no locking around the call.
43+
* However, if {@link }
44+
*
45+
* @author Artem Bilan
46+
*
47+
* @since 6.5
48+
*/
49+
public class LockRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
50+
51+
private final LockRegistry lockRegistry;
52+
53+
private final Expression lockKeyExpression;
54+
55+
@Nullable
56+
private MessageChannel discardChannel;
57+
58+
@Nullable
59+
private Expression waitLockDurationExpression;
60+
61+
private EvaluationContext evaluationContext;
62+
63+
/**
64+
* Construct an advice instance based on a {@link LockRegistry} and fixed (shared) lock key.
65+
* @param lockRegistry the {@link LockRegistry} to use.
66+
* @param lockKey the static (shared) lock key for all the calls.
67+
*/
68+
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Object lockKey) {
69+
this(lockRegistry, new ValueExpression<>(lockKey));
70+
}
71+
72+
/**
73+
* Construct an advice instance based on a {@link LockRegistry}
74+
* and SpEL expression for the lock key against request message.
75+
* @param lockRegistry the {@link LockRegistry} to use.
76+
* @param lockKeyExpression the SpEL expression to evaluate a lock key against request message.
77+
*/
78+
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Expression lockKeyExpression) {
79+
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
80+
Assert.notNull(lockKeyExpression, "'lockKeyExpression' must not be null");
81+
this.lockRegistry = lockRegistry;
82+
this.lockKeyExpression = lockKeyExpression;
83+
}
84+
85+
/**
86+
* Construct an advice instance based on a {@link LockRegistry}
87+
* and function for the lock key against request message.
88+
* @param lockRegistry the {@link LockRegistry} to use.
89+
* @param lockKeyFunction the function to evaluate a lock key against request message.
90+
*/
91+
public LockRequestHandlerAdvice(LockRegistry lockRegistry, Function<Message<?>, Object> lockKeyFunction) {
92+
Assert.notNull(lockRegistry, "'lockRegistry' must not be null");
93+
Assert.notNull(lockKeyFunction, "'lockKeyFunction' must not be null");
94+
this.lockRegistry = lockRegistry;
95+
this.lockKeyExpression = new FunctionExpression<>(lockKeyFunction);
96+
}
97+
98+
/**
99+
* Optional duration for a {@link Lock#tryLock(long, TimeUnit)} API.
100+
* Otherwise, {@link Lock#lockInterruptibly()} is used.
101+
* @param waitLockDuration the duration for {@link Lock#tryLock(long, TimeUnit)}.
102+
*/
103+
public void setWaitLockDuration(Duration waitLockDuration) {
104+
setWaitLockDurationExpression(new ValueExpression<>(waitLockDuration));
105+
}
106+
107+
/**
108+
* The SpEL expression to evaluate a {@link Lock#tryLock(long, TimeUnit)} duration
109+
* against request message.
110+
* Can be evaluated to {@link Duration}, {@code long} (with meaning as milliseconds),
111+
* or to string in the duration ISO-8601 format.
112+
* @param waitLockDurationExpression SpEL expression for duration.
113+
*/
114+
public void setWaitLockDurationExpression(Expression waitLockDurationExpression) {
115+
this.waitLockDurationExpression = waitLockDurationExpression;
116+
}
117+
118+
/**
119+
* The SpEL expression to evaluate a {@link Lock#tryLock(long, TimeUnit)} duration
120+
* against request message.
121+
* Can be evaluated to {@link Duration}, {@code long} (with meaning as milliseconds),
122+
* or to string in the duration ISO-8601 format.
123+
* @param waitLockDurationExpression SpEL expression for duration.
124+
*/
125+
public void setWaitLockDurationExpressionString(String waitLockDurationExpression) {
126+
this.waitLockDurationExpression = EXPRESSION_PARSER.parseExpression(waitLockDurationExpression);
127+
}
128+
129+
/**
130+
* The function to evaluate a {@link Lock#tryLock(long, TimeUnit)} duration
131+
* against request message.
132+
* @param waitLockDurationFunction the function for duration.
133+
*/
134+
public void setWaitLockDurationFunction(Function<Message<?>, Duration> waitLockDurationFunction) {
135+
this.waitLockDurationExpression = new FunctionExpression<>(waitLockDurationFunction);
136+
}
137+
138+
/**
139+
* Set a channel where to send a message for which {@code lockKey} is evaluated to {@code null}.
140+
* If this is not set and {@code lockKey == null}, no locking around the call.
141+
* @param discardChannel the channel to send messages without a key.
142+
*/
143+
public void setDiscardChannel(@Nullable MessageChannel discardChannel) {
144+
this.discardChannel = discardChannel;
145+
}
146+
147+
@Override
148+
protected void onInit() {
149+
super.onInit();
150+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
151+
}
152+
153+
@Override
154+
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
155+
Object lockKey = this.lockKeyExpression.getValue(this.evaluationContext, message);
156+
if (lockKey != null) {
157+
Duration waitLockDuration = getWaitLockDuration(message);
158+
try {
159+
if (waitLockDuration == null) {
160+
return lockRegistry.executeLocked(lockKey, callback::execute);
161+
}
162+
else {
163+
return lockRegistry.executeLocked(lockKey, waitLockDuration, callback::execute);
164+
}
165+
}
166+
catch (InterruptedException ex) {
167+
Thread.currentThread().interrupt();
168+
throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message,
169+
() -> "The lock for message was interrupted", ex);
170+
}
171+
catch (TimeoutException ex) {
172+
throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message,
173+
() -> "Could not acquire the lock in time: " + waitLockDuration, ex);
174+
}
175+
}
176+
else {
177+
if (this.discardChannel != null) {
178+
this.discardChannel.send(message);
179+
return null;
180+
}
181+
else {
182+
return callback.execute();
183+
}
184+
}
185+
}
186+
187+
@Nullable
188+
private Duration getWaitLockDuration(Message<?> message) {
189+
if (this.waitLockDurationExpression != null) {
190+
Object value = this.waitLockDurationExpression.getValue(this.evaluationContext, message);
191+
if (value != null) {
192+
if (value instanceof Duration duration) {
193+
return duration;
194+
}
195+
else if (value instanceof Long aLong) {
196+
return Duration.ofMillis(aLong);
197+
}
198+
else {
199+
return Duration.parse(value.toString());
200+
}
201+
}
202+
}
203+
return null;
204+
}
205+
206+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.util.concurrent.ExecutionException;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.springframework.beans.factory.annotation.Autowired;
28+
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.integration.annotation.ServiceActivator;
31+
import org.springframework.integration.channel.QueueChannel;
32+
import org.springframework.integration.config.EnableIntegration;
33+
import org.springframework.integration.core.AsyncMessagingTemplate;
34+
import org.springframework.integration.support.MessageBuilder;
35+
import org.springframework.integration.support.locks.DefaultLockRegistry;
36+
import org.springframework.integration.support.locks.LockRegistry;
37+
import org.springframework.messaging.Message;
38+
import org.springframework.messaging.MessageChannel;
39+
import org.springframework.messaging.core.MessagePostProcessor;
40+
import org.springframework.messaging.support.GenericMessage;
41+
import org.springframework.test.annotation.DirtiesContext;
42+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
43+
44+
import static org.assertj.core.api.Assertions.assertThat;
45+
46+
/**
47+
* @author Artem Bilan
48+
*
49+
* @since 6.5
50+
*/
51+
@SpringJUnitConfig
52+
@DirtiesContext
53+
public class LockRequestHandlerAdviceTests {
54+
55+
private static final String LOCK_KEY_HEADER = "lock-key-header";
56+
57+
@Autowired
58+
MessageChannel inputChannel;
59+
60+
@Autowired
61+
QueueChannel discardChannel;
62+
63+
@Test
64+
void verifyLockAroundHandler() throws ExecutionException, InterruptedException, TimeoutException {
65+
AsyncMessagingTemplate messagingTemplate = new AsyncMessagingTemplate();
66+
MessagePostProcessor messagePostProcessor =
67+
message ->
68+
MessageBuilder.fromMessage(message)
69+
.setHeader(LOCK_KEY_HEADER, "someLock")
70+
.build();
71+
Future<Object> test1 =
72+
messagingTemplate.asyncConvertSendAndReceive(this.inputChannel, "test1", messagePostProcessor);
73+
Future<Object> test2 =
74+
messagingTemplate.asyncConvertSendAndReceive(this.inputChannel, "test2", messagePostProcessor);
75+
76+
assertThat(test1.get(10, TimeUnit.SECONDS)).isEqualTo("test1-1");
77+
assertThat(test2.get(10, TimeUnit.SECONDS)).isEqualTo("test2-1");
78+
79+
messagingTemplate.send(this.inputChannel, new GenericMessage<>("no_lock_key"));
80+
81+
Message<?> receive = this.discardChannel.receive(10_000);
82+
assertThat(receive)
83+
.extracting(Message::getPayload)
84+
.isEqualTo("no_lock_key");
85+
86+
Future<Object> test3 =
87+
messagingTemplate.asyncConvertSendAndReceive(this.inputChannel, "longer_process", messagePostProcessor);
88+
89+
Future<Object> test4 =
90+
messagingTemplate.asyncConvertSendAndReceive(this.inputChannel, "test4", messagePostProcessor);
91+
92+
assertThat(test3.get(10, TimeUnit.SECONDS)).isEqualTo("longer_process-1");
93+
94+
assertThat(test4).failsWithin(10, TimeUnit.SECONDS)
95+
.withThrowableOfType(ExecutionException.class)
96+
.withRootCauseInstanceOf(TimeoutException.class)
97+
.withStackTraceContaining("Could not acquire the lock in time: PT1S");
98+
}
99+
100+
@Configuration
101+
@EnableIntegration
102+
public static class Config {
103+
104+
@Bean
105+
LockRegistry lockRegistry() {
106+
return new DefaultLockRegistry();
107+
}
108+
109+
@Bean
110+
QueueChannel discardChannel() {
111+
return new QueueChannel();
112+
}
113+
114+
@Bean
115+
LockRequestHandlerAdvice lockRequestHandlerAdvice(LockRegistry lockRegistry, QueueChannel discardChannel) {
116+
LockRequestHandlerAdvice lockRequestHandlerAdvice =
117+
new LockRequestHandlerAdvice(lockRegistry, (message) -> message.getHeaders().get(LOCK_KEY_HEADER));
118+
lockRequestHandlerAdvice.setDiscardChannel(discardChannel);
119+
lockRequestHandlerAdvice.setWaitLockDurationExpressionString("'PT1s'");
120+
return lockRequestHandlerAdvice;
121+
}
122+
123+
AtomicInteger counter = new AtomicInteger();
124+
125+
@ServiceActivator(inputChannel = "inputChannel", adviceChain = "lockRequestHandlerAdvice")
126+
String handleWithDelay(String payload) throws InterruptedException {
127+
int currentCount = this.counter.incrementAndGet();
128+
Thread.sleep("longer_process".equals(payload) ? 2000 : 500);
129+
try {
130+
return payload + "-" + currentCount;
131+
}
132+
finally {
133+
this.counter.decrementAndGet();
134+
}
135+
}
136+
137+
}
138+
139+
}

src/reference/antora/modules/ROOT/pages/handler-advice/classes.adoc

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ In addition to providing the general mechanism to apply AOP advice classes, Spri
1010
* `CacheRequestHandlerAdvice` (described in xref:handler-advice/classes.adoc#cache-advice[Caching Advice])
1111
* `ReactiveRequestHandlerAdvice` (described in xref:handler-advice/reactive.adoc[Reactive Advice])
1212
* `ContextHolderRequestHandlerAdvice` (described in xref:handler-advice/context-holder.adoc[Context Holder Advice])
13+
* `LockRequestHandlerAdvice` (described in xref:handler-advice/lock.adoc[Lock Advice])
1314

1415

1516
[[expression-advice]]

0 commit comments

Comments
 (0)