Skip to content

WIP Flush series in priority order; where priority is oldest first. #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 118 additions & 70 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ var (
// Ingester deals with "in flight" chunks.
// Its like MemorySeriesStorage, but simpler.
type Ingester struct {
cfg Config
chunkStore cortex.Store
stopLock sync.RWMutex
stopped bool
quit chan struct{}
done chan struct{}
flushSeriesLimiter cortex.Semaphore
cfg Config
chunkStore cortex.Store
stopLock sync.RWMutex
stopped bool
quit chan struct{}
done sync.WaitGroup

userStateLock sync.Mutex
userState map[string]*userState

flushQueue *priorityQueue

ingestedSamples prometheus.Counter
discardedSamples *prometheus.CounterVec
chunkUtilization prometheus.Histogram
Expand All @@ -78,10 +79,11 @@ type Ingester struct {

// Config configures an Ingester.
type Config struct {
FlushCheckPeriod time.Duration
MaxChunkAge time.Duration
RateUpdatePeriod time.Duration
Ring *ring.Ring
FlushCheckPeriod time.Duration
MaxChunkAge time.Duration
RateUpdatePeriod time.Duration
Ring *ring.Ring
ConcurrentFlushes int
}

// UserStats models ingestion statistics for one user.
Expand All @@ -99,6 +101,21 @@ type userState struct {
ingestedSamples *ewmaRate
}

type flushOp struct {
from model.Time
userID string
fp model.Fingerprint
immediate bool
}

func (o *flushOp) Key() string {
return fmt.Sprintf("%s-%d-%v", o.userID, o.fp, o.immediate)
}

func (o *flushOp) Priority() int64 {
return int64(o.from)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewer's note: 21b1b4c, pushed straight to master, changes this to -int64(o.from)

}

// New constructs a new Ingester.
func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
if cfg.FlushCheckPeriod == 0 {
Expand All @@ -110,16 +127,19 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
if cfg.RateUpdatePeriod == 0 {
cfg.RateUpdatePeriod = 15 * time.Second
}
if cfg.ConcurrentFlushes <= 0 {
cfg.ConcurrentFlushes = 25
}

i := &Ingester{
cfg: cfg,
chunkStore: chunkStore,
quit: make(chan struct{}),
done: make(chan struct{}),
flushSeriesLimiter: cortex.NewSemaphore(maxConcurrentFlushSeries),
cfg: cfg,
chunkStore: chunkStore,
quit: make(chan struct{}),

userState: map[string]*userState{},

flushQueue: newPriorityQueue(),

ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_ingested_samples_total",
Help: "The total number of samples ingested.",
Expand Down Expand Up @@ -164,6 +184,12 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
}),
}

i.done.Add(cfg.ConcurrentFlushes)
for j := 0; j < cfg.ConcurrentFlushes; j++ {
go i.flushLoop()
}

i.done.Add(1)
go i.loop()
return i, nil
}
Expand Down Expand Up @@ -400,15 +426,23 @@ func (i *Ingester) Stop() {
i.stopped = true
i.stopLock.Unlock()

// Closing i.quit triggers i.loop() to exit; i.loop() exiting
// will trigger i.flushLoop()s to exit.
close(i.quit)
<-i.done

i.done.Wait()
}

func (i *Ingester) loop() {
defer func() {
i.flushAllUsers(true)
close(i.done)
log.Infof("Ingester exited gracefully")

// We close flush queue here to ensure the flushLoops pick
// up all the flushes triggered by the last run
i.flushQueue.Close()

log.Infof("Ingester.loop() exited gracefully")
i.done.Done()
}()

flushTick := time.Tick(i.cfg.FlushCheckPeriod)
Expand All @@ -426,9 +460,6 @@ func (i *Ingester) loop() {
}

func (i *Ingester) flushAllUsers(immediate bool) {
log.Infof("Flushing chunks... (exiting: %v)", immediate)
defer log.Infof("Done flushing chunks.")

if i.chunkStore == nil {
return
}
Expand All @@ -440,21 +471,12 @@ func (i *Ingester) flushAllUsers(immediate bool) {
}
i.userStateLock.Unlock()

var wg sync.WaitGroup
for _, userID := range userIDs {
wg.Add(1)
go func(userID string) {
i.flushUser(userID, immediate)
wg.Done()
}(userID)
i.flushUser(userID, immediate)
}
wg.Wait()
}

func (i *Ingester) flushUser(userID string, immediate bool) {
log.Infof("Flushing user %s...", userID)
defer log.Infof("Done flushing user %s.", userID)

i.userStateLock.Lock()
userState, ok := i.userState[userID]
i.userStateLock.Unlock()
Expand All @@ -464,8 +486,9 @@ func (i *Ingester) flushUser(userID string, immediate bool) {
return
}

ctx := user.WithID(context.Background(), userID)
i.flushAllSeries(ctx, userState, immediate)
for pair := range userState.fpToSeries.iter() {
i.flushSeries(userState, pair.fp, pair.series, immediate)
}

// TODO: this is probably slow, and could be done in a better way.
i.userStateLock.Lock()
Expand All @@ -475,55 +498,80 @@ func (i *Ingester) flushUser(userID string, immediate bool) {
i.userStateLock.Unlock()
}

func (i *Ingester) flushAllSeries(ctx context.Context, state *userState, immediate bool) {
var wg sync.WaitGroup
for pair := range state.fpToSeries.iter() {
wg.Add(1)
i.flushSeriesLimiter.Acquire()
go func(pair fingerprintSeriesPair) {
if err := i.flushSeries(ctx, state, pair.fp, pair.series, immediate); err != nil {
log.Errorf("Failed to flush chunks for series: %v", err)
}
i.flushSeriesLimiter.Release()
wg.Done()
}(pair)
}
wg.Wait()
}

func (i *Ingester) flushSeries(ctx context.Context, u *userState, fp model.Fingerprint, series *memorySeries, immediate bool) error {
func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memorySeries, immediate bool) {
u.fpLocker.Lock(fp)

// Decide what chunks to flush
if immediate || time.Now().Sub(series.head().FirstTime().Time()) > i.cfg.MaxChunkAge {
firstTime := series.head().FirstTime()
if immediate || model.Now().Sub(series.head().FirstTime()) > i.cfg.MaxChunkAge {
series.headChunkClosed = true
series.head().MaybePopulateLastTime()
}
chunks := series.chunkDescs
chunks := len(series.chunkDescs)
if !series.headChunkClosed {
chunks = chunks[:len(chunks)-1]
chunks--
}
u.fpLocker.Unlock(fp)
if len(chunks) == 0 {
return nil
}

// flush the chunks without locking the series
if err := i.flushChunks(ctx, fp, series.metric, chunks); err != nil {
i.chunkStoreFailures.Add(float64(len(chunks)))
return err
if chunks == 0 {
return
}

// now remove the chunks
u.fpLocker.Lock(fp)
series.chunkDescs = series.chunkDescs[len(chunks):]
i.memoryChunks.Sub(float64(len(chunks)))
if len(series.chunkDescs) == 0 {
u.fpToSeries.del(fp)
u.index.delete(series.metric, fp)
i.flushQueue.Enqueue(&flushOp{firstTime, u.userID, fp, immediate})
}

func (i *Ingester) flushLoop() {
defer func() {
log.Info("Ingester.flushLoop() exited")
i.done.Done()
}()

for {
o := i.flushQueue.Dequeue()
if o == nil {
return
}
op := o.(*flushOp)

// get the user
i.userStateLock.Lock()
userState, ok := i.userState[op.userID]
i.userStateLock.Unlock()
if !ok {
continue
}
ctx := user.WithID(context.Background(), op.userID)

// Decide what chunks to flush
series, ok := userState.fpToSeries.get(op.fp)
if !ok {
continue
}

userState.fpLocker.Lock(op.fp)
chunks := series.chunkDescs
if !series.headChunkClosed {
chunks = chunks[1:]
}
userState.fpLocker.Unlock(op.fp)

// flush the chunks without locking the series
if err := i.flushChunks(ctx, op.fp, series.metric, chunks); err != nil {
log.Errorf("Failed to flush chunks: %v", err)
i.chunkStoreFailures.Add(float64(len(chunks)))
continue
}

// now remove the chunks
userState.fpLocker.Lock(op.fp)
series.chunkDescs = series.chunkDescs[len(chunks):]
i.memoryChunks.Sub(float64(len(chunks)))
if len(series.chunkDescs) == 0 {
userState.fpToSeries.del(op.fp)
userState.index.delete(series.metric, op.fp)
}
userState.fpLocker.Unlock(op.fp)
}
u.fpLocker.Unlock(fp)
return nil
}

func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric model.Metric, chunks []*prom_chunk.Desc) error {
Expand Down
89 changes: 89 additions & 0 deletions ingester/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package ingester

import (
"container/heap"
"sync"
)

type priorityQueue struct {
lock sync.Mutex
cond *sync.Cond
closed bool
hit map[string]struct{}
queue queue
}

type op interface {
Key() string
Priority() int64
}

type queue []op

func (q queue) Len() int { return len(q) }
func (q queue) Less(i, j int) bool { return q[i].Priority() < q[j].Priority() }
func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }

// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
func (q *queue) Push(x interface{}) {
*q = append(*q, x.(op))
}

func (q *queue) Pop() interface{} {
old := *q
n := len(old)
x := old[n-1]
*q = old[0 : n-1]
return x
}

func newPriorityQueue() *priorityQueue {
pq := &priorityQueue{
hit: map[string]struct{}{},
}
pq.cond = sync.NewCond(&pq.lock)
heap.Init(&pq.queue)
return pq
}

func (pq *priorityQueue) Close() {
pq.lock.Lock()
defer pq.lock.Unlock()
pq.closed = true
pq.cond.Broadcast()
}

func (pq *priorityQueue) Enqueue(op op) {
pq.lock.Lock()
defer pq.lock.Unlock()

if pq.closed {
panic("enqueue on closed queue")
}

_, enqueued := pq.hit[op.Key()]
if enqueued {
return
}

heap.Push(&pq.queue, op)
pq.cond.Broadcast()
}

func (pq *priorityQueue) Dequeue() op {
pq.lock.Lock()
defer pq.lock.Unlock()

for len(pq.queue) == 0 && !pq.closed {
pq.cond.Wait()
}

if len(pq.queue) == 0 && pq.closed {
return nil
}

op := heap.Pop(&pq.queue).(op)
delete(pq.hit, op.Key())
return op
}