Skip to content

Commit 8916f52

Browse files
committed
Add /ring to ingester & distributor, ability to remove token
1 parent 4816413 commit 8916f52

File tree

5 files changed

+111
-34
lines changed

5 files changed

+111
-34
lines changed

cmd/cortex/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ func main() {
147147
defer r.Stop()
148148

149149
router := mux.NewRouter()
150+
router.Handle("/ring", r)
151+
150152
switch cfg.mode {
151153
case modeDistributor:
152154
cfg.distributorConfig.Ring = r

ring/http.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package ring
2+
3+
import (
4+
"fmt"
5+
"html/template"
6+
"net/http"
7+
"time"
8+
)
9+
10+
const tpl = `
11+
<!DOCTYPE html>
12+
<html>
13+
<head>
14+
<meta charset="UTF-8">
15+
<title>Cortex Ring Status</title>
16+
</head>
17+
<body>
18+
<h1>Cortex Ring Status</h1>
19+
<p>{{ .Message }}</p>
20+
<form action="" method="POST">
21+
<table width="100%" border="1">
22+
<thead>
23+
<tr>
24+
<th>Ingester</th>
25+
<th>State</th>
26+
<th>Address</th>
27+
<th>Last Heartbeat</th>
28+
<th>Actions</th>
29+
</tr>
30+
</thead>
31+
<tbody>
32+
{{ range $key, $value := .Ring.Ingesters }}
33+
<tr>
34+
<td>{{ $key }}</td>
35+
<td>{{ $value.State }}</td>
36+
<td>{{ $value.Hostname }}</td>
37+
<td>{{ $value.Timestamp | time }}</td>
38+
<td><button name="forget" value="{{ $key }}" type="submit">Forget</button></td>
39+
</tr>
40+
{{ end }}
41+
</tbody>
42+
</table>
43+
</form>
44+
</body>
45+
</html>`
46+
47+
var tmpl *template.Template
48+
49+
func init() {
50+
var err error
51+
tmpl, err = template.New("webpage").
52+
Funcs(template.FuncMap{
53+
"time": func(in interface{}) string {
54+
return in.(time.Time).String()
55+
},
56+
}).
57+
Parse(tpl)
58+
if err != nil {
59+
panic(err)
60+
}
61+
}
62+
63+
func (r *Ring) forget(id string) error {
64+
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
65+
if in == nil {
66+
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
67+
}
68+
69+
ringDesc := in.(*Desc)
70+
ringDesc.removeIngester(id)
71+
return ringDesc, true, nil
72+
}
73+
return r.consul.CAS(consulKey, descFactory, unregister)
74+
}
75+
76+
func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
77+
message := ""
78+
if req.Method == http.MethodPost {
79+
ingesterID := req.FormValue("forget")
80+
if err := r.forget(ingesterID); err != nil {
81+
message = fmt.Sprintf("Error forgetting ingester: %v", err)
82+
} else {
83+
message = fmt.Sprintf("Ingester %s forgotten", ingesterID)
84+
}
85+
}
86+
87+
r.mtx.RLock()
88+
defer r.mtx.RUnlock()
89+
if err := tmpl.Execute(w, struct {
90+
Ring Desc
91+
Message string
92+
}{
93+
Ring: r.ringDesc,
94+
Message: message,
95+
}); err != nil {
96+
http.Error(w, err.Error(), http.StatusInternalServerError)
97+
return
98+
}
99+
}

ring/ingester_lifecycle.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (r *IngesterRegistration) Unregister() {
9999
func (r *IngesterRegistration) loop() {
100100
defer r.wait.Done()
101101
tokens := r.pickTokens()
102-
defer r.unregister(tokens)
102+
defer r.unregister()
103103
r.heartbeat(tokens)
104104
}
105105

@@ -176,14 +176,14 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
176176
}
177177
}
178178

179-
func (r *IngesterRegistration) unregister(tokens []uint32) {
179+
func (r *IngesterRegistration) unregister() {
180180
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
181181
if in == nil {
182182
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
183183
}
184184

185185
ringDesc := in.(*Desc)
186-
ringDesc.removeIngester(r.id, tokens)
186+
ringDesc.removeIngester(r.id)
187187
return ringDesc, true, nil
188188
}
189189
if err := r.consul.CAS(consulKey, descFactory, unregister); err != nil {

ring/model.go

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package ring
33
import (
44
"sort"
55
"time"
6-
7-
"github.com/prometheus/common/log"
86
)
97

108
// IngesterState describes the state of an ingester
@@ -83,29 +81,13 @@ func (d *Desc) addIngester(id, hostname, grpcHostname string, tokens []uint32, s
8381
sort.Sort(d.Tokens)
8482
}
8583

86-
func (d *Desc) removeIngester(id string, tokens []uint32) {
84+
func (d *Desc) removeIngester(id string) {
8785
delete(d.Ingesters, id)
8886
output := []TokenDesc{}
89-
i, j := 0, 0
90-
for i < len(d.Tokens) && j < len(tokens) {
91-
if d.Tokens[i].Token < tokens[j] {
87+
for i := 0; i < len(d.Tokens); i++ {
88+
if d.Tokens[i].Ingester != id {
9289
output = append(output, d.Tokens[i])
93-
i++
94-
} else if d.Tokens[i].Token > tokens[j] {
95-
log.Infof("Missing token from ring: %d", tokens[j])
96-
j++
97-
} else {
98-
i++
99-
j++
10090
}
10191
}
102-
for i < len(d.Tokens) {
103-
output = append(output, d.Tokens[i])
104-
i++
105-
}
106-
for j < len(tokens) {
107-
log.Infof("Missing token from ring: %d", tokens[j])
108-
j++
109-
}
11092
d.Tokens = output
11193
}

ring/ring.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,9 @@ func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
3535
// ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
3636
var ErrEmptyRing = errors.New("empty circle")
3737

38-
// CoordinationStateClient is an interface to getting changes to the coordination
39-
// state. Should allow us to swap out Consul for something else (mesh?) later.
40-
type CoordinationStateClient interface {
41-
WatchKey(key string, factory InstanceFactory, done <-chan struct{}, f func(interface{}) bool)
42-
}
43-
4438
// Ring holds the information about the members of the consistent hash circle.
4539
type Ring struct {
46-
client CoordinationStateClient
40+
consul ConsulClient
4741
quit, done chan struct{}
4842
heartbeatTimeout time.Duration
4943

@@ -56,9 +50,9 @@ type Ring struct {
5650
}
5751

5852
// New creates a new Ring
59-
func New(client CoordinationStateClient, heartbeatTimeout time.Duration) *Ring {
53+
func New(consul ConsulClient, heartbeatTimeout time.Duration) *Ring {
6054
r := &Ring{
61-
client: client,
55+
consul: consul,
6256
heartbeatTimeout: heartbeatTimeout,
6357
quit: make(chan struct{}),
6458
done: make(chan struct{}),
@@ -90,7 +84,7 @@ func (r *Ring) Stop() {
9084

9185
func (r *Ring) loop() {
9286
defer close(r.done)
93-
r.client.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool {
87+
r.consul.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool {
9488
if value == nil {
9589
log.Infof("Ring doesn't exist in consul yet.")
9690
return true

0 commit comments

Comments
 (0)