Skip to content

Add per-token state, and use it to not write to Leaving ingesters #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func main() {
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
flag.IntVar(&cfg.distributorConfig.MinWriteSuccesses, "distributor.min-write-successes", 2, "The minimum number of ingesters to which a write must succeed.")
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests")
flag.Parse()
Expand Down Expand Up @@ -125,10 +124,16 @@ func main() {
// network errors.
log.Fatalf("Could not register ingester: %v", err)
}
defer registration.Unregister()
prometheus.MustRegister(registration)
ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess)
defer ing.Stop()

// Deferring a func to make ordering obvious
defer func() {
registration.ChangeState(ring.Leaving)
ing.Stop()
registration.Unregister()
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this defer to before the prometheus metric registration? I know it's extremely unlikely that MustRegister will fail, but if it does, we will register the ingester without ever unregistering it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!


prometheus.MustRegister(registration)
default:
log.Fatalf("Mode %s not supported!", cfg.mode)
}
Expand Down
2 changes: 1 addition & 1 deletion cortex-build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.6.3
FROM golang:1.7.3
RUN apt-get update && apt-get install -y python-requests python-yaml file jq && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN go clean -i net && \
Expand Down
85 changes: 63 additions & 22 deletions distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"hash/fnv"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -46,7 +47,8 @@ type Distributor struct {
type ReadRing interface {
prometheus.Collector

Get(key uint32, n int) ([]ring.IngesterDesc, error)
Get(key uint32, n int, op ring.Operation) ([]ring.IngesterDesc, error)
BatchGet(keys []uint32, n int, op ring.Operation) ([][]ring.IngesterDesc, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. If this is a ReadRing, why do we have to specify operation? Won't it always be read? If not, what does it mean to write to a ReadRing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we're always reading from the ring, but the operation says what that info is for - so we read from the ring when writing samples and when processing queries. The way we handle leaving node depends on why we're asking...

GetAll() []ring.IngesterDesc
}

Expand All @@ -61,7 +63,6 @@ type DistributorConfig struct {

ReplicationFactor int
MinReadSuccesses int
MinWriteSuccesses int
HeartbeatTimeout time.Duration
}

Expand All @@ -70,9 +71,6 @@ func NewDistributor(cfg DistributorConfig) (*Distributor, error) {
if 0 > cfg.ReplicationFactor {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
if cfg.MinWriteSuccesses > cfg.ReplicationFactor {
return nil, fmt.Errorf("MinWriteSuccesses > ReplicationFactor: %d > %d", cfg.MinWriteSuccesses, cfg.ReplicationFactor)
}
if cfg.MinReadSuccesses > cfg.ReplicationFactor {
return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor)
}
Expand Down Expand Up @@ -154,6 +152,12 @@ func tokenFor(userID string, name model.LabelValue) uint32 {
return h.Sum32()
}

type sampleTracker struct {
sample *model.Sample
minSuccess int
succeeded int32
}

// Append implements SampleAppender.
func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error {
userID, err := user.GetID(ctx)
Expand All @@ -163,53 +167,90 @@ func (d *Distributor) Append(ctx context.Context, samples []*model.Sample) error

d.receivedSamples.Add(float64(len(samples)))

samplesByIngester := map[string][]*model.Sample{}
for _, sample := range samples {
key := tokenForMetric(userID, sample.Metric)
ingesters, err := d.cfg.Ring.Get(key, d.cfg.ReplicationFactor)
if err != nil {
return err
keys := make([]uint32, len(samples), len(samples))
for i, sample := range samples {
keys[i] = tokenForMetric(userID, sample.Metric)
}

ingesters, err := d.cfg.Ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write)
if err != nil {
return err
}

sampleTrackers := make([]sampleTracker, len(samples), len(samples))
samplesByIngester := map[string][]*sampleTracker{}
for i := range samples {
sampleTrackers[i] = sampleTracker{
sample: samples[i],
// We need a response from a quorum of ingesters, which is n/2 + 1.
minSuccess: (len(ingesters[i]) / 2) + 1,
succeeded: 0,
}
for _, ingester := range ingesters {
otherSamples := samplesByIngester[ingester.Hostname]
samplesByIngester[ingester.Hostname] = append(otherSamples, sample)

// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed ingesters
// will cause the whole write to fail.
liveIngesters := make([]string, 0, len(ingesters[i]))
for _, ingester := range ingesters[i] {
if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout {
liveIngesters = append(liveIngesters, ingester.Hostname)
}
}

// This is just a shortcut - if there are not minSuccess available ingesters,
// after filtering out dead ones, don't even both trying.
if len(liveIngesters) < sampleTrackers[i].minSuccess {
return fmt.Errorf("wanted at least %d live ingesters to process write, had %d",
sampleTrackers[i].minSuccess, len(liveIngesters))
}

for _, liveIngester := range liveIngesters {
sampleForIngester := samplesByIngester[liveIngester]
samplesByIngester[liveIngester] = append(sampleForIngester, &sampleTrackers[i])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling this for loop could be exploded into a couple of different things that, collectively, were easier to understand. No good ideas right now though.

}

errs := make(chan error)
for hostname, samples := range samplesByIngester {
go func(hostname string, samples []*model.Sample) {
go func(hostname string, samples []*sampleTracker) {
errs <- d.sendSamples(ctx, hostname, samples)
}(hostname, samples)
}
var lastErr error
successes := 0
for i := 0; i < len(samplesByIngester); i++ {
if err := <-errs; err != nil {
lastErr = err
continue
}
successes++
}

if successes < d.cfg.MinWriteSuccesses {
return fmt.Errorf("too few successful writes, last error was: %v", lastErr)
for i := range sampleTrackers {
if sampleTrackers[i].succeeded < int32(sampleTrackers[i].minSuccess) {
return fmt.Errorf("need %d successful writes, only got %d, last error was: %v",
sampleTrackers[i].minSuccess, sampleTrackers[i].succeeded, lastErr)
}
}
return nil
}

func (d *Distributor) sendSamples(ctx context.Context, hostname string, samples []*model.Sample) error {
func (d *Distributor) sendSamples(ctx context.Context, hostname string, sampleTrackers []*sampleTracker) error {
client, err := d.getClientFor(hostname)
if err != nil {
return err
}
samples := make([]*model.Sample, len(sampleTrackers), len(sampleTrackers))
for i := range sampleTrackers {
samples[i] = sampleTrackers[i].sample
}
err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error {
return client.Append(ctx, samples)
})
if err != nil {
d.ingesterAppendFailures.WithLabelValues(hostname).Inc()
}
d.ingesterAppends.WithLabelValues(hostname).Inc()
for i := range sampleTrackers {
atomic.AddInt32(&sampleTrackers[i].succeeded, 1)
}
return err
}

Expand Down Expand Up @@ -241,7 +282,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
return err
}

ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor)
ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read)
if err != nil {
return err
}
Expand Down
32 changes: 28 additions & 4 deletions ring/ingester_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type IngesterRegistration struct {
quit chan struct{}
wait sync.WaitGroup

// We need to remember the token state just in case consul goes away and comes
// back empty. Channel is used to tell the actor to update consul on state changes.
state TokenState
stateChange chan TokenState

consulHeartbeats prometheus.Counter
}

Expand All @@ -56,6 +61,10 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In
hostname: fmt.Sprintf("%s:%d", addr, listenPort),
quit: make(chan struct{}),

// Only read/written on actor goroutine.
state: Active,
stateChange: make(chan TokenState),

consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_consul_heartbeats_total",
Help: "The total number of heartbeats sent to consul.",
Expand All @@ -67,6 +76,13 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In
return r, nil
}

// ChangeState changes the state of all tokens owned by this
// ingester in the ring.
func (r *IngesterRegistration) ChangeState(state TokenState) {
log.Info("Changing token state to: %v", state)
r.stateChange <- state
}

// Unregister removes ingester config from Consul; will block
// until we'll successfully unregistered.
func (r *IngesterRegistration) Unregister() {
Expand Down Expand Up @@ -108,7 +124,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 {
tokens = append(tokens, newTokens...)
}

ringDesc.addIngester(r.id, r.hostname, tokens)
ringDesc.addIngester(r.id, r.hostname, tokens, r.state)
return ringDesc, true, nil
}
if err := r.consul.CAS(consulKey, descFactory, pickTokens); err != nil {
Expand All @@ -118,7 +134,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 {
}

func (r *IngesterRegistration) heartbeat(tokens []uint32) {
heartbeat := func(in interface{}) (out interface{}, retry bool, err error) {
updateConsul := func(in interface{}) (out interface{}, retry bool, err error) {
var ringDesc *Desc
if in == nil {
ringDesc = newDesc()
Expand All @@ -130,21 +146,29 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
if !ok {
// consul must have restarted
log.Infof("Found empty ring, inserting tokens!")
ringDesc.addIngester(r.id, r.hostname, tokens)
ringDesc.addIngester(r.id, r.hostname, tokens, r.state)
} else {
ingesterDesc.Timestamp = time.Now()
ringDesc.Ingesters[r.id] = ingesterDesc
for i := range ringDesc.Tokens {
ringDesc.Tokens[i].State = r.state
}
}

return ringDesc, true, nil
}

ticker := time.NewTicker(heartbeatInterval)
defer ticker.Stop()
for {
select {
case r.state = <-r.stateChange:
if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil {
log.Errorf("Failed to write to consul, sleeping: %v", err)
}
case <-ticker.C:
r.consulHeartbeats.Inc()
if err := r.consul.CAS(consulKey, descFactory, heartbeat); err != nil {
if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil {
log.Errorf("Failed to write to consul, sleeping: %v", err)
}
case <-r.quit:
Expand Down
17 changes: 14 additions & 3 deletions ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ import (
"github.com/prometheus/common/log"
)

// TokenState describes the state of a token
type TokenState int

// Values for TokenState
const (
Active TokenState = iota
Leaving
)

// Desc is the serialised state in Consul representing
// all ingesters (ie, the ring).
type Desc struct {
Expand All @@ -29,8 +38,9 @@ func (ts TokenDescs) Less(i, j int) bool { return ts[i].Token < ts[j].Token }

// TokenDesc describes an individual token in the ring.
type TokenDesc struct {
Token uint32 `json:"tokens"`
Ingester string `json:"ingester"`
Token uint32 `json:"tokens"`
Ingester string `json:"ingester"`
State TokenState `json:"state"`
}

func descFactory() interface{} {
Expand All @@ -43,7 +53,7 @@ func newDesc() *Desc {
}
}

func (d *Desc) addIngester(id, hostname string, tokens []uint32) {
func (d *Desc) addIngester(id, hostname string, tokens []uint32, state TokenState) {
d.Ingesters[id] = IngesterDesc{
Hostname: hostname,
Timestamp: time.Now(),
Expand All @@ -53,6 +63,7 @@ func (d *Desc) addIngester(id, hostname string, tokens []uint32) {
d.Tokens = append(d.Tokens, TokenDesc{
Token: token,
Ingester: id,
State: state,
})
}

Expand Down
Loading