Skip to content

Commit edcc768

Browse files
authored
Merge pull request #144 from weaveworks/grpc
Use gRPC for distributor <-> ingester rpcs.
2 parents 7c6b401 + ab8a67f commit edcc768

File tree

70 files changed

+4882
-1013
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+4882
-1013
lines changed

cmd/cortex/main.go

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"net"
67
"net/http"
78
_ "net/http/pprof"
89
"os"
@@ -11,13 +12,16 @@ import (
1112
"time"
1213

1314
"github.com/gorilla/mux"
15+
"github.com/mwitkow/go-grpc-middleware"
16+
"github.com/weaveworks/scope/common/middleware"
17+
"golang.org/x/net/context"
18+
"google.golang.org/grpc"
19+
1420
"github.com/prometheus/client_golang/prometheus"
1521
"github.com/prometheus/common/log"
1622
"github.com/prometheus/common/route"
1723
"github.com/prometheus/prometheus/promql"
1824
"github.com/prometheus/prometheus/web/api/v1"
19-
"github.com/weaveworks/scope/common/middleware"
20-
"golang.org/x/net/context"
2125

2226
"github.com/weaveworks/cortex"
2327
"github.com/weaveworks/cortex/chunk"
@@ -28,15 +32,15 @@ import (
2832
"github.com/weaveworks/cortex/ruler"
2933
"github.com/weaveworks/cortex/ui"
3034
"github.com/weaveworks/cortex/user"
35+
cortex_grpc_middleware "github.com/weaveworks/cortex/util/middleware"
3136
)
3237

3338
const (
3439
modeDistributor = "distributor"
3540
modeIngester = "ingester"
3641
modeRuler = "ruler"
3742

38-
infName = "eth0"
39-
userIDHeaderName = "X-Scope-OrgID"
43+
infName = "eth0"
4044
)
4145

4246
var (
@@ -84,30 +88,38 @@ func main() {
8488
var cfg cfg
8589
flag.StringVar(&cfg.mode, "mode", modeDistributor, "Mode (distributor, ingester, ruler).")
8690
flag.IntVar(&cfg.listenPort, "web.listen-port", 9094, "HTTP server listen port.")
91+
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests")
92+
8793
flag.StringVar(&cfg.consulHost, "consul.hostname", "localhost:8500", "Hostname and port of Consul.")
8894
flag.StringVar(&cfg.consulPrefix, "consul.prefix", "collectors/", "Prefix for keys in Consul.")
95+
8996
flag.StringVar(&cfg.s3URL, "s3.url", "localhost:4569", "S3 endpoint URL.")
9097
flag.StringVar(&cfg.dynamodbURL, "dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.")
91-
flag.BoolVar(&cfg.dynamodbCreateTables, "dynamodb.create-tables", false, "Create required DynamoDB tables on startup.")
9298
flag.DurationVar(&cfg.dynamodbPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.")
99+
flag.BoolVar(&cfg.dynamodbCreateTables, "dynamodb.create-tables", false, "Create required DynamoDB tables on startup.")
100+
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.")
101+
93102
flag.StringVar(&cfg.memcachedHostname, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
103+
flag.StringVar(&cfg.memcachedService, "memcached.service", "memcached", "SRV service used to discover memcache servers.")
94104
flag.DurationVar(&cfg.memcachedTimeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
95105
flag.DurationVar(&cfg.memcachedExpiration, "memcached.expiration", 0, "How long chunks stay in the memcache.")
96-
flag.StringVar(&cfg.memcachedService, "memcached.service", "memcached", "SRV service used to discover memcache servers.")
97-
flag.DurationVar(&cfg.remoteTimeout, "remote.timeout", 5*time.Second, "Timeout for downstream ingesters.")
106+
98107
flag.DurationVar(&cfg.ingesterConfig.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
99108
flag.DurationVar(&cfg.ingesterConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
100109
flag.DurationVar(&cfg.ingesterConfig.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.")
101110
flag.IntVar(&cfg.ingesterConfig.ConcurrentFlushes, "ingester.concurrent-flushes", 25, "Number of concurrent goroutines flushing to dynamodb.")
102111
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
112+
flag.IntVar(&cfg.ingesterConfig.GRPCListenPort, "ingester.grpc.listen-port", 9095, "gRPC server listen port.")
113+
103114
flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
104115
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
105116
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
117+
flag.DurationVar(&cfg.distributorConfig.RemoteTimeout, "distributor.remote-timeout", 5*time.Second, "Timeout for downstream ingesters.")
118+
106119
flag.StringVar(&cfg.rulerConfig.ConfigsAPIURL, "ruler.configs.url", "", "URL of configs API server.")
107120
flag.StringVar(&cfg.rulerConfig.UserID, "ruler.userID", "", "Weave Cloud org to run rules for")
108121
flag.DurationVar(&cfg.rulerConfig.EvaluationInterval, "ruler.evaluation-interval", 15*time.Second, "How frequently to evaluate rules")
109-
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests")
110-
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.")
122+
111123
flag.Parse()
112124

113125
chunkStore, err := setupChunkStore(cfg)
@@ -139,21 +151,34 @@ func main() {
139151
switch cfg.mode {
140152
case modeDistributor:
141153
cfg.distributorConfig.Ring = r
142-
cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) {
143-
return distributor.NewIngesterClient(address, cfg.remoteTimeout)
144-
}
145154
setupDistributor(cfg.distributorConfig, chunkStore, router.PathPrefix("/api/prom").Subrouter())
146155

147156
case modeIngester:
148157
cfg.ingesterConfig.Ring = r
149-
registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.numTokens)
158+
registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.ingesterConfig.GRPCListenPort, cfg.numTokens)
150159
if err != nil {
151160
// This only happens for errors in configuration & set-up, not for
152161
// network errors.
153162
log.Fatalf("Could not register ingester: %v", err)
154163
}
155164
ing := setupIngester(chunkStore, cfg.ingesterConfig, router)
156165

166+
// Setup gRPC server
167+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.ingesterConfig.GRPCListenPort))
168+
if err != nil {
169+
log.Fatalf("failed to listen: %v", err)
170+
}
171+
grpcServer := grpc.NewServer(
172+
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
173+
cortex_grpc_middleware.ServerInstrumentInterceptor(requestDuration),
174+
cortex_grpc_middleware.ServerLoggingInterceptor(cfg.logSuccess),
175+
cortex_grpc_middleware.ServerUserHeaderInterceptor,
176+
)),
177+
)
178+
cortex.RegisterIngesterServer(grpcServer, ing)
179+
go grpcServer.Serve(lis)
180+
defer grpcServer.Stop()
181+
157182
// Deferring a func to make ordering obvious
158183
defer func() {
159184
registration.ChangeState(ring.Leaving)
@@ -166,9 +191,6 @@ func main() {
166191
case modeRuler:
167192
// XXX: Too much duplication w/ distributor set up.
168193
cfg.distributorConfig.Ring = r
169-
cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) {
170-
return distributor.NewIngesterClient(address, cfg.remoteTimeout)
171-
}
172194
cfg.rulerConfig.DistributorConfig = cfg.distributorConfig
173195
ruler, err := setupRuler(chunkStore, cfg.rulerConfig)
174196
if err != nil {
@@ -242,26 +264,12 @@ func setupDistributor(
242264
}
243265
prometheus.MustRegister(dist)
244266

245-
router.Path("/push").Handler(cortex.AppenderHandler(dist, handleDistributorError))
267+
router.Path("/push").Handler(http.HandlerFunc(dist.PushHandler))
246268

247269
// TODO: Move querier to separate binary.
248270
setupQuerier(dist, chunkStore, router)
249271
}
250272

251-
func handleDistributorError(w http.ResponseWriter, err error) {
252-
switch e := err.(type) {
253-
case distributor.IngesterError:
254-
switch {
255-
case 400 <= e.StatusCode && e.StatusCode < 500:
256-
log.Warnf("append err: %v", err)
257-
http.Error(w, err.Error(), http.StatusBadRequest)
258-
return
259-
}
260-
}
261-
log.Errorf("append err: %v", err)
262-
http.Error(w, err.Error(), http.StatusInternalServerError)
263-
}
264-
265273
// setupQuerier sets up a complete querying pipeline:
266274
//
267275
// PromQL -> MergeQuerier -> Distributor -> IngesterQuerier -> Ingester
@@ -276,12 +284,15 @@ func setupQuerier(
276284
engine := promql.NewEngine(queryable, nil)
277285
api := v1.NewAPI(engine, querier.DummyStorage{Queryable: queryable})
278286
promRouter := route.New(func(r *http.Request) (context.Context, error) {
279-
userID := r.Header.Get(userIDHeaderName)
287+
userID := r.Header.Get(user.UserIDHeaderName)
288+
if userID == "" {
289+
return nil, fmt.Errorf("no %s header", user.UserIDHeaderName)
290+
}
280291
return user.WithID(context.Background(), userID), nil
281292
}).WithPrefix("/api/prom/api/v1")
282293
api.Register(promRouter)
283294
router.PathPrefix("/api/v1").Handler(promRouter)
284-
router.Path("/user_stats").Handler(cortex.DistributorUserStatsHandler(distributor.UserStats))
295+
router.Path("/user_stats").Handler(http.HandlerFunc(distributor.UserStatsHandler))
285296
router.Path("/graph").Handler(ui.GraphHandler())
286297
router.PathPrefix("/static/").Handler(ui.StaticAssetsHandler("/api/prom/static/"))
287298
}
@@ -297,25 +308,14 @@ func setupIngester(
297308
}
298309
prometheus.MustRegister(ingester)
299310

300-
router.Path("/push").Handler(cortex.AppenderHandler(ingester, handleIngesterError))
301-
router.Path("/query").Handler(cortex.QueryHandler(ingester))
302-
router.Path("/label_values").Handler(cortex.LabelValuesHandler(ingester))
303-
router.Path("/user_stats").Handler(cortex.IngesterUserStatsHandler(ingester.UserStats))
304-
router.Path("/ready").Handler(cortex.IngesterReadinessHandler(ingester))
311+
router.Path("/push").Handler(http.HandlerFunc(ingester.PushHandler))
312+
router.Path("/query").Handler(http.HandlerFunc(ingester.QueryHandler))
313+
router.Path("/label_values").Handler(http.HandlerFunc(ingester.LabelValuesHandler))
314+
router.Path("/user_stats").Handler(http.HandlerFunc(ingester.UserStatsHandler))
315+
router.Path("/ready").Handler(http.HandlerFunc(ingester.ReadinessHandler))
305316
return ingester
306317
}
307318

308-
func handleIngesterError(w http.ResponseWriter, err error) {
309-
switch err {
310-
case ingester.ErrOutOfOrderSample, ingester.ErrDuplicateSampleForTimestamp:
311-
log.Warnf("append err: %v", err)
312-
http.Error(w, err.Error(), http.StatusBadRequest)
313-
default:
314-
log.Errorf("append err: %v", err)
315-
http.Error(w, err.Error(), http.StatusInternalServerError)
316-
}
317-
}
318-
319319
// setupRuler sets up a ruler.
320320
func setupRuler(chunkStore chunk.Store, cfg ruler.Config) (*ruler.Ruler, error) {
321321
return ruler.New(chunkStore, cfg)

cortex.proto

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,27 @@
11
syntax = "proto3";
22

3-
package cortex;
4-
5-
message Sample {
6-
double value = 1;
7-
int64 timestamp_ms = 2;
8-
}
9-
10-
message LabelPair {
11-
string name = 1;
12-
string value = 2;
13-
}
3+
import "github.com/prometheus/prometheus/storage/remote/remote.proto";
144

15-
message TimeSeries {
16-
repeated LabelPair labels = 1;
17-
// Sorted by time, oldest sample first.
18-
repeated Sample samples = 2;
19-
}
5+
package cortex;
206

21-
enum MatchType {
22-
EQUAL = 0;
23-
NOT_EQUAL = 1;
24-
REGEX_MATCH = 2;
25-
REGEX_NO_MATCH = 3;
7+
service Ingester {
8+
rpc Push(remote.WriteRequest) returns (WriteResponse) {};
9+
rpc Query(QueryRequest) returns (QueryResponse) {};
10+
rpc LabelValues(LabelValuesRequest) returns (LabelValuesResponse) {};
11+
rpc UserStats(UserStatsRequest) returns (UserStatsResponse) {};
2612
}
2713

28-
message LabelMatcher {
29-
MatchType type = 1;
30-
string name = 2;
31-
string value = 3;
14+
message WriteResponse {
3215
}
3316

34-
message ReadRequest {
17+
message QueryRequest {
3518
int64 start_timestamp_ms = 1;
3619
int64 end_timestamp_ms = 2;
3720
repeated LabelMatcher matchers = 3;
3821
}
3922

40-
message ReadResponse {
41-
repeated TimeSeries timeseries = 1;
23+
message QueryResponse {
24+
repeated remote.TimeSeries timeseries = 1;
4225
}
4326

4427
message LabelValuesRequest {
@@ -49,7 +32,23 @@ message LabelValuesResponse {
4932
repeated string label_values = 1;
5033
}
5134

35+
message UserStatsRequest {
36+
}
37+
5238
message UserStatsResponse {
5339
double ingestion_rate = 1;
5440
uint64 num_series = 2;
5541
}
42+
43+
enum MatchType {
44+
EQUAL = 0;
45+
NOT_EQUAL = 1;
46+
REGEX_MATCH = 2;
47+
REGEX_NO_MATCH = 3;
48+
}
49+
50+
message LabelMatcher {
51+
MatchType type = 1;
52+
string name = 2;
53+
string value = 3;
54+
}

0 commit comments

Comments
 (0)