Skip to content

Commit 8e21f93

Browse files
authored
Merge pull request #104 from weaveworks/distributor-pkg
Move distributor to its own package
2 parents 140c822 + 9d5fc26 commit 8e21f93

File tree

4 files changed

+35
-46
lines changed

4 files changed

+35
-46
lines changed

cmd/cortex/main.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/weaveworks/cortex"
2222
"github.com/weaveworks/cortex/chunk"
23+
"github.com/weaveworks/cortex/distributor"
2324
"github.com/weaveworks/cortex/ingester"
2425
"github.com/weaveworks/cortex/querier"
2526
"github.com/weaveworks/cortex/ring"
@@ -70,7 +71,7 @@ type cfg struct {
7071
logSuccess bool
7172

7273
ingesterConfig ingester.Config
73-
distributorConfig cortex.DistributorConfig
74+
distributorConfig distributor.Config
7475
}
7576

7677
func main() {
@@ -113,8 +114,8 @@ func main() {
113114
switch cfg.mode {
114115
case modeDistributor:
115116
cfg.distributorConfig.Ring = r
116-
cfg.distributorConfig.ClientFactory = func(address string) (*cortex.IngesterClient, error) {
117-
return cortex.NewIngesterClient(address, cfg.remoteTimeout)
117+
cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) {
118+
return distributor.NewIngesterClient(address, cfg.remoteTimeout)
118119
}
119120
setupDistributor(cfg.distributorConfig, chunkStore, cfg.logSuccess)
120121
case modeIngester:
@@ -178,26 +179,26 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
178179
}
179180

180181
func setupDistributor(
181-
cfg cortex.DistributorConfig,
182+
cfg distributor.Config,
182183
chunkStore chunk.Store,
183184
logSuccess bool,
184185
) {
185-
distributor, err := cortex.NewDistributor(cfg)
186+
dist, err := distributor.New(cfg)
186187
if err != nil {
187188
log.Fatal(err)
188189
}
189-
prometheus.MustRegister(distributor)
190+
prometheus.MustRegister(dist)
190191

191192
prefix := "/api/prom"
192-
http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(distributor, handleDistributorError)))
193+
http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(dist, handleDistributorError)))
193194

194195
// TODO: Move querier to separate binary.
195-
setupQuerier(distributor, chunkStore, prefix, logSuccess)
196+
setupQuerier(dist, chunkStore, prefix, logSuccess)
196197
}
197198

198199
func handleDistributorError(w http.ResponseWriter, err error) {
199200
switch e := err.(type) {
200-
case cortex.IngesterError:
201+
case distributor.IngesterError:
201202
switch {
202203
case 400 <= e.StatusCode && e.StatusCode < 500:
203204
log.Warnf("append err: %v", err)
@@ -215,7 +216,7 @@ func handleDistributorError(w http.ResponseWriter, err error) {
215216
// |
216217
// `----------> ChunkQuerier -> DynamoDB/S3
217218
func setupQuerier(
218-
distributor *cortex.Distributor,
219+
distributor *distributor.Distributor,
219220
chunkStore chunk.Store,
220221
prefix string,
221222
logSuccess bool,

distributor.go renamed to distributor/distributor.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package cortex
1+
package distributor
22

33
import (
44
"fmt"
@@ -30,7 +30,7 @@ var (
3030
// Distributor is a storage.SampleAppender and a cortex.Querier which
3131
// forwards appends and queries to individual ingesters.
3232
type Distributor struct {
33-
cfg DistributorConfig
33+
cfg Config
3434
clientsMtx sync.RWMutex
3535
clients map[string]*IngesterClient
3636

@@ -55,9 +55,9 @@ type ReadRing interface {
5555
// IngesterClientFactory creates ingester clients.
5656
type IngesterClientFactory func(string) (*IngesterClient, error)
5757

58-
// DistributorConfig contains the configuration require to
58+
// Config contains the configuration require to
5959
// create a Distributor
60-
type DistributorConfig struct {
60+
type Config struct {
6161
Ring ReadRing
6262
ClientFactory IngesterClientFactory
6363

@@ -66,8 +66,8 @@ type DistributorConfig struct {
6666
HeartbeatTimeout time.Duration
6767
}
6868

69-
// NewDistributor constructs a new Distributor
70-
func NewDistributor(cfg DistributorConfig) (*Distributor, error) {
69+
// New constructs a new Distributor
70+
func New(cfg Config) (*Distributor, error) {
7171
if 0 > cfg.ReplicationFactor {
7272
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
7373
}

ingester_client.go renamed to distributor/ingester_client.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,4 @@
1-
// Copyright 2016 The Prometheus Authors
2-
// Licensed under the Apache License, Version 2.0 (the "License");
3-
// you may not use this file except in compliance with the License.
4-
// You may obtain a copy of the License at
5-
//
6-
// http://www.apache.org/licenses/LICENSE-2.0
7-
//
8-
// Unless required by applicable law or agreed to in writing, software
9-
// distributed under the License is distributed on an "AS IS" BASIS,
10-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11-
// See the License for the specific language governing permissions and
12-
// limitations under the License.
13-
14-
package cortex
1+
package distributor
152

163
import (
174
"bytes"
@@ -28,6 +15,7 @@ import (
2815
"github.com/prometheus/prometheus/storage/remote"
2916
"golang.org/x/net/context"
3017

18+
"github.com/weaveworks/cortex"
3119
"github.com/weaveworks/cortex/ingester"
3220
"github.com/weaveworks/cortex/user"
3321
)
@@ -100,32 +88,32 @@ func (c *IngesterClient) Append(ctx context.Context, samples []*model.Sample) er
10088

10189
// Query implements Querier.
10290
func (c *IngesterClient) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
103-
req := &ReadRequest{
91+
req := &cortex.ReadRequest{
10492
StartTimestampMs: int64(from),
10593
EndTimestampMs: int64(to),
10694
}
10795
for _, matcher := range matchers {
108-
var mType MatchType
96+
var mType cortex.MatchType
10997
switch matcher.Type {
11098
case metric.Equal:
111-
mType = MatchType_EQUAL
99+
mType = cortex.MatchType_EQUAL
112100
case metric.NotEqual:
113-
mType = MatchType_NOT_EQUAL
101+
mType = cortex.MatchType_NOT_EQUAL
114102
case metric.RegexMatch:
115-
mType = MatchType_REGEX_MATCH
103+
mType = cortex.MatchType_REGEX_MATCH
116104
case metric.RegexNoMatch:
117-
mType = MatchType_REGEX_NO_MATCH
105+
mType = cortex.MatchType_REGEX_NO_MATCH
118106
default:
119107
panic("invalid matcher type")
120108
}
121-
req.Matchers = append(req.Matchers, &LabelMatcher{
109+
req.Matchers = append(req.Matchers, &cortex.LabelMatcher{
122110
Type: mType,
123111
Name: string(matcher.Name),
124112
Value: string(matcher.Value),
125113
})
126114
}
127115

128-
resp := &ReadResponse{}
116+
resp := &cortex.ReadResponse{}
129117
err := c.doRequest(ctx, "/query", req, resp, false)
130118
if err != nil {
131119
return nil, err
@@ -154,10 +142,10 @@ func (c *IngesterClient) Query(ctx context.Context, from, to model.Time, matcher
154142

155143
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
156144
func (c *IngesterClient) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
157-
req := &LabelValuesRequest{
145+
req := &cortex.LabelValuesRequest{
158146
LabelName: string(ln),
159147
}
160-
resp := &LabelValuesResponse{}
148+
resp := &cortex.LabelValuesResponse{}
161149
err := c.doRequest(ctx, "/label_values", req, resp, false)
162150
if err != nil {
163151
return nil, err
@@ -172,7 +160,7 @@ func (c *IngesterClient) LabelValuesForLabelName(ctx context.Context, ln model.L
172160

173161
// UserStats returns stats for the current user.
174162
func (c *IngesterClient) UserStats(ctx context.Context) (*ingester.UserStats, error) {
175-
resp := &UserStatsResponse{}
163+
resp := &cortex.UserStatsResponse{}
176164
err := c.doRequest(ctx, "/user_stats", nil, resp, false)
177165
if err != nil {
178166
return nil, err
@@ -210,7 +198,7 @@ func (c *IngesterClient) doRequest(ctx context.Context, endpoint string, req pro
210198
if err != nil {
211199
return fmt.Errorf("unable to create request: %v", err)
212200
}
213-
httpReq.Header.Add(userIDHeaderName, userID)
201+
httpReq.Header.Add(cortex.UserIDHeaderName, userID)
214202
// TODO: This isn't actually the correct Content-type.
215203
httpReq.Header.Set("Content-Type", string(expfmt.FmtProtoDelim))
216204
httpResp, err := c.client.Do(httpReq)

server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import (
3333
"github.com/weaveworks/cortex/user"
3434
)
3535

36-
// legacy from scope as a service.
37-
const userIDHeaderName = "X-Scope-OrgID"
36+
// UserIDHeaderName is a legacy from scope as a service.
37+
const UserIDHeaderName = "X-Scope-OrgID"
3838

3939
// SampleAppender is the interface to append samples to both, local and remote
4040
// storage. All methods are goroutine-safe.
@@ -43,7 +43,7 @@ type SampleAppender interface {
4343
}
4444

4545
func parseRequest(w http.ResponseWriter, r *http.Request, req proto.Message) (ctx context.Context, abort bool) {
46-
userID := r.Header.Get(userIDHeaderName)
46+
userID := r.Header.Get(UserIDHeaderName)
4747
if userID == "" {
4848
http.Error(w, "", http.StatusUnauthorized)
4949
return nil, true
@@ -120,7 +120,7 @@ func getSamples(req *remote.WriteRequest) []*model.Sample {
120120
// AppenderHandler returns a http.Handler that accepts proto encoded samples.
121121
func AppenderHandler(appender SampleAppender, errorHandler func(http.ResponseWriter, error)) http.Handler {
122122
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
123-
userID := r.Header.Get(userIDHeaderName)
123+
userID := r.Header.Get(UserIDHeaderName)
124124
if userID == "" {
125125
http.Error(w, "", http.StatusUnauthorized)
126126
return

0 commit comments

Comments
 (0)