Skip to content

Cache index writes (and change some flag names) #1024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/chunk/cache/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type BackgroundConfig struct {
WriteBackBuffer int
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.")
f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.")
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) {
f.IntVar(&cfg.WriteBackGoroutines, prefix+"memcache.write-back-goroutines", 10, description+"How many goroutines to use to write back to memcache.")
f.IntVar(&cfg.WriteBackBuffer, prefix+"memcache.write-back-buffer", 10000, description+"How many chunks to buffer for background write back.")
}

type backgroundCache struct {
Expand Down
50 changes: 40 additions & 10 deletions pkg/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
"context"
"flag"
"time"
)

// Cache byte arrays by key.
Expand All @@ -15,24 +16,36 @@ type Cache interface {
// Config for building Caches.
type Config struct {
EnableDiskcache bool
EnableFifoCache bool

DefaultValidity time.Duration

background BackgroundConfig
memcache MemcachedConfig
memcacheClient MemcachedClientConfig
diskcache DiskcacheConfig
fifocache FifoCacheConfig

// This is to name the cache metrics properly.
prefix string

// For tests to inject specific implementations.
Cache Cache
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache")
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) {
cfg.background.RegisterFlagsWithPrefix(prefix, description, f)
cfg.memcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.memcacheClient.RegisterFlagsWithPrefix(prefix, description, f)
cfg.diskcache.RegisterFlagsWithPrefix(prefix, description, f)
cfg.fifocache.RegisterFlagsWithPrefix(prefix, description, f)

f.BoolVar(&cfg.EnableDiskcache, prefix+"cache.enable-diskcache", false, description+"Enable on-disk cache.")
f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.")
f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", 0, description+"The default validity of entries for caches unless overridden.")

cfg.background.RegisterFlags(f)
cfg.memcache.RegisterFlags(f)
cfg.memcacheClient.RegisterFlags(f)
cfg.diskcache.RegisterFlags(f)
cfg.prefix = prefix
}

// New creates a new Cache using Config.
Expand All @@ -43,23 +56,40 @@ func New(cfg Config) (Cache, error) {

caches := []Cache{}

if cfg.EnableFifoCache {
if cfg.fifocache.Validity == 0 && cfg.DefaultValidity != 0 {
cfg.fifocache.Validity = cfg.DefaultValidity
}

cache := NewFifoCache(cfg.prefix+"fifocache", cfg.fifocache)
caches = append(caches, Instrument(cfg.prefix+"fifocache", cache))
}

if cfg.EnableDiskcache {
cache, err := NewDiskcache(cfg.diskcache)
if err != nil {
return nil, err
}
caches = append(caches, NewBackground("diskcache", cfg.background, Instrument("diskcache", cache)))

cacheName := cfg.prefix + "diskcache"
caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache)))
}

if cfg.memcacheClient.Host != "" {
if cfg.memcache.Expiration == 0 && cfg.DefaultValidity != 0 {
cfg.memcache.Expiration = cfg.DefaultValidity
}

client := NewMemcachedClient(cfg.memcacheClient)
cache := NewMemcached(cfg.memcache, client)
caches = append(caches, NewBackground("memcache", cfg.background, Instrument("memcache", cache)))

cacheName := cfg.prefix + "memcache"
caches = append(caches, NewBackground(cacheName, cfg.background, Instrument(cacheName, cache)))
}

cache := NewTiered(caches)
if len(caches) > 1 {
cache = Instrument("tiered", cache)
cache = Instrument(cfg.prefix+"tiered", cache)
}
return cache, nil
}
2 changes: 1 addition & 1 deletion pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestDiskcache(t *testing.T) {
}

func TestFifoCache(t *testing.T) {
cache := cache.NewFifoCache("test", 1e3, 1*time.Hour)
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 1e3, Validity: 1 * time.Hour})
testCache(t, cache)
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/chunk/cache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ type DiskcacheConfig struct {

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *DiskcacheConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Path, "diskcache.path", "/var/run/chunks", "Path to file used to cache chunks.")
f.IntVar(&cfg.Size, "diskcache.size", 1024*1024*1024, "Size of file (bytes)")
cfg.RegisterFlagsWithPrefix("", "", f)
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *DiskcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.Path, prefix+"diskcache.path", "/var/run/chunks", description+"Path to file used to cache chunks.")
f.IntVar(&cfg.Size, prefix+"diskcache.size", 1024*1024*1024, description+"Size of file (bytes)")
}

// Diskcache is an on-disk chunk cache.
Expand Down
26 changes: 19 additions & 7 deletions pkg/chunk/cache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"flag"
"sync"
"time"

Expand Down Expand Up @@ -53,6 +54,18 @@ var (
}, []string{"cache"})
)

// FifoCacheConfig holds config for the FifoCache.
type FifoCacheConfig struct {
Size int
Validity time.Duration
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.IntVar(&cfg.Size, prefix+"fifocache.size", 0, description+"The number of entries to cache.")
f.DurationVar(&cfg.Validity, prefix+"fifocache.duration", 0, description+"The expiry duration for the cache.")
}

// FifoCache is a simple string -> interface{} cache which uses a fifo slide to
// manage evictions. O(1) inserts and updates, O(1) gets.
type FifoCache struct {
Expand Down Expand Up @@ -82,12 +95,12 @@ type cacheEntry struct {
}

// NewFifoCache returns a new initialised FifoCache of size.
func NewFifoCache(name string, size int, validity time.Duration) *FifoCache {
func NewFifoCache(name string, cfg FifoCacheConfig) *FifoCache {
return &FifoCache{
size: size,
validity: validity,
entries: make([]cacheEntry, 0, size),
index: make(map[string]int, size),
size: cfg.Size,
validity: cfg.Validity,
entries: make([]cacheEntry, 0, cfg.Size),
index: make(map[string]int, cfg.Size),

name: name,
entriesAdded: cacheEntriesAdded.WithLabelValues(name),
Expand Down Expand Up @@ -216,8 +229,7 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
index, ok := c.index[key]
if ok {
updated := c.entries[index].updated
if time.Now().Sub(updated) < c.validity {

if c.validity == 0 || time.Now().Sub(updated) < c.validity {
return c.entries[index].value, true
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/cache/fifo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const size = 10
const overwrite = 5

func TestFifoCache(t *testing.T) {
c := NewFifoCache("test", size, 1*time.Minute)
c := NewFifoCache("test", FifoCacheConfig{Size: size, Validity: 1 * time.Minute})
ctx := context.Background()

// Check put / get works
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFifoCache(t *testing.T) {
}

func TestFifoCacheExpiry(t *testing.T) {
c := NewFifoCache("test", size, 5*time.Millisecond)
c := NewFifoCache("test", FifoCacheConfig{Size: size, Validity: 5 * time.Millisecond})
ctx := context.Background()

c.Put(ctx, []string{"0"}, []interface{}{0})
Expand Down
10 changes: 5 additions & 5 deletions pkg/chunk/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ type MemcachedConfig struct {
Parallelism int
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long keys stay in the memcache.")
f.IntVar(&cfg.BatchSize, "memcached.batchsize", 0, "How many keys to fetch in each batch.")
f.IntVar(&cfg.Parallelism, "memcached.parallelism", 100, "Maximum active requests to memcache.")
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.DurationVar(&cfg.Expiration, prefix+"memcached.expiration", 0, description+"How long keys stay in the memcache.")
f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 0, description+"How many keys to fetch in each batch.")
f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 100, description+"Maximum active requests to memcache.")
}

// Memcached type caches chunks in memcached
Expand Down
23 changes: 5 additions & 18 deletions pkg/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,12 @@ type MemcachedClientConfig struct {
UpdateInterval time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) {
cfg.registerFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.registerFlagsWithPrefix(prefix, f)
}

func (cfg *MemcachedClientConfig) registerFlagsWithPrefix(prefix string, f *flag.FlagSet) {
if prefix != "" {
prefix = prefix + "."
}

f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", "SRV service used to discover memcache servers.")
f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.")
func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", description+"Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", description+"SRV service used to discover memcache servers.")
f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on memcached requests.")
f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, description+"Period with which to poll DNS for memcache servers.")
}

// NewMemcachedClient creates a new MemcacheClient that gets its server list
Expand Down
18 changes: 11 additions & 7 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
Expand All @@ -22,7 +23,7 @@ import (
)

var (
indexEntriesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
indexEntriesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "chunk_store_index_entries_per_chunk",
Help: "Number of entries written to storage per chunk.",
Expand All @@ -37,33 +38,36 @@ var (
},
HashBuckets: 1024,
})
cacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{
cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "cache_corrupt_chunks_total",
Help: "Total count of corrupt chunks found in cache.",
})
)

func init() {
prometheus.MustRegister(indexEntriesPerChunk)
prometheus.MustRegister(rowWrites)
prometheus.MustRegister(cacheCorrupt)
}

// StoreConfig specifies config for a ChunkStore
type StoreConfig struct {
CacheConfig cache.Config
ChunkCacheConfig cache.Config

MinChunkAge time.Duration
QueryChunkLimit int
CardinalityCacheSize int
CardinalityCacheValidity time.Duration
CardinalityLimit int

WriteDedupeCacheConfig cache.Config
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.CacheConfig.RegisterFlags(f)
cfg.ChunkCacheConfig.RegisterFlagsWithPrefix("", "Cache config for chunks. ", f)

cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f)

f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.")
Expand All @@ -81,7 +85,7 @@ type store struct {
}

func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.CacheConfig, storage)
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, storage)
if err != nil {
return nil, err
}
Expand Down
Loading