Skip to content

Add support for calculating and returning user stats #75

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ type cfg struct {
memcachedExpiration time.Duration
memcachedService string
remoteTimeout time.Duration
flushPeriod time.Duration
maxChunkAge time.Duration
numTokens int
logSuccess bool

ingesterConfig ingester.Config
distributorConfig cortex.DistributorConfig
}

Expand All @@ -95,8 +94,9 @@ func main() {
flag.DurationVar(&cfg.memcachedExpiration, "memcached.expiration", 0, "How long chunks stay in the memcache.")
flag.StringVar(&cfg.memcachedService, "memcached.service", "memcached", "SRV service used to discover memcache servers.")
flag.DurationVar(&cfg.remoteTimeout, "remote.timeout", 5*time.Second, "Timeout for downstream ingesters.")
flag.DurationVar(&cfg.flushPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
flag.DurationVar(&cfg.maxChunkAge, "ingester.max-chunk-age", 10*time.Minute, "Maximum chunk age before flushing.")
flag.DurationVar(&cfg.ingesterConfig.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
flag.DurationVar(&cfg.ingesterConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
flag.DurationVar(&cfg.ingesterConfig.MaxChunkAge, "ingester.max-chunk-age", 10*time.Minute, "Maximum chunk age before flushing.")
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
Expand Down Expand Up @@ -137,11 +137,7 @@ func main() {
log.Fatalf("Could not register ingester: %v", err)
}
defer registration.Unregister()
ingesterCfg := ingester.Config{
FlushCheckPeriod: cfg.flushPeriod,
MaxChunkAge: cfg.maxChunkAge,
}
ing := setupIngester(chunkStore, ingesterCfg, cfg.logSuccess)
ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess)
defer ing.Stop()
default:
log.Fatalf("Mode %s not supported!", cfg.mode)
Expand Down Expand Up @@ -241,6 +237,8 @@ func setupQuerier(
api.Register(router.WithPrefix(prefix + "/api/v1"))
http.Handle("/", router)

http.Handle(prefix+"/user_stats", instrument(logSuccess, cortex.DistributorUserStatsHandler(distributor.UserStats)))

http.Handle(prefix+"/graph", instrument(logSuccess, ui.GraphHandler()))
http.Handle(prefix+"/static/", instrument(logSuccess, ui.StaticAssetsHandler(prefix+"/static/")))
}
Expand All @@ -259,6 +257,7 @@ func setupIngester(
http.Handle("/push", instrument(logSuccess, cortex.AppenderHandler(ingester)))
http.Handle("/query", instrument(logSuccess, cortex.QueryHandler(ingester)))
http.Handle("/label_values", instrument(logSuccess, cortex.LabelValuesHandler(ingester)))
http.Handle("/user_stats", instrument(logSuccess, cortex.IngesterUserStatsHandler(ingester.UserStats)))
return ingester
}

Expand Down
69 changes: 42 additions & 27 deletions cortex.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ message LabelValuesRequest {
message LabelValuesResponse {
repeated string label_values = 1;
}

message UserStatsResponse {
double ingestion_rate = 1;
uint64 num_series = 2;
}
23 changes: 23 additions & 0 deletions distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/weaveworks/scope/common/instrument"
"golang.org/x/net/context"

"github.com/weaveworks/cortex/ingester"
"github.com/weaveworks/cortex/ring"
"github.com/weaveworks/cortex/user"
)
Expand Down Expand Up @@ -361,6 +362,28 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod
return values, nil
}

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error) {
totalStats := &ingester.UserStats{}
for _, c := range d.cfg.Ring.GetAll(d.cfg.HeartbeatTimeout) {
client, err := d.getClientFor(c.Hostname)
if err != nil {
return nil, err
}
stats, err := client.UserStats(ctx)
if err != nil {
return nil, err
}
totalStats.IngestionRate += stats.IngestionRate
totalStats.NumSeries += stats.NumSeries
}

totalStats.IngestionRate /= float64(d.cfg.ReplicationFactor)
totalStats.NumSeries /= uint64(d.cfg.ReplicationFactor)

return totalStats, nil
}

// NeedsThrottling implements SampleAppender.
func (*Distributor) NeedsThrottling(_ context.Context) bool {
return false
Expand Down
59 changes: 48 additions & 11 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,22 @@ type Ingester struct {
type Config struct {
FlushCheckPeriod time.Duration
MaxChunkAge time.Duration
RateUpdatePeriod time.Duration
}

// UserStats models ingestion statistics for one user.
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
NumSeries uint64 `json:"numSeries"`
}

type userState struct {
userID string
fpLocker *fingerprintLocker
fpToSeries *seriesMap
mapper *fpMapper
index *invertedIndex
userID string
fpLocker *fingerprintLocker
fpToSeries *seriesMap
mapper *fpMapper
index *invertedIndex
ingestedSamples *ewmaRate
}

// New constructs a new Ingester.
Expand All @@ -95,6 +103,9 @@ func New(cfg Config, chunkStore cortex.Store) (*Ingester, error) {
if cfg.MaxChunkAge == 0 {
cfg.MaxChunkAge = 10 * time.Minute
}
if cfg.RateUpdatePeriod == 0 {
cfg.RateUpdatePeriod = 15 * time.Second
}

i := &Ingester{
cfg: cfg,
Expand Down Expand Up @@ -154,10 +165,11 @@ func (i *Ingester) getStateFor(ctx context.Context) (*userState, error) {
state, ok := i.userState[userID]
if !ok {
state = &userState{
userID: userID,
fpToSeries: newSeriesMap(),
fpLocker: newFingerprintLocker(16),
index: newInvertedIndex(),
userID: userID,
fpToSeries: newSeriesMap(),
fpLocker: newFingerprintLocker(16),
index: newInvertedIndex(),
ingestedSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
}
state.mapper = newFPMapper(state.fpToSeries)
i.userState[userID] = state
Expand Down Expand Up @@ -233,6 +245,7 @@ func (i *Ingester) append(ctx context.Context, sample *model.Sample) error {
if err == nil {
// TODO: Track append failures too (unlikely to happen).
i.ingestedSamples.Inc()
state.ingestedSamples.inc()
}
return err
}
Expand Down Expand Up @@ -354,6 +367,18 @@ func (i *Ingester) LabelValuesForLabelName(ctx context.Context, name model.Label
return state.index.lookupLabelValues(name), nil
}

// UserStats returns ingestion statistics for the current user.
func (i *Ingester) UserStats(ctx context.Context) (*UserStats, error) {
state, err := i.getStateFor(ctx)
if err != nil {
return nil, err
}
return &UserStats{
IngestionRate: state.ingestedSamples.rate(),
NumSeries: uint64(state.fpToSeries.length()),
}, nil
}

// Stop stops the Ingester.
func (i *Ingester) Stop() {
i.stopLock.Lock()
Expand All @@ -371,11 +396,14 @@ func (i *Ingester) loop() {
log.Infof("Ingester exited gracefully")
}()

tick := time.Tick(i.cfg.FlushCheckPeriod)
flushTick := time.Tick(i.cfg.FlushCheckPeriod)
rateUpdateTick := time.Tick(i.cfg.RateUpdatePeriod)
for {
select {
case <-tick:
case <-flushTick:
i.flushAllUsers(false)
case <-rateUpdateTick:
i.updateRates()
case <-i.quit:
return
}
Expand Down Expand Up @@ -505,6 +533,15 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric
return i.chunkStore.Put(ctx, wireChunks)
}

func (i *Ingester) updateRates() {
i.userStateLock.Lock()
defer i.userStateLock.Unlock()

for _, u := range i.userState {
u.ingestedSamples.tick()
}
}

// Describe implements prometheus.Collector.
func (i *Ingester) Describe(ch chan<- *prometheus.Desc) {
ch <- memorySeriesDesc
Expand Down
53 changes: 53 additions & 0 deletions ingester/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ingester

import (
"sync"
"sync/atomic"
"time"
)

// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
type ewmaRate struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's "ewma"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry - exponentially weighted moving average (https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments to the type and its methods.

newEvents int64
alpha float64
interval time.Duration
lastRate float64
init bool
mutex sync.Mutex
}

func newEWMARate(alpha float64, interval time.Duration) *ewmaRate {
return &ewmaRate{
alpha: alpha,
interval: interval,
}
}

// rate returns the per-second rate.
func (r *ewmaRate) rate() float64 {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.lastRate
}

// tick assumes to be called every r.interval.
func (r *ewmaRate) tick() {
newEvents := atomic.LoadInt64(&r.newEvents)
atomic.AddInt64(&r.newEvents, -newEvents)
instantRate := float64(newEvents) / r.interval.Seconds()

r.mutex.Lock()
defer r.mutex.Unlock()

if r.init {
r.lastRate += r.alpha * (instantRate - r.lastRate)
} else {
r.init = true
r.lastRate = instantRate
}
}

// inc counts one event.
func (r *ewmaRate) inc() {
atomic.AddInt64(&r.newEvents, 1)
}
36 changes: 36 additions & 0 deletions ingester/rate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ingester

import (
"testing"
"time"
)

func TestRate(t *testing.T) {
ticks := []struct {
events int
want float64
}{
{60, 1},
{30, 0.9},
{0, 0.72},
{60, 0.776},
{0, 0.6208},
{0, 0.49664},
{0, 0.397312},
{0, 0.3178496},
{0, 0.25427968},
{0, 0.20342374400000002},
{0, 0.16273899520000001},
}
r := newEWMARate(0.2, time.Minute)

for i, tick := range ticks {
for e := 0; e < tick.events; e++ {
r.inc()
}
r.tick()
if r.rate() != tick.want {
t.Fatalf("%d. unexpected rate: want %v, got %v", i, tick.want, r.rate())
}
}
}
Loading