From 8916f524ab61295fe53d146e8b9e6c285c1d4eae Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 25 Nov 2016 11:48:31 +0000 Subject: [PATCH 1/2] Add /ring to ingester & distributor, ability to remove token --- cmd/cortex/main.go | 2 + ring/http.go | 99 ++++++++++++++++++++++++++++++++++++++ ring/ingester_lifecycle.go | 6 +-- ring/model.go | 24 ++------- ring/ring.go | 14 ++---- 5 files changed, 111 insertions(+), 34 deletions(-) create mode 100644 ring/http.go diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 3112cc2c3f..0b2d64209a 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -147,6 +147,8 @@ func main() { defer r.Stop() router := mux.NewRouter() + router.Handle("/ring", r) + switch cfg.mode { case modeDistributor: cfg.distributorConfig.Ring = r diff --git a/ring/http.go b/ring/http.go new file mode 100644 index 0000000000..e25e067fc6 --- /dev/null +++ b/ring/http.go @@ -0,0 +1,99 @@ +package ring + +import ( + "fmt" + "html/template" + "net/http" + "time" +) + +const tpl = ` + + + + + Cortex Ring Status + + +

Cortex Ring Status

+

{{ .Message }}

+
+ + + + + + + + + + + + {{ range $key, $value := .Ring.Ingesters }} + + + + + + + + {{ end }} + +
IngesterStateAddressLast HeartbeatActions
{{ $key }}{{ $value.State }}{{ $value.Hostname }}{{ $value.Timestamp | time }}
+
+ +` + +var tmpl *template.Template + +func init() { + var err error + tmpl, err = template.New("webpage"). + Funcs(template.FuncMap{ + "time": func(in interface{}) string { + return in.(time.Time).String() + }, + }). + Parse(tpl) + if err != nil { + panic(err) + } +} + +func (r *Ring) forget(id string) error { + unregister := func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return nil, false, fmt.Errorf("found empty ring when trying to unregister") + } + + ringDesc := in.(*Desc) + ringDesc.removeIngester(id) + return ringDesc, true, nil + } + return r.consul.CAS(consulKey, descFactory, unregister) +} + +func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { + message := "" + if req.Method == http.MethodPost { + ingesterID := req.FormValue("forget") + if err := r.forget(ingesterID); err != nil { + message = fmt.Sprintf("Error forgetting ingester: %v", err) + } else { + message = fmt.Sprintf("Ingester %s forgotten", ingesterID) + } + } + + r.mtx.RLock() + defer r.mtx.RUnlock() + if err := tmpl.Execute(w, struct { + Ring Desc + Message string + }{ + Ring: r.ringDesc, + Message: message, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index 91a8b1de9a..cf2b5bfcba 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -99,7 +99,7 @@ func (r *IngesterRegistration) Unregister() { func (r *IngesterRegistration) loop() { defer r.wait.Done() tokens := r.pickTokens() - defer r.unregister(tokens) + defer r.unregister() r.heartbeat(tokens) } @@ -176,14 +176,14 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { } } -func (r *IngesterRegistration) unregister(tokens []uint32) { +func (r *IngesterRegistration) unregister() { unregister := func(in interface{}) (out interface{}, retry bool, err error) { if in == nil { return nil, false, fmt.Errorf("found empty ring when trying to unregister") } ringDesc := in.(*Desc) - ringDesc.removeIngester(r.id, tokens) + ringDesc.removeIngester(r.id) return ringDesc, true, nil } if err := r.consul.CAS(consulKey, descFactory, unregister); err != nil { diff --git a/ring/model.go b/ring/model.go index d6b866a5ad..966dcfb1e3 100644 --- a/ring/model.go +++ b/ring/model.go @@ -3,8 +3,6 @@ package ring import ( "sort" "time" - - "github.com/prometheus/common/log" ) // IngesterState describes the state of an ingester @@ -83,29 +81,13 @@ func (d *Desc) addIngester(id, hostname, grpcHostname string, tokens []uint32, s sort.Sort(d.Tokens) } -func (d *Desc) removeIngester(id string, tokens []uint32) { +func (d *Desc) removeIngester(id string) { delete(d.Ingesters, id) output := []TokenDesc{} - i, j := 0, 0 - for i < len(d.Tokens) && j < len(tokens) { - if d.Tokens[i].Token < tokens[j] { + for i := 0; i < len(d.Tokens); i++ { + if d.Tokens[i].Ingester != id { output = append(output, d.Tokens[i]) - i++ - } else if d.Tokens[i].Token > tokens[j] { - log.Infof("Missing token from ring: %d", tokens[j]) - j++ - } else { - i++ - j++ } } - for i < len(d.Tokens) { - output = append(output, d.Tokens[i]) - i++ - } - for j < len(tokens) { - log.Infof("Missing token from ring: %d", tokens[j]) - j++ - } d.Tokens = output } diff --git a/ring/ring.go b/ring/ring.go index 33fcef638f..a6ac8585c7 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -35,15 +35,9 @@ func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] } // ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash. var ErrEmptyRing = errors.New("empty circle") -// CoordinationStateClient is an interface to getting changes to the coordination -// state. Should allow us to swap out Consul for something else (mesh?) later. -type CoordinationStateClient interface { - WatchKey(key string, factory InstanceFactory, done <-chan struct{}, f func(interface{}) bool) -} - // Ring holds the information about the members of the consistent hash circle. type Ring struct { - client CoordinationStateClient + consul ConsulClient quit, done chan struct{} heartbeatTimeout time.Duration @@ -56,9 +50,9 @@ type Ring struct { } // New creates a new Ring -func New(client CoordinationStateClient, heartbeatTimeout time.Duration) *Ring { +func New(consul ConsulClient, heartbeatTimeout time.Duration) *Ring { r := &Ring{ - client: client, + consul: consul, heartbeatTimeout: heartbeatTimeout, quit: make(chan struct{}), done: make(chan struct{}), @@ -90,7 +84,7 @@ func (r *Ring) Stop() { func (r *Ring) loop() { defer close(r.done) - r.client.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool { + r.consul.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool { if value == nil { log.Infof("Ring doesn't exist in consul yet.") return true From edfe50c3c6a34d6497868322ecf6ca431e40b217 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 25 Nov 2016 13:11:14 +0000 Subject: [PATCH 2/2] Review feedback --- ring/http.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ring/http.go b/ring/http.go index e25e067fc6..3a5895c5db 100644 --- a/ring/http.go +++ b/ring/http.go @@ -16,6 +16,7 @@ const tpl = `

Cortex Ring Status

+

Current time: {{ .Now }}

{{ .Message }}

@@ -89,9 +90,11 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { if err := tmpl.Execute(w, struct { Ring Desc Message string + Now time.Time }{ Ring: r.ringDesc, Message: message, + Now: time.Now(), }); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return