From b46ff9f27299200295fa36d245a9391e3081768c Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Thu, 30 Mar 2023 15:40:59 -0400 Subject: [PATCH 1/9] Initial OTLP ingest support Signed-off-by: Anthony J Mirabella --- go.mod | 2 +- pkg/api/api.go | 1 + pkg/util/push/otlp.go | 115 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 pkg/util/push/otlp.go diff --git a/go.mod b/go.mod index aeaeb575db..2cfce6348b 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 github.com/google/go-cmp v0.6.0 github.com/sercand/kuberesolver/v4 v4.0.0 + go.opentelemetry.io/collector/pdata v1.3.0 golang.org/x/exp v0.0.0-20240119083558-1b970713d09a google.golang.org/protobuf v1.33.0 ) @@ -208,7 +209,6 @@ require ( go.mongodb.org/mongo-driver v1.14.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/featuregate v1.3.0 // indirect - go.opentelemetry.io/collector/pdata v1.3.0 // indirect go.opentelemetry.io/collector/semconv v0.96.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/pkg/api/api.go b/pkg/api/api.go index 6171150ae8..06891cf065 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -260,6 +260,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) + a.RegisterRoute("/api/v1/push/otlp", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go new file mode 100644 index 0000000000..6a6655aa2c --- /dev/null +++ b/pkg/util/push/otlp.go @@ -0,0 +1,115 @@ +package push + +import ( + "io" + "net/http" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" +) + +// OTLPHandler is a http.Handler which accepts OTLP metrics. +func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + logger := log.WithContext(ctx, log.Logger) + if sourceIPs != nil { + source := sourceIPs.Get(r) + if source != "" { + ctx = util.AddSourceIPsToOutgoingContext(ctx, source) + logger = log.WithSourceIPs(source, logger) + } + } + + buf, err := io.ReadAll(r.Body) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + req := pmetricotlp.NewExportRequest() + err = req.UnmarshalProto(buf) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + tsMap, err := prometheusremotewrite.FromMetrics(req.Metrics(), prometheusremotewrite.Settings{DisableTargetInfo: true}) + if err != nil { + level.Error(logger).Log("err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + prwReq := cortexpb.WriteRequest{ + Source: cortexpb.API, + Metadata: nil, + SkipLabelNameValidation: false, + } + + tsList := []cortexpb.PreallocTimeseries(nil) + for _, v := range tsMap { + tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{ + Labels: makeLabels(v.Labels), + Samples: makeSamples(v.Samples), + Exemplars: makeExemplars(v.Exemplars), + }}) + } + prwReq.Timeseries = tsList + + if _, err := push(ctx, &prwReq); err != nil { + resp, ok := httpgrpc.HTTPResponseFromError(err) + if !ok { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.GetCode()/100 == 5 { + level.Error(logger).Log("msg", "push error", "err", err) + } else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests { + level.Warn(logger).Log("msg", "push refused", "err", err) + } + http.Error(w, string(resp.Body), int(resp.Code)) + } + }) +} + +func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter { + out := make(labels.Labels, 0, len(in)) + for _, l := range in { + out = append(out, labels.Label{Name: l.Name, Value: l.Value}) + } + return cortexpb.FromLabelsToLabelAdapters(out) +} + +func makeSamples(in []prompb.Sample) []cortexpb.Sample { + out := make([]cortexpb.Sample, 0, len(in)) + for _, s := range in { + out = append(out, cortexpb.Sample{ + Value: s.Value, + TimestampMs: s.Timestamp, + }) + } + return out +} + +func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar { + out := make([]cortexpb.Exemplar, 0, len(in)) + for _, e := range in { + out = append(out, cortexpb.Exemplar{ + Labels: makeLabels(e.Labels), + Value: e.Value, + TimestampMs: e.Timestamp, + }) + } + return out +} From da38947f8d9ae39e5aa6daebadc7f28c40b35943 Mon Sep 17 00:00:00 2001 From: Anthony J Mirabella Date: Wed, 5 Apr 2023 15:51:57 -0400 Subject: [PATCH 2/9] Add resoure attribute conversion Signed-off-by: Anthony J Mirabella --- pkg/util/push/otlp.go | 70 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index 6a6655aa2c..8578acf09e 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -13,6 +13,8 @@ import ( "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" ) @@ -44,7 +46,7 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle return } - tsMap, err := prometheusremotewrite.FromMetrics(req.Metrics(), prometheusremotewrite.Settings{DisableTargetInfo: true}) + tsMap, err := prometheusremotewrite.FromMetrics(convertToMetricsAttributes(req.Metrics()), prometheusremotewrite.Settings{DisableTargetInfo: true}) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -113,3 +115,69 @@ func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar { } return out } + +func convertToMetricsAttributes(md pmetric.Metrics) pmetric.Metrics { + cloneMd := pmetric.NewMetrics() + md.CopyTo(cloneMd) + rms := cloneMd.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + resource := rms.At(i).Resource() + + ilms := rms.At(i).ScopeMetrics() + for j := 0; j < ilms.Len(); j++ { + ilm := ilms.At(j) + metricSlice := ilm.Metrics() + for k := 0; k < metricSlice.Len(); k++ { + addAttributesToMetric(metricSlice.At(k), resource.Attributes()) + } + } + } + return cloneMd +} + +// addAttributesToMetric adds additional labels to the given metric +func addAttributesToMetric(metric pmetric.Metric, labelMap pcommon.Map) { + switch metric.Type() { + case pmetric.MetricTypeGauge: + addAttributesToNumberDataPoints(metric.Gauge().DataPoints(), labelMap) + case pmetric.MetricTypeSum: + addAttributesToNumberDataPoints(metric.Sum().DataPoints(), labelMap) + case pmetric.MetricTypeHistogram: + addAttributesToHistogramDataPoints(metric.Histogram().DataPoints(), labelMap) + case pmetric.MetricTypeSummary: + addAttributesToSummaryDataPoints(metric.Summary().DataPoints(), labelMap) + case pmetric.MetricTypeExponentialHistogram: + addAttributesToExponentialHistogramDataPoints(metric.ExponentialHistogram().DataPoints(), labelMap) + } +} + +func addAttributesToNumberDataPoints(ps pmetric.NumberDataPointSlice, newAttributeMap pcommon.Map) { + for i := 0; i < ps.Len(); i++ { + joinAttributeMaps(newAttributeMap, ps.At(i).Attributes()) + } +} + +func addAttributesToHistogramDataPoints(ps pmetric.HistogramDataPointSlice, newAttributeMap pcommon.Map) { + for i := 0; i < ps.Len(); i++ { + joinAttributeMaps(newAttributeMap, ps.At(i).Attributes()) + } +} + +func addAttributesToSummaryDataPoints(ps pmetric.SummaryDataPointSlice, newAttributeMap pcommon.Map) { + for i := 0; i < ps.Len(); i++ { + joinAttributeMaps(newAttributeMap, ps.At(i).Attributes()) + } +} + +func addAttributesToExponentialHistogramDataPoints(ps pmetric.ExponentialHistogramDataPointSlice, newAttributeMap pcommon.Map) { + for i := 0; i < ps.Len(); i++ { + joinAttributeMaps(newAttributeMap, ps.At(i).Attributes()) + } +} + +func joinAttributeMaps(from, to pcommon.Map) { + from.Range(func(k string, v pcommon.Value) bool { + v.CopyTo(to.PutEmpty(k)) + return true + }) +} From b4046d509746765764fb581ab87c3655a8227685 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Thu, 14 Mar 2024 21:07:36 +0100 Subject: [PATCH 3/9] Fix lint Signed-off-by: Friedrich Gonzalez --- pkg/util/push/otlp.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index 8578acf09e..01773518fc 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -4,9 +4,6 @@ import ( "io" "net/http" - "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" @@ -16,6 +13,10 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/log" ) // OTLPHandler is a http.Handler which accepts OTLP metrics. From 95b754f1002724895afb83fbc842d55a175c6fec Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Mon, 18 Mar 2024 10:21:43 +0100 Subject: [PATCH 4/9] Put under /api/v1/otlp Signed-off-by: Friedrich Gonzalez --- pkg/api/api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 06891cf065..03e6c77801 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -260,8 +260,8 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) - a.RegisterRoute("/api/v1/push/otlp", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics") From 5107edf4b89007db66701476f909512473f112a8 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Mon, 18 Mar 2024 15:30:55 +0100 Subject: [PATCH 5/9] Re-use DecodeOTLPWriteRequest Signed-off-by: Friedrich Gonzalez --- pkg/util/push/otlp.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index 01773518fc..f922dd08c7 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -1,18 +1,17 @@ package push import ( - "io" "net/http" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util" @@ -31,16 +30,7 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle logger = log.WithSourceIPs(source, logger) } } - - buf, err := io.ReadAll(r.Body) - if err != nil { - level.Error(logger).Log("err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - req := pmetricotlp.NewExportRequest() - err = req.UnmarshalProto(buf) + req, err := remote.DecodeOTLPWriteRequest(r) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) From 11be7a81b25a0372948201cd537f13414695bfff Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Wed, 20 Mar 2024 10:42:46 +0100 Subject: [PATCH 6/9] Tests Signed-off-by: Friedrich Gonzalez --- pkg/util/push/otlp_test.go | 129 +++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 pkg/util/push/otlp_test.go diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go new file mode 100644 index 0000000000..641fc2c566 --- /dev/null +++ b/pkg/util/push/otlp_test.go @@ -0,0 +1,129 @@ +package push + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +func TestOTLPWriteHandler(t *testing.T) { + exportRequest := generateOTLPWriteRequest(t) + + buf, err := exportRequest.MarshalProto() + require.NoError(t, err) + + req, err := http.NewRequest("", "", bytes.NewReader(buf)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/x-protobuf") + + push := verifyOTLPWriteRequestHandler(t, cortexpb.API) + handler := OTLPHandler(nil, push) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + + resp := recorder.Result() + require.Equal(t, http.StatusOK, resp.StatusCode) +} + +func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest { + d := pmetric.NewMetrics() + + // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram + // with resource attributes: service.name="test-service", service.instance.id="test-instance", host.name="test-host" + // with metric attibute: foo.bar="baz" + + timestamp := time.Now() + + resourceMetric := d.ResourceMetrics().AppendEmpty() + resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") + resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance") + resourceMetric.Resource().Attributes().PutStr("host.name", "test-host") + + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + // Generate One Counter + counterMetric := scopeMetric.Metrics().AppendEmpty() + counterMetric.SetName("test-counter") + counterMetric.SetDescription("test-counter-description") + counterMetric.SetEmptySum() + counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + counterMetric.Sum().SetIsMonotonic(true) + + counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty() + counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + counterDataPoint.SetDoubleValue(10.0) + counterDataPoint.Attributes().PutStr("foo.bar", "baz") + + counterExemplar := counterDataPoint.Exemplars().AppendEmpty() + counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + counterExemplar.SetDoubleValue(10.0) + counterExemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7}) + counterExemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) + + // Generate One Gauge + gaugeMetric := scopeMetric.Metrics().AppendEmpty() + gaugeMetric.SetName("test-gauge") + gaugeMetric.SetDescription("test-gauge-description") + gaugeMetric.SetEmptyGauge() + + gaugeDataPoint := gaugeMetric.Gauge().DataPoints().AppendEmpty() + gaugeDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + gaugeDataPoint.SetDoubleValue(10.0) + gaugeDataPoint.Attributes().PutStr("foo.bar", "baz") + + // Generate One Histogram + histogramMetric := scopeMetric.Metrics().AppendEmpty() + histogramMetric.SetName("test-histogram") + histogramMetric.SetDescription("test-histogram-description") + histogramMetric.SetEmptyHistogram() + histogramMetric.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + histogramDataPoint := histogramMetric.Histogram().DataPoints().AppendEmpty() + histogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + histogramDataPoint.ExplicitBounds().FromRaw([]float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0}) + histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2, 2}) + histogramDataPoint.SetCount(10) + histogramDataPoint.SetSum(30.0) + histogramDataPoint.Attributes().PutStr("foo.bar", "baz") + + // Generate One Exponential-Histogram + exponentialHistogramMetric := scopeMetric.Metrics().AppendEmpty() + exponentialHistogramMetric.SetName("test-exponential-histogram") + exponentialHistogramMetric.SetDescription("test-exponential-histogram-description") + exponentialHistogramMetric.SetEmptyExponentialHistogram() + exponentialHistogramMetric.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + exponentialHistogramDataPoint := exponentialHistogramMetric.ExponentialHistogram().DataPoints().AppendEmpty() + exponentialHistogramDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + exponentialHistogramDataPoint.SetScale(2.0) + exponentialHistogramDataPoint.Positive().BucketCounts().FromRaw([]uint64{2, 2, 2, 2, 2}) + exponentialHistogramDataPoint.SetZeroCount(2) + exponentialHistogramDataPoint.SetCount(10) + exponentialHistogramDataPoint.SetSum(30.0) + exponentialHistogramDataPoint.Attributes().PutStr("foo.bar", "baz") + + return pmetricotlp.NewExportRequestFromMetrics(d) +} + +func verifyOTLPWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequest_SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { + t.Helper() + return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { + assert.Len(t, request.Timeseries, 12) // 1 (counter) + 1 (gauge) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + 1 (exponential histogram) + // TODO: test more things + assert.Equal(t, expectSource, request.Source) + assert.False(t, request.SkipLabelNameValidation) + return &cortexpb.WriteResponse{}, nil + } +} From cc3d30faf2f54aea051078476047528f62fd9730 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Thu, 11 Apr 2024 23:18:30 +0200 Subject: [PATCH 7/9] Integration test Signed-off-by: Friedrich Gonzalez --- integration/e2ecortex/client.go | 71 ++++++++++++++++++++++++++++++ integration/otlp_test.go | 76 +++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 integration/otlp_test.go diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 692b585d33..fd6bc8927b 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -29,6 +29,9 @@ import ( "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/util/backoff" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" ) var ErrNotFound = errors.New("not found") @@ -142,6 +145,74 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) { return res, nil } +func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) { + var metricName string + attributes := make(map[string]any) + for _, label := range ts.Labels { + if label.Name == model.MetricNameLabel { + metricName = label.Value + } else { + attributes[label.Name] = label.Value + } + } + return metricName, attributes +} + +func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) { + newMetric.SetEmptyGauge() + for _, sample := range samples { + datapoint := newMetric.Gauge().DataPoints().AppendEmpty() + datapoint.SetDoubleValue(sample.Value) + datapoint.SetTimestamp(pcommon.Timestamp(sample.Timestamp * time.Millisecond.Nanoseconds())) + datapoint.Attributes().FromRaw(attributes) + } +} + +// Convert Timeseries to Metrics +func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics { + metrics := pmetric.NewMetrics() + for _, ts := range timeseries { + metricName, attributes := getNameAndAttributes(ts) + newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + newMetric.SetName(metricName) + //TODO Set description for new metric + //TODO Set unit for new metric + createDatapointsGauge(newMetric, attributes, ts.Samples) + //TODO(friedrichg): Add support for histograms + } + return metrics +} + +// Push series to OTLP endpoint +func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) { + + data, err := pmetricotlp.NewExportRequestFromMetrics(convertTimeseriesToMetrics(timeseries)).MarshalProto() + if err != nil { + return nil, err + } + + // Create HTTP request + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/otlp/v1/metrics", c.distributorAddress), bytes.NewReader(data)) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + defer res.Body.Close() + return res, nil +} + // Query runs an instant query. func (c *Client) Query(query string, ts time.Time) (model.Value, error) { value, _, err := c.querierClient.Query(context.Background(), query, ts) diff --git a/integration/otlp_test.go b/integration/otlp_test.go new file mode 100644 index 0000000000..64c33334f7 --- /dev/null +++ b/integration/otlp_test.go @@ -0,0 +1,76 @@ +//go:build requires_docker +// +build requires_docker + +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestOTLP(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start Cortex components. + require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks.yaml", cortexConfigFile)) + + // Start Cortex in single binary mode, reading the config from file and overwriting + // the backend config to make it work with Minio. + flags := map[string]string{ + "-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey, + "-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey, + "-blocks-storage.s3.bucket-name": bucketName, + "-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-blocks-storage.s3.insecure": "true", + } + + cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095) + require.NoError(t, s.StartAndWaitReady(cortex)) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + now := time.Now() + series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"}) + + res, err := c.OTLP(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Query the series. + result, err := c.Query("series_1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + + labelValues, err := c.LabelValues("foo", time.Time{}, time.Time{}, nil) + require.NoError(t, err) + require.Equal(t, model.LabelValues{"bar"}, labelValues) + + labelNames, err := c.LabelNames(time.Time{}, time.Time{}) + require.NoError(t, err) + require.Equal(t, []string{"__name__", "foo"}, labelNames) + + // Check that a range query does not return an error to sanity check the queryrange tripperware. + _, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second) + require.NoError(t, err) + + //TODO(friedrichg): test histograms +} From c7cda19ed86539e41522b2bcffb03c51cf90c2fa Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Thu, 11 Apr 2024 23:41:30 +0200 Subject: [PATCH 8/9] Fix lint and minimal docs Signed-off-by: Friedrich Gonzalez --- CHANGELOG.md | 1 + docs/api/_index.md | 1 + docs/configuration/v1-guarantees.md | 1 + integration/e2ecortex/client.go | 5 +++-- 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4f9b874ad..15bbbd8351 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * [CHANGE] Querier: Mark `-querier.ingester-streaming` flag as deprecated. Now query ingester streaming is always enabled. #5817 * [CHANGE] Compactor/Bucket Store: Added `-blocks-storage.bucket-store.block-discovery-strategy` to configure different block listing strategy. Reverted the current recursive block listing mechanism and use the strategy `Concurrent` as in 1.15. #5828 * [CHANGE] Compactor: Don't halt compactor when overlapped source blocks detected. #5854 +* [FEATURE] OTLP ingestion experimental. #5813 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 diff --git a/docs/api/_index.md b/docs/api/_index.md index a4572059cf..0c8a787927 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -26,6 +26,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi | [Pprof](#pprof) | _All services_ || `GET /debug/pprof` | | [Fgprof](#fgprof) | _All services_ || `GET /debug/fgprof` | | [Remote write](#remote-write) | Distributor || `POST /api/v1/push` | +| [OTLP receiver](#otlp-receiver) | Distributor || `POST /api/v1/otlp/v1/metrics` | | [Tenants stats](#tenants-stats) | Distributor || `GET /distributor/all_user_stats` | | [HA tracker status](#ha-tracker-status) | Distributor || `GET /distributor/ha_tracker` | | [Flush blocks](#flush-blocks) | Ingester || `GET,POST /ingester/flush` | diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 382cc5c502..41a99169b7 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -112,3 +112,4 @@ Currently experimental features are: - `-ruler.ring.final-sleep` (duration) CLI flag - `store-gateway.sharding-ring.final-sleep` (duration) CLI flag - `alertmanager-sharding-ring.final-sleep` (duration) CLI flag +- OTLP Receiver diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index fd6bc8927b..9fee51d903 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -27,11 +27,12 @@ import ( "github.com/prometheus/prometheus/storage/remote" yaml "gopkg.in/yaml.v3" - "github.com/cortexproject/cortex/pkg/ruler" - "github.com/cortexproject/cortex/pkg/util/backoff" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + + "github.com/cortexproject/cortex/pkg/ruler" + "github.com/cortexproject/cortex/pkg/util/backoff" ) var ErrNotFound = errors.New("not found") From 88e23e832e3d6df4ba7e095cb28f6f5abb5bb830 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Thu, 11 Apr 2024 23:47:34 +0200 Subject: [PATCH 9/9] Catch error Signed-off-by: Friedrich Gonzalez --- integration/e2ecortex/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 9fee51d903..003cef426d 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -165,7 +165,10 @@ func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, datapoint := newMetric.Gauge().DataPoints().AppendEmpty() datapoint.SetDoubleValue(sample.Value) datapoint.SetTimestamp(pcommon.Timestamp(sample.Timestamp * time.Millisecond.Nanoseconds())) - datapoint.Attributes().FromRaw(attributes) + err := datapoint.Attributes().FromRaw(attributes) + if err != nil { + panic(err) + } } }