Skip to content

Commit e74c504

Browse files
tomwilkiebboreham
authored andcommitted
Diskcache: querier-local SSD backed chunk cache (#685)
* Refactor memcache(d) into chunk package. - Caches don't know about chunk anymore, its all just []byte. - Deal with empty memcached host by returning a noopCache, not special casing nil client. - Make background writes a cache 'middleware'. - s/memcache/memcached/. - Refactor tests. * Add diskcache, for caching chunks on local SSD in queriers. - mmap a large file, treat it as a series of 2KB buckets. - Use FNV hash to place key and chunk in buckets. - Use existing memcached tests * Add instrumentation middleware for caches. * Add tests for background and tiered cache. * Ensure the background goroutines have flushed before checking the cache for entries. * Write chunks found in lower-tier caches back to the upper tier. * Query diskcache first.
1 parent 646ec88 commit e74c504

15 files changed

+938
-363
lines changed

pkg/chunk/cache/background.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"flag"
6+
"sync"
7+
8+
"github.com/go-kit/kit/log/level"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/weaveworks/cortex/pkg/util"
11+
)
12+
13+
var (
14+
droppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{
15+
Namespace: "cortex",
16+
Name: "cache_dropped_background_writes_total",
17+
Help: "Total count of dropped write backs to cache.",
18+
})
19+
queueLength = prometheus.NewGauge(prometheus.GaugeOpts{
20+
Namespace: "cortex",
21+
Name: "cache_background_queue_length",
22+
Help: "Length of the cache background write queue.",
23+
})
24+
)
25+
26+
func init() {
27+
prometheus.MustRegister(droppedWriteBack)
28+
prometheus.MustRegister(queueLength)
29+
}
30+
31+
// BackgroundConfig is config for a Background Cache.
32+
type BackgroundConfig struct {
33+
WriteBackGoroutines int
34+
WriteBackBuffer int
35+
}
36+
37+
// RegisterFlags adds the flags required to config this to the given FlagSet.
38+
func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) {
39+
f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.")
40+
f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.")
41+
}
42+
43+
type backgroundCache struct {
44+
Cache
45+
46+
wg sync.WaitGroup
47+
quit chan struct{}
48+
bgWrites chan backgroundWrite
49+
}
50+
51+
type backgroundWrite struct {
52+
key string
53+
buf []byte
54+
}
55+
56+
// NewBackground returns a new Cache that does stores on background goroutines.
57+
func NewBackground(cfg BackgroundConfig, cache Cache) Cache {
58+
c := &backgroundCache{
59+
Cache: cache,
60+
quit: make(chan struct{}),
61+
bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer),
62+
}
63+
64+
c.wg.Add(cfg.WriteBackGoroutines)
65+
for i := 0; i < cfg.WriteBackGoroutines; i++ {
66+
go c.writeBackLoop()
67+
}
68+
69+
return c
70+
}
71+
72+
// Stop the background flushing goroutines.
73+
func (c *backgroundCache) Stop() error {
74+
close(c.quit)
75+
c.wg.Wait()
76+
77+
return c.Cache.Stop()
78+
}
79+
80+
// StoreChunk writes chunks for the cache in the background.
81+
func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error {
82+
bgWrite := backgroundWrite{
83+
key: key,
84+
buf: buf,
85+
}
86+
select {
87+
case c.bgWrites <- bgWrite:
88+
queueLength.Inc()
89+
default:
90+
droppedWriteBack.Inc()
91+
}
92+
return nil
93+
}
94+
95+
func (c *backgroundCache) writeBackLoop() {
96+
defer c.wg.Done()
97+
98+
for {
99+
select {
100+
case bgWrite, ok := <-c.bgWrites:
101+
if !ok {
102+
return
103+
}
104+
queueLength.Dec()
105+
err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf)
106+
if err != nil {
107+
level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err)
108+
}
109+
case <-c.quit:
110+
return
111+
}
112+
}
113+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package cache
2+
3+
func Flush(c Cache) {
4+
b := c.(*backgroundCache)
5+
close(b.bgWrites)
6+
b.wg.Wait()
7+
}

pkg/chunk/cache/background_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package cache_test
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
8+
"github.com/weaveworks/cortex/pkg/chunk/cache"
9+
)
10+
11+
type mockCache struct {
12+
sync.Mutex
13+
cache map[string][]byte
14+
}
15+
16+
func (m *mockCache) StoreChunk(_ context.Context, key string, buf []byte) error {
17+
m.Lock()
18+
defer m.Unlock()
19+
m.cache[key] = buf
20+
return nil
21+
}
22+
23+
func (m *mockCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
24+
m.Lock()
25+
defer m.Unlock()
26+
for _, key := range keys {
27+
buf, ok := m.cache[key]
28+
if ok {
29+
found = append(found, key)
30+
bufs = append(bufs, buf)
31+
} else {
32+
missing = append(missing, key)
33+
}
34+
}
35+
return
36+
}
37+
38+
func (m *mockCache) Stop() error {
39+
return nil
40+
}
41+
42+
func newMockCache() cache.Cache {
43+
return &mockCache{
44+
cache: map[string][]byte{},
45+
}
46+
}
47+
48+
func TestBackground(t *testing.T) {
49+
c := cache.NewBackground(cache.BackgroundConfig{
50+
WriteBackGoroutines: 1,
51+
WriteBackBuffer: 100,
52+
}, newMockCache())
53+
54+
keys, chunks := fillCache(t, c)
55+
cache.Flush(c)
56+
57+
testCacheSingle(t, c, keys, chunks)
58+
testCacheMultiple(t, c, keys, chunks)
59+
testCacheMiss(t, c)
60+
}

pkg/chunk/cache/cache.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"flag"
6+
)
7+
8+
// Cache byte arrays by key.
9+
type Cache interface {
10+
StoreChunk(ctx context.Context, key string, buf []byte) error
11+
FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)
12+
Stop() error
13+
}
14+
15+
// Config for building Caches.
16+
type Config struct {
17+
EnableDiskcache bool
18+
19+
background BackgroundConfig
20+
memcache MemcachedConfig
21+
memcacheClient MemcachedClientConfig
22+
diskcache DiskcacheConfig
23+
}
24+
25+
// RegisterFlags adds the flags required to config this to the given FlagSet.
26+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
27+
f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache")
28+
29+
cfg.memcache.RegisterFlags(f)
30+
cfg.memcacheClient.RegisterFlags(f)
31+
cfg.diskcache.RegisterFlags(f)
32+
}
33+
34+
// New creates a new Cache using Config.
35+
func New(cfg Config) (Cache, error) {
36+
caches := []Cache{}
37+
38+
if cfg.EnableDiskcache {
39+
cache, err := NewDiskcache(cfg.diskcache)
40+
if err != nil {
41+
return nil, err
42+
}
43+
caches = append(caches, instrument("diskcache", cache))
44+
}
45+
46+
if cfg.memcacheClient.Host != "" {
47+
client := newMemcachedClient(cfg.memcacheClient)
48+
cache := NewMemcached(cfg.memcache, client)
49+
caches = append(caches, instrument("memcache", cache))
50+
}
51+
52+
var cache Cache = tiered(caches)
53+
if len(caches) > 1 {
54+
cache = instrument("tiered", cache)
55+
}
56+
57+
cache = NewBackground(cfg.background, cache)
58+
return cache, nil
59+
}

pkg/chunk/cache/cache_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package cache_test
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"os"
7+
"path"
8+
"strconv"
9+
"testing"
10+
11+
"github.com/prometheus/common/model"
12+
"github.com/stretchr/testify/require"
13+
"github.com/weaveworks/cortex/pkg/chunk"
14+
"github.com/weaveworks/cortex/pkg/chunk/cache"
15+
prom_chunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
16+
)
17+
18+
func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) {
19+
const (
20+
userID = "1"
21+
chunkLen = 13 * 3600 // in seconds
22+
)
23+
24+
// put 100 chunks from 0 to 99
25+
keys := []string{}
26+
chunks := []chunk.Chunk{}
27+
for i := 0; i < 100; i++ {
28+
ts := model.TimeFromUnix(int64(i * chunkLen))
29+
promChunk, _ := prom_chunk.New().Add(model.SamplePair{
30+
Timestamp: ts,
31+
Value: model.SampleValue(i),
32+
})
33+
c := chunk.NewChunk(
34+
userID,
35+
model.Fingerprint(1),
36+
model.Metric{
37+
model.MetricNameLabel: "foo",
38+
"bar": "baz",
39+
},
40+
promChunk[0],
41+
ts,
42+
ts.Add(chunkLen),
43+
)
44+
45+
buf, err := c.Encode()
46+
require.NoError(t, err)
47+
48+
key := c.ExternalKey()
49+
err = cache.StoreChunk(context.Background(), key, buf)
50+
require.NoError(t, err)
51+
52+
keys = append(keys, key)
53+
chunks = append(chunks, c)
54+
}
55+
56+
return keys, chunks
57+
}
58+
59+
func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) {
60+
for i := 0; i < 100; i++ {
61+
index := rand.Intn(len(keys))
62+
key := keys[index]
63+
64+
found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), []string{key})
65+
require.NoError(t, err)
66+
require.Len(t, found, 1)
67+
require.Len(t, bufs, 1)
68+
require.Len(t, missingKeys, 0)
69+
70+
foundChunks, missing, err := chunk.ProcessCacheResponse([]chunk.Chunk{chunks[index]}, found, bufs)
71+
require.NoError(t, err)
72+
require.Empty(t, missing)
73+
require.Equal(t, chunks[index], foundChunks[0])
74+
}
75+
}
76+
77+
func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) {
78+
// test getting them all
79+
found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), keys)
80+
require.NoError(t, err)
81+
require.Len(t, found, len(keys))
82+
require.Len(t, bufs, len(keys))
83+
require.Len(t, missingKeys, 0)
84+
85+
foundChunks, missing, err := chunk.ProcessCacheResponse(chunks, found, bufs)
86+
require.NoError(t, err)
87+
require.Empty(t, missing)
88+
require.Equal(t, chunks, foundChunks)
89+
}
90+
91+
func testCacheMiss(t *testing.T, cache cache.Cache) {
92+
for i := 0; i < 100; i++ {
93+
key := strconv.Itoa(rand.Int())
94+
found, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{key})
95+
require.NoError(t, err)
96+
require.Empty(t, found)
97+
require.Empty(t, bufs)
98+
require.Len(t, missing, 1)
99+
}
100+
}
101+
102+
func testCache(t *testing.T, cache cache.Cache) {
103+
keys, chunks := fillCache(t, cache)
104+
testCacheSingle(t, cache, keys, chunks)
105+
testCacheMultiple(t, cache, keys, chunks)
106+
testCacheMiss(t, cache)
107+
}
108+
109+
func TestMemcache(t *testing.T) {
110+
cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache())
111+
testCache(t, cache)
112+
}
113+
114+
func TestDiskcache(t *testing.T) {
115+
dirname := os.TempDir()
116+
filename := path.Join(dirname, "diskcache")
117+
defer os.RemoveAll(filename)
118+
119+
cache, err := cache.NewDiskcache(cache.DiskcacheConfig{
120+
Path: filename,
121+
Size: 100 * 1024 * 1024,
122+
})
123+
require.NoError(t, err)
124+
testCache(t, cache)
125+
}

0 commit comments

Comments
 (0)