Skip to content

Commit 04a8c99

Browse files
authored
Optimise memberlist kv store access by storing data unencoded. (#4345)
* Optimise memberlist kv store access by storing data unencoded. The following profile data was taken from running 50 idle ingesters with memberlist, with almost everything at default values (5s heartbeats): ``` 52.16% mergeBytesValueForKey +- 52.16% mergeValueForKey +- 47.84% computeNewValue +- 27.24% codec Proto Decode +- 26.25% mergeWithTime ``` It is apparent from the this that a lot of time is spent on the memberlist receive path, as might be expected, specifically, the merging of the update into the current state. The cost however is not in decoding the incoming states (occurs in `mergeBytesValueForKey` before `mergeValueForKey`), but in fact decoding _current state_ of the value in the store (as it is stored encoded). The ring state was measured at 123K (50 ingesters), so it makes sense that decoding could be costly. This can be avoided by storing the value in it's decoded `Mergeable` form. When doing this, care has to be taken to deep copy the value when accessed, as it is modified in place before being updated in the store, and accessed outside the store mutex. Note a side effect of this change is that is no longer straightforward to expose the `memberlist_kv_store_value_bytes` metric, as this reported the size of the encoded data, therefore it has been removed. Signed-off-by: Steve Simpson <[email protected]> * Typo. Signed-off-by: Steve Simpson <[email protected]> * Review comments. Signed-off-by: Steve Simpson <[email protected]>
1 parent f336481 commit 04a8c99

File tree

7 files changed

+104
-80
lines changed

7 files changed

+104
-80
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317
55

66
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
7+
* [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345
78
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
89
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
910
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
@@ -13,6 +14,7 @@
1314
* `-alertmanager.sharding-ring.heartbeat-timeout`
1415
* `-compactor.ring.heartbeat-timeout`
1516
* `-store-gateway.sharding-ring.heartbeat-timeout`
17+
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
1618
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
1719

1820
## 1.10.0-rc.0 / 2021-06-28

pkg/ring/kv/memberlist/kv_init_service.go

+28-22
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/hashicorp/memberlist"
1717
"go.uber.org/atomic"
1818

19-
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
2019
"github.com/cortexproject/cortex/pkg/util"
2120
"github.com/cortexproject/cortex/pkg/util/services"
2221
)
@@ -106,12 +105,12 @@ func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) {
106105

107106
if err := req.ParseForm(); err == nil {
108107
if req.Form[downloadKeyParam] != nil {
109-
downloadKey(w, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest.
108+
downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest.
110109
return
111110
}
112111

113112
if req.Form[viewKeyParam] != nil {
114-
viewKey(w, kv, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req))
113+
viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req))
115114
return
116115
}
117116

@@ -179,30 +178,25 @@ func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) {
179178
return
180179
}
181180

182-
formatValue(w, c, msg.Pair.Value, format)
181+
val, err := c.Decode(msg.Pair.Value)
182+
if err != nil {
183+
http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError)
184+
return
185+
}
186+
187+
formatValue(w, val, format)
183188
}
184189

185-
func viewKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string, format string) {
190+
func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) {
186191
if store[key].value == nil {
187192
http.Error(w, "value not found", http.StatusNotFound)
188193
return
189194
}
190195

191-
c := kv.GetCodec(store[key].codecID)
192-
if c == nil {
193-
http.Error(w, "codec not found", http.StatusNotFound)
194-
return
195-
}
196-
197-
formatValue(w, c, store[key].value, format)
196+
formatValue(w, store[key].value, format)
198197
}
199198

200-
func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format string) {
201-
val, err := codec.Decode(value)
202-
if err != nil {
203-
http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError)
204-
return
205-
}
199+
func formatValue(w http.ResponseWriter, val interface{}, format string) {
206200

207201
w.WriteHeader(200)
208202
w.Header().Add("content-type", "text/plain")
@@ -214,7 +208,7 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format
214208
enc.SetIndent("", " ")
215209
}
216210

217-
err = enc.Encode(val)
211+
err := enc.Encode(val)
218212
if err != nil {
219213
http.Error(w, err.Error(), http.StatusInternalServerError)
220214
}
@@ -224,22 +218,34 @@ func formatValue(w http.ResponseWriter, codec codec.Codec, value []byte, format
224218
}
225219
}
226220

227-
func downloadKey(w http.ResponseWriter, store map[string]valueDesc, key string) {
221+
func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) {
228222
if store[key].value == nil {
229223
http.Error(w, "value not found", http.StatusNotFound)
230224
return
231225
}
232226

233227
val := store[key]
234228

229+
c := kv.GetCodec(store[key].codecID)
230+
if c == nil {
231+
http.Error(w, "codec not found", http.StatusNotFound)
232+
return
233+
}
234+
235+
encoded, err := c.Encode(val.value)
236+
if err != nil {
237+
http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError)
238+
return
239+
}
240+
235241
w.Header().Add("content-type", "application/octet-stream")
236242
// Set content-length so that client knows whether it has received full response or not.
237-
w.Header().Add("content-length", strconv.Itoa(len(val.value)))
243+
w.Header().Add("content-length", strconv.Itoa(len(encoded)))
238244
w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key))
239245
w.WriteHeader(200)
240246

241247
// Ignore errors, we cannot do anything about them.
242-
_, _ = w.Write(val.value)
248+
_, _ = w.Write(encoded)
243249
}
244250

245251
type pageData struct {

pkg/ring/kv/memberlist/memberlist_client.go

+38-48
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,6 @@ type KV struct {
265265
watchPrefixDroppedNotifications *prometheus.CounterVec
266266

267267
storeValuesDesc *prometheus.Desc
268-
storeSizesDesc *prometheus.Desc
269268
storeTombstones *prometheus.GaugeVec
270269
storeRemovedTombstones *prometheus.CounterVec
271270

@@ -292,9 +291,11 @@ type message struct {
292291
}
293292

294293
type valueDesc struct {
295-
// We store bytes here. Reason is that clients calling CAS function will modify the object in place,
296-
// but unless CAS succeeds, we don't want those modifications to be visible.
297-
value []byte
294+
// We store the decoded value here to prevent decoding the entire state for every
295+
// update we receive. Whilst the updates are small and fast to decode,
296+
// the total state can be quite large.
297+
// The CAS function is passed a deep copy because it modifies in-place.
298+
value Mergeable
298299

299300
// version (local only) is used to keep track of what we're gossiping about, and invalidate old messages
300301
version uint
@@ -303,8 +304,16 @@ type valueDesc struct {
303304
codecID string
304305
}
305306

307+
func (v valueDesc) Clone() (result valueDesc) {
308+
result = v
309+
if v.value != nil {
310+
result.value = v.value.Clone()
311+
}
312+
return
313+
}
314+
306315
func (v valueDesc) String() string {
307-
return fmt.Sprintf("size: %d, version: %d, codec: %s", len(v.value), v.version, v.codecID)
316+
return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID)
308317
}
309318

310319
var (
@@ -612,24 +621,16 @@ func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {
612621
// Returns current value with removed tombstones.
613622
func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {
614623
m.storeMu.Lock()
615-
v := m.store[key]
624+
v := m.store[key].Clone()
616625
m.storeMu.Unlock()
617626

618-
out = nil
619627
if v.value != nil {
620-
out, err = codec.Decode(v.value)
621-
if err != nil {
622-
return nil, 0, err
623-
}
624-
625-
if mr, ok := out.(Mergeable); ok {
626-
// remove ALL tombstones before returning to client.
627-
// No need for clients to see them.
628-
_, _ = mr.RemoveTombstones(time.Time{})
629-
}
628+
// remove ALL tombstones before returning to client.
629+
// No need for clients to see them.
630+
_, _ = v.value.RemoveTombstones(time.Time{})
630631
}
631632

632-
return out, v.version, nil
633+
return v.value, v.version, nil
633634
}
634635

635636
// WatchKey watches for value changes for given key. When value changes, 'f' function is called with the
@@ -1043,9 +1044,21 @@ func (m *KV) LocalState(join bool) []byte {
10431044
continue
10441045
}
10451046

1047+
codec := m.GetCodec(val.codecID)
1048+
if codec == nil {
1049+
level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key)
1050+
continue
1051+
}
1052+
1053+
encoded, err := codec.Encode(val.value)
1054+
if err != nil {
1055+
level.Error(m.logger).Log("msg", "failed to encode remote state", "err", err)
1056+
continue
1057+
}
1058+
10461059
kvPair.Reset()
10471060
kvPair.Key = key
1048-
kvPair.Value = val.value
1061+
kvPair.Value = encoded
10491062
kvPair.Codec = val.codecID
10501063

10511064
ser, err := kvPair.Marshal()
@@ -1055,7 +1068,7 @@ func (m *KV) LocalState(join bool) []byte {
10551068
}
10561069

10571070
if uint(len(ser)) > math.MaxUint32 {
1058-
level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(val.value))
1071+
level.Error(m.logger).Log("msg", "value too long", "key", key, "value_length", len(encoded))
10591072
continue
10601073
}
10611074

@@ -1177,12 +1190,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
11771190
m.storeMu.Lock()
11781191
defer m.storeMu.Unlock()
11791192

1180-
curr := m.store[key]
1193+
curr := m.store[key].Clone()
11811194
// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
11821195
if casVersion > 0 && curr.version != casVersion {
11831196
return nil, 0, errVersionMismatch
11841197
}
1185-
result, change, err := computeNewValue(incomingValue, curr.value, codec, casVersion > 0)
1198+
result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0)
11861199
if err != nil {
11871200
return nil, 0, err
11881201
}
@@ -1199,14 +1212,9 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
11991212
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))
12001213
}
12011214

1202-
encoded, err := codec.Encode(result)
1203-
if err != nil {
1204-
return nil, 0, fmt.Errorf("failed to encode merged result: %v", err)
1205-
}
1206-
12071215
newVersion := curr.version + 1
12081216
m.store[key] = valueDesc{
1209-
value: encoded,
1217+
value: result,
12101218
version: newVersion,
12111219
codecID: codec.CodecID(),
12121220
}
@@ -1215,25 +1223,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
12151223
}
12161224

12171225
// returns [result, change, error]
1218-
func computeNewValue(incoming Mergeable, stored []byte, c codec.Codec, cas bool) (Mergeable, Mergeable, error) {
1219-
if len(stored) == 0 {
1220-
return incoming, incoming, nil
1221-
}
1222-
1223-
old, err := c.Decode(stored)
1224-
if err != nil {
1225-
return incoming, incoming, fmt.Errorf("failed to decode stored value: %v", err)
1226-
}
1227-
1228-
if old == nil {
1229-
return incoming, incoming, nil
1230-
}
1231-
1232-
oldVal, ok := old.(Mergeable)
1233-
if !ok {
1234-
return incoming, incoming, fmt.Errorf("stored value is not Mergeable, got %T", old)
1235-
}
1236-
1226+
func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, Mergeable, error) {
12371227
if oldVal == nil {
12381228
return incoming, incoming, nil
12391229
}
@@ -1249,7 +1239,7 @@ func (m *KV) storeCopy() map[string]valueDesc {
12491239

12501240
result := make(map[string]valueDesc, len(m.store))
12511241
for k, v := range m.store {
1252-
result[k] = v
1242+
result[k] = v.Clone()
12531243
}
12541244
return result
12551245
}

pkg/ring/kv/memberlist/memberlist_client_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,26 @@ func (d *data) RemoveTombstones(limit time.Time) (_, _ int) {
8989
return
9090
}
9191

92+
func (m member) clone() member {
93+
out := member{
94+
Timestamp: m.Timestamp,
95+
Tokens: make([]uint32, len(m.Tokens)),
96+
State: m.State,
97+
}
98+
copy(out.Tokens, m.Tokens)
99+
return out
100+
}
101+
102+
func (d *data) Clone() Mergeable {
103+
out := &data{
104+
Members: make(map[string]member, len(d.Members)),
105+
}
106+
for k, v := range d.Members {
107+
out.Members[k] = v.clone()
108+
}
109+
return out
110+
}
111+
92112
func (d *data) getAllTokens() []uint32 {
93113
out := []uint32(nil)
94114
for _, m := range d.Members {
@@ -872,6 +892,14 @@ func (dc distributedCounter) RemoveTombstones(limit time.Time) (_, _ int) {
872892
return
873893
}
874894

895+
func (dc distributedCounter) Clone() Mergeable {
896+
out := make(distributedCounter, len(dc))
897+
for k, v := range dc {
898+
out[k] = v
899+
}
900+
return out
901+
}
902+
875903
type distributedCounterCodec struct{}
876904

877905
func (d distributedCounterCodec) CodecID() string {

pkg/ring/kv/memberlist/mergeable.go

+3
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,7 @@ type Mergeable interface {
4040
// time when client is accessing value from the store. It can be used to hide tombstones from the clients.
4141
// Returns the total number of tombstones present and the number of removed tombstones by this invocation.
4242
RemoveTombstones(limit time.Time) (total, removed int)
43+
44+
// Clone should return a deep copy of the state.
45+
Clone() Mergeable
4346
}

pkg/ring/kv/memberlist/metrics.go

-10
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,6 @@ func (m *KV) createAndRegisterMetrics() {
117117
"Number of values in KV Store",
118118
nil, nil)
119119

120-
m.storeSizesDesc = prometheus.NewDesc(
121-
prometheus.BuildFQName(m.cfg.MetricsNamespace, subsystem, "kv_store_value_bytes"), // gauge
122-
"Sizes of values in KV Store in bytes",
123-
[]string{"key"}, nil)
124-
125120
m.storeTombstones = prometheus.NewGaugeVec(prometheus.GaugeOpts{
126121
Namespace: m.cfg.MetricsNamespace,
127122
Subsystem: subsystem,
@@ -222,7 +217,6 @@ func (m *KV) createAndRegisterMetrics() {
222217
// Describe returns prometheus descriptions via supplied channel
223218
func (m *KV) Describe(ch chan<- *prometheus.Desc) {
224219
ch <- m.storeValuesDesc
225-
ch <- m.storeSizesDesc
226220
}
227221

228222
// Collect returns extra metrics via supplied channel
@@ -231,8 +225,4 @@ func (m *KV) Collect(ch chan<- prometheus.Metric) {
231225
defer m.storeMu.Unlock()
232226

233227
ch <- prometheus.MustNewConstMetric(m.storeValuesDesc, prometheus.GaugeValue, float64(len(m.store)))
234-
235-
for k, v := range m.store {
236-
ch <- prometheus.MustNewConstMetric(m.storeSizesDesc, prometheus.GaugeValue, float64(len(v.value)), k)
237-
}
238228
}

pkg/ring/model.go

+5
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,11 @@ func (d *Desc) RemoveTombstones(limit time.Time) (total, removed int) {
398398
return
399399
}
400400

401+
// Clone returns a deep copy of the ring state.
402+
func (d *Desc) Clone() memberlist.Mergeable {
403+
return proto.Clone(d).(*Desc)
404+
}
405+
401406
func (d *Desc) getTokensInfo() map[uint32]instanceInfo {
402407
out := map[uint32]instanceInfo{}
403408

0 commit comments

Comments
 (0)