Skip to content

Move state to ingester, so we don't mark all tokens as Leaving #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions ring/ingester_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type IngesterRegistration struct {

// We need to remember the token state just in case consul goes away and comes
// back empty. Channel is used to tell the actor to update consul on state changes.
state TokenState
stateChange chan TokenState
state IngesterState
stateChange chan IngesterState

consulHeartbeats prometheus.Counter
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In

// Only read/written on actor goroutine.
state: Active,
stateChange: make(chan TokenState),
stateChange: make(chan IngesterState),

consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_consul_heartbeats_total",
Expand All @@ -78,8 +78,8 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In

// ChangeState changes the state of all tokens owned by this
// ingester in the ring.
func (r *IngesterRegistration) ChangeState(state TokenState) {
log.Info("Changing token state to: %v", state)
func (r *IngesterRegistration) ChangeState(state IngesterState) {
log.Info("Changing ingester state to: %v", state)
r.stateChange <- state
}

Expand Down Expand Up @@ -123,6 +123,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 {
newTokens := generateTokens(r.numTokens-len(tokens), takenTokens)
tokens = append(tokens, newTokens...)
}
sort.Sort(sortableUint32(tokens))

ringDesc.addIngester(r.id, r.hostname, tokens, r.state)
return ringDesc, true, nil
Expand All @@ -149,10 +150,8 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
ringDesc.addIngester(r.id, r.hostname, tokens, r.state)
} else {
ingesterDesc.Timestamp = time.Now()
ingesterDesc.State = r.state
ringDesc.Ingesters[r.id] = ingesterDesc
for i := range ringDesc.Tokens {
ringDesc.Tokens[i].State = r.state
}
}

return ringDesc, true, nil
Expand Down Expand Up @@ -214,7 +213,6 @@ func generateTokens(numTokens int, takenTokens []uint32) []uint32 {
tokens = append(tokens, candidate)
i++
}
sort.Sort(tokens)
return tokens
}

Expand Down
22 changes: 11 additions & 11 deletions ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"github.com/prometheus/common/log"
)

// TokenState describes the state of a token
type TokenState int
// IngesterState describes the state of an ingester
type IngesterState int

// Values for TokenState
// Values for IngesterState
const (
Active TokenState = iota
Active IngesterState = iota
Leaving
)

Expand All @@ -25,8 +25,9 @@ type Desc struct {

// IngesterDesc describes a single ingester.
type IngesterDesc struct {
Hostname string `json:"hostname"`
Timestamp time.Time `json:"timestamp"`
Hostname string `json:"hostname"`
Timestamp time.Time `json:"timestamp"`
State IngesterState `json:"state"`
}

// TokenDescs is a sortable list of TokenDescs
Expand All @@ -38,9 +39,8 @@ func (ts TokenDescs) Less(i, j int) bool { return ts[i].Token < ts[j].Token }

// TokenDesc describes an individual token in the ring.
type TokenDesc struct {
Token uint32 `json:"tokens"`
Ingester string `json:"ingester"`
State TokenState `json:"state"`
Token uint32 `json:"tokens"`
Ingester string `json:"ingester"`
}

func descFactory() interface{} {
Expand All @@ -53,17 +53,17 @@ func newDesc() *Desc {
}
}

func (d *Desc) addIngester(id, hostname string, tokens []uint32, state TokenState) {
func (d *Desc) addIngester(id, hostname string, tokens []uint32, state IngesterState) {
d.Ingesters[id] = IngesterDesc{
Hostname: hostname,
Timestamp: time.Now(),
State: state,
}

for _, token := range tokens {
d.Tokens = append(d.Tokens, TokenDesc{
Token: token,
Ingester: id,
State: state,
})
}

Expand Down
12 changes: 4 additions & 8 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,22 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, err
continue
}
distinctHosts[token.Ingester] = struct{}{}
ingester := r.ringDesc.Ingesters[token.Ingester]

// Ingesters that are Leaving do not count to the replication limit. We do
// not want to Write to them because they are about to go away, but we do
// want to write the extra replica somewhere. So we increase the size of the
// set of replicas for the key. This means we have to also increase the
// size of the replica set for read, but we can read from Leaving ingesters,
// so don't skip it in this case.
if token.State == Leaving {
if ingester.State == Leaving {
n++
if op == Write {
continue
}
}

ing := r.ringDesc.Ingesters[token.Ingester]
ingesters = append(ingesters, ing)
ingesters = append(ingesters, ingester)
}
return ingesters, nil
}
Expand Down Expand Up @@ -192,11 +192,7 @@ func (r *Ring) Ready() bool {
for _, ingester := range r.ringDesc.Ingesters {
if time.Now().Sub(ingester.Timestamp) > r.heartbeatTimeout {
return false
}
}

for _, token := range r.ringDesc.Tokens {
if token.State != Active {
} else if ingester.State != Active {
return false
}
}
Expand Down