Skip to content

Add /ring to ingester & distributor, ability to remove token #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func main() {
defer r.Stop()

router := mux.NewRouter()
router.Handle("/ring", r)

switch cfg.mode {
case modeDistributor:
cfg.distributorConfig.Ring = r
Expand Down
102 changes: 102 additions & 0 deletions ring/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package ring

import (
"fmt"
"html/template"
"net/http"
"time"
)

const tpl = `
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Cortex Ring Status</title>
</head>
<body>
<h1>Cortex Ring Status</h1>
<p>Current time: {{ .Now }}</p>
<p>{{ .Message }}</p>
<form action="" method="POST">
<table width="100%" border="1">
<thead>
<tr>
<th>Ingester</th>
<th>State</th>
<th>Address</th>
<th>Last Heartbeat</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{{ range $key, $value := .Ring.Ingesters }}
<tr>
<td>{{ $key }}</td>
<td>{{ $value.State }}</td>
<td>{{ $value.Hostname }}</td>
<td>{{ $value.Timestamp | time }}</td>
<td><button name="forget" value="{{ $key }}" type="submit">Forget</button></td>
</tr>
{{ end }}
</tbody>
</table>
</form>
</body>
</html>`

var tmpl *template.Template

func init() {
var err error
tmpl, err = template.New("webpage").
Funcs(template.FuncMap{
"time": func(in interface{}) string {
return in.(time.Time).String()
},
}).
Parse(tpl)
if err != nil {
panic(err)
}
}

func (r *Ring) forget(id string) error {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}

ringDesc := in.(*Desc)
ringDesc.removeIngester(id)
return ringDesc, true, nil
}
return r.consul.CAS(consulKey, descFactory, unregister)
}

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
message := ""
if req.Method == http.MethodPost {
ingesterID := req.FormValue("forget")
if err := r.forget(ingesterID); err != nil {
message = fmt.Sprintf("Error forgetting ingester: %v", err)
} else {
message = fmt.Sprintf("Ingester %s forgotten", ingesterID)
}
}

r.mtx.RLock()
defer r.mtx.RUnlock()
if err := tmpl.Execute(w, struct {
Ring Desc
Message string
Now time.Time
}{
Ring: r.ringDesc,
Message: message,
Now: time.Now(),
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
6 changes: 3 additions & 3 deletions ring/ingester_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *IngesterRegistration) Unregister() {
func (r *IngesterRegistration) loop() {
defer r.wait.Done()
tokens := r.pickTokens()
defer r.unregister(tokens)
defer r.unregister()
r.heartbeat(tokens)
}

Expand Down Expand Up @@ -176,14 +176,14 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
}
}

func (r *IngesterRegistration) unregister(tokens []uint32) {
func (r *IngesterRegistration) unregister() {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}

ringDesc := in.(*Desc)
ringDesc.removeIngester(r.id, tokens)
ringDesc.removeIngester(r.id)
return ringDesc, true, nil
}
if err := r.consul.CAS(consulKey, descFactory, unregister); err != nil {
Expand Down
24 changes: 3 additions & 21 deletions ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ring
import (
"sort"
"time"

"github.com/prometheus/common/log"
)

// IngesterState describes the state of an ingester
Expand Down Expand Up @@ -83,29 +81,13 @@ func (d *Desc) addIngester(id, hostname, grpcHostname string, tokens []uint32, s
sort.Sort(d.Tokens)
}

func (d *Desc) removeIngester(id string, tokens []uint32) {
func (d *Desc) removeIngester(id string) {
delete(d.Ingesters, id)
output := []TokenDesc{}
i, j := 0, 0
for i < len(d.Tokens) && j < len(tokens) {
if d.Tokens[i].Token < tokens[j] {
for i := 0; i < len(d.Tokens); i++ {
if d.Tokens[i].Ingester != id {
output = append(output, d.Tokens[i])
i++
} else if d.Tokens[i].Token > tokens[j] {
log.Infof("Missing token from ring: %d", tokens[j])
j++
} else {
i++
j++
}
}
for i < len(d.Tokens) {
output = append(output, d.Tokens[i])
i++
}
for j < len(tokens) {
log.Infof("Missing token from ring: %d", tokens[j])
j++
}
d.Tokens = output
}
14 changes: 4 additions & 10 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,9 @@ func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
var ErrEmptyRing = errors.New("empty circle")

// CoordinationStateClient is an interface to getting changes to the coordination
// state. Should allow us to swap out Consul for something else (mesh?) later.
type CoordinationStateClient interface {
WatchKey(key string, factory InstanceFactory, done <-chan struct{}, f func(interface{}) bool)
}

// Ring holds the information about the members of the consistent hash circle.
type Ring struct {
client CoordinationStateClient
consul ConsulClient
quit, done chan struct{}
heartbeatTimeout time.Duration

Expand All @@ -56,9 +50,9 @@ type Ring struct {
}

// New creates a new Ring
func New(client CoordinationStateClient, heartbeatTimeout time.Duration) *Ring {
func New(consul ConsulClient, heartbeatTimeout time.Duration) *Ring {
r := &Ring{
client: client,
consul: consul,
heartbeatTimeout: heartbeatTimeout,
quit: make(chan struct{}),
done: make(chan struct{}),
Expand Down Expand Up @@ -90,7 +84,7 @@ func (r *Ring) Stop() {

func (r *Ring) loop() {
defer close(r.done)
r.client.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool {
r.consul.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool {
if value == nil {
log.Infof("Ring doesn't exist in consul yet.")
return true
Expand Down