Skip to content

Commit 2e21f82

Browse files
pstibranygouthamve
authored andcommitted
Implement kv.Client based on memberlist library. (#1727)
Signed-off-by: Peter Štibraný <[email protected]>
1 parent f2d1405 commit 2e21f82

26 files changed

+3549
-59
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* `-ingester.max-global-series-per-user`
1010
* `-ingester.max-global-series-per-metric`
1111
* [FEATURE] Flush chunks with stale markers early with `ingester.max-stale-chunk-idle`. #1759
12+
* [FEATURE] EXPERIMENTAL: Added new KV Store backend based on memberlist library. Components can gossip about tokens and ingester states, instead of using Consul or Etcd. #1721
1213
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
1314
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
1415
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708

docs/arguments.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,39 @@ prefix these flags with `distributor.ha-tracker.`
142142
- `etcd.max-retries`
143143
The maximum number of retries to do for failed ops.
144144

145+
#### memberlist (EXPERIMENTAL)
146+
147+
Flags for configuring KV store based on memberlist library. This feature is experimental, please don't use it yet.
148+
149+
- `memberlist.nodename`
150+
Name of the node in memberlist cluster. Defaults to hostname.
151+
- `memberlist.retransmit-factor`
152+
Multiplication factor used when sending out messages (factor * log(N+1)). If not set, default value is used.
153+
- `memberlist.join`
154+
Other cluster members to join. Can be specified multiple times.
155+
- `memberlist.abort-if-join-fails`
156+
If this node fails to join memberlist cluster, abort.
157+
- `memberlist.left-ingesters-timeout`
158+
How long to keep LEFT ingesters in the ring. Note: this is only used for gossiping, LEFT ingesters are otherwise invisible.
159+
- `memberlist.leave-timeout`
160+
Timeout for leaving memberlist cluster.
161+
- `memberlist.gossip-interval`
162+
How often to gossip with other cluster members. Uses memberlist LAN defaults if 0.
163+
- `memberlist.gossip-nodes`
164+
How many nodes to gossip with in each gossip interval. Uses memberlist LAN defaults if 0.
165+
- `memberlist.pullpush-interval`
166+
How often to use pull/push sync. Uses memberlist LAN defaults if 0.
167+
- `memberlist.bind-addr`
168+
IP address to listen on for gossip messages. Multiple addresses may be specified. Defaults to 0.0.0.0.
169+
- `memberlist.bind-port`
170+
Port to listen on for gossip messages. Defaults to 7946.
171+
- `memberlist.packet-dial-timeout`
172+
Timeout used when connecting to other nodes to send packet.
173+
- `memberlist.packet-write-timeout`
174+
Timeout for writing 'packet' data.
175+
- `memberlist.transport-debug`
176+
Log debug transport messages. Note: global log.level must be at debug level as well.
177+
145178
### HA Tracker
146179

147180
HA tracking has two of it's own flags:

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ require (
3838
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
3939
github.com/hashicorp/consul/api v1.1.0
4040
github.com/hashicorp/go-cleanhttp v0.5.1
41+
github.com/hashicorp/go-sockaddr v1.0.2
42+
github.com/hashicorp/memberlist v0.1.4
4143
github.com/jonboulle/clockwork v0.1.0
4244
github.com/json-iterator/go v1.1.7
4345
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect

pkg/ingester/transfer.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
7676
if fromIngesterID == "" {
7777
fromIngesterID = wireSeries.FromIngesterId
7878
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
79+
80+
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
81+
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
82+
if err != nil {
83+
return err
84+
}
7985
}
8086
descs, err := fromWireChunks(wireSeries.Chunks)
8187
if err != nil {
@@ -136,6 +142,37 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
136142
return nil
137143
}
138144

145+
// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet
146+
// when using gossip ring. If we cannot see ingester is the LEAVING state yet, we don't accept this
147+
// transfer, as claiming tokens would possibly end up with this ingester owning no tokens, due to conflict
148+
// resolution in ring merge function. Hopefully the leaving ingester will retry transfer again.
149+
func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIngesterID string) error {
150+
v, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
151+
if err != nil {
152+
return errors.Wrap(err, "TransferChunks: get ring")
153+
}
154+
if v == nil {
155+
return fmt.Errorf("TransferChunks: ring not found when checking state of source ingester")
156+
}
157+
r, ok := v.(*ring.Desc)
158+
if !ok || r == nil {
159+
return fmt.Errorf("TransferChunks: ring not found, got %T", v)
160+
}
161+
162+
if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING {
163+
return fmt.Errorf("TransferChunks: source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State)
164+
}
165+
166+
if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING {
167+
err = fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State)
168+
util.Logger.Log("msg", "TransferChunks error", "err", err)
169+
return err
170+
}
171+
172+
// all fine
173+
return nil
174+
}
175+
139176
func (i *Ingester) transfer(ctx context.Context, xfer func() error) error {
140177
// Enter JOINING state (only valid from PENDING)
141178
if err := i.lifecycler.ChangeState(ctx, ring.JOINING); err != nil {
@@ -198,6 +235,12 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
198235
if fromIngesterID == "" {
199236
fromIngesterID = f.FromIngesterId
200237
level.Info(util.Logger).Log("msg", "processing TransferTSDB request", "from_ingester", fromIngesterID)
238+
239+
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
240+
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
241+
if err != nil {
242+
return err
243+
}
201244
}
202245
filesXfer++
203246

pkg/ring/kv/client.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ import (
66
"fmt"
77
"sync"
88

9+
"github.com/prometheus/client_golang/prometheus"
10+
911
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
1012
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
1113
"github.com/cortexproject/cortex/pkg/ring/kv/etcd"
14+
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
1215
)
1316

1417
// The NewInMemoryKVClient returned by NewClient() is a singleton, so
@@ -20,10 +23,11 @@ var inmemoryStore Client
2023
// Config is config for a KVStore currently used by ring and HA tracker,
2124
// where store can be consul or inmemory.
2225
type Config struct {
23-
Store string `yaml:"store,omitempty"`
24-
Consul consul.Config `yaml:"consul,omitempty"`
25-
Etcd etcd.Config `yaml:"etcd,omitempty"`
26-
Prefix string `yaml:"prefix,omitempty"`
26+
Store string `yaml:"store,omitempty"`
27+
Consul consul.Config `yaml:"consul,omitempty"`
28+
Etcd etcd.Config `yaml:"etcd,omitempty"`
29+
Memberlist memberlist.Config `yaml:"memberlist,omitempty"`
30+
Prefix string `yaml:"prefix,omitempty"`
2731

2832
Mock Client
2933
}
@@ -40,11 +44,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
4044
// be easier to have everything under ring, so ring.consul.<flag-name>
4145
cfg.Consul.RegisterFlags(f, prefix)
4246
cfg.Etcd.RegisterFlagsWithPrefix(f, prefix)
47+
cfg.Memberlist.RegisterFlags(f, prefix)
4348
if prefix == "" {
4449
prefix = "ring."
4550
}
4651
f.StringVar(&cfg.Prefix, prefix+"prefix", "collectors/", "The prefix for the keys in the store. Should end with a /.")
47-
f.StringVar(&cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, etcd, inmemory).")
52+
f.StringVar(&cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, etcd, inmemory, memberlist [experimental]).")
4853
}
4954

5055
// Client is a high-level client for key-value stores (such as Etcd and
@@ -69,6 +74,9 @@ type Client interface {
6974

7075
// WatchPrefix calls f whenever any value stored under prefix changes.
7176
WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
77+
78+
// If client needs to do some cleanup, it can do it here.
79+
Stop()
7280
}
7381

7482
// NewClient creates a new Client (consul, etcd or inmemory) based on the config,
@@ -96,6 +104,10 @@ func NewClient(cfg Config, codec codec.Codec) (Client, error) {
96104
})
97105
client = inmemoryStore
98106

107+
case "memberlist":
108+
cfg.Memberlist.MetricsRegisterer = prometheus.DefaultRegisterer
109+
client, err = memberlist.NewMemberlistClient(cfg.Memberlist, codec)
110+
99111
default:
100112
return nil, fmt.Errorf("invalid KV store type: %s", cfg.Store)
101113
}

pkg/ring/kv/consul/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,3 +328,8 @@ func (c *Client) createRateLimiter() *rate.Limiter {
328328
}
329329
return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst)
330330
}
331+
332+
// Stop does nothing in Consul client.
333+
func (c *Client) Stop() {
334+
// nothing to do
335+
}

pkg/ring/kv/etcd/etcd.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,8 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
194194
}
195195
return c.codec.Decode(resp.Kvs[0].Value)
196196
}
197+
198+
// Stop does nothing in etcd client.
199+
func (c *Client) Stop() {
200+
// nothing to do
201+
}

pkg/ring/kv/memberlist/broadcast.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package memberlist
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/go-kit/kit/log/level"
7+
"github.com/hashicorp/memberlist"
8+
9+
"github.com/cortexproject/cortex/pkg/util"
10+
)
11+
12+
// ringBroadcast implements memberlist.Broadcast interface, which is used by memberlist.TransmitLimitedQueue.
13+
type ringBroadcast struct {
14+
key string
15+
content []string // Description of what is stored in this value. Used for invalidation.
16+
version uint // local version of the value, generated by merging this change
17+
msg []byte // encoded key and value
18+
finished func(b ringBroadcast)
19+
}
20+
21+
func (r ringBroadcast) Invalidates(old memberlist.Broadcast) bool {
22+
if oldb, ok := old.(ringBroadcast); ok {
23+
if r.key != oldb.key {
24+
return false
25+
}
26+
27+
// if 'content' (result of Mergeable.MergeContent) of this broadcast is a superset of content of old value,
28+
// and this broadcast has resulted in a newer ring update, we can invalidate the old value
29+
30+
for _, oldName := range oldb.content {
31+
found := false
32+
for _, newName := range r.content {
33+
if oldName == newName {
34+
found = true
35+
break
36+
}
37+
}
38+
39+
if !found {
40+
return false
41+
}
42+
}
43+
44+
// only do this check if this ringBroadcast covers same ingesters as 'b'
45+
// otherwise, we may be invalidating some older messages, which however covered different
46+
// ingesters
47+
if r.version >= oldb.version {
48+
level.Debug(util.Logger).Log("msg", "Invalidating forwarded broadcast", "key", r.key, "version", r.version, "oldVersion", oldb.version, "content", fmt.Sprintf("%v", r.content), "oldContent", fmt.Sprintf("%v", oldb.content))
49+
return true
50+
}
51+
}
52+
return false
53+
}
54+
55+
func (r ringBroadcast) Message() []byte {
56+
return r.msg
57+
}
58+
59+
func (r ringBroadcast) Finished() {
60+
if r.finished != nil {
61+
r.finished(r)
62+
}
63+
r.msg = nil
64+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package memberlist
2+
3+
import "testing"
4+
5+
func TestInvalidates(t *testing.T) {
6+
const key = "ring"
7+
8+
messages := map[string]ringBroadcast{
9+
"b1": {key: key, content: []string{"A", "B", "C"}, version: 1},
10+
"b2": {key: key, content: []string{"A", "B", "C"}, version: 2},
11+
"b3": {key: key, content: []string{"A"}, version: 3},
12+
"b4": {key: key, content: []string{"A", "B"}, version: 4},
13+
"b5": {key: key, content: []string{"A", "B", "D"}, version: 5},
14+
"b6": {key: key, content: []string{"A", "B", "C", "D"}, version: 6},
15+
}
16+
17+
checkInvalidate(t, messages, "b2", "b1", true, false)
18+
checkInvalidate(t, messages, "b3", "b1", false, false)
19+
checkInvalidate(t, messages, "b3", "b2", false, false)
20+
checkInvalidate(t, messages, "b4", "b1", false, false)
21+
checkInvalidate(t, messages, "b4", "b2", false, false)
22+
checkInvalidate(t, messages, "b4", "b3", true, false)
23+
checkInvalidate(t, messages, "b5", "b1", false, false)
24+
checkInvalidate(t, messages, "b5", "b2", false, false)
25+
checkInvalidate(t, messages, "b5", "b3", true, false)
26+
checkInvalidate(t, messages, "b5", "b4", true, false)
27+
checkInvalidate(t, messages, "b6", "b1", true, false)
28+
checkInvalidate(t, messages, "b6", "b2", true, false)
29+
checkInvalidate(t, messages, "b6", "b3", true, false)
30+
checkInvalidate(t, messages, "b6", "b4", true, false)
31+
checkInvalidate(t, messages, "b6", "b5", true, false)
32+
}
33+
34+
func checkInvalidate(t *testing.T, messages map[string]ringBroadcast, key1, key2 string, firstInvalidatesSecond, secondInvalidatesFirst bool) {
35+
b1, ok := messages[key1]
36+
if !ok {
37+
t.Fatal("cannot find", key1)
38+
}
39+
40+
b2, ok := messages[key2]
41+
if !ok {
42+
t.Fatal("cannot find", key2)
43+
}
44+
45+
if b1.Invalidates(b2) != firstInvalidatesSecond {
46+
t.Errorf("%s.Invalidates(%s) returned %t. %s={%v, %d}, %s={%v, %d}", key1, key2, !firstInvalidatesSecond, key1, b1.content, b1.version, key2, b2.content, b2.version)
47+
}
48+
49+
if b2.Invalidates(b1) != secondInvalidatesFirst {
50+
t.Errorf("%s.Invalidates(%s) returned %t. %s={%v, %d}, %s={%v, %d}", key2, key1, !secondInvalidatesFirst, key2, b2.content, b2.version, key1, b1.content, b1.version)
51+
}
52+
}

0 commit comments

Comments
 (0)