Skip to content

Commit ceccafb

Browse files
DATAREDIS-443 - Add Support for Spring 4.3 synchronized mode to RedisCache.
As of Spring Framework 4.3.RC1, the `Cache` interface has a new `<T> T get(Object key, Callable<T> valueLoader);` method (see SPR-9254). If no entry for the given key is found, the `Callable` is invoked to compute/load the value that is then put into redis and returned. Additionally concurrent calls get synchronized so that the `Callable` is only called once. Using Spring Framework 4.3 failures result in `o.s.c.Cache$ValueRetrievalException` prior versions use `RedisSystemException`.
1 parent 3409acd commit ceccafb

File tree

6 files changed

+273
-9
lines changed

6 files changed

+273
-9
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ dependencies {
100100
testCompile "junit:junit:$junitVersion"
101101
testCompile "org.springframework:spring-test:$springVersion"
102102
testCompile "org.springframework:spring-jdbc:$springVersion"
103-
testCompile 'org.testinfected.hamcrest-matchers:core-matchers:1.8'
103+
testCompile 'org.testinfected.hamcrest-matchers:hamcrest-matchers:1.8'
104104
testCompile "org.mockito:mockito-core:$mockitoVersion"
105105
testCompile("javax.annotation:jsr250-api:1.0", optional)
106-
testCompile("com.thoughtworks.xstream:xstream:1.4.4", optional)
106+
testCompile("com.thoughtworks.xstream:xstream:1.4.8", optional)
107107
testCompile("javax.transaction:jta:1.1")
108108

109109
sharedResources "org.springframework.data.build:spring-data-build-resources:$springDataBuildVersion@zip"

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ slf4jVersion=1.7.12
22
junitVersion=4.12
33
jredisVersion=06052013
44
jedisVersion=2.7.3
5-
springVersion=4.1.7.RELEASE
5+
springVersion=4.1.9.RELEASE
66
springDataBuildVersion=1.8.0.BUILD-SNAPSHOT
77
log4jVersion=1.2.17
88
version=1.7.0.DATAREDIS-443-SNAPSHOT

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<springdata.commons>1.12.0.BUILD-SNAPSHOT</springdata.commons>
2323
<jta>1.1</jta>
2424
<beanutils>1.9.2</beanutils>
25-
<xstream>1.4.4</xstream>
25+
<xstream>1.4.8</xstream>
2626
<pool>2.2</pool>
2727
<lettuce>3.3.Final</lettuce>
2828
<jedis>2.7.3</jedis>

src/main/java/org/springframework/data/redis/cache/RedisCache.java

Lines changed: 136 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2015 the original author or authors.
2+
* Copyright 2011-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,18 +19,22 @@
1919
import static org.springframework.util.Assert.*;
2020
import static org.springframework.util.ObjectUtils.*;
2121

22+
import java.lang.reflect.Constructor;
2223
import java.util.Arrays;
2324
import java.util.Set;
25+
import java.util.concurrent.Callable;
2426

2527
import org.springframework.cache.Cache;
2628
import org.springframework.cache.support.SimpleValueWrapper;
2729
import org.springframework.dao.DataAccessException;
30+
import org.springframework.data.redis.RedisSystemException;
2831
import org.springframework.data.redis.connection.RedisConnection;
2932
import org.springframework.data.redis.connection.ReturnType;
3033
import org.springframework.data.redis.core.RedisCallback;
3134
import org.springframework.data.redis.core.RedisOperations;
3235
import org.springframework.data.redis.serializer.RedisSerializer;
3336
import org.springframework.data.redis.serializer.StringRedisSerializer;
37+
import org.springframework.util.ClassUtils;
3438

3539
/**
3640
* Cache implementation on top of Redis.
@@ -91,6 +95,32 @@ public ValueWrapper get(Object key) {
9195
redisOperations.getKeySerializer()));
9296
}
9397

98+
/*
99+
* @see org.springframework.cache.Cache#get(java.lang.Object, java.util.concurrent.Callable)
100+
* introduced in springframework 4.3.0.RC1
101+
*/
102+
public <T> T get(final Object key, final Callable<T> valueLoader) {
103+
104+
BinaryRedisCacheElement rce = new BinaryRedisCacheElement(new RedisCacheElement(new RedisCacheKey(key).usePrefix(
105+
cacheMetadata.getKeyPrefix()).withKeySerializer(redisOperations.getKeySerializer()), valueLoader),
106+
cacheValueAccessor);
107+
108+
ValueWrapper val = get(key);
109+
if (val != null) {
110+
return (T) val.get();
111+
}
112+
113+
RedisWriteThroughCallback callback = new RedisWriteThroughCallback(rce, cacheMetadata);
114+
115+
try {
116+
byte[] result = (byte[]) redisOperations.execute(callback);
117+
return (T) (result == null ? null : cacheValueAccessor.deserializeIfNecessary(result));
118+
} catch (RuntimeException e) {
119+
throw CacheValueRetrievalExceptionFactory.INSTANCE.create(key, valueLoader, e);
120+
}
121+
122+
}
123+
94124
/**
95125
* Return the value to which this cache maps the specified key.
96126
*
@@ -361,13 +391,18 @@ static class BinaryRedisCacheElement extends RedisCacheElement {
361391
private byte[] keyBytes;
362392
private byte[] valueBytes;
363393
private RedisCacheElement element;
394+
private boolean lazyLoad;
395+
private CacheValueAccessor accessor;
364396

365397
public BinaryRedisCacheElement(RedisCacheElement element, CacheValueAccessor accessor) {
366398

367399
super(element.getKey(), element.get());
368400
this.element = element;
369401
this.keyBytes = element.getKeyBytes();
370-
this.valueBytes = accessor.convertToBytesIfNecessary(element.get());
402+
this.accessor = accessor;
403+
404+
lazyLoad = element.get() instanceof Callable;
405+
this.valueBytes = lazyLoad ? null : accessor.convertToBytesIfNecessary(element.get());
371406
}
372407

373408
@Override
@@ -393,9 +428,16 @@ public RedisCacheElement expireAfter(long seconds) {
393428

394429
@Override
395430
public byte[] get() {
431+
432+
if (lazyLoad && valueBytes == null) {
433+
try {
434+
valueBytes = accessor.convertToBytesIfNecessary(((Callable<?>) element.get()).call());
435+
} catch (Exception e) {
436+
throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e.getMessage(), e);
437+
}
438+
}
396439
return valueBytes;
397440
}
398-
399441
}
400442

401443
/**
@@ -470,6 +512,15 @@ protected boolean waitForLock(RedisConnection connection) {
470512

471513
return foundLock;
472514
}
515+
516+
protected void lock(RedisConnection connection) {
517+
waitForLock(connection);
518+
connection.set(cacheMetadata.getCacheLockKey(), "locked".getBytes());
519+
}
520+
521+
protected void unlock(RedisConnection connection) {
522+
connection.del(cacheMetadata.getCacheLockKey());
523+
}
473524
}
474525

475526
/**
@@ -666,4 +717,86 @@ private byte[] put(BinaryRedisCacheElement element, RedisConnection connection)
666717
}
667718
}
668719

720+
/**
721+
* @author Christoph Strobl
722+
* @since 1.7
723+
*/
724+
static class RedisWriteThroughCallback extends AbstractRedisCacheCallback<byte[]> {
725+
726+
public RedisWriteThroughCallback(BinaryRedisCacheElement element, RedisCacheMetadata metadata) {
727+
super(element, metadata);
728+
}
729+
730+
@Override
731+
public byte[] doInRedis(BinaryRedisCacheElement element, RedisConnection connection) throws DataAccessException {
732+
733+
try {
734+
735+
lock(connection);
736+
737+
try {
738+
739+
byte[] value = connection.get(element.getKeyBytes());
740+
741+
if (value != null) {
742+
return value;
743+
}
744+
745+
connection.watch(element.getKeyBytes());
746+
connection.multi();
747+
748+
value = element.get();
749+
connection.set(element.getKeyBytes(), value);
750+
751+
processKeyExpiration(element, connection);
752+
maintainKnownKeys(element, connection);
753+
754+
connection.exec();
755+
756+
return value;
757+
} catch (RuntimeException e) {
758+
759+
connection.discard();
760+
throw e;
761+
}
762+
} finally {
763+
unlock(connection);
764+
}
765+
}
766+
};
767+
768+
/**
769+
* @author Christoph Strobl
770+
* @since 1.7 (TODO: remove when upgrading to spring 4.3)
771+
*/
772+
private static enum CacheValueRetrievalExceptionFactory {
773+
774+
INSTANCE;
775+
776+
private static boolean isSpring43;
777+
778+
static {
779+
isSpring43 = ClassUtils.isPresent("org.springframework.cache.Cache$ValueRetrievalException",
780+
ClassUtils.getDefaultClassLoader());
781+
}
782+
783+
public RuntimeException create(Object key, Callable<?> valueLoader, Throwable cause) {
784+
785+
if (isSpring43) {
786+
try {
787+
Class<?> execption = ClassUtils.forName("org.springframework.cache.Cache$ValueRetrievalException", this
788+
.getClass().getClassLoader());
789+
Constructor<?> c = ClassUtils.getConstructorIfAvailable(execption, Object.class, Callable.class,
790+
Throwable.class);
791+
return (RuntimeException) c.newInstance(key, valueLoader, cause);
792+
} catch (Exception ex) {
793+
// ignore
794+
}
795+
}
796+
797+
return new RedisSystemException(String.format("Value for key '%s' could not be loaded using '%s'.", key,
798+
valueLoader), cause);
799+
}
800+
}
801+
669802
}

src/test/java/org/springframework/data/redis/cache/RedisCacheTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2014 the original author or authors.
2+
* Copyright 2011-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,10 +26,15 @@
2626
import static org.springframework.data.redis.matcher.RedisTestMatchers.*;
2727

2828
import java.util.Collection;
29+
import java.util.List;
30+
import java.util.concurrent.Callable;
31+
import java.util.concurrent.CopyOnWriteArrayList;
2932
import java.util.concurrent.CountDownLatch;
3033
import java.util.concurrent.TimeUnit;
3134
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.concurrent.atomic.AtomicLong;
3236

37+
import org.hamcrest.core.IsEqual;
3338
import org.junit.AfterClass;
3439
import org.junit.Before;
3540
import org.junit.Test;
@@ -39,6 +44,7 @@
3944
import org.springframework.cache.Cache;
4045
import org.springframework.cache.Cache.ValueWrapper;
4146
import org.springframework.data.redis.ConnectionFactoryTracker;
47+
import org.springframework.data.redis.LongObjectFactory;
4248
import org.springframework.data.redis.ObjectFactory;
4349
import org.springframework.data.redis.core.AbstractOperationsTestParams;
4450
import org.springframework.data.redis.core.RedisTemplate;
@@ -274,4 +280,53 @@ public void putIfAbsentShouldSetValueOnlyIfNotPresent() {
274280

275281
assertThat(wrapper.get(), equalTo(value));
276282
}
283+
284+
/**
285+
* @see DATAREDIS-443
286+
*/
287+
@Test
288+
public void testCacheGetSynchronized() throws InterruptedException {
289+
290+
assumeThat(cache, instanceOf(RedisCache.class));
291+
assumeThat(valueFactory, instanceOf(LongObjectFactory.class));
292+
293+
int threadCount = 10;
294+
final AtomicLong counter = new AtomicLong();
295+
final List<Object> results = new CopyOnWriteArrayList<Object>();
296+
final CountDownLatch latch = new CountDownLatch(threadCount);
297+
298+
final RedisCache redisCache = (RedisCache) cache;
299+
300+
final Object key = getKey();
301+
302+
Runnable run = new Runnable() {
303+
@Override
304+
public void run() {
305+
try {
306+
Long value = redisCache.get(key, new Callable<Long>() {
307+
@Override
308+
public Long call() throws Exception {
309+
310+
Thread.sleep(333); // make sure the thread will overlap
311+
return counter.incrementAndGet();
312+
}
313+
});
314+
results.add(value);
315+
} finally {
316+
latch.countDown();
317+
}
318+
}
319+
};
320+
321+
for (int i = 0; i < threadCount; i++) {
322+
new Thread(run).start();
323+
Thread.sleep(100);
324+
}
325+
latch.await();
326+
327+
assertThat(results.size(), IsEqual.equalTo(threadCount));
328+
for (Object result : results) {
329+
assertThat((Long) result, equalTo(1L));
330+
}
331+
}
277332
}

0 commit comments

Comments
 (0)