From aa3369dbd1f5e4d61342630321f120151ec26e80 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 5 Nov 2016 15:43:28 -0700 Subject: [PATCH] Move state to ingester, so we don't mark all tokens as Leaving --- ring/ingester_lifecycle.go | 16 +++++++--------- ring/model.go | 22 +++++++++++----------- ring/ring.go | 12 ++++-------- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index c55aad04ea..f8e3dbfc07 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -33,8 +33,8 @@ type IngesterRegistration struct { // We need to remember the token state just in case consul goes away and comes // back empty. Channel is used to tell the actor to update consul on state changes. - state TokenState - stateChange chan TokenState + state IngesterState + stateChange chan IngesterState consulHeartbeats prometheus.Counter } @@ -63,7 +63,7 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In // Only read/written on actor goroutine. state: Active, - stateChange: make(chan TokenState), + stateChange: make(chan IngesterState), consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_consul_heartbeats_total", @@ -78,8 +78,8 @@ func RegisterIngester(consulClient ConsulClient, listenPort, numTokens int) (*In // ChangeState changes the state of all tokens owned by this // ingester in the ring. -func (r *IngesterRegistration) ChangeState(state TokenState) { - log.Info("Changing token state to: %v", state) +func (r *IngesterRegistration) ChangeState(state IngesterState) { + log.Info("Changing ingester state to: %v", state) r.stateChange <- state } @@ -123,6 +123,7 @@ func (r *IngesterRegistration) pickTokens() []uint32 { newTokens := generateTokens(r.numTokens-len(tokens), takenTokens) tokens = append(tokens, newTokens...) } + sort.Sort(sortableUint32(tokens)) ringDesc.addIngester(r.id, r.hostname, tokens, r.state) return ringDesc, true, nil @@ -149,10 +150,8 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { ringDesc.addIngester(r.id, r.hostname, tokens, r.state) } else { ingesterDesc.Timestamp = time.Now() + ingesterDesc.State = r.state ringDesc.Ingesters[r.id] = ingesterDesc - for i := range ringDesc.Tokens { - ringDesc.Tokens[i].State = r.state - } } return ringDesc, true, nil @@ -214,7 +213,6 @@ func generateTokens(numTokens int, takenTokens []uint32) []uint32 { tokens = append(tokens, candidate) i++ } - sort.Sort(tokens) return tokens } diff --git a/ring/model.go b/ring/model.go index df1d13ab77..ec28c222a7 100644 --- a/ring/model.go +++ b/ring/model.go @@ -7,12 +7,12 @@ import ( "github.com/prometheus/common/log" ) -// TokenState describes the state of a token -type TokenState int +// IngesterState describes the state of an ingester +type IngesterState int -// Values for TokenState +// Values for IngesterState const ( - Active TokenState = iota + Active IngesterState = iota Leaving ) @@ -25,8 +25,9 @@ type Desc struct { // IngesterDesc describes a single ingester. type IngesterDesc struct { - Hostname string `json:"hostname"` - Timestamp time.Time `json:"timestamp"` + Hostname string `json:"hostname"` + Timestamp time.Time `json:"timestamp"` + State IngesterState `json:"state"` } // TokenDescs is a sortable list of TokenDescs @@ -38,9 +39,8 @@ func (ts TokenDescs) Less(i, j int) bool { return ts[i].Token < ts[j].Token } // TokenDesc describes an individual token in the ring. type TokenDesc struct { - Token uint32 `json:"tokens"` - Ingester string `json:"ingester"` - State TokenState `json:"state"` + Token uint32 `json:"tokens"` + Ingester string `json:"ingester"` } func descFactory() interface{} { @@ -53,17 +53,17 @@ func newDesc() *Desc { } } -func (d *Desc) addIngester(id, hostname string, tokens []uint32, state TokenState) { +func (d *Desc) addIngester(id, hostname string, tokens []uint32, state IngesterState) { d.Ingesters[id] = IngesterDesc{ Hostname: hostname, Timestamp: time.Now(), + State: state, } for _, token := range tokens { d.Tokens = append(d.Tokens, TokenDesc{ Token: token, Ingester: id, - State: state, }) } diff --git a/ring/ring.go b/ring/ring.go index 66f9b97260..9791cbf4f3 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -149,6 +149,7 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, err continue } distinctHosts[token.Ingester] = struct{}{} + ingester := r.ringDesc.Ingesters[token.Ingester] // Ingesters that are Leaving do not count to the replication limit. We do // not want to Write to them because they are about to go away, but we do @@ -156,15 +157,14 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, err // set of replicas for the key. This means we have to also increase the // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. - if token.State == Leaving { + if ingester.State == Leaving { n++ if op == Write { continue } } - ing := r.ringDesc.Ingesters[token.Ingester] - ingesters = append(ingesters, ing) + ingesters = append(ingesters, ingester) } return ingesters, nil } @@ -192,11 +192,7 @@ func (r *Ring) Ready() bool { for _, ingester := range r.ringDesc.Ingesters { if time.Now().Sub(ingester.Timestamp) > r.heartbeatTimeout { return false - } - } - - for _, token := range r.ringDesc.Tokens { - if token.State != Active { + } else if ingester.State != Active { return false } }