From 8ca7efd2dab751f74f2d6ff7e6f97137248e7330 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 25 Nov 2016 11:03:21 +0000 Subject: [PATCH] Get rid of the old ingester http interface --- cmd/cortex/main.go | 6 -- distributor/distributor.go | 30 ++---- distributor/http_ingester_client.go | 149 ---------------------------- distributor/http_server.go | 18 ++++ ingester/http_server.go | 94 ------------------ 5 files changed, 29 insertions(+), 268 deletions(-) delete mode 100644 distributor/http_ingester_client.go delete mode 100644 ingester/http_server.go diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 3112cc2c3f..5e5a7124e0 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -310,12 +310,6 @@ func setupIngester( log.Fatal(err) } prometheus.MustRegister(ingester) - - router.Path("/push").Handler(http.HandlerFunc(ingester.PushHandler)) - router.Path("/query").Handler(http.HandlerFunc(ingester.QueryHandler)) - router.Path("/label_values").Handler(http.HandlerFunc(ingester.LabelValuesHandler)) - router.Path("/user_stats").Handler(http.HandlerFunc(ingester.UserStatsHandler)) - router.Path("/ready").Handler(http.HandlerFunc(ingester.ReadinessHandler)) return ingester } diff --git a/distributor/distributor.go b/distributor/distributor.go index 192ad4b1a7..0be51e3e4b 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -136,26 +136,18 @@ func (d *Distributor) getClientFor(ingester ring.IngesterDesc) (cortex.IngesterC return client, nil } - if ingester.GRPCHostname != "" { - conn, err := grpc.Dial( - ingester.GRPCHostname, - grpc.WithInsecure(), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.ClientUserHeaderInterceptor, - )), - ) - if err != nil { - return nil, err - } - client = cortex.NewIngesterClient(conn) - } else { - var err error - client, err = NewHTTPIngesterClient(ingester.Hostname, d.cfg.RemoteTimeout) - if err != nil { - return nil, err - } + conn, err := grpc.Dial( + ingester.GRPCHostname, + grpc.WithInsecure(), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + )), + ) + if err != nil { + return nil, err } + client = cortex.NewIngesterClient(conn) d.clients[ingester.Hostname] = client return client, nil diff --git a/distributor/http_ingester_client.go b/distributor/http_ingester_client.go deleted file mode 100644 index 08b1067c4a..0000000000 --- a/distributor/http_ingester_client.go +++ /dev/null @@ -1,149 +0,0 @@ -package distributor - -import ( - "bytes" - "fmt" - "io" - "net/http" - "time" - - "github.com/golang/protobuf/proto" - "github.com/golang/snappy" - "golang.org/x/net/context" - "google.golang.org/grpc" - - "github.com/prometheus/common/expfmt" - "github.com/prometheus/prometheus/storage/remote" - - "github.com/weaveworks/cortex" - "github.com/weaveworks/cortex/user" -) - -// httpIngesterClient is a client library for the ingester -type httpIngesterClient struct { - address string - client http.Client - timeout time.Duration -} - -// IngesterError is an error we got from an ingester. -type IngesterError struct { - StatusCode int - err error -} - -func errStatusCode(code int, status string) IngesterError { - return IngesterError{ - StatusCode: code, - err: fmt.Errorf("server returned HTTP status %s", status), - } -} - -func (i IngesterError) Error() string { - return i.err.Error() -} - -// NewHTTPIngesterClient makes a new IngesterClient. This client is careful to -// propagate the user ID from Distributor -> Ingester. -func NewHTTPIngesterClient(address string, timeout time.Duration) (cortex.IngesterClient, error) { - return &httpIngesterClient{ - address: address, - client: http.Client{ - Timeout: timeout, - }, - timeout: timeout, - }, nil -} - -// Push adds new samples to the ingester -func (c *httpIngesterClient) Push(ctx context.Context, req *remote.WriteRequest, _ ...grpc.CallOption) (*cortex.WriteResponse, error) { - if err := c.doRequest(ctx, "/push", req, nil, true); err != nil { - return nil, err - } - return &cortex.WriteResponse{}, nil -} - -func (c *httpIngesterClient) Query(ctx context.Context, req *cortex.QueryRequest, _ ...grpc.CallOption) (*cortex.QueryResponse, error) { - resp := &cortex.QueryResponse{} - if err := c.doRequest(ctx, "/query", req, resp, false); err != nil { - return nil, err - } - return resp, nil -} - -// LabelValues returns all of the label values that are associated with a given label name. -func (c *httpIngesterClient) LabelValues(ctx context.Context, req *cortex.LabelValuesRequest, _ ...grpc.CallOption) (*cortex.LabelValuesResponse, error) { - resp := &cortex.LabelValuesResponse{} - err := c.doRequest(ctx, "/label_values", req, resp, false) - if err != nil { - return nil, err - } - return resp, nil -} - -// UserStats returns stats for the current user. -func (c *httpIngesterClient) UserStats(ctx context.Context, in *cortex.UserStatsRequest, _ ...grpc.CallOption) (*cortex.UserStatsResponse, error) { - resp := &cortex.UserStatsResponse{} - err := c.doRequest(ctx, "/user_stats", nil, resp, false) - if err != nil { - return nil, err - } - return resp, nil -} - -func (c *httpIngesterClient) doRequest(ctx context.Context, endpoint string, req proto.Message, resp proto.Message, compressed bool) error { - userID, err := user.GetID(ctx) - if err != nil { - return err - } - - var buf bytes.Buffer - if req != nil { - data, err := proto.Marshal(req) - if err != nil { - return fmt.Errorf("unable to marshal request: %v", err) - } - - var writer io.Writer = &buf - if compressed { - writer = snappy.NewWriter(writer) - } - if _, err := writer.Write(data); err != nil { - return err - } - } - - httpReq, err := http.NewRequest("POST", fmt.Sprintf("http://%s%s", c.address, endpoint), &buf) - if err != nil { - return fmt.Errorf("unable to create request: %v", err) - } - httpReq.Header.Add(user.UserIDHeaderName, userID) - // TODO: This isn't actually the correct Content-type. - httpReq.Header.Set("Content-Type", string(expfmt.FmtProtoDelim)) - httpResp, err := c.client.Do(httpReq) - if err != nil { - return fmt.Errorf("error sending request: %v", err) - } - defer httpResp.Body.Close() - if httpResp.StatusCode/100 != 2 { - return errStatusCode(httpResp.StatusCode, httpResp.Status) - } - if resp == nil { - return nil - } - - buf.Reset() - reader := httpResp.Body.(io.Reader) - if compressed { - reader = snappy.NewReader(reader) - } - if _, err = buf.ReadFrom(reader); err != nil { - return fmt.Errorf("unable to read response body: %v", err) - } - - err = proto.Unmarshal(buf.Bytes(), resp) - if err != nil { - return fmt.Errorf("unable to unmarshal response body: %v", err) - } - return nil -} diff --git a/distributor/http_server.go b/distributor/http_server.go index 7592825e2e..157efaa046 100644 --- a/distributor/http_server.go +++ b/distributor/http_server.go @@ -1,6 +1,7 @@ package distributor import ( + "fmt" "net/http" "github.com/prometheus/common/log" @@ -9,6 +10,23 @@ import ( "github.com/weaveworks/cortex/util" ) +// IngesterError is an error we got from an ingester. +type IngesterError struct { + StatusCode int + err error +} + +func errStatusCode(code int, status string) IngesterError { + return IngesterError{ + StatusCode: code, + err: fmt.Errorf("server returned HTTP status %s", status), + } +} + +func (i IngesterError) Error() string { + return i.err.Error() +} + // PushHandler is a http.Handler which accepts WriteRequests. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { var req remote.WriteRequest diff --git a/ingester/http_server.go b/ingester/http_server.go deleted file mode 100644 index 2629da2f5c..0000000000 --- a/ingester/http_server.go +++ /dev/null @@ -1,94 +0,0 @@ -package ingester - -import ( - "net/http" - - "github.com/prometheus/common/log" - "github.com/prometheus/prometheus/storage/remote" - - "github.com/weaveworks/cortex" - "github.com/weaveworks/cortex/util" -) - -// PushHandler is a http.Handler that accepts proto encoded samples. -func (i *Ingester) PushHandler(w http.ResponseWriter, r *http.Request) { - var req remote.WriteRequest - ctx, abort := util.ParseProtoRequest(w, r, &req, true) - if abort { - return - } - - _, err := i.Push(ctx, &req) - if err != nil { - switch err { - case ErrOutOfOrderSample, ErrDuplicateSampleForTimestamp: - log.Warnf("append err: %v", err) - http.Error(w, err.Error(), http.StatusBadRequest) - default: - log.Errorf("append err: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } -} - -// QueryHandler is a http.Handler that accepts protobuf formatted -// query requests and serves them. -func (i *Ingester) QueryHandler(w http.ResponseWriter, r *http.Request) { - var req cortex.QueryRequest - ctx, abort := util.ParseProtoRequest(w, r, &req, false) - if abort { - return - } - - resp, err := i.Query(ctx, &req) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - util.WriteProtoResponse(w, resp) -} - -// LabelValuesHandler handles label values -func (i *Ingester) LabelValuesHandler(w http.ResponseWriter, r *http.Request) { - var req cortex.LabelValuesRequest - ctx, abort := util.ParseProtoRequest(w, r, &req, false) - if abort { - return - } - - resp, err := i.LabelValues(ctx, &req) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - util.WriteProtoResponse(w, resp) -} - -// UserStatsHandler handles user stats requests to the Ingester. -func (i *Ingester) UserStatsHandler(w http.ResponseWriter, r *http.Request) { - ctx, abort := util.ParseProtoRequest(w, r, nil, false) - if abort { - return - } - - resp, err := i.UserStats(ctx, &cortex.UserStatsRequest{}) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - util.WriteProtoResponse(w, resp) -} - -// ReadinessHandler returns 204 when the ingester is ready, -// 500 otherwise. It's used by kubernetes to indicate if the ingester -// pool is ready to have ingesters added / removed. -func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { - if i.Ready() { - w.WriteHeader(http.StatusNoContent) - } else { - w.WriteHeader(http.StatusInternalServerError) - } -}