From b44108b8819d88c11c34e81b409e037111bc06d9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 11 Nov 2016 11:08:42 -0800 Subject: [PATCH 1/2] Instrument the base router so we don't miss endpoints. --- cmd/cortex/main.go | 78 ++++++++++++++++++---------------------------- 1 file changed, 31 insertions(+), 47 deletions(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 518cbd31e0..28a5114753 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/route" @@ -128,13 +129,15 @@ func main() { r := ring.New(consul, cfg.distributorConfig.HeartbeatTimeout) defer r.Stop() + router := mux.NewRouter() switch cfg.mode { case modeDistributor: cfg.distributorConfig.Ring = r cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) { return distributor.NewIngesterClient(address, cfg.remoteTimeout) } - setupDistributor(cfg.distributorConfig, chunkStore, cfg.logSuccess) + setupDistributor(cfg.distributorConfig, chunkStore, router.Path("/api/prom")) + case modeIngester: cfg.ingesterConfig.Ring = r registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.numTokens) @@ -143,7 +146,7 @@ func main() { // network errors. log.Fatalf("Could not register ingester: %v", err) } - ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess) + ing := setupIngester(chunkStore, cfg.ingesterConfig, router.NewRoute()) // Deferring a func to make ordering obvious defer func() { @@ -157,8 +160,16 @@ func main() { log.Fatalf("Mode %s not supported!", cfg.mode) } - http.Handle("/metrics", prometheus.Handler()) - go http.ListenAndServe(fmt.Sprintf(":%d", cfg.listenPort), nil) + router.Handle("/metrics", prometheus.Handler()) + instrumented := middleware.Merge( + middleware.Log{ + LogSuccess: cfg.logSuccess, + }, + middleware.Instrument{ + Duration: requestDuration, + }, + ).Wrap(router) + go http.ListenAndServe(fmt.Sprintf(":%d", cfg.listenPort), instrumented) term := make(chan os.Signal) signal.Notify(term, os.Interrupt, syscall.SIGTERM) @@ -198,7 +209,7 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) { func setupDistributor( cfg distributor.Config, chunkStore chunk.Store, - logSuccess bool, + router *mux.Route, ) { dist, err := distributor.New(cfg) if err != nil { @@ -206,11 +217,10 @@ func setupDistributor( } prometheus.MustRegister(dist) - prefix := "/api/prom" - http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(dist, handleDistributorError))) + router.Path("/push").Handler(cortex.AppenderHandler(dist, handleDistributorError)) // TODO: Move querier to separate binary. - setupQuerier(dist, chunkStore, prefix, logSuccess) + setupQuerier(dist, chunkStore, router) } func handleDistributorError(w http.ResponseWriter, err error) { @@ -235,8 +245,7 @@ func handleDistributorError(w http.ResponseWriter, err error) { func setupQuerier( distributor *distributor.Distributor, chunkStore chunk.Store, - prefix string, - logSuccess bool, + router *mux.Route, ) { queryable := querier.Queryable{ Q: querier.MergeQuerier{ @@ -248,36 +257,23 @@ func setupQuerier( }, }, } - engine := promql.NewEngine(queryable, nil) - api := v1.NewAPI(engine, querier.DummyStorage{Queryable: queryable}) - router := route.New(func(r *http.Request) (context.Context, error) { + promRouter := route.New(func(r *http.Request) (context.Context, error) { userID := r.Header.Get(userIDHeaderName) - if r.Method != "OPTIONS" && userID == "" { - // For now, getting the user ID from basic auth allows for easy testing - // with Grafana. - // TODO: Remove basic auth support. - userID, _, _ = r.BasicAuth() - if userID == "" { - return nil, fmt.Errorf("missing user ID") - } - } return user.WithID(context.Background(), userID), nil }) - api.Register(router.WithPrefix(prefix + "/api/v1")) - http.Handle("/", router) - - http.Handle(prefix+"/user_stats", instrument(logSuccess, cortex.DistributorUserStatsHandler(distributor.UserStats))) - - http.Handle(prefix+"/graph", instrument(logSuccess, ui.GraphHandler())) - http.Handle(prefix+"/static/", instrument(logSuccess, ui.StaticAssetsHandler(prefix+"/static/"))) + api.Register(promRouter) + router.Path("/api/v1").Handler(promRouter) + router.Path("/user_stats").Handler(cortex.DistributorUserStatsHandler(distributor.UserStats)) + router.Path("/graph").Handler(ui.GraphHandler()) + router.Path("/static/").Handler(ui.StaticAssetsHandler("/api/prom/static/")) } func setupIngester( chunkStore chunk.Store, cfg ingester.Config, - logSuccess bool, + router *mux.Route, ) *ingester.Ingester { ingester, err := ingester.New(cfg, chunkStore) if err != nil { @@ -285,11 +281,11 @@ func setupIngester( } prometheus.MustRegister(ingester) - http.Handle("/push", instrument(logSuccess, cortex.AppenderHandler(ingester, handleIngesterError))) - http.Handle("/query", instrument(logSuccess, cortex.QueryHandler(ingester))) - http.Handle("/label_values", instrument(logSuccess, cortex.LabelValuesHandler(ingester))) - http.Handle("/user_stats", instrument(logSuccess, cortex.IngesterUserStatsHandler(ingester.UserStats))) - http.Handle("/ready", instrument(logSuccess, cortex.IngesterReadinessHandler(ingester))) + router.Path("/push").Handler(cortex.AppenderHandler(ingester, handleIngesterError)) + router.Path("/query").Handler(cortex.QueryHandler(ingester)) + router.Path("/label_values").Handler(cortex.LabelValuesHandler(ingester)) + router.Path("/user_stats").Handler(cortex.IngesterUserStatsHandler(ingester.UserStats)) + router.Path("/ready").Handler(cortex.IngesterReadinessHandler(ingester)) return ingester } @@ -303,15 +299,3 @@ func handleIngesterError(w http.ResponseWriter, err error) { http.Error(w, err.Error(), http.StatusInternalServerError) } } - -// instrument instruments a handler. -func instrument(logSuccess bool, handler http.Handler) http.Handler { - return middleware.Merge( - middleware.Log{ - LogSuccess: logSuccess, - }, - middleware.Instrument{ - Duration: requestDuration, - }, - ).Wrap(handler) -} From b1a38835482418b9b0cfdf3da0fc8e422ed73f5d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 11 Nov 2016 14:26:22 -0800 Subject: [PATCH 2/2] Fix up gorilla mux handling and test locally. --- cmd/cortex/main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 28a5114753..2f40cfc147 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -136,7 +136,7 @@ func main() { cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) { return distributor.NewIngesterClient(address, cfg.remoteTimeout) } - setupDistributor(cfg.distributorConfig, chunkStore, router.Path("/api/prom")) + setupDistributor(cfg.distributorConfig, chunkStore, router.PathPrefix("/api/prom").Subrouter()) case modeIngester: cfg.ingesterConfig.Ring = r @@ -146,7 +146,7 @@ func main() { // network errors. log.Fatalf("Could not register ingester: %v", err) } - ing := setupIngester(chunkStore, cfg.ingesterConfig, router.NewRoute()) + ing := setupIngester(chunkStore, cfg.ingesterConfig, router) // Deferring a func to make ordering obvious defer func() { @@ -209,7 +209,7 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) { func setupDistributor( cfg distributor.Config, chunkStore chunk.Store, - router *mux.Route, + router *mux.Router, ) { dist, err := distributor.New(cfg) if err != nil { @@ -245,7 +245,7 @@ func handleDistributorError(w http.ResponseWriter, err error) { func setupQuerier( distributor *distributor.Distributor, chunkStore chunk.Store, - router *mux.Route, + router *mux.Router, ) { queryable := querier.Queryable{ Q: querier.MergeQuerier{ @@ -262,18 +262,18 @@ func setupQuerier( promRouter := route.New(func(r *http.Request) (context.Context, error) { userID := r.Header.Get(userIDHeaderName) return user.WithID(context.Background(), userID), nil - }) + }).WithPrefix("/api/prom/api/v1") api.Register(promRouter) - router.Path("/api/v1").Handler(promRouter) + router.PathPrefix("/api/v1").Handler(promRouter) router.Path("/user_stats").Handler(cortex.DistributorUserStatsHandler(distributor.UserStats)) router.Path("/graph").Handler(ui.GraphHandler()) - router.Path("/static/").Handler(ui.StaticAssetsHandler("/api/prom/static/")) + router.PathPrefix("/static/").Handler(ui.StaticAssetsHandler("/api/prom/static/")) } func setupIngester( chunkStore chunk.Store, cfg ingester.Config, - router *mux.Route, + router *mux.Router, ) *ingester.Ingester { ingester, err := ingester.New(cfg, chunkStore) if err != nil {