Skip to content

Commit 5318a4e

Browse files
authored
Merge pull request #174 from weaveworks/169-tokens
Only add new tokens to the ring, don't re-add all tokens.
2 parents 6cdd3ee + 404a79b commit 5318a4e

File tree

4 files changed

+284
-28
lines changed

4 files changed

+284
-28
lines changed

cmd/cortex/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,11 @@ func main() {
156156

157157
case modeIngester:
158158
cfg.ingesterConfig.Ring = r
159-
registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens)
159+
registration, err := ring.RegisterIngester(consul, ring.IngesterRegistrationConfig{
160+
ListenPort: cfg.listenPort,
161+
GRPCPort: cfg.ingesterConfig.GRPCListenPort,
162+
NumTokens: cfg.numTokens,
163+
})
160164
if err != nil {
161165
// This only happens for errors in configuration & set-up, not for
162166
// network errors.

ring/ingester_lifecycle.go

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ const (
2323

2424
// IngesterRegistration manages the connection between the ingester and Consul.
2525
type IngesterRegistration struct {
26-
consul ConsulClient
27-
numTokens int
26+
consul ConsulClient
27+
numTokens int
28+
skipUnregister bool
2829

2930
id string
3031
hostname string
@@ -40,27 +41,48 @@ type IngesterRegistration struct {
4041
consulHeartbeats prometheus.Counter
4142
}
4243

44+
// IngesterRegistrationConfig is the config for an IngesterRegistration
45+
type IngesterRegistrationConfig struct {
46+
ListenPort int
47+
GRPCPort int
48+
NumTokens int
49+
50+
// For testing
51+
Addr string
52+
Hostname string
53+
skipUnregister bool
54+
}
55+
4356
// RegisterIngester registers an ingester with Consul.
44-
func RegisterIngester(consulClient ConsulClient, listenPort, grpcPort, numTokens int) (*IngesterRegistration, error) {
45-
hostname, err := os.Hostname()
46-
if err != nil {
47-
return nil, err
57+
func RegisterIngester(consulClient ConsulClient, cfg IngesterRegistrationConfig) (*IngesterRegistration, error) {
58+
hostname := cfg.Hostname
59+
if hostname == "" {
60+
var err error
61+
hostname, err = os.Hostname()
62+
if err != nil {
63+
return nil, err
64+
}
4865
}
4966

50-
addr, err := getFirstAddressOf(infName)
51-
if err != nil {
52-
return nil, err
67+
addr := cfg.Addr
68+
if addr == "" {
69+
var err error
70+
addr, err = getFirstAddressOf(infName)
71+
if err != nil {
72+
return nil, err
73+
}
5374
}
5475

5576
r := &IngesterRegistration{
56-
consul: consulClient,
57-
numTokens: numTokens,
77+
consul: consulClient,
78+
numTokens: cfg.NumTokens,
79+
skipUnregister: cfg.skipUnregister,
5880

5981
id: hostname,
6082
// hostname is the ip+port of this instance, written to consul so
6183
// the distributors know where to connect.
62-
hostname: fmt.Sprintf("%s:%d", addr, listenPort),
63-
grpcHostname: fmt.Sprintf("%s:%d", addr, grpcPort),
84+
hostname: fmt.Sprintf("%s:%d", addr, cfg.ListenPort),
85+
grpcHostname: fmt.Sprintf("%s:%d", addr, cfg.GRPCPort),
6486
quit: make(chan struct{}),
6587

6688
// Only read/written on actor goroutine.
@@ -87,23 +109,27 @@ func (r *IngesterRegistration) ChangeState(state IngesterState) {
87109
// Unregister removes ingester config from Consul; will block
88110
// until we'll successfully unregistered.
89111
func (r *IngesterRegistration) Unregister() {
90-
log.Info("Removing ingester from consul")
91-
92112
// closing r.quit triggers loop() to exit, which in turn will trigger
93113
// the removal of our tokens.
94114
close(r.quit)
95115
r.wait.Wait()
96-
log.Infof("Ingester removed from consul")
97116
}
98117

99118
func (r *IngesterRegistration) loop() {
100119
defer r.wait.Done()
101-
tokens := r.pickTokens()
102-
defer r.unregister()
120+
tokens, err := r.pickTokens()
121+
if err != nil {
122+
log.Fatalf("Failed to pick tokens in consul: %v", err)
123+
}
124+
125+
if !r.skipUnregister {
126+
defer r.unregister()
127+
}
128+
103129
r.heartbeat(tokens)
104130
}
105131

106-
func (r *IngesterRegistration) pickTokens() []uint32 {
132+
func (r *IngesterRegistration) pickTokens() ([]uint32, error) {
107133
var tokens []uint32
108134
pickTokens := func(in interface{}) (out interface{}, retry bool, err error) {
109135
var ringDesc *Desc
@@ -113,25 +139,32 @@ func (r *IngesterRegistration) pickTokens() []uint32 {
113139
ringDesc = in.(*Desc)
114140
}
115141

116-
takenTokens := []uint32{}
142+
var takenTokens, myTokens []uint32
117143
for _, token := range ringDesc.Tokens {
118144
takenTokens = append(takenTokens, token.Token)
145+
119146
if token.Ingester == r.id {
120-
tokens = append(tokens, token.Token)
147+
myTokens = append(myTokens, token.Token)
121148
}
122149
}
123-
if len(tokens) < r.numTokens {
124-
newTokens := generateTokens(r.numTokens-len(tokens), takenTokens)
125-
tokens = append(tokens, newTokens...)
150+
151+
if len(myTokens) > 0 {
152+
log.Infof("%d tokens already exist for this ingester!", len(myTokens))
126153
}
154+
155+
newTokens := generateTokens(r.numTokens-len(myTokens), takenTokens)
156+
ringDesc.addIngester(r.id, r.hostname, r.grpcHostname, newTokens, r.state)
157+
158+
tokens := append(myTokens, newTokens...)
127159
sort.Sort(sortableUint32(tokens))
128-
ringDesc.addIngester(r.id, r.hostname, r.grpcHostname, tokens, r.state)
160+
129161
return ringDesc, true, nil
130162
}
131163
if err := r.consul.CAS(consulKey, descFactory, pickTokens); err != nil {
132-
log.Fatalf("Failed to pick tokens in consul: %v", err)
164+
return nil, err
133165
}
134-
return tokens
166+
log.Infof("Ingester added to consul")
167+
return tokens, nil
135168
}
136169

137170
func (r *IngesterRegistration) heartbeat(tokens []uint32) {
@@ -189,6 +222,7 @@ func (r *IngesterRegistration) unregister() {
189222
if err := r.consul.CAS(consulKey, descFactory, unregister); err != nil {
190223
log.Fatalf("Failed to unregister from consul: %v", err)
191224
}
225+
log.Infof("Ingester removed from consul")
192226
}
193227

194228
type sortableUint32 []uint32

ring/ingester_lifecycle_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package ring
2+
3+
import (
4+
"reflect"
5+
"runtime"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestIngesterRestart(t *testing.T) {
11+
consul := newMockConsulClient()
12+
ring := New(consul, time.Second)
13+
14+
{
15+
registra, err := RegisterIngester(consul, IngesterRegistrationConfig{
16+
NumTokens: 1,
17+
Addr: "localhost",
18+
Hostname: "localhost",
19+
skipUnregister: true,
20+
})
21+
if err != nil {
22+
t.Fatal(err)
23+
}
24+
registra.Unregister() // doesn't actually unregister due to skipUnregister: true
25+
}
26+
27+
poll(t, 100*time.Millisecond, 1, func() interface{} {
28+
return ring.numTokens("localhost")
29+
})
30+
31+
{
32+
registra, err := RegisterIngester(consul, IngesterRegistrationConfig{
33+
NumTokens: 1,
34+
Addr: "localhost",
35+
Hostname: "localhost",
36+
skipUnregister: true,
37+
})
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
registra.Unregister() // doesn't actually unregister due to skipUnregister: true
42+
}
43+
44+
time.Sleep(200 * time.Millisecond)
45+
46+
poll(t, 100*time.Millisecond, 1, func() interface{} {
47+
return ring.numTokens("localhost")
48+
})
49+
}
50+
51+
func (r *Ring) numTokens(name string) int {
52+
r.mtx.RLock()
53+
defer r.mtx.RUnlock()
54+
count := 0
55+
for _, token := range r.ringDesc.Tokens {
56+
if token.Ingester == name {
57+
count++
58+
}
59+
}
60+
return count
61+
}
62+
63+
// poll repeatedly evaluates condition until we either timeout, or it succeeds.
64+
func poll(t *testing.T, d time.Duration, want interface{}, have func() interface{}) {
65+
deadline := time.Now().Add(d)
66+
for {
67+
if time.Now().After(deadline) {
68+
break
69+
}
70+
if reflect.DeepEqual(want, have()) {
71+
return
72+
}
73+
time.Sleep(d / 10)
74+
}
75+
h := have()
76+
if !reflect.DeepEqual(want, h) {
77+
_, file, line, _ := runtime.Caller(1)
78+
t.Fatalf("%s:%d: %v != %v", file, line, want, h)
79+
}
80+
}

ring/mock_consul_client_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package ring
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
consul "github.com/hashicorp/consul/api"
8+
"github.com/prometheus/common/log"
9+
)
10+
11+
type mockKV struct {
12+
mtx sync.Mutex
13+
cond *sync.Cond
14+
kvps map[string]*consul.KVPair
15+
current uint64 // the current 'index in the log'
16+
}
17+
18+
func newMockConsulClient() ConsulClient {
19+
m := mockKV{
20+
kvps: map[string]*consul.KVPair{},
21+
}
22+
m.cond = sync.NewCond(&m.mtx)
23+
go m.loop()
24+
return &consulClient{&m}
25+
}
26+
27+
func copyKVPair(in *consul.KVPair) *consul.KVPair {
28+
out := *in
29+
out.Value = make([]byte, len(in.Value))
30+
copy(out.Value, in.Value)
31+
return &out
32+
}
33+
34+
// periodic loop to wake people up, so they can honour timeouts
35+
func (m *mockKV) loop() {
36+
for range time.Tick(1 * time.Second) {
37+
m.mtx.Lock()
38+
m.cond.Broadcast()
39+
m.mtx.Unlock()
40+
}
41+
}
42+
43+
func (m *mockKV) Put(p *consul.KVPair, q *consul.WriteOptions) (*consul.WriteMeta, error) {
44+
m.mtx.Lock()
45+
defer m.mtx.Unlock()
46+
47+
m.current++
48+
existing, ok := m.kvps[p.Key]
49+
if ok {
50+
existing.Value = p.Value
51+
existing.ModifyIndex = m.current
52+
} else {
53+
m.kvps[p.Key] = &consul.KVPair{
54+
Key: p.Key,
55+
Value: p.Value,
56+
CreateIndex: m.current,
57+
ModifyIndex: m.current,
58+
}
59+
}
60+
61+
m.cond.Broadcast()
62+
return nil, nil
63+
}
64+
65+
func (m *mockKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
66+
log.Debugf("CAS %s (%d) <- %s", p.Key, p.ModifyIndex, p.Value)
67+
68+
m.mtx.Lock()
69+
defer m.mtx.Unlock()
70+
existing, ok := m.kvps[p.Key]
71+
if ok && existing.ModifyIndex != p.ModifyIndex {
72+
return false, nil, nil
73+
}
74+
75+
m.current++
76+
if ok {
77+
existing.Value = p.Value
78+
existing.ModifyIndex = m.current
79+
} else {
80+
m.kvps[p.Key] = &consul.KVPair{
81+
Key: p.Key,
82+
Value: p.Value,
83+
CreateIndex: m.current,
84+
ModifyIndex: m.current,
85+
}
86+
}
87+
88+
m.cond.Broadcast()
89+
return true, nil, nil
90+
}
91+
92+
func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) {
93+
log.Debugf("Get %s (%d)", key, q.WaitIndex)
94+
95+
m.mtx.Lock()
96+
defer m.mtx.Unlock()
97+
98+
value, ok := m.kvps[key]
99+
if !ok {
100+
log.Debugf("Get %s - not found", key)
101+
return nil, &consul.QueryMeta{LastIndex: m.current}, nil
102+
}
103+
104+
if q.WaitTime > 0 {
105+
deadline := time.Now().Add(q.WaitTime)
106+
for q.WaitIndex >= value.ModifyIndex && time.Now().Before(deadline) {
107+
m.cond.Wait()
108+
}
109+
if time.Now().After(deadline) {
110+
log.Debugf("Get %s - deadline exceeded", key)
111+
return nil, &consul.QueryMeta{LastIndex: q.WaitIndex}, nil
112+
}
113+
}
114+
115+
log.Debugf("Get %s (%d) = %s", key, value.ModifyIndex, value.Value)
116+
return copyKVPair(value), &consul.QueryMeta{LastIndex: value.ModifyIndex}, nil
117+
}
118+
119+
func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *consul.QueryMeta, error) {
120+
m.mtx.Lock()
121+
defer m.mtx.Unlock()
122+
123+
deadline := time.Now().Add(q.WaitTime)
124+
for q.WaitIndex >= m.current && time.Now().Before(deadline) {
125+
m.cond.Wait()
126+
}
127+
if time.Now().After(deadline) {
128+
return nil, &consul.QueryMeta{LastIndex: q.WaitIndex}, nil
129+
}
130+
131+
result := consul.KVPairs{}
132+
for _, kvp := range m.kvps {
133+
if kvp.ModifyIndex >= q.WaitIndex {
134+
result = append(result, copyKVPair(kvp))
135+
}
136+
}
137+
return result, &consul.QueryMeta{LastIndex: m.current}, nil
138+
}

0 commit comments

Comments
 (0)