Skip to content

Cache index queries for up to 15mins #947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions pkg/chunk/cache/fifo_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package cache

import (
"context"
"sync"
"time"

ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
cacheEntriesAdded = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "added_total",
Help: "The total number of Put calls on the cache",
}, []string{"cache"})

cacheEntriesAddedNew = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "added_new_total",
Help: "The total number of new entries added to the cache",
}, []string{"cache"})

cacheEntriesEvicted = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "evicted_total",
Help: "The total number of evicted entries",
}, []string{"cache"})

cacheTotalGets = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "gets_total",
Help: "The total number of Get calls",
}, []string{"cache"})

cacheTotalMisses = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "misses_total",
Help: "The total number of Get calls that had no valid entry",
}, []string{"cache"})

cacheStaleGets = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "stale_gets_total",
Help: "The total number of Get calls that had an entry which expired",
}, []string{"cache"})
)

// FifoCache is a simple string -> interface{} cache which uses a fifo slide to
// manage evictions. O(1) inserts and updates, O(1) gets.
type FifoCache struct {
lock sync.RWMutex
size int
validity time.Duration
entries []cacheEntry
index map[string]int

// indexes into entries to identify the most recent and least recent entry.
first, last int

name string
entriesAdded prometheus.Counter
entriesAddedNew prometheus.Counter
entriesEvicted prometheus.Counter
totalGets prometheus.Counter
totalMisses prometheus.Counter
staleGets prometheus.Counter
}

type cacheEntry struct {
updated time.Time
key string
value interface{}
prev, next int
}

// NewFifoCache returns a new initialised FifoCache of size.
func NewFifoCache(name string, size int, validity time.Duration) *FifoCache {
return &FifoCache{
size: size,
validity: validity,
entries: make([]cacheEntry, 0, size),
index: make(map[string]int, size),

name: name,
entriesAdded: cacheEntriesAdded.WithLabelValues(name),
entriesAddedNew: cacheEntriesAddedNew.WithLabelValues(name),
entriesEvicted: cacheEntriesEvicted.WithLabelValues(name),
totalGets: cacheTotalGets.WithLabelValues(name),
totalMisses: cacheTotalMisses.WithLabelValues(name),
staleGets: cacheStaleGets.WithLabelValues(name),
}
}

// Put stores the value against the key.
func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) {
span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put")
defer span.Finish()

c.entriesAdded.Inc()
if c.size == 0 {
return
}

c.lock.Lock()
defer c.lock.Unlock()

// See if we already have the entry
index, ok := c.index[key]
if ok {
entry := c.entries[index]

entry.updated = time.Now()
entry.value = value

// Remove this entry from the FIFO linked-list.
c.entries[entry.prev].next = entry.next
c.entries[entry.next].prev = entry.prev

// Insert it at the beginning
entry.next = c.first
entry.prev = c.last
c.entries[entry.next].prev = index
c.entries[entry.prev].next = index
c.first = index

c.entries[index] = entry
return
}
c.entriesAddedNew.Inc()

// Otherwise, see if we need to evict an entry.
if len(c.entries) >= c.size {
c.entriesEvicted.Inc()
index = c.last
entry := c.entries[index]

c.last = entry.prev
c.first = index
delete(c.index, entry.key)
c.index[key] = index

entry.updated = time.Now()
entry.value = value
entry.key = key
c.entries[index] = entry
return
}

// Finally, no hit and we have space.
index = len(c.entries)
c.entries = append(c.entries, cacheEntry{
updated: time.Now(),
key: key,
value: value,
prev: c.last,
next: c.first,
})
c.entries[c.first].prev = index
c.entries[c.last].next = index
c.first = index
c.index[key] = index
}

// Get returns the stored value against the key and when the key was last updated.
func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-get")
defer span.Finish()

c.totalGets.Inc()
if c.size == 0 {
return nil, false
}

c.lock.RLock()
defer c.lock.RUnlock()

index, ok := c.index[key]
if ok {
updated := c.entries[index].updated
if time.Now().Sub(updated) < c.validity {
span.LogFields(otlog.Bool("hit", true))
return c.entries[index].value, true
}

c.totalMisses.Inc()
c.staleGets.Inc()
span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", true))
return nil, false
}

span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", false))
c.totalMisses.Inc()
return nil, false
}
39 changes: 29 additions & 10 deletions pkg/chunk/fifo_cache_test.go → pkg/chunk/cache/fifo_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package chunk
package cache

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -12,56 +14,73 @@ const size = 10
const overwrite = 5

func TestFifoCache(t *testing.T) {
c := newFifoCache(size)
c := NewFifoCache("test", size, 1*time.Minute)
ctx := context.Background()

// Check put / get works
for i := 0; i < size; i++ {
c.put(strconv.Itoa(i), i)
c.Put(ctx, strconv.Itoa(i), i)
//c.print()
}
require.Len(t, c.index, size)
require.Len(t, c.entries, size)

for i := 0; i < size; i++ {
value, _, ok := c.get(strconv.Itoa(i))
value, ok := c.Get(ctx, strconv.Itoa(i))
require.True(t, ok)
require.Equal(t, i, value.(int))
}

// Check evictions
for i := size; i < size+overwrite; i++ {
c.put(strconv.Itoa(i), i)
c.Put(ctx, strconv.Itoa(i), i)
//c.print()
}
require.Len(t, c.index, size)
require.Len(t, c.entries, size)

for i := 0; i < size-overwrite; i++ {
_, _, ok := c.get(strconv.Itoa(i))
_, ok := c.Get(ctx, strconv.Itoa(i))
require.False(t, ok)
}
for i := size; i < size+overwrite; i++ {
value, _, ok := c.get(strconv.Itoa(i))
value, ok := c.Get(ctx, strconv.Itoa(i))
require.True(t, ok)
require.Equal(t, i, value.(int))
}

// Check updates work
for i := size; i < size+overwrite; i++ {
c.put(strconv.Itoa(i), i*2)
c.Put(ctx, strconv.Itoa(i), i*2)
//c.print()
}
require.Len(t, c.index, size)
require.Len(t, c.entries, size)

for i := size; i < size+overwrite; i++ {
value, _, ok := c.get(strconv.Itoa(i))
value, ok := c.Get(ctx, strconv.Itoa(i))
require.True(t, ok)
require.Equal(t, i*2, value.(int))
}
}

func (c *fifoCache) print() {
func TestFifoCacheExpiry(t *testing.T) {
c := NewFifoCache("test", size, 5*time.Millisecond)
ctx := context.Background()

c.Put(ctx, "0", 0)

value, ok := c.Get(ctx, "0")
require.True(t, ok)
require.Equal(t, 0, value.(int))

// Expire the entry.
time.Sleep(5 * time.Millisecond)
_, ok = c.Get(ctx, strconv.Itoa(0))
require.False(t, ok)
}

func (c *FifoCache) print() {
fmt.Println("first", c.first, "last", c.last)
for i, entry := range c.entries {
fmt.Printf(" %d -> key: %s, value: %v, next: %d, prev: %d\n", i, entry.key, entry.value, entry.next, entry.prev)
Expand Down
Loading