Skip to content

Commit 1f458e5

Browse files
committed
Add diskcache, for caching chunks on local SSD in queriers.
- mmap a large file, treat it as a series of 2KB buckets. - Use FNV hash to place key and chunk in buckets. - Use existing memcached tests
1 parent ad65b8b commit 1f458e5

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed

pkg/chunk/cache/diskcache.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package cache
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"flag"
7+
"fmt"
8+
"hash/fnv"
9+
"os"
10+
"sync"
11+
12+
"github.com/pkg/errors"
13+
"github.com/prometheus/tsdb/fileutil"
14+
"golang.org/x/sys/unix"
15+
)
16+
17+
// TODO: in the future we could cuckoo hash or linear probe.
18+
19+
// Buckets contain key (~50), chunks (1024) and their metadata (~100)
20+
const bucketSize = 2048
21+
22+
// DiskcacheConfig for the Disk cache.
23+
type DiskcacheConfig struct {
24+
Path string
25+
Size int
26+
}
27+
28+
// RegisterFlags adds the flags required to config this to the given FlagSet
29+
func (cfg *DiskcacheConfig) RegisterFlags(f *flag.FlagSet) {
30+
f.StringVar(&cfg.Path, "diskcache.path", "/var/run/chunks", "Path to file used to cache chunks.")
31+
f.IntVar(&cfg.Size, "diskcache.size", 1024*1024*1024, "Size of file (bytes)")
32+
}
33+
34+
// Diskcache is an on-disk chunk cache.
35+
type Diskcache struct {
36+
mtx sync.RWMutex
37+
f *os.File
38+
buckets uint32
39+
buf []byte
40+
}
41+
42+
// NewDiskcache creates a new on-disk cache.
43+
func NewDiskcache(cfg DiskcacheConfig) (*Diskcache, error) {
44+
f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE, 0644)
45+
if err != nil {
46+
return nil, errors.Wrap(err, "open")
47+
}
48+
49+
if err := fileutil.Preallocate(f, int64(cfg.Size), true); err != nil {
50+
return nil, errors.Wrap(err, "preallocate")
51+
}
52+
53+
info, err := f.Stat()
54+
if err != nil {
55+
return nil, errors.Wrap(err, "stat")
56+
}
57+
58+
buf, err := unix.Mmap(int(f.Fd()), 0, int(info.Size()), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
59+
if err != nil {
60+
f.Close()
61+
return nil, err
62+
}
63+
64+
buckets := len(buf) / bucketSize
65+
66+
return &Diskcache{
67+
f: f,
68+
buf: buf,
69+
buckets: uint32(buckets),
70+
}, nil
71+
}
72+
73+
// Stop closes the file.
74+
func (d *Diskcache) Stop() error {
75+
if err := unix.Munmap(d.buf); err != nil {
76+
return err
77+
}
78+
return d.f.Close()
79+
}
80+
81+
// FetchChunkData get chunks from the cache.
82+
func (d *Diskcache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) {
83+
for _, key := range keys {
84+
buf, ok := d.fetch(key)
85+
if ok {
86+
found = append(found, key)
87+
bufs = append(bufs, buf)
88+
}
89+
}
90+
return
91+
}
92+
93+
func (d *Diskcache) fetch(key string) ([]byte, bool) {
94+
d.mtx.RLock()
95+
defer d.mtx.RUnlock()
96+
97+
bucket := hash(key) % d.buckets
98+
buf := d.buf[bucket*bucketSize : (bucket+1)*bucketSize]
99+
100+
existingKey, n, ok := get(buf, 0)
101+
if !ok || string(existingKey) != key {
102+
return nil, false
103+
}
104+
105+
existingValue, _, ok := get(buf, n)
106+
if !ok {
107+
return nil, false
108+
}
109+
110+
result := make([]byte, len(existingValue), len(existingValue))
111+
copy(result, existingValue)
112+
return result, true
113+
}
114+
115+
// StoreChunk puts a chunk into the cache.
116+
func (d *Diskcache) StoreChunk(ctx context.Context, key string, value []byte) error {
117+
d.mtx.Lock()
118+
defer d.mtx.Unlock()
119+
120+
bucket := hash(key) % d.buckets
121+
buf := d.buf[bucket*bucketSize : (bucket+1)*bucketSize]
122+
123+
n, err := put([]byte(key), buf, 0)
124+
if err != nil {
125+
return err
126+
}
127+
128+
_, err = put(value, buf, n)
129+
if err != nil {
130+
return err
131+
}
132+
133+
return nil
134+
}
135+
136+
func put(value []byte, buf []byte, n int) (int, error) {
137+
if len(value)+n+4 > len(buf) {
138+
return 0, errors.Wrap(fmt.Errorf("value too big: %d > %d", len(value), len(buf)), "put")
139+
}
140+
m := binary.PutUvarint(buf[n:], uint64(len(value)))
141+
copy(buf[n+m:], value)
142+
return len(value) + n + m, nil
143+
}
144+
145+
func get(buf []byte, n int) ([]byte, int, bool) {
146+
size, m := binary.Uvarint(buf[n:])
147+
end := n + m + int(size)
148+
if end > len(buf) {
149+
return nil, 0, false
150+
}
151+
return buf[n+m : end], end, true
152+
}
153+
154+
func hash(key string) uint32 {
155+
h := fnv.New32()
156+
h.Write([]byte(key))
157+
return h.Sum32()
158+
}

0 commit comments

Comments
 (0)