Skip to content

Move distributor to its own package #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/weaveworks/cortex"
"github.com/weaveworks/cortex/chunk"
"github.com/weaveworks/cortex/distributor"
"github.com/weaveworks/cortex/ingester"
"github.com/weaveworks/cortex/querier"
"github.com/weaveworks/cortex/ring"
Expand Down Expand Up @@ -70,7 +71,7 @@ type cfg struct {
logSuccess bool

ingesterConfig ingester.Config
distributorConfig cortex.DistributorConfig
distributorConfig distributor.Config
}

func main() {
Expand Down Expand Up @@ -113,8 +114,8 @@ func main() {
switch cfg.mode {
case modeDistributor:
cfg.distributorConfig.Ring = r
cfg.distributorConfig.ClientFactory = func(address string) (*cortex.IngesterClient, error) {
return cortex.NewIngesterClient(address, cfg.remoteTimeout)
cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) {
return distributor.NewIngesterClient(address, cfg.remoteTimeout)
}
setupDistributor(cfg.distributorConfig, chunkStore, cfg.logSuccess)
case modeIngester:
Expand Down Expand Up @@ -178,26 +179,26 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
}

func setupDistributor(
cfg cortex.DistributorConfig,
cfg distributor.Config,
chunkStore chunk.Store,
logSuccess bool,
) {
distributor, err := cortex.NewDistributor(cfg)
dist, err := distributor.New(cfg)
if err != nil {
log.Fatal(err)
}
prometheus.MustRegister(distributor)
prometheus.MustRegister(dist)

prefix := "/api/prom"
http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(distributor, handleDistributorError)))
http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(dist, handleDistributorError)))

// TODO: Move querier to separate binary.
setupQuerier(distributor, chunkStore, prefix, logSuccess)
setupQuerier(dist, chunkStore, prefix, logSuccess)
}

func handleDistributorError(w http.ResponseWriter, err error) {
switch e := err.(type) {
case cortex.IngesterError:
case distributor.IngesterError:
switch {
case 400 <= e.StatusCode && e.StatusCode < 500:
log.Warnf("append err: %v", err)
Expand All @@ -215,7 +216,7 @@ func handleDistributorError(w http.ResponseWriter, err error) {
// |
// `----------> ChunkQuerier -> DynamoDB/S3
func setupQuerier(
distributor *cortex.Distributor,
distributor *distributor.Distributor,
chunkStore chunk.Store,
prefix string,
logSuccess bool,
Expand Down
12 changes: 6 additions & 6 deletions distributor.go → distributor/distributor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cortex
package distributor

import (
"fmt"
Expand Down Expand Up @@ -30,7 +30,7 @@ var (
// Distributor is a storage.SampleAppender and a cortex.Querier which
// forwards appends and queries to individual ingesters.
type Distributor struct {
cfg DistributorConfig
cfg Config
clientsMtx sync.RWMutex
clients map[string]*IngesterClient

Expand All @@ -55,9 +55,9 @@ type ReadRing interface {
// IngesterClientFactory creates ingester clients.
type IngesterClientFactory func(string) (*IngesterClient, error)

// DistributorConfig contains the configuration require to
// Config contains the configuration require to
// create a Distributor
type DistributorConfig struct {
type Config struct {
Ring ReadRing
ClientFactory IngesterClientFactory

Expand All @@ -66,8 +66,8 @@ type DistributorConfig struct {
HeartbeatTimeout time.Duration
}

// NewDistributor constructs a new Distributor
func NewDistributor(cfg DistributorConfig) (*Distributor, error) {
// New constructs a new Distributor
func New(cfg Config) (*Distributor, error) {
if 0 > cfg.ReplicationFactor {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
Expand Down
40 changes: 14 additions & 26 deletions ingester_client.go → distributor/ingester_client.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,4 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cortex
package distributor

import (
"bytes"
Expand All @@ -28,6 +15,7 @@ import (
"github.com/prometheus/prometheus/storage/remote"
"golang.org/x/net/context"

"github.com/weaveworks/cortex"
"github.com/weaveworks/cortex/ingester"
"github.com/weaveworks/cortex/user"
)
Expand Down Expand Up @@ -100,32 +88,32 @@ func (c *IngesterClient) Append(ctx context.Context, samples []*model.Sample) er

// Query implements Querier.
func (c *IngesterClient) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
req := &ReadRequest{
req := &cortex.ReadRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
}
for _, matcher := range matchers {
var mType MatchType
var mType cortex.MatchType
switch matcher.Type {
case metric.Equal:
mType = MatchType_EQUAL
mType = cortex.MatchType_EQUAL
case metric.NotEqual:
mType = MatchType_NOT_EQUAL
mType = cortex.MatchType_NOT_EQUAL
case metric.RegexMatch:
mType = MatchType_REGEX_MATCH
mType = cortex.MatchType_REGEX_MATCH
case metric.RegexNoMatch:
mType = MatchType_REGEX_NO_MATCH
mType = cortex.MatchType_REGEX_NO_MATCH
default:
panic("invalid matcher type")
}
req.Matchers = append(req.Matchers, &LabelMatcher{
req.Matchers = append(req.Matchers, &cortex.LabelMatcher{
Type: mType,
Name: string(matcher.Name),
Value: string(matcher.Value),
})
}

resp := &ReadResponse{}
resp := &cortex.ReadResponse{}
err := c.doRequest(ctx, "/query", req, resp, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -154,10 +142,10 @@ func (c *IngesterClient) Query(ctx context.Context, from, to model.Time, matcher

// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (c *IngesterClient) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
req := &LabelValuesRequest{
req := &cortex.LabelValuesRequest{
LabelName: string(ln),
}
resp := &LabelValuesResponse{}
resp := &cortex.LabelValuesResponse{}
err := c.doRequest(ctx, "/label_values", req, resp, false)
if err != nil {
return nil, err
Expand All @@ -172,7 +160,7 @@ func (c *IngesterClient) LabelValuesForLabelName(ctx context.Context, ln model.L

// UserStats returns stats for the current user.
func (c *IngesterClient) UserStats(ctx context.Context) (*ingester.UserStats, error) {
resp := &UserStatsResponse{}
resp := &cortex.UserStatsResponse{}
err := c.doRequest(ctx, "/user_stats", nil, resp, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -210,7 +198,7 @@ func (c *IngesterClient) doRequest(ctx context.Context, endpoint string, req pro
if err != nil {
return fmt.Errorf("unable to create request: %v", err)
}
httpReq.Header.Add(userIDHeaderName, userID)
httpReq.Header.Add(cortex.UserIDHeaderName, userID)
// TODO: This isn't actually the correct Content-type.
httpReq.Header.Set("Content-Type", string(expfmt.FmtProtoDelim))
httpResp, err := c.client.Do(httpReq)
Expand Down
8 changes: 4 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"github.com/weaveworks/cortex/user"
)

// legacy from scope as a service.
const userIDHeaderName = "X-Scope-OrgID"
// UserIDHeaderName is a legacy from scope as a service.
const UserIDHeaderName = "X-Scope-OrgID"

// SampleAppender is the interface to append samples to both, local and remote
// storage. All methods are goroutine-safe.
Expand All @@ -43,7 +43,7 @@ type SampleAppender interface {
}

func parseRequest(w http.ResponseWriter, r *http.Request, req proto.Message) (ctx context.Context, abort bool) {
userID := r.Header.Get(userIDHeaderName)
userID := r.Header.Get(UserIDHeaderName)
if userID == "" {
http.Error(w, "", http.StatusUnauthorized)
return nil, true
Expand Down Expand Up @@ -120,7 +120,7 @@ func getSamples(req *remote.WriteRequest) []*model.Sample {
// AppenderHandler returns a http.Handler that accepts proto encoded samples.
func AppenderHandler(appender SampleAppender, errorHandler func(http.ResponseWriter, error)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID := r.Header.Get(userIDHeaderName)
userID := r.Header.Get(UserIDHeaderName)
if userID == "" {
http.Error(w, "", http.StatusUnauthorized)
return
Expand Down