Skip to content

Commit bb28044

Browse files
committed
Add get_stale and get_stale_while_update for memory-cache
These functions allow to use stale values while performing updates in the background to minimize the lookup latency.
1 parent 7e3eaf0 commit bb28044

File tree

4 files changed

+175
-7
lines changed

4 files changed

+175
-7
lines changed

.bleep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
028a39daa932d3079b73073c39b35404ae06f78d
1+
bf7bddac9b1ce4e162d4658673a02b70e86610f5

pingora-memory-cache/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ pingora-error = { version = "0.4.0", path = "../pingora-error" }
2525
log = { workspace = true }
2626
parking_lot = "0"
2727
pingora-timeout = { version = "0.4.0", path = "../pingora-timeout" }
28+
29+
[dev-dependencies]
30+
once_cell = { workspace = true }

pingora-memory-cache/src/lib.rs

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub enum CacheStatus {
3333
Expired,
3434
/// The key was not initially found but was found after awaiting a lock.
3535
LockHit,
36+
/// The returned value was expired but still returned. The [Duration] is
37+
/// how long it has been since its expiration time.
38+
Stale(Duration),
3639
}
3740

3841
impl CacheStatus {
@@ -43,16 +46,25 @@ impl CacheStatus {
4346
Self::Miss => "miss",
4447
Self::Expired => "expired",
4548
Self::LockHit => "lock_hit",
49+
Self::Stale(_) => "stale",
4650
}
4751
}
4852

4953
/// Returns whether this status represents a cache hit.
5054
pub fn is_hit(&self) -> bool {
5155
match self {
52-
CacheStatus::Hit | CacheStatus::LockHit => true,
56+
CacheStatus::Hit | CacheStatus::LockHit | CacheStatus::Stale(_) => true,
5357
CacheStatus::Miss | CacheStatus::Expired => false,
5458
}
5559
}
60+
61+
/// Returns the stale duration if any
62+
pub fn stale(&self) -> Option<Duration> {
63+
match self {
64+
CacheStatus::Stale(time) => Some(*time),
65+
_ => None,
66+
}
67+
}
5668
}
5769

5870
#[derive(Debug, Clone)]
@@ -71,15 +83,21 @@ impl<T: Clone> Node<T> {
7183
}
7284

7385
fn will_expire_at(&self, time: &Instant) -> bool {
74-
match self.expire_on.as_ref() {
75-
Some(t) => t <= time,
76-
None => false,
77-
}
86+
self.stale_duration(time).is_some()
7887
}
7988

8089
fn is_expired(&self) -> bool {
8190
self.will_expire_at(&Instant::now())
8291
}
92+
93+
fn stale_duration(&self, time: &Instant) -> Option<Duration> {
94+
let expire_time = self.expire_on?;
95+
if &expire_time <= time {
96+
Some(time.duration_since(expire_time))
97+
} else {
98+
None
99+
}
100+
}
83101
}
84102

85103
/// A high performant in-memory cache with S3-FIFO + TinyLFU
@@ -107,14 +125,32 @@ impl<K: Hash, T: Clone + Send + Sync + 'static> MemoryCache<K, T> {
107125
if !n.is_expired() {
108126
(Some(n.value), CacheStatus::Hit)
109127
} else {
110-
// TODO: consider returning the staled value
111128
(None, CacheStatus::Expired)
112129
}
113130
} else {
114131
(None, CacheStatus::Miss)
115132
}
116133
}
117134

135+
/// Similar to [get], fetch the key and return its value in addition to a
136+
/// [CacheStatus] but also return the value even if it is expired. When the
137+
/// value is expired, the [Duration] of how long it has been stale will
138+
/// also be returned.
139+
pub fn get_stale(&self, key: &K) -> (Option<T>, CacheStatus) {
140+
let hashed_key = self.hasher.hash_one(key);
141+
142+
if let Some(n) = self.store.get(&hashed_key) {
143+
let stale_duration = n.stale_duration(&Instant::now());
144+
if let Some(stale_duration) = stale_duration {
145+
(Some(n.value), CacheStatus::Stale(stale_duration))
146+
} else {
147+
(Some(n.value), CacheStatus::Hit)
148+
}
149+
} else {
150+
(None, CacheStatus::Miss)
151+
}
152+
}
153+
118154
/// Insert a key and value pair with an optional TTL into the cache.
119155
///
120156
/// An item with zero TTL of zero will not be inserted.
@@ -245,6 +281,20 @@ mod tests {
245281
assert_eq!(hit, CacheStatus::Expired);
246282
}
247283

284+
#[test]
285+
fn test_get_stale() {
286+
let cache: MemoryCache<i32, i32> = MemoryCache::new(10);
287+
let (res, hit) = cache.get(&1);
288+
assert_eq!(res, None);
289+
assert_eq!(hit, CacheStatus::Miss);
290+
cache.put(&1, 2, Some(Duration::from_secs(1)));
291+
sleep(Duration::from_millis(1100));
292+
let (res, hit) = cache.get_stale(&1);
293+
assert_eq!(res.unwrap(), 2);
294+
// we slept 1100ms and the ttl is 1000ms
295+
assert!(hit.stale().unwrap() >= Duration::from_millis(100));
296+
}
297+
248298
#[test]
249299
fn test_eviction() {
250300
let cache: MemoryCache<i32, i32> = MemoryCache::new(2);

pingora-memory-cache/src/read_through.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,58 @@ where
283283
ret
284284
}
285285
}
286+
287+
/// Similar to [get], query the cache for a given value, but also returns the value even if the
288+
/// value is expired up to `stale_ttl`. If it is a cache miss or the value is stale more than
289+
/// the `stale_ttl`, a lookup will be performed to populate the cache.
290+
pub async fn get_stale(
291+
&self,
292+
key: &K,
293+
ttl: Option<Duration>,
294+
extra: Option<&S>,
295+
stale_ttl: Duration,
296+
) -> (Result<T, Box<Error>>, CacheStatus) {
297+
let (result, cache_status) = self.inner.get_stale(key);
298+
if let Some(result) = result {
299+
let stale_duration = cache_status.stale();
300+
if stale_duration.unwrap_or(Duration::ZERO) <= stale_ttl {
301+
return (Ok(result), cache_status);
302+
}
303+
}
304+
let (res, status) = self.get(key, ttl, extra).await;
305+
(res, status)
306+
}
307+
}
308+
309+
impl<K, T, CB, S> RTCache<K, T, CB, S>
310+
where
311+
K: Hash + Clone + Send + Sync,
312+
T: Clone + Send + Sync + 'static,
313+
S: Clone + Send + Sync,
314+
CB: Lookup<K, T, S> + Sync + Send,
315+
{
316+
/// Similar to [get_stale], but when it returns the stale value, it also initiates a lookup
317+
/// in the background in order to refresh the value.
318+
///
319+
/// Note that this function requires the [RTCache] to be static, which can be done by wrapping
320+
/// it with something like [once_cell::sync::Lazy].
321+
pub async fn get_stale_while_update(
322+
&'static self,
323+
key: &K,
324+
ttl: Option<Duration>,
325+
extra: Option<&S>,
326+
stale_ttl: Duration,
327+
) -> (Result<T, Box<Error>>, CacheStatus) {
328+
let (result, cache_status) = self.get_stale(key, ttl, extra, stale_ttl).await;
329+
let key = key.clone();
330+
let extra = extra.cloned();
331+
if cache_status.stale().is_some() {
332+
tokio::spawn(async move {
333+
let _ = self.get(&key, ttl, extra.as_ref()).await;
334+
});
335+
}
336+
(result, cache_status)
337+
}
286338
}
287339

288340
impl<K, T, CB, S> RTCache<K, T, CB, S>
@@ -686,4 +738,67 @@ mod tests {
686738
.await
687739
.unwrap();
688740
}
741+
742+
#[tokio::test]
743+
async fn test_get_stale() {
744+
let ttl = Some(Duration::from_millis(100));
745+
let cache: RTCache<i32, i32, TestCB, ExtraOpt> = RTCache::new(10, None, None);
746+
let opt = Some(ExtraOpt {
747+
error: false,
748+
empty: false,
749+
delay_for: None,
750+
used: Arc::new(AtomicI32::new(0)),
751+
});
752+
let (res, hit) = cache.get(&1, ttl, opt.as_ref()).await;
753+
assert_eq!(res.unwrap(), 1);
754+
assert_eq!(hit, CacheStatus::Miss);
755+
let (res, hit) = cache.get(&1, ttl, opt.as_ref()).await;
756+
assert_eq!(res.unwrap(), 1);
757+
assert_eq!(hit, CacheStatus::Hit);
758+
tokio::time::sleep(Duration::from_millis(150)).await;
759+
let (res, hit) = cache
760+
.get_stale(&1, ttl, opt.as_ref(), Duration::from_millis(1000))
761+
.await;
762+
assert_eq!(res.unwrap(), 1);
763+
assert!(hit.stale().is_some());
764+
765+
let (res, hit) = cache
766+
.get_stale(&1, ttl, opt.as_ref(), Duration::from_millis(30))
767+
.await;
768+
assert_eq!(res.unwrap(), 2);
769+
assert_eq!(hit, CacheStatus::Expired);
770+
}
771+
772+
#[tokio::test]
773+
async fn test_get_stale_while_update() {
774+
use once_cell::sync::Lazy;
775+
let ttl = Some(Duration::from_millis(100));
776+
static CACHE: Lazy<RTCache<i32, i32, TestCB, ExtraOpt>> =
777+
Lazy::new(|| RTCache::new(10, None, None));
778+
let opt = Some(ExtraOpt {
779+
error: false,
780+
empty: false,
781+
delay_for: None,
782+
used: Arc::new(AtomicI32::new(0)),
783+
});
784+
let (res, hit) = CACHE.get(&1, ttl, opt.as_ref()).await;
785+
assert_eq!(res.unwrap(), 1);
786+
assert_eq!(hit, CacheStatus::Miss);
787+
let (res, hit) = CACHE.get(&1, ttl, opt.as_ref()).await;
788+
assert_eq!(res.unwrap(), 1);
789+
assert_eq!(hit, CacheStatus::Hit);
790+
tokio::time::sleep(Duration::from_millis(150)).await;
791+
let (res, hit) = CACHE
792+
.get_stale_while_update(&1, ttl, opt.as_ref(), Duration::from_millis(1000))
793+
.await;
794+
assert_eq!(res.unwrap(), 1);
795+
assert!(hit.stale().is_some());
796+
797+
// allow the background lookup to finish
798+
tokio::time::sleep(Duration::from_millis(10)).await;
799+
800+
let (res, hit) = CACHE.get(&1, ttl, opt.as_ref()).await;
801+
assert_eq!(res.unwrap(), 2);
802+
assert_eq!(hit, CacheStatus::Hit);
803+
}
689804
}

0 commit comments

Comments
 (0)