Skip to content

Switch to snappy/proto for encoding ring in consul #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmd/cortex/cortex
.uptodate
.pkg
cortex.pb.go
*.pb.go
ui/bindata.go
18 changes: 11 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ IMAGE_NAMES=$(foreach dir,$(DOCKER_IMAGE_DIRS),$(patsubst %,$(IMAGE_PREFIX)/%,$(
images:
$(info $(IMAGE_NAMES))

PROTO_DEFS := $(shell find . -type f -name "*.proto" ! -path "./tools/*" ! -path "./vendor/*")
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS))

# List of exes please
CORTEX_EXE := ./cmd/cortex/cortex
EXES = $(CORTEX_EXE)

all: $(UPTODATE_FILES)
test: $(PROTO_GOS)

# And what goes into each exe
$(CORTEX_EXE): $(shell find . -name '*.go') ui/bindata.go cortex.pb.go
cortex.pb.go: cortex.proto
$(CORTEX_EXE): $(shell find . -name '*.go' ! -path "./tools/*" ! -path "./vendor/*") ui/bindata.go $(PROTO_GOS)
%.pb.go: %.proto
ui/bindata.go: $(shell find ui/static ui/templates)

# And now what goes into each image
Expand All @@ -55,7 +59,7 @@ NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \

ifeq ($(BUILD_IN_CONTAINER),true)

$(EXES) cortex.pb.go ui/bindata.go lint test shell: cortex-build/$(UPTODATE)
$(EXES) $(PROTO_GOS) ui/bindata.go lint test shell: cortex-build/$(UPTODATE)
@mkdir -p $(shell pwd)/.pkg
$(SUDO) docker run $(RM) -ti \
-v $(shell pwd)/.pkg:/go/pkg \
Expand All @@ -68,16 +72,16 @@ $(EXES): cortex-build/$(UPTODATE)
go build $(GO_FLAGS) -o $@ ./$(@D)
$(NETGO_CHECK)

cortex.pb.go: cortex-build/$(UPTODATE)
protoc -I ./vendor:. --go_out=plugins=grpc:. ./cortex.proto
%.pb.go: cortex-build/$(UPTODATE)
protoc -I ./vendor:./$(@D) --go_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@)

ui/bindata.go: cortex-build/$(UPTODATE)
go-bindata -pkg ui -o ui/bindata.go -ignore '(.*\.map|bootstrap\.js|bootstrap-theme\.css|bootstrap\.css)' ui/templates/... ui/static/...

lint: cortex-build/$(UPTODATE)
./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers .

test: cortex-build/$(UPTODATE) cortex.pb.go
test: cortex-build/$(UPTODATE)
./tools/test -no-go-get

shell: cortex-build/$(UPTODATE)
Expand All @@ -87,7 +91,7 @@ endif

clean:
$(SUDO) docker rmi $(IMAGE_NAMES) >/dev/null 2>&1 || true
rm -rf $(UPTODATE_FILES) $(EXES) cortex.pb.go ui/bindata.go
rm -rf $(UPTODATE_FILES) $(EXES) $(PROTO_GOS) ui/bindata.go
go clean ./...


10 changes: 7 additions & 3 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ func main() {
prometheus.MustRegister(resourceWatcher)
}

consul, err := ring.NewConsulClient(cfg.consulHost)
ringCodec := ring.NewDynamicCodec(
ring.JSONCodec{Factory: ring.DescFactory},
ring.ProtoCodec{Factory: ring.ProtoDescFactory},
)
consul, err := ring.NewConsulClient(cfg.consulHost, ringCodec)
if err != nil {
log.Fatalf("Error initializing Consul client: %v", err)
}
Expand All @@ -156,7 +160,7 @@ func main() {

case modeIngester:
cfg.ingesterConfig.Ring = r
registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens)
registration, err := ring.RegisterIngester(consul, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens, ringCodec)
if err != nil {
// This only happens for errors in configuration & set-up, not for
// network errors.
Expand All @@ -183,7 +187,7 @@ func main() {

// Deferring a func to make ordering obvious
defer func() {
registration.ChangeState(ring.Leaving)
registration.ChangeState(ring.IngesterState_LEAVING)
ing.Stop()
registration.Unregister()
}()
Expand Down
20 changes: 10 additions & 10 deletions distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type Distributor struct {
type ReadRing interface {
prometheus.Collector

Get(key uint32, n int, op ring.Operation) ([]ring.IngesterDesc, error)
BatchGet(keys []uint32, n int, op ring.Operation) ([][]ring.IngesterDesc, error)
GetAll() []ring.IngesterDesc
Get(key uint32, n int, op ring.Operation) ([]*ring.IngesterDesc, error)
BatchGet(keys []uint32, n int, op ring.Operation) ([][]*ring.IngesterDesc, error)
GetAll() []*ring.IngesterDesc
}

// Config contains the configuration require to
Expand Down Expand Up @@ -121,7 +121,7 @@ func New(cfg Config) (*Distributor, error) {
}, nil
}

func (d *Distributor) getClientFor(ingester ring.IngesterDesc) (cortex.IngesterClient, error) {
func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.IngesterClient, error) {
d.clientsMtx.RLock()
client, ok := d.clients[ingester.Hostname]
d.clientsMtx.RUnlock()
Expand All @@ -137,7 +137,7 @@ func (d *Distributor) getClientFor(ingester ring.IngesterDesc) (cortex.IngesterC
}

conn, err := grpc.Dial(
ingester.GRPCHostname,
ingester.Hostname,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
Expand Down Expand Up @@ -192,7 +192,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
}

sampleTrackers := make([]sampleTracker, len(samples), len(samples))
samplesByIngester := map[ring.IngesterDesc][]*sampleTracker{}
samplesByIngester := map[*ring.IngesterDesc][]*sampleTracker{}
for i := range samples {
sampleTrackers[i] = sampleTracker{
sample: samples[i],
Expand All @@ -204,9 +204,9 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed ingesters
// will cause the whole write to fail.
liveIngesters := make([]ring.IngesterDesc, 0, len(ingesters[i]))
liveIngesters := make([]*ring.IngesterDesc, 0, len(ingesters[i]))
for _, ingester := range ingesters[i] {
if time.Now().Sub(ingester.Timestamp) <= d.cfg.HeartbeatTimeout {
if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) <= d.cfg.HeartbeatTimeout {
liveIngesters = append(liveIngesters, ingester)
}
}
Expand All @@ -226,7 +226,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort

errs := make(chan error)
for hostname, samples := range samplesByIngester {
go func(ingester ring.IngesterDesc, samples []*sampleTracker) {
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
errs <- d.sendSamples(ctx, ingester, samples)
}(hostname, samples)
}
Expand All @@ -246,7 +246,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort
return &cortex.WriteResponse{}, nil
}

func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, sampleTrackers []*sampleTracker) error {
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker) error {
client, err := d.getClientFor(ingester)
if err != nil {
return err
Expand Down
Loading