diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b823a0197..23a91fa683 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Server: Instrument `cortex_request_duration_seconds` metric with native histogram. If `native-histograms` feature is enabled in monitoring Prometheus then the metric name needs to be updated in your dashboards. #6056 * [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 * [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005 +* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071 * [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987 * [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892 * [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916 diff --git a/integration/e2e/util.go b/integration/e2e/util.go index c9ded9b5a3..68a25a3af3 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -150,9 +150,8 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label) return } -func GenerateHistogramSeries(name string, ts time.Time, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) { +func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) { tsMillis := TimeToMilliseconds(ts) - i := rand.Uint32() lbls := append( []prompb.Label{ diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index a993816f0a..dbc626d6a5 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -159,7 +159,7 @@ func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) { return metricName, attributes } -func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) { +func createDataPointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) { newMetric.SetEmptyGauge() for _, sample := range samples { datapoint := newMetric.Gauge().DataPoints().AppendEmpty() @@ -172,6 +172,47 @@ func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, } } +func createDataPointsExponentialHistogram(newMetric pmetric.Metric, attributes map[string]any, histograms []prompb.Histogram) { + newMetric.SetEmptyExponentialHistogram() + for _, h := range histograms { + datapoint := newMetric.ExponentialHistogram().DataPoints().AppendEmpty() + datapoint.SetTimestamp(pcommon.Timestamp(h.Timestamp * time.Millisecond.Nanoseconds())) + datapoint.SetCount(h.GetCountInt()) + datapoint.SetSum(h.GetSum()) + datapoint.SetScale(h.GetSchema()) + datapoint.SetZeroCount(h.GetZeroCountInt()) + datapoint.SetZeroThreshold(h.GetZeroThreshold()) + convertBucketLayout(datapoint.Positive(), h.PositiveSpans, h.PositiveDeltas) + convertBucketLayout(datapoint.Negative(), h.NegativeSpans, h.NegativeDeltas) + err := datapoint.Attributes().FromRaw(attributes) + if err != nil { + panic(err) + } + } +} + +// convertBucketLayout converts Prometheus remote write bucket layout to Exponential Histogram bucket layout. +func convertBucketLayout(bucket pmetric.ExponentialHistogramDataPointBuckets, spans []prompb.BucketSpan, deltas []int64) { + vals := make([]uint64, 0) + iDelta := 0 + count := int64(0) + for i, span := range spans { + if i == 0 { + bucket.SetOffset(span.GetOffset() - 1) + } else { + for j := 0; j < int(span.GetOffset()); j++ { + vals = append(vals, 0) + } + } + for j := 0; j < int(span.Length); j++ { + count += deltas[iDelta] + vals = append(vals, uint64(count)) + iDelta++ + } + } + bucket.BucketCounts().FromRaw(vals) +} + // Convert Timeseries to Metrics func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics { metrics := pmetric.NewMetrics() @@ -181,8 +222,11 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics newMetric.SetName(metricName) //TODO Set description for new metric //TODO Set unit for new metric - createDatapointsGauge(newMetric, attributes, ts.Samples) - //TODO(friedrichg): Add support for histograms + if len(ts.Samples) > 0 { + createDataPointsGauge(newMetric, attributes, ts.Samples) + } else if len(ts.Histograms) > 0 { + createDataPointsExponentialHistogram(newMetric, attributes, ts.Histograms) + } } return metrics } diff --git a/integration/native_histogram_test.go b/integration/native_histogram_test.go index 1e27576c2e..d3f58a20b7 100644 --- a/integration/native_histogram_test.go +++ b/integration/native_histogram_test.go @@ -4,11 +4,13 @@ package integration import ( + "math/rand" "testing" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/integration/e2e" @@ -51,14 +53,16 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { seriesTimestamp := time.Now() series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) - series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) - series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + histogramIdx1 := rand.Uint32() + series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, histogramIdx1, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) res, err := c.Push(append(series1, series1Float...)) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) - series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) - series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + histogramIdx2 := rand.Uint32() + series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, histogramIdx2, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) res, err = c.Push(append(series2, series2Float...)) require.NoError(t, err) require.Equal(t, 200, res.StatusCode) @@ -96,6 +100,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) + expectedHistogram1 := tsdbutil.GenerateTestHistogram(int(histogramIdx1)) + expectedHistogram2 := tsdbutil.GenerateTestHistogram(int(histogramIdx2)) result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) require.NoError(t, err) require.Equal(t, model.ValMatrix, result.Type()) @@ -106,6 +112,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { require.NotEmpty(t, ss.Histograms) for _, h := range ss.Histograms { require.NotEmpty(t, h) + require.Equal(t, float64(expectedHistogram1.Count), float64(h.Histogram.Count)) + require.Equal(t, float64(expectedHistogram1.Sum), float64(h.Histogram.Sum)) } } @@ -119,6 +127,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { require.NotEmpty(t, ss.Histograms) for _, h := range ss.Histograms { require.NotEmpty(t, h) + require.Equal(t, float64(expectedHistogram2.Count), float64(h.Histogram.Count)) + require.Equal(t, float64(expectedHistogram2.Sum), float64(h.Histogram.Sum)) } } @@ -129,6 +139,8 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { require.Equal(t, 2, v.Len()) for _, s := range v { require.NotNil(t, s.Histogram) + require.Equal(t, float64(expectedHistogram1.Count), float64(s.Histogram.Count)) + require.Equal(t, float64(expectedHistogram1.Sum), float64(s.Histogram.Sum)) } result, err = c.Query(`series_2`, series2Timestamp) @@ -138,5 +150,7 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { require.Equal(t, 2, v.Len()) for _, s := range v { require.NotNil(t, s.Histogram) + require.Equal(t, float64(expectedHistogram2.Count), float64(s.Histogram.Count)) + require.Equal(t, float64(expectedHistogram2.Sum), float64(s.Histogram.Sum)) } } diff --git a/integration/otlp_test.go b/integration/otlp_test.go index 64c33334f7..0d56c3751b 100644 --- a/integration/otlp_test.go +++ b/integration/otlp_test.go @@ -5,11 +5,13 @@ package integration import ( "fmt" + "math/rand" "testing" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,11 +35,12 @@ func TestOTLP(t *testing.T) { // Start Cortex in single binary mode, reading the config from file and overwriting // the backend config to make it work with Minio. flags := map[string]string{ - "-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey, - "-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey, - "-blocks-storage.s3.bucket-name": bucketName, - "-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), - "-blocks-storage.s3.insecure": "true", + "-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey, + "-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey, + "-blocks-storage.s3.bucket-name": bucketName, + "-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-blocks-storage.s3.insecure": "true", + "-blocks-storage.tsdb.enable-native-histograms": "true", } cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095) @@ -72,5 +75,19 @@ func TestOTLP(t *testing.T) { _, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second) require.NoError(t, err) - //TODO(friedrichg): test histograms + i := rand.Uint32() + histogramSeries := e2e.GenerateHistogramSeries("histogram_series", now, i, false, prompb.Label{Name: "job", Value: "test"}) + res, err = c.Push(histogramSeries) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + result, err = c.Query(`histogram_series`, now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + v := result.(model.Vector) + require.Equal(t, 1, v.Len()) + expectedHistogram := tsdbutil.GenerateTestHistogram(int(i)) + require.NotNil(t, v[0].Histogram) + require.Equal(t, float64(expectedHistogram.Count), float64(v[0].Histogram.Count)) + require.Equal(t, expectedHistogram.Sum, float64(v[0].Histogram.Sum)) }