From 137eb86eb7a59dd04a2e04e7a5c3be4c45781e5c Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 25 Nov 2016 11:03:21 +0000 Subject: [PATCH 1/4] Switch to snappy compressed protos for encoding the ring in consul --- .gitignore | 2 +- Makefile | 18 ++-- cmd/cortex/main.go | 10 +- distributor/distributor.go | 20 ++-- ring/consul_client.go | 181 ++++++++++++++++++++++++++++--------- ring/http.go | 6 +- ring/ingester_lifecycle.go | 51 +++++++---- ring/model.go | 112 ++++++++++++----------- ring/ring.go | 36 ++++---- ring/ring.proto | 25 +++++ 10 files changed, 305 insertions(+), 156 deletions(-) create mode 100644 ring/ring.proto diff --git a/.gitignore b/.gitignore index c5352150da..144f8ff7bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ cmd/cortex/cortex .uptodate .pkg -cortex.pb.go +*.pb.go ui/bindata.go diff --git a/Makefile b/Makefile index 619967db2b..7bf7095cb9 100644 --- a/Makefile +++ b/Makefile @@ -24,15 +24,19 @@ IMAGE_NAMES=$(foreach dir,$(DOCKER_IMAGE_DIRS),$(patsubst %,$(IMAGE_PREFIX)/%,$( images: $(info $(IMAGE_NAMES)) +PROTO_DEFS := $(shell find . -type f -name "*.proto" ! -path "./tools/*" ! -path "./vendor/*") +PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) + # List of exes please CORTEX_EXE := ./cmd/cortex/cortex EXES = $(CORTEX_EXE) all: $(UPTODATE_FILES) +test: $(PROTO_GOS) # And what goes into each exe -$(CORTEX_EXE): $(shell find . -name '*.go') ui/bindata.go cortex.pb.go -cortex.pb.go: cortex.proto +$(CORTEX_EXE): $(shell find . -name '*.go' ! -path "./tools/*" ! -path "./vendor/*") ui/bindata.go $(PROTO_GOS) +%.pb.go: %.proto ui/bindata.go: $(shell find ui/static ui/templates) # And now what goes into each image @@ -55,7 +59,7 @@ NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \ ifeq ($(BUILD_IN_CONTAINER),true) -$(EXES) cortex.pb.go ui/bindata.go lint test shell: cortex-build/$(UPTODATE) +$(EXES) $(PROTO_GOS) ui/bindata.go lint test shell: cortex-build/$(UPTODATE) @mkdir -p $(shell pwd)/.pkg $(SUDO) docker run $(RM) -ti \ -v $(shell pwd)/.pkg:/go/pkg \ @@ -68,8 +72,8 @@ $(EXES): cortex-build/$(UPTODATE) go build $(GO_FLAGS) -o $@ ./$(@D) $(NETGO_CHECK) -cortex.pb.go: cortex-build/$(UPTODATE) - protoc -I ./vendor:. --go_out=plugins=grpc:. ./cortex.proto +%.pb.go: cortex-build/$(UPTODATE) + protoc -I ./vendor:./$(@D) --go_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@) ui/bindata.go: cortex-build/$(UPTODATE) go-bindata -pkg ui -o ui/bindata.go -ignore '(.*\.map|bootstrap\.js|bootstrap-theme\.css|bootstrap\.css)' ui/templates/... ui/static/... @@ -77,7 +81,7 @@ ui/bindata.go: cortex-build/$(UPTODATE) lint: cortex-build/$(UPTODATE) ./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers . -test: cortex-build/$(UPTODATE) cortex.pb.go +test: cortex-build/$(UPTODATE) ./tools/test -no-go-get shell: cortex-build/$(UPTODATE) @@ -87,7 +91,7 @@ endif clean: $(SUDO) docker rmi $(IMAGE_NAMES) >/dev/null 2>&1 || true - rm -rf $(UPTODATE_FILES) $(EXES) cortex.pb.go ui/bindata.go + rm -rf $(UPTODATE_FILES) $(EXES) $(PROTO_GOS) ui/bindata.go go clean ./... diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 32c817e026..695d5b0e78 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -138,7 +138,11 @@ func main() { prometheus.MustRegister(resourceWatcher) } - consul, err := ring.NewConsulClient(cfg.consulHost) + ringSerDes := ring.NewDynamicSerDes( + ring.JSONSerDes{Factory: ring.DescFactory}, + ring.ProtoSerDes{Factory: ring.ProtoDescFactory}, + ) + consul, err := ring.NewConsulClient(cfg.consulHost, ringSerDes) if err != nil { log.Fatalf("Error initializing Consul client: %v", err) } @@ -156,7 +160,7 @@ func main() { case modeIngester: cfg.ingesterConfig.Ring = r - registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens) + registration, err := ring.RegisterIngester(consul, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens, ringSerDes) if err != nil { // This only happens for errors in configuration & set-up, not for // network errors. @@ -183,7 +187,7 @@ func main() { // Deferring a func to make ordering obvious defer func() { - registration.ChangeState(ring.Leaving) + registration.ChangeState(ring.IngesterState_LEAVING) ing.Stop() registration.Unregister() }() diff --git a/distributor/distributor.go b/distributor/distributor.go index 0be51e3e4b..ee0c7b08f8 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -54,9 +54,9 @@ type Distributor struct { type ReadRing interface { prometheus.Collector - Get(key uint32, n int, op ring.Operation) ([]ring.IngesterDesc, error) - BatchGet(keys []uint32, n int, op ring.Operation) ([][]ring.IngesterDesc, error) - GetAll() []ring.IngesterDesc + Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error) + BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error) + GetAll() []*ring.IngesterDesc } // Config contains the configuration require to @@ -121,7 +121,7 @@ func New(cfg Config) (*Distributor, error) { }, nil } -func (d *Distributor) getClientFor(ingester ring.IngesterDesc) (cortex.IngesterClient, error) { +func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.IngesterClient, error) { d.clientsMtx.RLock() client, ok := d.clients[ingester.Hostname] d.clientsMtx.RUnlock() @@ -137,7 +137,7 @@ func (d *Distributor) getClientFor(ingester ring.IngesterDesc) (cortex.IngesterC } conn, err := grpc.Dial( - ingester.GRPCHostname, + ingester.Hostname, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), @@ -192,7 +192,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort } sampleTrackers := make([]sampleTracker, len(samples), len(samples)) - samplesByIngester := map[ring.IngesterDesc][]*sampleTracker{} + samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{} for i := range samples { sampleTrackers[i] = sampleTracker{ sample: samples[i], @@ -204,9 +204,9 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters // will cause the whole write to fail. - liveIngesters := make([]ring.IngesterDesc, 0, len(ingesters[i])) + liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i])) for _, ingester := range ingesters[i] { - if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout { + if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= d.cfg.HeartbeatTimeout { liveIngesters = append(liveIngesters, ingester) } } @@ -226,7 +226,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort errs := make(chan error) for hostname, samples := range samplesByIngester { - go func(ingester ring.IngesterDesc, samples []*sampleTracker) { + go func(ingester *ring.IngesterDesc, samples []*sampleTracker) { errs <- d.sendSamples(ctx, ingester, samples) }(hostname, samples) } @@ -246,7 +246,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort return &cortex.WriteResponse{}, nil } -func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, sampleTrackers []*sampleTracker) error { +func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker) error { client, err := d.getClientFor(ingester) if err != nil { return err diff --git a/ring/consul_client.go b/ring/consul_client.go index 62d63a75a4..22566e2977 100644 --- a/ring/consul_client.go +++ b/ring/consul_client.go @@ -1,11 +1,13 @@ package ring import ( - "bytes" "encoding/json" "fmt" + "sync" "time" + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" consul "github.com/hashicorp/consul/api" "github.com/prometheus/common/log" ) @@ -18,18 +20,20 @@ const ( // such as CAS and Watch which take callbacks. It also deals with serialisation // by having an instance factory passed in to methods and deserialising into that. type ConsulClient interface { - Get(key string, factory InstanceFactory) error - CAS(key string, factory InstanceFactory, f CASCallback) error - WatchPrefix(path string, factory InstanceFactory, done <-chan struct{}, f func(string, interface{}) bool) - WatchKey(key string, factory InstanceFactory, done <-chan struct{}, f func(interface{}) bool) + CAS(key string, f CASCallback) error + WatchPrefix(path string, done <-chan struct{}, f func(string, interface{}) bool) + WatchKey(key string, done <-chan struct{}, f func(interface{}) bool) PutBytes(key string, buf []byte) error } // CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. type CASCallback func(in interface{}) (out interface{}, retry bool, err error) -// InstanceFactory type creates empty instances for use when deserialising -type InstanceFactory func() interface{} +// SerDes allows the consult client to serialise and deserialise values. +type SerDes interface { + Decode([]byte) (interface{}, error) + Encode(interface{}) ([]byte, error) +} type kv interface { CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) @@ -40,10 +44,11 @@ type kv interface { type consulClient struct { kv + serdes SerDes } // NewConsulClient returns a new ConsulClient. -func NewConsulClient(addr string) (ConsulClient, error) { +func NewConsulClient(addr string, serdes SerDes) (ConsulClient, error) { client, err := consul.NewClient(&consul.Config{ Address: addr, Scheme: "http", @@ -51,7 +56,10 @@ func NewConsulClient(addr string) (ConsulClient, error) { if err != nil { return nil, err } - return &consulClient{client.KV()}, nil + return &consulClient{ + kv: client.KV(), + serdes: serdes, + }, nil } var ( @@ -64,25 +72,112 @@ var ( ErrNotFound = fmt.Errorf("Not found") ) -// Get and deserialise a JSON value from Consul. -func (c *consulClient) Get(key string, factory InstanceFactory) error { - kvp, _, err := c.kv.Get(key, queryOptions) +// ProtoSerDes is a SerDes for proto/snappy +type ProtoSerDes struct { + Factory func() proto.Message +} + +// Decode implements SerDes +func (p ProtoSerDes) Decode(bytes []byte) (interface{}, error) { + out := p.Factory() + bytes, err := snappy.Decode(nil, bytes) + if err != nil { + return nil, err + } + if err := proto.Unmarshal(bytes, out); err != nil { + return nil, err + } + return out, nil +} + +// Encode implements SerDes +func (p ProtoSerDes) Encode(msg interface{}) ([]byte, error) { + bytes, err := proto.Marshal(msg.(proto.Message)) if err != nil { - return err + return nil, err + } + return snappy.Encode(nil, bytes), nil +} + +// JSONSerDes is a SerDes for JSON +type JSONSerDes struct { + Factory func() interface{} +} + +// Decode implements SerDes +func (j JSONSerDes) Decode(bytes []byte) (interface{}, error) { + out := j.Factory() + if err := json.Unmarshal(bytes, out); err != nil { + return nil, err + } + return out, nil +} + +// Encode implemenrs SerDes +func (j JSONSerDes) Encode(msg interface{}) ([]byte, error) { + return json.Marshal(msg) +} + +// DynamicSerDes is a SerDes that can read json and proto, and +// that can serialise to either (selectively). +// Once it fails to decode JSON, it will start decoding (and +// writing) protos. +type DynamicSerDes struct { + mtx sync.Mutex + useProto bool + json SerDes + proto SerDes +} + +// NewDynamicSerDes makes a new DynamicSerDes +func NewDynamicSerDes(json, proto SerDes) *DynamicSerDes { + return &DynamicSerDes{ + useProto: false, + json: json, + proto: proto, } - if kvp == nil { - return ErrNotFound +} + +// UseProto allow you to change the SerDes at runtime. +func (d *DynamicSerDes) UseProto(useProto bool) { + log.Infof("Switching to proto serialization: %v", useProto) + + d.mtx.Lock() + defer d.mtx.Unlock() + d.useProto = useProto +} + +// Decode implements SerDes +func (d *DynamicSerDes) Decode(bytes []byte) (interface{}, error) { + d.mtx.Lock() + defer d.mtx.Unlock() + + out, err := d.json.Decode(bytes) + if err == nil { + return out, nil } - out := factory() - if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil { - return err + + if !d.useProto { + log.Infof("Error decoding json, switching to writing proto: %v", err) + d.useProto = true } - return nil + + return d.proto.Decode(bytes) +} + +// Encode implemenrs SerDes +func (d *DynamicSerDes) Encode(msg interface{}) ([]byte, error) { + d.mtx.Lock() + defer d.mtx.Unlock() + if d.useProto { + return d.proto.Encode(msg) + } + return d.json.Encode(msg) } // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. -func (c *consulClient) CAS(key string, factory InstanceFactory, f CASCallback) error { +func (c *consulClient) CAS(key string, f CASCallback) error { var ( index = uint64(0) retries = 10 @@ -96,9 +191,9 @@ func (c *consulClient) CAS(key string, factory InstanceFactory, f CASCallback) e } var intermediate interface{} if kvp != nil { - out := factory() - if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil { - log.Errorf("Error deserialising %s: %v", key, err) + out, err := c.serdes.Decode(kvp.Value) + if err != nil { + log.Errorf("Error decoding %s: %v", key, err) continue } // If key doesn't exist, index will be 0. @@ -119,14 +214,14 @@ func (c *consulClient) CAS(key string, factory InstanceFactory, f CASCallback) e panic("Callback must instantiate value!") } - value := bytes.Buffer{} - if err := json.NewEncoder(&value).Encode(intermediate); err != nil { + bytes, err := c.serdes.Encode(intermediate) + if err != nil { log.Errorf("Error serialising value for %s: %v", key, err) continue } ok, _, err := c.kv.CAS(&consul.KVPair{ Key: key, - Value: value.Bytes(), + Value: bytes, ModifyIndex: index, }, writeOptions) if err != nil { @@ -189,7 +284,7 @@ func isClosed(done <-chan struct{}) bool { // supplied which generates an empty struct for WatchPrefix to deserialise // into. Values in Consul are assumed to be JSON. This function blocks until // the done channel is closed. -func (c *consulClient) WatchPrefix(prefix string, factory InstanceFactory, done <-chan struct{}, f func(string, interface{}) bool) { +func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(string, interface{}) bool) { var ( backoff = newBackoff(done) index = uint64(0) @@ -218,9 +313,9 @@ func (c *consulClient) WatchPrefix(prefix string, factory InstanceFactory, done index = meta.LastIndex for _, kvp := range kvps { - out := factory() - if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil { - log.Errorf("Error deserialising %s: %v", kvp.Key, err) + out, err := c.serdes.Decode(kvp.Value) + if err != nil { + log.Errorf("Error decoding %s: %v", kvp.Key, err) continue } if !f(kvp.Key, out) { @@ -236,7 +331,7 @@ func (c *consulClient) WatchPrefix(prefix string, factory InstanceFactory, done // supplied which generates an empty struct for WatchKey to deserialise // into. Values in Consul are assumed to be JSON. This function blocks until // the done channel is closed. -func (c *consulClient) WatchKey(key string, factory InstanceFactory, done <-chan struct{}, f func(interface{}) bool) { +func (c *consulClient) WatchKey(key string, done <-chan struct{}, f func(interface{}) bool) { var ( backoff = newBackoff(done) index = uint64(0) @@ -266,9 +361,10 @@ func (c *consulClient) WatchKey(key string, factory InstanceFactory, done <-chan var out interface{} if kvp != nil { - out = factory() - if err := json.NewDecoder(bytes.NewReader(kvp.Value)).Decode(out); err != nil { - log.Errorf("Error deserialising %s: %v", kvp.Key, err) + var err error + out, err = c.serdes.Decode(kvp.Value) + if err != nil { + log.Errorf("Error decoding %s: %v", key, err) continue } } @@ -296,25 +392,20 @@ func PrefixClient(client ConsulClient, prefix string) ConsulClient { return &prefixedConsulClient{prefix, client} } -// Get and deserialise a JSON value from Consul. -func (c *prefixedConsulClient) Get(key string, factory InstanceFactory) error { - return c.consul.Get(c.prefix+key, factory) -} - // CAS atomically modifies a value in a callback. If the value doesn't exist, // you'll get 'nil' as an argument to your callback. -func (c *prefixedConsulClient) CAS(key string, factory InstanceFactory, f CASCallback) error { - return c.consul.CAS(c.prefix+key, factory, f) +func (c *prefixedConsulClient) CAS(key string, f CASCallback) error { + return c.consul.CAS(c.prefix+key, f) } // WatchPrefix watches a prefix. This is in addition to the prefix we already have. -func (c *prefixedConsulClient) WatchPrefix(path string, factory InstanceFactory, done <-chan struct{}, f func(string, interface{}) bool) { - c.consul.WatchPrefix(c.prefix+path, factory, done, f) +func (c *prefixedConsulClient) WatchPrefix(path string, done <-chan struct{}, f func(string, interface{}) bool) { + c.consul.WatchPrefix(c.prefix+path, done, f) } // WatchKey watches a key. -func (c *prefixedConsulClient) WatchKey(key string, factory InstanceFactory, done <-chan struct{}, f func(interface{}) bool) { - c.consul.WatchKey(c.prefix+key, factory, done, f) +func (c *prefixedConsulClient) WatchKey(key string, done <-chan struct{}, f func(interface{}) bool) { + c.consul.WatchKey(c.prefix+key, done, f) } // PutBytes writes bytes to Consul. diff --git a/ring/http.go b/ring/http.go index 3a5895c5db..85fc618d63 100644 --- a/ring/http.go +++ b/ring/http.go @@ -52,7 +52,7 @@ func init() { tmpl, err = template.New("webpage"). Funcs(template.FuncMap{ "time": func(in interface{}) string { - return in.(time.Time).String() + return time.Unix(in.(int64), 0).String() }, }). Parse(tpl) @@ -71,7 +71,7 @@ func (r *Ring) forget(id string) error { ringDesc.removeIngester(id) return ringDesc, true, nil } - return r.consul.CAS(consulKey, descFactory, unregister) + return r.consul.CAS(consulKey, unregister) } func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -88,7 +88,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { r.mtx.RLock() defer r.mtx.RUnlock() if err := tmpl.Execute(w, struct { - Ring Desc + Ring *Desc Message string Now time.Time }{ diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index cf2b5bfcba..96fb4044fd 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -25,12 +25,12 @@ const ( type IngesterRegistration struct { consul ConsulClient numTokens int + serdes *DynamicSerDes - id string - hostname string - grpcHostname string - quit chan struct{} - wait sync.WaitGroup + id string + hostname string + quit chan struct{} + wait sync.WaitGroup // We need to remember the ingester state just in case consul goes away and comes // back empty. Channel is used to tell the actor to update consul on state changes. @@ -41,7 +41,7 @@ type IngesterRegistration struct { } // RegisterIngester registers an ingester with Consul. -func RegisterIngester(consulClient ConsulClient, listenPort, grpcPort, numTokens int) (*IngesterRegistration, error) { +func RegisterIngester(consulClient ConsulClient, grpcPort, numTokens int, serdes *DynamicSerDes) (*IngesterRegistration, error) { hostname, err := os.Hostname() if err != nil { return nil, err @@ -55,16 +55,16 @@ func RegisterIngester(consulClient ConsulClient, listenPort, grpcPort, numTokens r := &IngesterRegistration{ consul: consulClient, numTokens: numTokens, + serdes: serdes, id: hostname, // hostname is the ip+port of this instance, written to consul so // the distributors know where to connect. - hostname: fmt.Sprintf("%s:%d", addr, listenPort), - grpcHostname: fmt.Sprintf("%s:%d", addr, grpcPort), - quit: make(chan struct{}), + hostname: fmt.Sprintf("%s:%d", addr, grpcPort), + quit: make(chan struct{}), // Only read/written on actor goroutine. - state: Active, + state: IngesterState_ACTIVE, stateChange: make(chan IngesterState), consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{ @@ -125,16 +125,18 @@ func (r *IngesterRegistration) pickTokens() []uint32 { tokens = append(tokens, newTokens...) } sort.Sort(sortableUint32(tokens)) - ringDesc.addIngester(r.id, r.hostname, r.grpcHostname, tokens, r.state) + ringDesc.addIngester(r.id, r.hostname, tokens, r.state) return ringDesc, true, nil } - if err := r.consul.CAS(consulKey, descFactory, pickTokens); err != nil { + if err := r.consul.CAS(consulKey, pickTokens); err != nil { log.Fatalf("Failed to pick tokens in consul: %v", err) } return tokens } func (r *IngesterRegistration) heartbeat(tokens []uint32) { + allIngestersCanReadProtos := false + updateConsul := func(in interface{}) (out interface{}, retry bool, err error) { var ringDesc *Desc if in == nil { @@ -143,14 +145,27 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { ringDesc = in.(*Desc) } + // See if all ingesters can read protos; if so start writing them + protoRing := true + for _, ing := range ringDesc.Ingesters { + if !ing.ProtoRing { + allIngestersCanReadProtos = false + break + } + } + allIngestersCanReadProtos = protoRing + ingesterDesc, ok := ringDesc.Ingesters[r.id] if !ok { // consul must have restarted log.Infof("Found empty ring, inserting tokens!") - ringDesc.addIngester(r.id, r.hostname, r.grpcHostname, tokens, r.state) + ringDesc.addIngester(r.id, r.hostname, tokens, r.state) } else { - ingesterDesc.Timestamp = time.Now() + ingesterDesc.Timestamp = time.Now().Unix() ingesterDesc.State = r.state + + // Set ProtoRing back to true for the case where an existing ingester that didn't understand this field removed it whilst updating the ring. + ingesterDesc.ProtoRing = true ringDesc.Ingesters[r.id] = ingesterDesc } @@ -162,13 +177,15 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { for { select { case r.state = <-r.stateChange: - if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil { + if err := r.consul.CAS(consulKey, updateConsul); err != nil { log.Errorf("Failed to write to consul, sleeping: %v", err) } case <-ticker.C: r.consulHeartbeats.Inc() - if err := r.consul.CAS(consulKey, descFactory, updateConsul); err != nil { + if err := r.consul.CAS(consulKey, updateConsul); err != nil { log.Errorf("Failed to write to consul, sleeping: %v", err) + } else if allIngestersCanReadProtos { + r.serdes.UseProto(true) } case <-r.quit: return @@ -186,7 +203,7 @@ func (r *IngesterRegistration) unregister() { ringDesc.removeIngester(r.id) return ringDesc, true, nil } - if err := r.consul.CAS(consulKey, descFactory, unregister); err != nil { + if err := r.consul.CAS(consulKey, unregister); err != nil { log.Fatalf("Failed to unregister from consul: %v", err) } } diff --git a/ring/model.go b/ring/model.go index 966dcfb1e3..c2b00944bb 100644 --- a/ring/model.go +++ b/ring/model.go @@ -1,89 +1,97 @@ package ring import ( + "encoding/json" "sort" "time" + + "github.com/golang/protobuf/proto" ) -// IngesterState describes the state of an ingester -type IngesterState int +// ByToken is a sortable list of TokenDescs +type ByToken []*TokenDesc -// Values for IngesterState -const ( - Active IngesterState = iota - Leaving -) +func (ts ByToken) Len() int { return len(ts) } +func (ts ByToken) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } +func (ts ByToken) Less(i, j int) bool { return ts[i].Token < ts[j].Token } -func (s IngesterState) String() string { - switch s { - case Active: - return "Active" - case Leaving: - return "Leaving" - } - return "" +// ProtoDescFactory makes new Descs +func ProtoDescFactory() proto.Message { + return newDesc() } -// Desc is the serialised state in Consul representing -// all ingesters (ie, the ring). -type Desc struct { - Ingesters map[string]IngesterDesc `json:"ingesters"` - Tokens TokenDescs `json:"tokens"` +// DescFactory makes new Descs +func DescFactory() interface{} { + return newDesc() } -// IngesterDesc describes a single ingester. -type IngesterDesc struct { - Hostname string `json:"hostname"` - Timestamp time.Time `json:"timestamp"` - State IngesterState `json:"state"` - - GRPCHostname string `json:"grpc_hostname"` +func newDesc() *Desc { + return &Desc{ + Ingesters: map[string]*IngesterDesc{}, + } } -// TokenDescs is a sortable list of TokenDescs -type TokenDescs []TokenDesc +type oldIngesterDesc struct { + Hostname string `json:"hostname"` + Timestamp time.Time `json:"timestamp"` + State IngesterState `json:"state"` + GRPCHostname string `json:"grpc_hostname"` + ProtoRing bool `json:"proto_ring"` +} -func (ts TokenDescs) Len() int { return len(ts) } -func (ts TokenDescs) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] } -func (ts TokenDescs) Less(i, j int) bool { return ts[i].Token < ts[j].Token } +// UnmarshalJSON allows the new proto IngesterDescs to read the old JSON format. +// +// NB grpc_hostname in the old format is just hostname in the new. +func (d *IngesterDesc) UnmarshalJSON(in []byte) error { + var tmp oldIngesterDesc + if err := json.Unmarshal(in, &tmp); err != nil { + return err + } -// TokenDesc describes an individual token in the ring. -type TokenDesc struct { - Token uint32 `json:"tokens"` - Ingester string `json:"ingester"` + d.Hostname = tmp.GRPCHostname + d.Timestamp = tmp.Timestamp.Unix() + d.State = tmp.State + d.ProtoRing = tmp.ProtoRing + return nil } -func descFactory() interface{} { - return newDesc() +// MarshalJSON allows the new proto IngesterDescs to write the old JSON format. +// +// NB grpc_hostname in the old format is just hostname in the new. +func (d *IngesterDesc) MarshalJSON() ([]byte, error) { + return json.Marshal(oldIngesterDesc{ + Hostname: "", + Timestamp: time.Unix(d.Timestamp, 0), + State: d.State, + GRPCHostname: d.Hostname, + ProtoRing: d.ProtoRing, + }) } -func newDesc() *Desc { - return &Desc{ - Ingesters: map[string]IngesterDesc{}, +func (d *Desc) addIngester(id, hostname string, tokens []uint32, state IngesterState) { + if d.Ingesters == nil { + d.Ingesters = map[string]*IngesterDesc{} } -} - -func (d *Desc) addIngester(id, hostname, grpcHostname string, tokens []uint32, state IngesterState) { - d.Ingesters[id] = IngesterDesc{ - Hostname: hostname, - GRPCHostname: grpcHostname, - Timestamp: time.Now(), - State: state, + d.Ingesters[id] = &IngesterDesc{ + Hostname: hostname, + Timestamp: time.Now().Unix(), + State: state, + ProtoRing: true, } for _, token := range tokens { - d.Tokens = append(d.Tokens, TokenDesc{ + d.Tokens = append(d.Tokens, &TokenDesc{ Token: token, Ingester: id, }) } - sort.Sort(d.Tokens) + sort.Sort(ByToken(d.Tokens)) } func (d *Desc) removeIngester(id string) { delete(d.Ingesters, id) - output := []TokenDesc{} + output := []*TokenDesc{} for i := 0; i < len(d.Tokens); i++ { if d.Tokens[i].Ingester != id { output = append(output, d.Tokens[i]) diff --git a/ring/ring.go b/ring/ring.go index a6ac8585c7..273aad475c 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -42,7 +42,7 @@ type Ring struct { heartbeatTimeout time.Duration mtx sync.RWMutex - ringDesc Desc + ringDesc *Desc ingesterOwnershipDesc *prometheus.Desc numIngestersDesc *prometheus.Desc @@ -84,7 +84,7 @@ func (r *Ring) Stop() { func (r *Ring) loop() { defer close(r.done) - r.consul.WatchKey(consulKey, descFactory, r.quit, func(value interface{}) bool { + r.consul.WatchKey(consulKey, r.quit, func(value interface{}) bool { if value == nil { log.Infof("Ring doesn't exist in consul yet.") return true @@ -93,13 +93,13 @@ func (r *Ring) loop() { ringDesc := value.(*Desc) r.mtx.Lock() defer r.mtx.Unlock() - r.ringDesc = *ringDesc + r.ringDesc = ringDesc return true }) } // Get returns n (or more) ingesters which form the replicas for the given key. -func (r *Ring) Get(key uint32, n int, op Operation) ([]IngesterDesc, error) { +func (r *Ring) Get(key uint32, n int, op Operation) ([]*IngesterDesc, error) { r.mtx.RLock() defer r.mtx.RUnlock() return r.getInternal(key, n, op) @@ -107,11 +107,11 @@ func (r *Ring) Get(key uint32, n int, op Operation) ([]IngesterDesc, error) { // BatchGet returns n (or more) ingesters which form the replicas for the given key. // The order of the result matches the order of the input. -func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]IngesterDesc, error) { +func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]*IngesterDesc, error) { r.mtx.RLock() defer r.mtx.RUnlock() - result := make([][]IngesterDesc, len(keys), len(keys)) + result := make([][]*IngesterDesc, len(keys), len(keys)) for i, key := range keys { ingesters, err := r.getInternal(key, n, op) if err != nil { @@ -122,12 +122,12 @@ func (r *Ring) BatchGet(keys []uint32, n int, op Operation) ([][]IngesterDesc, e return result, nil } -func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, error) { +func (r *Ring) getInternal(key uint32, n int, op Operation) ([]*IngesterDesc, error) { if len(r.ringDesc.Tokens) == 0 { return nil, ErrEmptyRing } - ingesters := make([]IngesterDesc, 0, n) + ingesters := make([]*IngesterDesc, 0, n) distinctHosts := map[string]struct{}{} start := r.search(key) iterations := 0 @@ -150,7 +150,7 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, err // set of replicas for the key. This means we have to also increase the // size of the replica set for read, but we can read from Leaving ingesters, // so don't skip it in this case. - if ingester.State == Leaving { + if ingester.State == IngesterState_LEAVING { n++ if op == Write { continue @@ -163,13 +163,13 @@ func (r *Ring) getInternal(key uint32, n int, op Operation) ([]IngesterDesc, err } // GetAll returns all available ingesters in the circle. -func (r *Ring) GetAll() []IngesterDesc { +func (r *Ring) GetAll() []*IngesterDesc { r.mtx.RLock() defer r.mtx.RUnlock() - ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) + ingesters := make([]*IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range r.ringDesc.Ingesters { - if time.Now().Sub(ingester.Timestamp) > r.heartbeatTimeout { + if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) > r.heartbeatTimeout { continue } ingesters = append(ingesters, ingester) @@ -183,9 +183,9 @@ func (r *Ring) Ready() bool { defer r.mtx.RUnlock() for _, ingester := range r.ringDesc.Ingesters { - if time.Now().Sub(ingester.Timestamp) > r.heartbeatTimeout { + if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) > r.heartbeatTimeout { return false - } else if ingester.State != Active { + } else if ingester.State != IngesterState_ACTIVE { return false } } @@ -237,12 +237,12 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { // Initialised to zero so we emit zero-metrics (instead of not emitting anything) byState := map[string]int{ - unhealthy: 0, - Active.String(): 0, - Leaving.String(): 0, + unhealthy: 0, + IngesterState_ACTIVE.String(): 0, + IngesterState_LEAVING.String(): 0, } for _, ingester := range r.ringDesc.Ingesters { - if time.Now().Sub(ingester.Timestamp) > r.heartbeatTimeout { + if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) > r.heartbeatTimeout { byState[unhealthy]++ } else { byState[ingester.State.String()]++ diff --git a/ring/ring.proto b/ring/ring.proto new file mode 100644 index 0000000000..dcc23d66be --- /dev/null +++ b/ring/ring.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package ring; + +message Desc { + map ingesters = 1; + repeated TokenDesc tokens = 2; +} + +message IngesterDesc { + string hostname = 1; + int64 timestamp = 2; + IngesterState state = 3; + bool protoRing = 5; +} + +message TokenDesc { + uint32 token = 1; + string ingester = 2; +} + +enum IngesterState { + ACTIVE = 0; + LEAVING = 1; +} From ce3fde4aef3371b9e9e8c73c3ce17616d501cca3 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 25 Nov 2016 16:40:00 +0000 Subject: [PATCH 2/4] Don't spam logs when switching to protos; fix typo which meant we switched to early. --- ring/consul_client.go | 7 ++++--- ring/ingester_lifecycle.go | 9 ++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ring/consul_client.go b/ring/consul_client.go index 22566e2977..25ca5d19db 100644 --- a/ring/consul_client.go +++ b/ring/consul_client.go @@ -140,11 +140,12 @@ func NewDynamicSerDes(json, proto SerDes) *DynamicSerDes { // UseProto allow you to change the SerDes at runtime. func (d *DynamicSerDes) UseProto(useProto bool) { - log.Infof("Switching to proto serialization: %v", useProto) - d.mtx.Lock() defer d.mtx.Unlock() - d.useProto = useProto + if d.useProto != useProto { + log.Infof("Switching to proto serialization: %v", useProto) + d.useProto = useProto + } } // Decode implements SerDes diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index 96fb4044fd..7695cf3c08 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -135,7 +135,6 @@ func (r *IngesterRegistration) pickTokens() []uint32 { } func (r *IngesterRegistration) heartbeat(tokens []uint32) { - allIngestersCanReadProtos := false updateConsul := func(in interface{}) (out interface{}, retry bool, err error) { var ringDesc *Desc @@ -146,14 +145,16 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { } // See if all ingesters can read protos; if so start writing them - protoRing := true + allIngestersCanReadProtos := true for _, ing := range ringDesc.Ingesters { if !ing.ProtoRing { allIngestersCanReadProtos = false break } } - allIngestersCanReadProtos = protoRing + if allIngestersCanReadProtos { + r.serdes.UseProto(true) + } ingesterDesc, ok := ringDesc.Ingesters[r.id] if !ok { @@ -184,8 +185,6 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { r.consulHeartbeats.Inc() if err := r.consul.CAS(consulKey, updateConsul); err != nil { log.Errorf("Failed to write to consul, sleeping: %v", err) - } else if allIngestersCanReadProtos { - r.serdes.UseProto(true) } case <-r.quit: return From 82991da8bd30244125da717e451f12265d13e22e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 25 Nov 2016 17:04:17 +0000 Subject: [PATCH 3/4] Review feedback --- cmd/cortex/main.go | 10 ++--- ring/consul_client.go | 77 +++++++++++++++++++------------------- ring/ingester_lifecycle.go | 10 ++--- 3 files changed, 49 insertions(+), 48 deletions(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 695d5b0e78..cf8c075c57 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -138,11 +138,11 @@ func main() { prometheus.MustRegister(resourceWatcher) } - ringSerDes := ring.NewDynamicSerDes( - ring.JSONSerDes{Factory: ring.DescFactory}, - ring.ProtoSerDes{Factory: ring.ProtoDescFactory}, + ringCodec := ring.NewDynamicCodec( + ring.JSONCodec{Factory: ring.DescFactory}, + ring.ProtoCodec{Factory: ring.ProtoDescFactory}, ) - consul, err := ring.NewConsulClient(cfg.consulHost, ringSerDes) + consul, err := ring.NewConsulClient(cfg.consulHost, ringCodec) if err != nil { log.Fatalf("Error initializing Consul client: %v", err) } @@ -160,7 +160,7 @@ func main() { case modeIngester: cfg.ingesterConfig.Ring = r - registration, err := ring.RegisterIngester(consul, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens, ringSerDes) + registration, err := ring.RegisterIngester(consul, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens, ringCodec) if err != nil { // This only happens for errors in configuration & set-up, not for // network errors. diff --git a/ring/consul_client.go b/ring/consul_client.go index 25ca5d19db..a4989438d2 100644 --- a/ring/consul_client.go +++ b/ring/consul_client.go @@ -29,8 +29,8 @@ type ConsulClient interface { // CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. type CASCallback func(in interface{}) (out interface{}, retry bool, err error) -// SerDes allows the consult client to serialise and deserialise values. -type SerDes interface { +// Codec allows the consult client to serialise and deserialise values. +type Codec interface { Decode([]byte) (interface{}, error) Encode(interface{}) ([]byte, error) } @@ -44,11 +44,11 @@ type kv interface { type consulClient struct { kv - serdes SerDes + codec Codec } // NewConsulClient returns a new ConsulClient. -func NewConsulClient(addr string, serdes SerDes) (ConsulClient, error) { +func NewConsulClient(addr string, codec Codec) (ConsulClient, error) { client, err := consul.NewClient(&consul.Config{ Address: addr, Scheme: "http", @@ -57,8 +57,8 @@ func NewConsulClient(addr string, serdes SerDes) (ConsulClient, error) { return nil, err } return &consulClient{ - kv: client.KV(), - serdes: serdes, + kv: client.KV(), + codec: codec, }, nil } @@ -72,13 +72,13 @@ var ( ErrNotFound = fmt.Errorf("Not found") ) -// ProtoSerDes is a SerDes for proto/snappy -type ProtoSerDes struct { +// ProtoCodec is a Codec for proto/snappy +type ProtoCodec struct { Factory func() proto.Message } -// Decode implements SerDes -func (p ProtoSerDes) Decode(bytes []byte) (interface{}, error) { +// Decode implements Codec +func (p ProtoCodec) Decode(bytes []byte) (interface{}, error) { out := p.Factory() bytes, err := snappy.Decode(nil, bytes) if err != nil { @@ -90,8 +90,8 @@ func (p ProtoSerDes) Decode(bytes []byte) (interface{}, error) { return out, nil } -// Encode implements SerDes -func (p ProtoSerDes) Encode(msg interface{}) ([]byte, error) { +// Encode implements Codec +func (p ProtoCodec) Encode(msg interface{}) ([]byte, error) { bytes, err := proto.Marshal(msg.(proto.Message)) if err != nil { return nil, err @@ -99,13 +99,13 @@ func (p ProtoSerDes) Encode(msg interface{}) ([]byte, error) { return snappy.Encode(nil, bytes), nil } -// JSONSerDes is a SerDes for JSON -type JSONSerDes struct { +// JSONCodec is a Codec for JSON +type JSONCodec struct { Factory func() interface{} } -// Decode implements SerDes -func (j JSONSerDes) Decode(bytes []byte) (interface{}, error) { +// Decode implements Codec +func (j JSONCodec) Decode(bytes []byte) (interface{}, error) { out := j.Factory() if err := json.Unmarshal(bytes, out); err != nil { return nil, err @@ -113,43 +113,43 @@ func (j JSONSerDes) Decode(bytes []byte) (interface{}, error) { return out, nil } -// Encode implemenrs SerDes -func (j JSONSerDes) Encode(msg interface{}) ([]byte, error) { +// Encode implemenrs Codec +func (j JSONCodec) Encode(msg interface{}) ([]byte, error) { return json.Marshal(msg) } -// DynamicSerDes is a SerDes that can read json and proto, and +// DynamicCodec is a Codec that can read json and proto, and // that can serialise to either (selectively). // Once it fails to decode JSON, it will start decoding (and // writing) protos. -type DynamicSerDes struct { +type DynamicCodec struct { mtx sync.Mutex useProto bool - json SerDes - proto SerDes + json Codec + proto Codec } -// NewDynamicSerDes makes a new DynamicSerDes -func NewDynamicSerDes(json, proto SerDes) *DynamicSerDes { - return &DynamicSerDes{ +// NewDynamicCodec makes a new DynamicCodec +func NewDynamicCodec(json, proto Codec) *DynamicCodec { + return &DynamicCodec{ useProto: false, json: json, proto: proto, } } -// UseProto allow you to change the SerDes at runtime. -func (d *DynamicSerDes) UseProto(useProto bool) { +// UseProto allow you to change the Codec at runtime. +func (d *DynamicCodec) UseProto(useProto bool) { d.mtx.Lock() defer d.mtx.Unlock() if d.useProto != useProto { - log.Infof("Switching to proto serialization: %v", useProto) + log.Infof("Using to proto serialization: %v", useProto) d.useProto = useProto } } -// Decode implements SerDes -func (d *DynamicSerDes) Decode(bytes []byte) (interface{}, error) { +// Decode implements Codec +func (d *DynamicCodec) Decode(bytes []byte) (interface{}, error) { d.mtx.Lock() defer d.mtx.Unlock() @@ -158,16 +158,17 @@ func (d *DynamicSerDes) Decode(bytes []byte) (interface{}, error) { return out, nil } - if !d.useProto { + out, err = d.proto.Decode(bytes) + if err == nil && !d.useProto { log.Infof("Error decoding json, switching to writing proto: %v", err) d.useProto = true } - return d.proto.Decode(bytes) + return out, err } -// Encode implemenrs SerDes -func (d *DynamicSerDes) Encode(msg interface{}) ([]byte, error) { +// Encode implemenrs Codec +func (d *DynamicCodec) Encode(msg interface{}) ([]byte, error) { d.mtx.Lock() defer d.mtx.Unlock() if d.useProto { @@ -192,7 +193,7 @@ func (c *consulClient) CAS(key string, f CASCallback) error { } var intermediate interface{} if kvp != nil { - out, err := c.serdes.Decode(kvp.Value) + out, err := c.codec.Decode(kvp.Value) if err != nil { log.Errorf("Error decoding %s: %v", key, err) continue @@ -215,7 +216,7 @@ func (c *consulClient) CAS(key string, f CASCallback) error { panic("Callback must instantiate value!") } - bytes, err := c.serdes.Encode(intermediate) + bytes, err := c.codec.Encode(intermediate) if err != nil { log.Errorf("Error serialising value for %s: %v", key, err) continue @@ -314,7 +315,7 @@ func (c *consulClient) WatchPrefix(prefix string, done <-chan struct{}, f func(s index = meta.LastIndex for _, kvp := range kvps { - out, err := c.serdes.Decode(kvp.Value) + out, err := c.codec.Decode(kvp.Value) if err != nil { log.Errorf("Error decoding %s: %v", kvp.Key, err) continue @@ -363,7 +364,7 @@ func (c *consulClient) WatchKey(key string, done <-chan struct{}, f func(interfa var out interface{} if kvp != nil { var err error - out, err = c.serdes.Decode(kvp.Value) + out, err = c.codec.Decode(kvp.Value) if err != nil { log.Errorf("Error decoding %s: %v", key, err) continue diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index 7695cf3c08..c72d8076ad 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -25,7 +25,7 @@ const ( type IngesterRegistration struct { consul ConsulClient numTokens int - serdes *DynamicSerDes + codec *DynamicCodec id string hostname string @@ -41,7 +41,7 @@ type IngesterRegistration struct { } // RegisterIngester registers an ingester with Consul. -func RegisterIngester(consulClient ConsulClient, grpcPort, numTokens int, serdes *DynamicSerDes) (*IngesterRegistration, error) { +func RegisterIngester(consulClient ConsulClient, grpcPort, numTokens int, codec *DynamicCodec) (*IngesterRegistration, error) { hostname, err := os.Hostname() if err != nil { return nil, err @@ -55,7 +55,7 @@ func RegisterIngester(consulClient ConsulClient, grpcPort, numTokens int, serdes r := &IngesterRegistration{ consul: consulClient, numTokens: numTokens, - serdes: serdes, + codec: codec, id: hostname, // hostname is the ip+port of this instance, written to consul so @@ -153,7 +153,7 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { } } if allIngestersCanReadProtos { - r.serdes.UseProto(true) + r.codec.UseProto(true) } ingesterDesc, ok := ringDesc.Ingesters[r.id] @@ -165,7 +165,7 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { ingesterDesc.Timestamp = time.Now().Unix() ingesterDesc.State = r.state - // Set ProtoRing back to true for the case where an existing ingester that didn't understand this field removed it whilst updating the ring. + // Set ProtoRing back to true for the case where an existing ingestser that didn't understand this field removed it whilst updating the ring. ingesterDesc.ProtoRing = true ringDesc.Ingesters[r.id] = ingesterDesc } From fd9dee753bdbbba5bc6b8197bb2370fbefc35998 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 25 Nov 2016 17:28:11 +0000 Subject: [PATCH 4/4] :trollface: --- ring/ingester_lifecycle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index c72d8076ad..0838eeda8f 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -165,7 +165,7 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { ingesterDesc.Timestamp = time.Now().Unix() ingesterDesc.State = r.state - // Set ProtoRing back to true for the case where an existing ingestser that didn't understand this field removed it whilst updating the ring. + // Set ProtoRing back to true for the case where an existing ingester that didn't understand this field removed it whilst updating the ring. ingesterDesc.ProtoRing = true ringDesc.Ingesters[r.id] = ingesterDesc }