Skip to content

Commit d032d7e

Browse files
committed
Use Locks instead of synchronized blocks that run blocking operations.
To avoid thread pinning on virtual thread arrangements we now use ReentrantLock instead of a synchronized block. Closes #2690
1 parent 6a13328 commit d032d7e

File tree

6 files changed

+102
-48
lines changed

6 files changed

+102
-48
lines changed

src/main/java/org/springframework/data/redis/cache/RedisCache.java

+41-33
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Map.Entry;
2525
import java.util.StringJoiner;
2626
import java.util.concurrent.Callable;
27+
import java.util.concurrent.locks.Lock;
28+
import java.util.concurrent.locks.ReentrantLock;
2729

2830
import org.springframework.cache.Cache;
2931
import org.springframework.cache.support.AbstractValueAdaptingCache;
@@ -58,6 +60,8 @@ public class RedisCache extends AbstractValueAdaptingCache {
5860

5961
private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
6062

63+
private final Lock lock = new ReentrantLock();
64+
6165
private final RedisCacheConfiguration cacheConfiguration;
6266

6367
private final RedisCacheWriter cacheWriter;
@@ -68,12 +72,12 @@ public class RedisCache extends AbstractValueAdaptingCache {
6872
* Create a new {@link RedisCache}.
6973
*
7074
* @param name {@link String name} for this {@link Cache}; must not be {@literal null}.
71-
* @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by executing
72-
* the necessary Redis commands; must not be {@literal null}.
73-
* @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation;
74-
* must not be {@literal null}.
75+
* @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by executing the
76+
* necessary Redis commands; must not be {@literal null}.
77+
* @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation; must not
78+
* be {@literal null}.
7579
* @throws IllegalArgumentException if either the given {@link RedisCacheWriter} or {@link RedisCacheConfiguration}
76-
* are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}.
80+
* are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}.
7781
*/
7882
protected RedisCache(String name, RedisCacheWriter cacheWriter, RedisCacheConfiguration cacheConfiguration) {
7983

@@ -92,7 +96,7 @@ protected RedisCache(String name, RedisCacheWriter cacheWriter, RedisCacheConfig
9296
* Get the {@link RedisCacheConfiguration} used to configure this {@link RedisCache} on initialization.
9397
*
9498
* @return an immutable {@link RedisCacheConfiguration} used to configure this {@link RedisCache} on initialization;
95-
* never {@literal null}.
99+
* never {@literal null}.
96100
*/
97101
public RedisCacheConfiguration getCacheConfiguration() {
98102
return this.cacheConfiguration;
@@ -108,11 +112,11 @@ protected RedisCacheWriter getCacheWriter() {
108112
}
109113

110114
/**
111-
* Gets the configured {@link ConversionService} used to convert {@link Object cache keys} to a {@link String}
112-
* when accessing entries in the cache.
115+
* Gets the configured {@link ConversionService} used to convert {@link Object cache keys} to a {@link String} when
116+
* accessing entries in the cache.
113117
*
114-
* @return the configured {@link ConversionService} used to convert {@link Object cache keys} to a {@link String}
115-
* when accessing entries in the cache.
118+
* @return the configured {@link ConversionService} used to convert {@link Object cache keys} to a {@link String} when
119+
* accessing entries in the cache.
116120
* @see RedisCacheConfiguration#getConversionService()
117121
* @see #getCacheConfiguration()
118122
*/
@@ -153,21 +157,27 @@ public <T> T get(Object key, Callable<T> valueLoader) {
153157

154158
@Nullable
155159
@SuppressWarnings("unchecked")
156-
private synchronized <T> T getSynchronized(Object key, Callable<T> valueLoader) {
160+
private <T> T getSynchronized(Object key, Callable<T> valueLoader) {
157161

158-
ValueWrapper result = get(key);
162+
lock.lock();
163+
try {
164+
ValueWrapper result = get(key);
165+
166+
return result != null ? (T) result.get() : loadCacheValue(key, valueLoader);
167+
} finally {
168+
lock.unlock();
169+
170+
}
159171

160-
return result != null ? (T) result.get() : loadCacheValue(key, valueLoader);
161172
}
162173

163174
/**
164-
* Loads the {@link Object} using the given {@link Callable valueLoader} and {@link #put(Object, Object) puts}
165-
* the {@link Object loaded value} in the cache.
175+
* Loads the {@link Object} using the given {@link Callable valueLoader} and {@link #put(Object, Object) puts} the
176+
* {@link Object loaded value} in the cache.
166177
*
167178
* @param <T> {@link Class type} of the loaded {@link Object cache value}.
168179
* @param key {@link Object key} mapped to the loaded {@link Object cache value}.
169-
* @param valueLoader {@link Callable} object used to load the {@link Object value}
170-
* for the given {@link Object key}.
180+
* @param valueLoader {@link Callable} object used to load the {@link Object value} for the given {@link Object key}.
171181
* @return the loaded {@link Object value}.
172182
*/
173183
protected <T> T loadCacheValue(Object key, Callable<T> valueLoader) {
@@ -176,8 +186,7 @@ protected <T> T loadCacheValue(Object key, Callable<T> valueLoader) {
176186

177187
try {
178188
value = valueLoader.call();
179-
}
180-
catch (Exception cause) {
189+
} catch (Exception cause) {
181190
throw new ValueRetrievalException(key, valueLoader, cause);
182191
}
183192

@@ -190,8 +199,8 @@ protected <T> T loadCacheValue(Object key, Callable<T> valueLoader) {
190199
protected Object lookup(Object key) {
191200

192201
byte[] value = getCacheConfiguration().isTimeToIdleEnabled()
193-
? getCacheWriter().get(getName(), createAndConvertCacheKey(key), getTimeToLive(key))
194-
: getCacheWriter().get(getName(), createAndConvertCacheKey(key));
202+
? getCacheWriter().get(getName(), createAndConvertCacheKey(key), getTimeToLive(key))
203+
: getCacheWriter().get(getName(), createAndConvertCacheKey(key));
195204

196205
return value != null ? deserializeCacheValue(value) : null;
197206
}
@@ -212,14 +221,14 @@ public void put(Object key, @Nullable Object value) {
212221
if (nullCacheValueIsNotAllowed(cacheValue)) {
213222

214223
String message = String.format("Cache '%s' does not allow 'null' values; Avoid storing null"
215-
+ " via '@Cacheable(unless=\"#result == null\")' or configure RedisCache to allow 'null'"
216-
+ " via RedisCacheConfiguration", getName());
224+
+ " via '@Cacheable(unless=\"#result == null\")' or configure RedisCache to allow 'null'"
225+
+ " via RedisCacheConfiguration", getName());
217226

218227
throw new IllegalArgumentException(message);
219228
}
220229

221230
getCacheWriter().put(getName(), createAndConvertCacheKey(key), serializeCacheValue(cacheValue),
222-
getTimeToLive(key, value));
231+
getTimeToLive(key, value));
223232
}
224233

225234
@Override
@@ -232,7 +241,7 @@ public ValueWrapper putIfAbsent(Object key, @Nullable Object value) {
232241
}
233242

234243
byte[] result = getCacheWriter().putIfAbsent(getName(), createAndConvertCacheKey(key),
235-
serializeCacheValue(cacheValue), getTimeToLive(key, value));
244+
serializeCacheValue(cacheValue), getTimeToLive(key, value));
236245

237246
return result != null ? new SimpleValueWrapper(fromStoreValue(deserializeCacheValue(result))) : null;
238247
}
@@ -313,7 +322,7 @@ protected byte[] serializeCacheValue(Object value) {
313322
*
314323
* @param value array of bytes to deserialize; must not be {@literal null}.
315324
* @return an {@link Object} deserialized from the array of bytes using the configured value
316-
* {@link RedisSerializationContext.SerializationPair}; can be {@literal null}.
325+
* {@link RedisSerializationContext.SerializationPair}; can be {@literal null}.
317326
* @see RedisCacheConfiguration#getValueSerializationPair()
318327
*/
319328
@Nullable
@@ -359,8 +368,7 @@ protected String convertKey(Object key) {
359368
if (conversionService.canConvert(source, TypeDescriptor.valueOf(String.class))) {
360369
try {
361370
return conversionService.convert(key, String.class);
362-
}
363-
catch (ConversionFailedException cause) {
371+
} catch (ConversionFailedException cause) {
364372

365373
// May fail if the given key is a collection
366374
if (isCollectionLikeOrMap(source)) {
@@ -375,8 +383,9 @@ protected String convertKey(Object key) {
375383
return key.toString();
376384
}
377385

378-
String message = String.format("Cannot convert cache key %s to String; Please register a suitable Converter"
379-
+ " via 'RedisCacheConfiguration.configureKeyConverters(...)' or override '%s.toString()'",
386+
String message = String.format(
387+
"Cannot convert cache key %s to String; Please register a suitable Converter"
388+
+ " via 'RedisCacheConfiguration.configureKeyConverters(...)' or override '%s.toString()'",
380389
source, key.getClass().getName());
381390

382391
throw new IllegalStateException(message);
@@ -413,13 +422,12 @@ private String convertCollectionLikeOrMapKey(Object key, TypeDescriptor source)
413422
target.append("}");
414423

415424
return target.toString();
416-
}
417-
else if (source.isCollection() || source.isArray()) {
425+
} else if (source.isCollection() || source.isArray()) {
418426

419427
StringJoiner stringJoiner = new StringJoiner(",");
420428

421429
Collection<?> collection = source.isCollection() ? (Collection<?>) key
422-
: Arrays.asList(ObjectUtils.toObjectArray(key));
430+
: Arrays.asList(ObjectUtils.toObjectArray(key));
423431

424432
for (Object collectedKey : collection) {
425433
stringJoiner.add(convertKey(collectedKey));

src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
import java.util.Optional;
2727
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.locks.Lock;
29+
import java.util.concurrent.locks.ReentrantLock;
2830

2931
import org.springframework.dao.InvalidDataAccessApiUsageException;
3032
import org.springframework.lang.Nullable;
@@ -44,7 +46,7 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
4446
private final RedisCodec<?, ?> codec;
4547
private final Optional<ReadFrom> readFrom;
4648

47-
private final Object monitor = new Object();
49+
private final Lock lock = new ReentrantLock();
4850

4951
private volatile boolean initialized;
5052

@@ -84,16 +86,19 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
8486
// partitions have to be initialized before asynchronous usage.
8587
// Needs to happen only once. Initialize eagerly if
8688
// blocking is not an options.
87-
synchronized (monitor) {
89+
lock.lock();
90+
try {
8891
if (!initialized) {
8992
client.getPartitions();
9093
initialized = true;
9194
}
95+
} finally {
96+
lock.unlock();
9297
}
9398
}
9499

95100
if (connectionType.equals(StatefulRedisPubSubConnection.class)
96-
|| connectionType.equals(StatefulRedisClusterPubSubConnection.class)) {
101+
|| connectionType.equals(StatefulRedisClusterPubSubConnection.class)) {
97102

98103
return client.connectPubSubAsync(codec) //
99104
.thenApply(connectionType::cast);

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.Set;
32+
import java.util.concurrent.locks.Lock;
33+
import java.util.concurrent.locks.ReentrantLock;
3234

3335
import org.apache.commons.logging.Log;
3436
import org.apache.commons.logging.LogFactory;
@@ -538,6 +540,7 @@ protected interface LettuceMultiKeyClusterCommandCallback<T>
538540
*/
539541
static class LettuceClusterNodeResourceProvider implements ClusterNodeResourceProvider, DisposableBean {
540542

543+
private final Lock lock = new ReentrantLock();
541544
private final LettuceConnectionProvider connectionProvider;
542545
private volatile @Nullable StatefulRedisClusterConnection<byte[], byte[]> connection;
543546

@@ -552,10 +555,13 @@ public RedisClusterCommands<byte[], byte[]> getResourceForSpecificNode(RedisClus
552555
Assert.notNull(node, "Node must not be null");
553556

554557
if (connection == null) {
555-
synchronized (this) {
558+
lock.lock();
559+
try {
556560
if (connection == null) {
557561
this.connection = connectionProvider.getConnection(StatefulRedisClusterConnection.class);
558562
}
563+
} finally {
564+
lock.unlock();
559565
}
560566
}
561567

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.concurrent.CompletionStage;
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.concurrent.atomic.AtomicReference;
44+
import java.util.concurrent.locks.Lock;
45+
import java.util.concurrent.locks.ReentrantLock;
4446
import java.util.function.Consumer;
4547
import java.util.stream.Collectors;
4648

@@ -139,6 +141,8 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv
139141
/** Synchronization monitor for the shared Connection */
140142
private final Object connectionMonitor = new Object();
141143

144+
private final Lock lock = new ReentrantLock();
145+
142146
private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
143147

144148
private @Nullable RedisConfiguration configuration;
@@ -1480,7 +1484,8 @@ class SharedConnection<E> {
14801484
@Nullable
14811485
StatefulConnection<E, E> getConnection() {
14821486

1483-
synchronized (this.connectionMonitor) {
1487+
lock.lock();
1488+
try {
14841489

14851490
if (this.connection == null) {
14861491
this.connection = getNativeConnection();
@@ -1491,6 +1496,8 @@ StatefulConnection<E, E> getConnection() {
14911496
}
14921497

14931498
return this.connection;
1499+
} finally {
1500+
lock.unlock();
14941501
}
14951502
}
14961503

@@ -1508,7 +1515,8 @@ private StatefulConnection<E, E> getNativeConnection() {
15081515
*/
15091516
void validateConnection() {
15101517

1511-
synchronized (this.connectionMonitor) {
1518+
lock.lock();
1519+
try {
15121520

15131521
boolean valid = false;
15141522

@@ -1535,6 +1543,8 @@ void validateConnection() {
15351543
resetConnection();
15361544
this.connection = getNativeConnection();
15371545
}
1546+
} finally {
1547+
lock.unlock();
15381548
}
15391549
}
15401550

@@ -1543,13 +1553,16 @@ void validateConnection() {
15431553
*/
15441554
void resetConnection() {
15451555

1546-
synchronized (this.connectionMonitor) {
1556+
lock.lock();
1557+
try {
15471558

15481559
if (this.connection != null) {
15491560
this.connectionProvider.release(this.connection);
15501561
}
15511562

15521563
this.connection = null;
1564+
} finally {
1565+
lock.unlock();
15531566
}
15541567
}
15551568
}

src/main/java/org/springframework/data/redis/core/script/DefaultRedisScript.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package org.springframework.data.redis.core.script;
1717

1818
import java.io.IOException;
19+
import java.util.concurrent.locks.Lock;
20+
import java.util.concurrent.locks.ReentrantLock;
1921

2022
import org.springframework.beans.factory.InitializingBean;
2123
import org.springframework.core.io.Resource;
@@ -37,7 +39,7 @@
3739
*/
3840
public class DefaultRedisScript<T> implements RedisScript<T>, InitializingBean {
3941

40-
private final Object shaModifiedMonitor = new Object();
42+
private final Lock lock = new ReentrantLock();
4143

4244
private @Nullable ScriptSource scriptSource;
4345
private @Nullable String sha1;
@@ -76,11 +78,14 @@ public void afterPropertiesSet() {
7678

7779
public String getSha1() {
7880

79-
synchronized (shaModifiedMonitor) {
81+
lock.lock();
82+
try {
8083
if (sha1 == null || scriptSource.isModified()) {
8184
this.sha1 = DigestUtils.sha1DigestAsHex(getScriptAsString());
8285
}
8386
return sha1;
87+
} finally {
88+
lock.unlock();
8489
}
8590
}
8691

0 commit comments

Comments
 (0)