Skip to content

Commit 00ef739

Browse files
authored
Merge pull request #162 from weaveworks/slash-ring
Add /ring to ingester & distributor, ability to remove token
2 parents 4816413 + edfe50c commit 00ef739

File tree

5 files changed

+114
-34
lines changed

5 files changed

+114
-34
lines changed

cmd/cortex/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ func main() {
147147
defer r.Stop()
148148

149149
router := mux.NewRouter()
150+
router.Handle("/ring", r)
151+
150152
switch cfg.mode {
151153
case modeDistributor:
152154
cfg.distributorConfig.Ring = r

ring/http.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package ring
2+
3+
import (
4+
"fmt"
5+
"html/template"
6+
"net/http"
7+
"time"
8+
)
9+
10+
const tpl = `
11+
<!DOCTYPE html>
12+
<html>
13+
<head>
14+
<meta charset="UTF-8">
15+
<title>Cortex Ring Status</title>
16+
</head>
17+
<body>
18+
<h1>Cortex Ring Status</h1>
19+
<p>Current time: {{ .Now }}</p>
20+
<p>{{ .Message }}</p>
21+
<form action="" method="POST">
22+
<table width="100%" border="1">
23+
<thead>
24+
<tr>
25+
<th>Ingester</th>
26+
<th>State</th>
27+
<th>Address</th>
28+
<th>Last Heartbeat</th>
29+
<th>Actions</th>
30+
</tr>
31+
</thead>
32+
<tbody>
33+
{{ range $key, $value := .Ring.Ingesters }}
34+
<tr>
35+
<td>{{ $key }}</td>
36+
<td>{{ $value.State }}</td>
37+
<td>{{ $value.Hostname }}</td>
38+
<td>{{ $value.Timestamp | time }}</td>
39+
<td><button name="forget" value="{{ $key }}" type="submit">Forget</button></td>
40+
</tr>
41+
{{ end }}
42+
</tbody>
43+
</table>
44+
</form>
45+
</body>
46+
</html>`
47+
48+
var tmpl *template.Template
49+
50+
func init() {
51+
var err error
52+
tmpl, err = template.New("webpage").
53+
Funcs(template.FuncMap{
54+
"time": func(in interface{}) string {
55+
return in.(time.Time).String()
56+
},
57+
}).
58+
Parse(tpl)
59+
if err != nil {
60+
panic(err)
61+
}
62+
}
63+
64+
func (r *Ring) forget(id string) error {
65+
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
66+
if in == nil {
67+
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
68+
}
69+
70+
ringDesc := in.(*Desc)
71+
ringDesc.removeIngester(id)
72+
return ringDesc, true, nil
73+
}
74+
return r.consul.CAS(consulKey, descFactory, unregister)
75+
}
76+
77+
func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
78+
message := ""
79+
if req.Method == http.MethodPost {
80+
ingesterID := req.FormValue("forget")
81+
if err := r.forget(ingesterID); err != nil {
82+
message = fmt.Sprintf("Error forgetting ingester: %v", err)
83+
} else {
84+
message = fmt.Sprintf("Ingester %s forgotten", ingesterID)
85+
}
86+
}
87+
88+
r.mtx.RLock()
89+
defer r.mtx.RUnlock()
90+
if err := tmpl.Execute(w, struct {
91+
Ring Desc
92+
Message string
93+
Now time.Time
94+
}{
95+
Ring: r.ringDesc,
96+
Message: message,
97+
Now: time.Now(),
98+
}); err != nil {
99+
http.Error(w, err.Error(), http.StatusInternalServerError)
100+
return
101+
}
102+
}

ring/ingester_lifecycle.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (r *IngesterRegistration) Unregister() {
9999
func (r *IngesterRegistration) loop() {
100100
defer r.wait.Done()
101101
tokens := r.pickTokens()
102-
defer r.unregister(tokens)
102+
defer r.unregister()
103103
r.heartbeat(tokens)
104104
}
105105

@@ -176,14 +176,14 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) {
176176
}
177177
}
178178

179-
func (r *IngesterRegistration) unregister(tokens []uint32) {
179+
func (r *IngesterRegistration) unregister() {
180180
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
181181
if in == nil {
182182
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
183183
}
184184

185185
ringDesc := in.(*Desc)
186-
ringDesc.removeIngester(r.id, tokens)
186+
ringDesc.removeIngester(r.id)
187187
return ringDesc, true, nil
188188
}
189189
if err := r.consul.CAS(consulKey, descFactory, unregister); err != nil {

ring/model.go

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package ring
33
import (
44
"sort"
55
"time"
6-
7-
"github.com/prometheus/common/log"
86
)
97

108
// IngesterState describes the state of an ingester
@@ -83,29 +81,13 @@ func (d *Desc) addIngester(id, hostname, grpcHostname string, tokens []uint32, s
8381
sort.Sort(d.Tokens)
8482
}
8583

86-
func (d *Desc) removeIngester(id string, tokens []uint32) {
84+
func (d *Desc) removeIngester(id string) {
8785
delete(d.Ingesters, id)
8886
output := []TokenDesc{}
89-
i, j := 0, 0
90-
for i < len(d.Tokens) && j < len(tokens) {
91-
if d.Tokens[i].Token < tokens[j] {
87+
for i := 0; i < len(d.Tokens); i++ {
88+
if d.Tokens[i].Ingester != id {
9289
output = append(output, d.Tokens[i])
93-
i++
94-
} else if d.Tokens[i].Token > tokens[j] {
95-
log.Infof("Missing token from ring: %d", tokens[j])
96-
j++
97-
} else {
98-
i++
99-
j++
10090
}
10191
}
102-
for i < len(d.Tokens) {
103-
output = append(output, d.Tokens[i])
104-
i++
105-
}
106-
for j < len(tokens) {
107-
log.Infof("Missing token from ring: %d", tokens[j])
108-
j++
109-
}
11092
d.Tokens = output
11193
}

ring/ring.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,9 @@ func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
3535
// ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash.
3636
var ErrEmptyRing = errors.New("empty circle")
3737

38-
// CoordinationStateClient is an interface to getting changes to the coordination
39-
// state. Should allow us to swap out Consul for something else (mesh?) later.
40-
type CoordinationStateClient interface {
41-
WatchKey(key string, factory InstanceFactory, done <-chan struct{}, f func(interface{}) bool)
42-
}
43-
4438
// Ring holds the information about the members of the consistent hash circle.
4539
type Ring struct {
46-
client CoordinationStateClient
40+
consul ConsulClient
4741
quit, done chan struct{}
4842
heartbeatTimeout time.Duration
4943

@@ -56,9 +50,9 @@ type Ring struct {
5650
}
5751

5852
// New creates a new Ring
59-
func New(client CoordinationStateClient, heartbeatTimeout time.Duration) *Ring {
53+
func New(consul ConsulClient, heartbeatTimeout time.Duration) *Ring {
6054
r := &Ring{
61-
client: client,
55+
consul: consul,
6256
heartbeatTimeout: heartbeatTimeout,
6357
quit: make(chan struct{}),
6458
done: make(chan struct{}),
@@ -90,7 +84,7 @@ func (r *Ring) Stop() {
9084

9185
func (r *Ring) loop() {
9286
defer close(r.done)
93-
r.client.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool {
87+
r.consul.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool {
9488
if value == nil {
9589
log.Infof("Ring doesn't exist in consul yet.")
9690
return true

0 commit comments

Comments
 (0)