From f9619d27aa08194c9cd968b259408c58b2af956e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sun, 6 Nov 2016 10:50:47 -0800 Subject: [PATCH 1/2] Move distributor to its own package --- cmd/cortex/main.go | 21 +++++----- distributor.go => distributor/distributor.go | 10 ++--- .../ingester_client.go | 40 +++++++------------ server.go | 6 +-- 4 files changed, 33 insertions(+), 44 deletions(-) rename distributor.go => distributor/distributor.go (98%) rename ingester_client.go => distributor/ingester_client.go (84%) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index baee46def9..990500c24f 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -20,6 +20,7 @@ import ( "github.com/weaveworks/cortex" "github.com/weaveworks/cortex/chunk" + "github.com/weaveworks/cortex/distributor" "github.com/weaveworks/cortex/ingester" "github.com/weaveworks/cortex/querier" "github.com/weaveworks/cortex/ring" @@ -70,7 +71,7 @@ type cfg struct { logSuccess bool ingesterConfig ingester.Config - distributorConfig cortex.DistributorConfig + distributorConfig distributor.Config } func main() { @@ -113,8 +114,8 @@ func main() { switch cfg.mode { case modeDistributor: cfg.distributorConfig.Ring = r - cfg.distributorConfig.ClientFactory = func(address string) (*cortex.IngesterClient, error) { - return cortex.NewIngesterClient(address, cfg.remoteTimeout) + cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) { + return distributor.NewIngesterClient(address, cfg.remoteTimeout) } setupDistributor(cfg.distributorConfig, chunkStore, cfg.logSuccess) case modeIngester: @@ -178,26 +179,26 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) { } func setupDistributor( - cfg cortex.DistributorConfig, + cfg distributor.Config, chunkStore chunk.Store, logSuccess bool, ) { - distributor, err := cortex.NewDistributor(cfg) + dist, err := distributor.New(cfg) if err != nil { log.Fatal(err) } - prometheus.MustRegister(distributor) + prometheus.MustRegister(dist) prefix := "/api/prom" - http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(distributor, handleDistributorError))) + http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(dist, handleDistributorError))) // TODO: Move querier to separate binary. - setupQuerier(distributor, chunkStore, prefix, logSuccess) + setupQuerier(dist, chunkStore, prefix, logSuccess) } func handleDistributorError(w http.ResponseWriter, err error) { switch e := err.(type) { - case cortex.IngesterError: + case distributor.IngesterError: switch { case 400 <= e.StatusCode && e.StatusCode < 500: log.Warnf("append err: %v", err) @@ -215,7 +216,7 @@ func handleDistributorError(w http.ResponseWriter, err error) { // | // `----------> ChunkQuerier -> DynamoDB/S3 func setupQuerier( - distributor *cortex.Distributor, + distributor *distributor.Distributor, chunkStore chunk.Store, prefix string, logSuccess bool, diff --git a/distributor.go b/distributor/distributor.go similarity index 98% rename from distributor.go rename to distributor/distributor.go index c68bbad69d..1430356ea0 100644 --- a/distributor.go +++ b/distributor/distributor.go @@ -1,4 +1,4 @@ -package cortex +package distributor import ( "fmt" @@ -30,7 +30,7 @@ var ( // Distributor is a storage.SampleAppender and a cortex.Querier which // forwards appends and queries to individual ingesters. type Distributor struct { - cfg DistributorConfig + cfg Config clientsMtx sync.RWMutex clients map[string]*IngesterClient @@ -57,7 +57,7 @@ type IngesterClientFactory func(string) (*IngesterClient, error) // DistributorConfig contains the configuration require to // create a Distributor -type DistributorConfig struct { +type Config struct { Ring ReadRing ClientFactory IngesterClientFactory @@ -66,8 +66,8 @@ type DistributorConfig struct { HeartbeatTimeout time.Duration } -// NewDistributor constructs a new Distributor -func NewDistributor(cfg DistributorConfig) (*Distributor, error) { +// New constructs a new Distributor +func New(cfg Config) (*Distributor, error) { if 0 > cfg.ReplicationFactor { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } diff --git a/ingester_client.go b/distributor/ingester_client.go similarity index 84% rename from ingester_client.go rename to distributor/ingester_client.go index ac0f3bc56e..1be10b544b 100644 --- a/ingester_client.go +++ b/distributor/ingester_client.go @@ -1,17 +1,4 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cortex +package distributor import ( "bytes" @@ -28,6 +15,7 @@ import ( "github.com/prometheus/prometheus/storage/remote" "golang.org/x/net/context" + "github.com/weaveworks/cortex" "github.com/weaveworks/cortex/ingester" "github.com/weaveworks/cortex/user" ) @@ -100,32 +88,32 @@ func (c *IngesterClient) Append(ctx context.Context, samples []*model.Sample) er // Query implements Querier. func (c *IngesterClient) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) { - req := &ReadRequest{ + req := &cortex.ReadRequest{ StartTimestampMs: int64(from), EndTimestampMs: int64(to), } for _, matcher := range matchers { - var mType MatchType + var mType cortex.MatchType switch matcher.Type { case metric.Equal: - mType = MatchType_EQUAL + mType = cortex.MatchType_EQUAL case metric.NotEqual: - mType = MatchType_NOT_EQUAL + mType = cortex.MatchType_NOT_EQUAL case metric.RegexMatch: - mType = MatchType_REGEX_MATCH + mType = cortex.MatchType_REGEX_MATCH case metric.RegexNoMatch: - mType = MatchType_REGEX_NO_MATCH + mType = cortex.MatchType_REGEX_NO_MATCH default: panic("invalid matcher type") } - req.Matchers = append(req.Matchers, &LabelMatcher{ + req.Matchers = append(req.Matchers, &cortex.LabelMatcher{ Type: mType, Name: string(matcher.Name), Value: string(matcher.Value), }) } - resp := &ReadResponse{} + resp := &cortex.ReadResponse{} err := c.doRequest(ctx, "/query", req, resp, false) if err != nil { return nil, err @@ -154,10 +142,10 @@ func (c *IngesterClient) Query(ctx context.Context, from, to model.Time, matcher // LabelValuesForLabelName returns all of the label values that are associated with a given label name. func (c *IngesterClient) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) { - req := &LabelValuesRequest{ + req := &cortex.LabelValuesRequest{ LabelName: string(ln), } - resp := &LabelValuesResponse{} + resp := &cortex.LabelValuesResponse{} err := c.doRequest(ctx, "/label_values", req, resp, false) if err != nil { return nil, err @@ -172,7 +160,7 @@ func (c *IngesterClient) LabelValuesForLabelName(ctx context.Context, ln model.L // UserStats returns stats for the current user. func (c *IngesterClient) UserStats(ctx context.Context) (*ingester.UserStats, error) { - resp := &UserStatsResponse{} + resp := &cortex.UserStatsResponse{} err := c.doRequest(ctx, "/user_stats", nil, resp, false) if err != nil { return nil, err @@ -210,7 +198,7 @@ func (c *IngesterClient) doRequest(ctx context.Context, endpoint string, req pro if err != nil { return fmt.Errorf("unable to create request: %v", err) } - httpReq.Header.Add(userIDHeaderName, userID) + httpReq.Header.Add(cortex.UserIDHeaderName, userID) // TODO: This isn't actually the correct Content-type. httpReq.Header.Set("Content-Type", string(expfmt.FmtProtoDelim)) httpResp, err := c.client.Do(httpReq) diff --git a/server.go b/server.go index d807d78d5b..b72adc85eb 100644 --- a/server.go +++ b/server.go @@ -34,7 +34,7 @@ import ( ) // legacy from scope as a service. -const userIDHeaderName = "X-Scope-OrgID" +const UserIDHeaderName = "X-Scope-OrgID" // SampleAppender is the interface to append samples to both, local and remote // storage. All methods are goroutine-safe. @@ -43,7 +43,7 @@ type SampleAppender interface { } func parseRequest(w http.ResponseWriter, r *http.Request, req proto.Message) (ctx context.Context, abort bool) { - userID := r.Header.Get(userIDHeaderName) + userID := r.Header.Get(UserIDHeaderName) if userID == "" { http.Error(w, "", http.StatusUnauthorized) return nil, true @@ -120,7 +120,7 @@ func getSamples(req *remote.WriteRequest) []*model.Sample { // AppenderHandler returns a http.Handler that accepts proto encoded samples. func AppenderHandler(appender SampleAppender, errorHandler func(http.ResponseWriter, error)) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - userID := r.Header.Get(userIDHeaderName) + userID := r.Header.Get(UserIDHeaderName) if userID == "" { http.Error(w, "", http.StatusUnauthorized) return From 9d5fc261b4a9cfa3651843a7efe8384ad1a21da4 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sun, 6 Nov 2016 10:54:07 -0800 Subject: [PATCH 2/2] Fix lint --- distributor/distributor.go | 2 +- server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distributor/distributor.go b/distributor/distributor.go index 1430356ea0..7a02d3d8c4 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -55,7 +55,7 @@ type ReadRing interface { // IngesterClientFactory creates ingester clients. type IngesterClientFactory func(string) (*IngesterClient, error) -// DistributorConfig contains the configuration require to +// Config contains the configuration require to // create a Distributor type Config struct { Ring ReadRing diff --git a/server.go b/server.go index b72adc85eb..df0a325a2b 100644 --- a/server.go +++ b/server.go @@ -33,7 +33,7 @@ import ( "github.com/weaveworks/cortex/user" ) -// legacy from scope as a service. +// UserIDHeaderName is a legacy from scope as a service. const UserIDHeaderName = "X-Scope-OrgID" // SampleAppender is the interface to append samples to both, local and remote