Skip to content

Store chunks in DynamoDB #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 18, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
365 changes: 311 additions & 54 deletions pkg/chunk/aws_storage_client.go

Large diffs are not rendered by default.

70 changes: 2 additions & 68 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,46 +109,14 @@ func (c *Store) Put(ctx context.Context, chunks []Chunk) error {
keys = append(keys, chunks[i].externalKey())
}

err = c.putChunks(ctx, keys, bufs)
err = c.storage.PutChunks(ctx, chunks, keys, bufs)
if err != nil {
return err
}

return c.updateIndex(ctx, userID, chunks)
}

// putChunks writes a collection of chunks to S3 in parallel.
func (c *Store) putChunks(ctx context.Context, keys []string, bufs [][]byte) error {
incomingErrors := make(chan error)
for i := range bufs {
go func(i int) {
incomingErrors <- c.putChunk(ctx, keys[i], bufs[i])
}(i)
}

var lastErr error
for range keys {
err := <-incomingErrors
if err != nil {
lastErr = err
}
}
return lastErr
}

// putChunk puts a chunk into S3.
func (c *Store) putChunk(ctx context.Context, key string, buf []byte) error {
err := c.storage.PutChunk(ctx, key, buf)
if err != nil {
return err
}

if err := c.cache.StoreChunk(ctx, key, buf); err != nil {
log.Warnf("Could not store %v in chunk cache: %v", key, err)
}
return nil
}

func (c *Store) updateIndex(ctx context.Context, userID string, chunks []Chunk) error {
writeReqs, err := c.calculateDynamoWrites(userID, chunks)
if err != nil {
Expand Down Expand Up @@ -223,7 +191,7 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers .
log.Warnf("Error fetching from cache: %v", err)
}

fromS3, err := c.fetchChunkData(ctx, missing)
fromS3, err := c.storage.GetChunks(ctx, missing)
if err != nil {
return nil, promql.ErrStorage(err)
}
Expand Down Expand Up @@ -480,40 +448,6 @@ func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []Index
return unique(chunkSet), nil
}

func (c *Store) fetchChunkData(ctx context.Context, chunkSet []Chunk) ([]Chunk, error) {
incomingChunks := make(chan Chunk)
incomingErrors := make(chan error)
for _, chunk := range chunkSet {
go func(chunk Chunk) {
buf, err := c.storage.GetChunk(ctx, chunk.externalKey())
if err != nil {
incomingErrors <- err
return
}
if err := chunk.decode(buf); err != nil {
incomingErrors <- err
return
}
incomingChunks <- chunk
}(chunk)
}

chunks := []Chunk{}
errors := []error{}
for i := 0; i < len(chunkSet); i++ {
select {
case chunk := <-incomingChunks:
chunks = append(chunks, chunk)
case err := <-incomingErrors:
errors = append(errors, err)
}
}
if len(errors) > 0 {
return nil, errors[0]
}
return chunks, nil
}

func (c *Store) writeBackCache(_ context.Context, chunks []Chunk) error {
for i := range chunks {
encoded, err := chunks[i].encode()
Expand Down
29 changes: 19 additions & 10 deletions pkg/chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,26 +229,35 @@ func (m *MockStorage) QueryPages(_ context.Context, query IndexQuery, callback f
return nil
}

// PutChunk implements S3Client.
func (m *MockStorage) PutChunk(_ context.Context, key string, buf []byte) error {
// PutChunks implements StorageClient.
func (m *MockStorage) PutChunks(_ context.Context, _ []Chunk, keys []string, bufs [][]byte) error {
m.mtx.Lock()
defer m.mtx.Unlock()

m.objects[key] = buf
for i := range keys {
m.objects[keys[i]] = bufs[i]
}
return nil
}

// GetChunk implements S3Client.
func (m *MockStorage) GetChunk(_ context.Context, key string) ([]byte, error) {
// GetChunks implements StorageClient.
func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

buf, ok := m.objects[key]
if !ok {
return nil, fmt.Errorf("%v not found", key)
result := []Chunk{}
for _, chunk := range chunkSet {
key := chunk.externalKey()
buf, ok := m.objects[key]
if !ok {
return nil, fmt.Errorf("%v not found", key)
}
if err := chunk.decode(buf); err != nil {
return nil, err
}
result = append(result, chunk)
}

return buf, nil
return result, nil
}

type mockWriteBatch []struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type IndexQuery struct {
RangeValuePrefix []byte
RangeValueStart []byte

// Used when fetching chunks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to say why we need it when fetching chunks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old code I should have removed, sorry!

RangeValue []byte

// Filters for querying
ValueEqual []byte
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type StorageClient interface {
QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error

// For storing and retrieving chunks.
PutChunk(ctx context.Context, key string, data []byte) error
GetChunk(ctx context.Context, key string) ([]byte, error)
PutChunks(ctx context.Context, chunks []Chunk, keys []string, data [][]byte) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the constraints on how chunks, keys, and data relate to each other? Can we pick better types (e.g. map[string]Chunk) to make this less easy to mess up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list must be the same length, and the ordering within them must be consistent.

You could probably consider this a micro optimization, as the chunk can generate the key and the buffer. I'll see if I can factor it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have managed to tidy this up.

GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this interface. You already have chunks (chunks []Chunk) and this returns the same thing. Why are the inputs & outputs the same types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunks you're passing in are "empty", in that they just describe what to fetch.

I could separate out the parsed chunk ID (ChunkDescriptor) from the chunk itself, and embed on in another. Then it could take a ChunkDescriptor and return a Chunk. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

}

// WriteBatch represents a batch of writes.
Expand Down
74 changes: 65 additions & 9 deletions pkg/chunk/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/weaveworks/common/instrument"
Expand Down Expand Up @@ -47,6 +48,7 @@ type TableManagerConfig struct {
DynamoDBPollInterval time.Duration

PeriodicTableConfig
PeriodicChunkTableConfig

// duration a table will be created before it is needed.
CreationGracePeriod time.Duration
Expand All @@ -55,6 +57,11 @@ type TableManagerConfig struct {
ProvisionedReadThroughput int64
InactiveWriteThroughput int64
InactiveReadThroughput int64

ChunkTableProvisionedWriteThroughput int64
ChunkTableProvisionedReadThroughput int64
ChunkTableInactiveWriteThroughput int64
ChunkTableInactiveReadThroughput int64
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -66,8 +73,13 @@ func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet) {
f.Int64Var(&cfg.ProvisionedReadThroughput, "dynamodb.periodic-table.read-throughput", 300, "DynamoDB periodic tables read throughput")
f.Int64Var(&cfg.InactiveWriteThroughput, "dynamodb.periodic-table.inactive-write-throughput", 1, "DynamoDB periodic tables write throughput for inactive tables.")
f.Int64Var(&cfg.InactiveReadThroughput, "dynamodb.periodic-table.inactive-read-throughput", 300, "DynamoDB periodic tables read throughput for inactive tables")
f.Int64Var(&cfg.ChunkTableProvisionedWriteThroughput, "dynamodb.chunk-table.write-throughput", 3000, "DynamoDB chunk tables write throughput")
f.Int64Var(&cfg.ChunkTableProvisionedReadThroughput, "dynamodb.chunk-table.read-throughput", 300, "DynamoDB chunk tables read throughput")
f.Int64Var(&cfg.ChunkTableInactiveWriteThroughput, "dynamodb.chunk-table.inactive-write-throughput", 1, "DynamoDB chunk tables write throughput for inactive tables.")
f.Int64Var(&cfg.ChunkTableInactiveReadThroughput, "dynamodb.chunk-table.inactive-read-throughput", 300, "DynamoDB chunk tables read throughput for inactive tables")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these flags be on PeriodicChunkTableConfig? Better yet, could these be implemented in such a way that we don't have duplication with the periodic table config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these flags be on PeriodicChunkTableConfig?

I followed the pattern we used for PeriodicTableConfig, where only the flags that need to be shared are actually put in the shared struct.

Better yet, could these be implemented in such a way that we don't have duplication with the periodic table config?

Eventually I want Cortex to self-tune its provisioned throughput, but for now the chunk table will need different levels as other tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.


cfg.PeriodicTableConfig.RegisterFlags(f)
cfg.PeriodicChunkTableConfig.RegisterFlags(f)
}

// PeriodicTableConfig for the use of periodic tables (ie, weekly tables). Can
Expand All @@ -90,6 +102,20 @@ func (cfg *PeriodicTableConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.PeriodicTableStartAt, "dynamodb.periodic-table.start", "DynamoDB periodic tables start time.")
}

// PeriodicChunkTableConfig contains the various parameters for the chunk table.
type PeriodicChunkTableConfig struct {
ChunkTableFrom util.DayValue
ChunkTablePrefix string
ChunkTablePeriod time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *PeriodicChunkTableConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.ChunkTableFrom, "dynamodb.chunk-table.from", "Date after which to write chunks to DynamoDB.")
f.StringVar(&cfg.ChunkTablePrefix, "dynamodb.chunk-table.prefix", "cortex_chunks_", "DynamoDB table prefix for period chunk tables.")
f.DurationVar(&cfg.ChunkTablePeriod, "dynamodb.chunk-table.period", 7*24*time.Hour, "DynamoDB chunk tables period.")
}

// TableManager creates and manages the provisioned throughput on DynamoDB tables
type TableManager struct {
client TableClient
Expand Down Expand Up @@ -191,7 +217,6 @@ func (m *TableManager) calculateExpectedTables() []tableDescription {
gracePeriodSecs = int64(m.cfg.CreationGracePeriod / time.Second)
maxChunkAgeSecs = int64(m.cfg.MaxChunkAge / time.Second)
firstTable = m.cfg.PeriodicTableStartAt.Unix() / tablePeriodSecs
lastTable = (mtime.Now().Unix() + gracePeriodSecs) / tablePeriodSecs
now = mtime.Now().Unix()
)

Expand All @@ -211,23 +236,54 @@ func (m *TableManager) calculateExpectedTables() []tableDescription {
result = append(result, legacyTable)
}

result = append(result, periodicTables(
m.cfg.TablePrefix, m.cfg.PeriodicTableStartAt.Time, m.cfg.TablePeriod,
m.cfg.CreationGracePeriod, m.cfg.MaxChunkAge,
m.cfg.ProvisionedReadThroughput, m.cfg.ProvisionedWriteThroughput,
m.cfg.InactiveReadThroughput, m.cfg.InactiveWriteThroughput,
)...)

if m.cfg.ChunkTableFrom.IsSet() {
result = append(result, periodicTables(
m.cfg.ChunkTablePrefix, m.cfg.ChunkTableFrom.Time, m.cfg.ChunkTablePeriod,
m.cfg.CreationGracePeriod, m.cfg.MaxChunkAge,
m.cfg.ChunkTableProvisionedReadThroughput, m.cfg.ChunkTableProvisionedWriteThroughput,
m.cfg.ChunkTableInactiveReadThroughput, m.cfg.ChunkTableInactiveWriteThroughput,
)...)
}

sort.Sort(byName(result))
return result
}

func periodicTables(
prefix string, start model.Time, period, beginGrace, endGrace time.Duration,
activeRead, activeWrite, inactiveRead, inactiveWrite int64,
) []tableDescription {
var (
periodSecs = int64(period / time.Second)
beginGraceSecs = int64(beginGrace / time.Second)
endGraceSecs = int64(endGrace / time.Second)
firstTable = start.Unix() / periodSecs
lastTable = (mtime.Now().Unix() + beginGraceSecs) / periodSecs
now = mtime.Now().Unix()
result = []tableDescription{}
)
for i := firstTable; i <= lastTable; i++ {
table := tableDescription{
// Name construction needs to be consistent with chunk_store.bigBuckets
name: m.cfg.TablePrefix + strconv.Itoa(int(i)),
provisionedRead: m.cfg.InactiveReadThroughput,
provisionedWrite: m.cfg.InactiveWriteThroughput,
name: prefix + strconv.Itoa(int(i)),
provisionedRead: inactiveRead,
provisionedWrite: inactiveWrite,
}

// if now is within table [start - grace, end + grace), then we need some write throughput
if (i*tablePeriodSecs)-gracePeriodSecs <= now && now < (i*tablePeriodSecs)+tablePeriodSecs+gracePeriodSecs+maxChunkAgeSecs {
table.provisionedRead = m.cfg.ProvisionedReadThroughput
table.provisionedWrite = m.cfg.ProvisionedWriteThroughput
if (i*periodSecs)-beginGraceSecs <= now && now < (i*periodSecs)+periodSecs+endGraceSecs {
table.provisionedRead = activeRead
table.provisionedWrite = activeWrite
}
result = append(result, table)
}

sort.Sort(byName(result))
return result
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/prometheus/common/log:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/github.com/prometheus/prometheus/promql:go_default_library",
"//vendor/github.com/prometheus/prometheus/storage/local/chunk:go_default_library",
"//vendor/github.com/prometheus/prometheus/storage/metric:go_default_library",
"//vendor/github.com/weaveworks/common/user:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"client.go",
"cortex.pb.go",
"dep.go",
],
visibility = ["//visibility:public"],
deps = [
Expand Down