diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/MetricDataMapper.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/MetricDataMapper.java index 6c415242d525..5f5705337eb3 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/MetricDataMapper.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/MetricDataMapper.java @@ -72,6 +72,23 @@ public MetricDataMapper(BiConsumer telemetry this.captureHttpServer4xxAsError = captureHttpServer4xxAsError; } + public void mapMetrics(MetricData metricData, Consumer consumer) { + MetricDataType type = metricData.getType(); + if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) { + + // DO NOT emit unstable metrics from the OpenTelemetry auto instrumentation libraries + // custom metrics are always emitted + if (OTEL_UNSTABLE_METRICS_TO_EXCLUDE.contains(metricData.getName()) + && metricData.getInstrumentationScopeInfo().getName().startsWith(OTEL_INSTRUMENTATION_NAME_PREFIX)) { + return; + } + List stableOtelMetrics = convertOtelMetricToAzureMonitorMetric(metricData, false); + stableOtelMetrics.forEach(consumer::accept); + } else { + logger.warning("metric data type {} is not supported yet.", metricData.getType()); + } + } + public void map(MetricData metricData, Consumer consumer) { MetricDataType type = metricData.getType(); if (type == DOUBLE_SUM || type == DOUBLE_GAUGE || type == LONG_SUM || type == LONG_GAUGE || type == HISTOGRAM) { diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulse.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulse.java index 15411d3a33b2..89eaa93fa518 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulse.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulse.java @@ -5,6 +5,7 @@ import com.azure.core.http.HttpPipeline; import com.azure.core.http.HttpRequest; +import com.azure.monitor.opentelemetry.exporter.implementation.MetricDataMapper; import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem; import com.azure.monitor.opentelemetry.exporter.implementation.utils.HostName; import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings; @@ -20,13 +21,15 @@ public class QuickPulse { - static final int QP_INVARIANT_VERSION = 1; + // 6 represents filtering support for Otel metrics only is enabled + static final int QP_INVARIANT_VERSION = 6; private volatile QuickPulseDataCollector collector; public static QuickPulse create(HttpPipeline httpPipeline, Supplier endpointUrl, Supplier instrumentationKey, @Nullable String roleName, @Nullable String roleInstance, - boolean useNormalizedValueForNonNormalizedCpuPercentage, String sdkVersion) { + boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader, + MetricDataMapper metricDataMapper, String sdkVersion) { QuickPulse quickPulse = new QuickPulse(); @@ -41,7 +44,7 @@ public static QuickPulse create(HttpPipeline httpPipeline, Supplier endpoin Thread.currentThread().interrupt(); } quickPulse.initialize(httpPipeline, endpointUrl, instrumentationKey, roleName, roleInstance, - useNormalizedValueForNonNormalizedCpuPercentage, sdkVersion); + useNormalizedValueForNonNormalizedCpuPercentage, quickPulseMetricReader, metricDataMapper, sdkVersion); }); // the condition below will always be false, but by referencing the executor it ensures the // executor can't become unreachable in the middle of the execute() method execution above @@ -66,12 +69,15 @@ public void add(TelemetryItem telemetryItem) { private void initialize(HttpPipeline httpPipeline, Supplier endpointUrl, Supplier instrumentationKey, @Nullable String roleName, @Nullable String roleInstance, - boolean useNormalizedValueForNonNormalizedCpuPercentage, String sdkVersion) { + boolean useNormalizedValueForNonNormalizedCpuPercentage, QuickPulseMetricReader quickPulseMetricReader, + MetricDataMapper metricDataMapper, String sdkVersion) { String quickPulseId = UUID.randomUUID().toString().replace("-", ""); ArrayBlockingQueue sendQueue = new ArrayBlockingQueue<>(256, true); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); - QuickPulseDataSender quickPulseDataSender = new QuickPulseDataSender(httpPipeline, sendQueue); + QuickPulseDataSender quickPulseDataSender + = new QuickPulseDataSender(httpPipeline, sendQueue, quickPulseConfiguration); String instanceName = roleInstance; String machineName = HostName.get(); @@ -84,12 +90,12 @@ private void initialize(HttpPipeline httpPipeline, Supplier endpointUrl, Su } QuickPulseDataCollector collector - = new QuickPulseDataCollector(useNormalizedValueForNonNormalizedCpuPercentage); + = new QuickPulseDataCollector(useNormalizedValueForNonNormalizedCpuPercentage, quickPulseConfiguration); QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(httpPipeline, endpointUrl, - instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion); + instrumentationKey, roleName, instanceName, machineName, quickPulseId, sdkVersion, quickPulseConfiguration); QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher(collector, sendQueue, endpointUrl, - instrumentationKey, roleName, instanceName, machineName, quickPulseId); + instrumentationKey, roleName, instanceName, machineName, quickPulseId, quickPulseConfiguration); QuickPulseCoordinatorInitData coordinatorInitData = new QuickPulseCoordinatorInitDataBuilder().withPingSender(quickPulsePingSender) @@ -100,6 +106,14 @@ private void initialize(HttpPipeline httpPipeline, Supplier endpointUrl, Su QuickPulseCoordinator coordinator = new QuickPulseCoordinator(coordinatorInitData); + QuickPulseMetricReceiver quickPulseMetricReceiver + = new QuickPulseMetricReceiver(quickPulseMetricReader, metricDataMapper, collector); + + Thread metricReceiverThread + = new Thread(quickPulseMetricReceiver, QuickPulseMetricReceiver.class.getSimpleName()); + metricReceiverThread.setDaemon(true); + metricReceiverThread.start(); + Thread senderThread = new Thread(quickPulseDataSender, QuickPulseDataSender.class.getSimpleName()); senderThread.setDaemon(true); senderThread.start(); diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseConfiguration.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseConfiguration.java new file mode 100644 index 000000000000..7c5ab8700637 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseConfiguration.java @@ -0,0 +1,250 @@ +package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse; + +import com.azure.core.http.HttpResponse; +import com.azure.core.util.logging.ClientLogger; +import com.azure.json.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +public class QuickPulseConfiguration { + private static final ClientLogger logger = new ClientLogger(QuickPulseDataFetcher.class); + private AtomicReference etag = new AtomicReference<>(); + private ConcurrentHashMap> derivedMetrics = new ConcurrentHashMap<>(); + + public synchronized String getEtag() { + return this.etag.get(); + } + + public synchronized void setEtag(String etag) { + this.etag.set(etag); + } + + public synchronized ConcurrentHashMap> getDerivedMetrics() { + return this.derivedMetrics; + } + + public synchronized void setDerivedMetrics(ConcurrentHashMap> metrics) { + this.derivedMetrics = metrics; + } + + public synchronized void updateConfig(String etagValue, + ConcurrentHashMap> otelMetrics) { + if (!Objects.equals(this.getEtag(), etagValue)) { + this.setEtag(etagValue); + this.setDerivedMetrics(otelMetrics); + } + + } + + public ConcurrentHashMap> parseDerivedMetrics(HttpResponse response) + throws IOException { + + ConcurrentHashMap> requestedMetrics = new ConcurrentHashMap<>(); + try { + + String responseBody = response.getBodyAsString().block(); + if (responseBody == null || responseBody.isEmpty()) { + return new ConcurrentHashMap>(); + } + + try (JsonReader jsonReader = JsonProviders.createReader(responseBody)) { + jsonReader.nextToken(); + while (jsonReader.nextToken() != JsonToken.END_OBJECT) { + if ("Metrics".equals(jsonReader.getFieldName())) { + jsonReader.nextToken(); + + while (jsonReader.nextToken() != JsonToken.END_ARRAY) { + DerivedMetricInfo metric = new DerivedMetricInfo(); + + while (jsonReader.nextToken() != JsonToken.END_OBJECT) { + + String fieldName = jsonReader.getFieldName(); + jsonReader.nextToken(); + + switch (fieldName) { + case "Id": + metric.setId(jsonReader.getString()); + break; + + case "Aggregation": + metric.setAggregation(jsonReader.getString()); + break; + + case "TelemetryType": + metric.setTelemetryType(jsonReader.getString()); + break; + + case "Projection": + metric.setProjection(jsonReader.getString()); + break; + + case "FilterGroups": + // Handle "FilterGroups" field + if (jsonReader.currentToken() == JsonToken.START_ARRAY) { + while (jsonReader.nextToken() != JsonToken.END_ARRAY) { + if (jsonReader.currentToken() == JsonToken.START_OBJECT) { + while (jsonReader.nextToken() != JsonToken.END_OBJECT) { + if (jsonReader.currentToken() == JsonToken.FIELD_NAME + && jsonReader.getFieldName().equals("Filters")) { + jsonReader.nextToken(); + if (jsonReader.currentToken() == JsonToken.START_ARRAY) { + while (jsonReader.nextToken() != JsonToken.END_ARRAY) { + if (jsonReader.currentToken() + == JsonToken.START_OBJECT) { + String innerFieldName = ""; + String predicate = ""; + String comparand = ""; + + while (jsonReader.nextToken() + != JsonToken.END_OBJECT) { + String filterFieldName + = jsonReader.getFieldName(); + jsonReader.nextToken(); + + switch (filterFieldName) { + case "FieldName": + innerFieldName + = jsonReader.getString(); + if (innerFieldName.contains(".")) { + innerFieldName = innerFieldName + .split("\\.")[1]; + } + break; + + case "Predicate": + predicate = jsonReader.getString(); + break; + + case "Comparand": + comparand = jsonReader.getString(); + break; + } + } + + if (!innerFieldName.isEmpty() + && !innerFieldName.equals("undefined") + && !predicate.isEmpty() + && !comparand.isEmpty()) { + metric.addFilterGroup(innerFieldName, + predicate, comparand); + } + } + } + } + } + } + } + } + } + break; + + default: + jsonReader.skipChildren(); + break; + } + } + requestedMetrics.computeIfAbsent(metric.getTelemetryType(), k -> new ArrayList<>()) + .add(metric); + } + } else { + jsonReader.skipChildren(); + + } + } + } + return requestedMetrics; + } catch (Exception e) { + logger.verbose("Failed to parse metrics from response: %s", e.getMessage()); + } + return new ConcurrentHashMap>(); + } + + public class DerivedMetricInfo { + private String id; + private String projection; + private String telemetryType; + private String aggregation; + private ArrayList filterGroups = new ArrayList(); + + public String getId() { + return this.id; + } + + public void setId(String id) { + this.id = id; + } + + public String getProjection() { + return projection; + } + + public void setTelemetryType(String telemetryType) { + this.telemetryType = telemetryType; + } + + public String getTelemetryType() { + return this.telemetryType; + } + + public void setProjection(String projection) { + this.projection = projection; + } + + public String getAggregation() { + return this.aggregation; + } + + public void setAggregation(String aggregation) { + this.aggregation = aggregation; + } + + public ArrayList getFilterGroups() { + return this.filterGroups; + } + + public void addFilterGroup(String fieldName, String predicate, String comparand) { + this.filterGroups.add(new FilterGroup(fieldName, predicate, comparand)); + } + } + + class FilterGroup { + private String fieldName; + private String operator; + private String comparand; + + public FilterGroup(String fieldName, String predicate, String comparand) { + this.setFieldName(fieldName); + this.setOperator(predicate); + this.setComparand(comparand); + } + + public String getFieldName() { + return this.fieldName; + } + + private void setFieldName(String fieldName) { + this.fieldName = fieldName; + } + + public String getOperator() { + return this.operator; + } + + private void setOperator(String operator) { + this.operator = operator; + } + + public String getComparand() { + return this.comparand; + } + + public void setComparand(String comparand) { + this.comparand = comparand; + } + } + +} diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinator.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinator.java index 57e3138827fc..96e302275881 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinator.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinator.java @@ -79,11 +79,14 @@ private long sendData() { case QP_IS_OFF: pingMode = true; + collector.flushOtelMetrics(); + QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo); return qpsServicePollingIntervalHintMillis > 0 ? qpsServicePollingIntervalHintMillis : waitBetweenPingsInMillis; case QP_IS_ON: + QuickPulseMetricReceiver.setQuickPulseHeaderInfo(currentQuickPulseHeaderInfo); return waitBetweenPostsInMillis; } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollector.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollector.java index f0fab42b0d4c..0c96c7fde141 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollector.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollector.java @@ -3,19 +3,10 @@ package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse; -import com.azure.monitor.opentelemetry.exporter.implementation.models.ContextTagKeys; -import com.azure.monitor.opentelemetry.exporter.implementation.models.MonitorDomain; -import com.azure.monitor.opentelemetry.exporter.implementation.models.RemoteDependencyData; -import com.azure.monitor.opentelemetry.exporter.implementation.models.RequestData; -import com.azure.monitor.opentelemetry.exporter.implementation.models.StackFrame; -import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryExceptionData; -import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryExceptionDetails; -import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem; -import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model.QuickPulseDependencyDocument; -import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model.QuickPulseDocument; -import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model.QuickPulseExceptionDocument; -import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model.QuickPulseRequestDocument; +import com.azure.monitor.opentelemetry.exporter.implementation.models.*; +import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model.*; import com.azure.monitor.opentelemetry.exporter.implementation.utils.CpuPerformanceCounterCalculator; +import io.opentelemetry.api.common.AttributeKey; import reactor.util.annotation.Nullable; import java.lang.management.ManagementFactory; @@ -23,10 +14,8 @@ import java.lang.management.MemoryUsage; import java.lang.management.OperatingSystemMXBean; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -39,16 +28,24 @@ final class QuickPulseDataCollector { private static final OperatingSystemMXBean operatingSystemMxBean = ManagementFactory.getOperatingSystemMXBean(); private final AtomicReference counters = new AtomicReference<>(null); + private final CpuPerformanceCounterCalculator cpuPerformanceCounterCalculator = getCpuPerformanceCounterCalculator(); + + private QuickPulseConfiguration quickPulseConfiguration; + + private OtelMetricsStorage metricsStorage = new OtelMetricsStorage(); + private final boolean useNormalizedValueForNonNormalizedCpuPercentage; private volatile QuickPulseStatus quickPulseStatus = QuickPulseStatus.QP_IS_OFF; private volatile Supplier instrumentationKeySupplier; - QuickPulseDataCollector(boolean useNormalizedValueForNonNormalizedCpuPercentage) { + QuickPulseDataCollector(boolean useNormalizedValueForNonNormalizedCpuPercentage, + QuickPulseConfiguration quickPulseConfiguration) { this.useNormalizedValueForNonNormalizedCpuPercentage = useNormalizedValueForNonNormalizedCpuPercentage; + this.quickPulseConfiguration = quickPulseConfiguration; } private static CpuPerformanceCounterCalculator getCpuPerformanceCounterCalculator() { @@ -57,6 +54,7 @@ private static CpuPerformanceCounterCalculator getCpuPerformanceCounterCalculato synchronized void disable() { counters.set(null); + metricsStorage.clearMetrics(); quickPulseStatus = QuickPulseStatus.QP_IS_OFF; } @@ -94,6 +92,32 @@ synchronized FinalCounters peek() { return null; } + void addOtelMetric(TelemetryItem telemetryItem) { + if (!isEnabled()) { + // quick pulse is not enabled or quick pulse data sender is not enabled + return; + } + + if (!telemetryItem.getInstrumentationKey().equals(instrumentationKeySupplier.get())) { + return; + } + + Float sampleRate = telemetryItem.getSampleRate(); + if (sampleRate != null && sampleRate == 0) { + // sampleRate should never be zero (how could it be captured if sampling set to zero percent?) + return; + } + + if (Objects.equals(telemetryItem.getResource().getAttribute(AttributeKey.stringKey("telemetry.sdk.name")), + "opentelemetry")) { + MonitorDomain data = telemetryItem.getData().getBaseData(); + MetricsData metricsData = (MetricsData) data; + MetricDataPoint point = metricsData.getMetrics().get(0); + this.metricsStorage.addMetric(point.getName(), point.getValue(), metricsData.getProperties()); + } + + } + void add(TelemetryItem telemetryItem) { if (!isEnabled()) { // quick pulse is not enabled or quick pulse data sender is not enabled @@ -310,6 +334,18 @@ private static int charToInt(char c) { return x; } + public ArrayList retrieveOtelMetrics() { + return metricsStorage.processMetrics(); + } + + public ConcurrentHashMap getOtelMetrics() { + return metricsStorage.getMetrics(); + } + + public void flushOtelMetrics() { + metricsStorage.clearMetrics(); + } + class FinalCounters { final int exceptions; @@ -410,4 +446,104 @@ static CountAndDuration decodeCountAndDuration(long countAndDuration) { return new CountAndDuration(countAndDuration >> 44, countAndDuration & MAX_DURATION); } } + + class OtelMetricsStorage { + private volatile ConcurrentHashMap metrics = new ConcurrentHashMap<>(); + // setting most amount of OTel metrics a user can stream to this. will review if users surpass threshold. + private final int maxMetricsLimit = 50; + + public void addMetric(String metricName, double value, Map dimensions) { + OTelMetric metric = metrics.get(metricName); + if (metric == null) { + // create new metric and add to metrics map + // (if we have reached the limit, we will block the metric from being created + if (metrics.size() <= maxMetricsLimit) { + metric = new OTelMetric(metricName); + metric.addDataPoint(value, dimensions == null ? new HashMap<>() : new HashMap<>(dimensions)); + metrics.putIfAbsent(metricName, metric); + } + } else { + metric.addDataPoint(value, dimensions == null ? new HashMap<>() : new HashMap<>(dimensions)); + } + } + + public ArrayList processMetrics() { + ConcurrentHashMap> requestedMetrics + = quickPulseConfiguration.getDerivedMetrics(); + ArrayList processedMetrics = new ArrayList<>(); + if (requestedMetrics.get("Metric") != null) { + for (QuickPulseConfiguration.DerivedMetricInfo metricInfo : requestedMetrics.get("Metric")) { + if (this.getMetrics().get(metricInfo.getProjection()) != null) { + QuickPulseMetrics processedMetric + = processMetric(this.getMetrics().get(metricInfo.getProjection()), metricInfo); + processedMetrics.add(processedMetric); + } + } + } + + this.clearMetrics(); + return processedMetrics; + } + + public QuickPulseMetrics processMetric(OTelMetric metric, + QuickPulseConfiguration.DerivedMetricInfo metricInfo) { + + if (metric.getDataPoints().isEmpty()) { + return new QuickPulseMetrics(metricInfo.getId(), 0, 1); + } + + String aggregation = metricInfo.getAggregation(); + ArrayList filteredValues = new ArrayList<>(); + for (OTelDataPoint dataPoint : metric.getDataPoints()) { + boolean passedFilter = true; + for (QuickPulseConfiguration.FilterGroup filterGroup : metricInfo.getFilterGroups()) { + String fieldName = filterGroup.getFieldName(); + String operator = filterGroup.getOperator(); + String comparand = filterGroup.getComparand(); + if (Objects.equals(operator, "Equal")) { + String fieldValue = dataPoint.getDimensions().get(fieldName); + if (fieldValue == null || !fieldValue.equals(comparand)) { + passedFilter = false; + break; + // Handle the case where the value is null or does not equal the operator + } + } + } + if (passedFilter) { + filteredValues.add(dataPoint.getValue()); + } + } + + switch (aggregation) { + case "Sum": + double sum = filteredValues.stream().mapToDouble(Double::doubleValue).sum(); + return new QuickPulseMetrics(metricInfo.getId(), sum, 1); + + case "Avg": + double avg = filteredValues.stream().mapToDouble(Double::doubleValue).average().orElse(0); + return new QuickPulseMetrics(metricInfo.getId(), avg, filteredValues.size()); + + case "Min": + double min = filteredValues.stream().mapToDouble(Double::doubleValue).min().orElse(0); + return new QuickPulseMetrics(metricInfo.getId(), min, 1); + + case "Max": + double max = filteredValues.stream().mapToDouble(Double::doubleValue).max().orElse(0); + return new QuickPulseMetrics(metricInfo.getId(), max, 1); + + default: + throw new IllegalArgumentException("Aggregation type not supported: " + aggregation); + } + } + + public void clearMetrics() { + this.metrics.clear(); + } + + //for testing + public ConcurrentHashMap getMetrics() { + return metrics; + } + + } } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcher.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcher.java index bbd50a601fc8..a2d7ff4e5f67 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcher.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcher.java @@ -28,6 +28,7 @@ class QuickPulseDataFetcher { private final ArrayBlockingQueue sendQueue; private final QuickPulseNetworkHelper networkHelper = new QuickPulseNetworkHelper(); + private QuickPulseConfiguration quickPulseConfiguration; private final Supplier endpointUrl; private final Supplier instrumentationKey; @@ -40,7 +41,7 @@ class QuickPulseDataFetcher { public QuickPulseDataFetcher(QuickPulseDataCollector collector, ArrayBlockingQueue sendQueue, Supplier endpointUrl, Supplier instrumentationKey, String roleName, String instanceName, - String machineName, String quickPulseId) { + String machineName, String quickPulseId, QuickPulseConfiguration quickPulseConfiguration) { this.collector = collector; this.sendQueue = sendQueue; this.endpointUrl = endpointUrl; @@ -49,6 +50,7 @@ public QuickPulseDataFetcher(QuickPulseDataCollector collector, ArrayBlockingQue this.instanceName = instanceName; this.machineName = machineName; this.quickPulseId = quickPulseId; + this.quickPulseConfiguration = quickPulseConfiguration; sdkVersion = getCurrentSdkVersion(); } @@ -74,7 +76,8 @@ public void prepareQuickPulseDataForSend(String redirectedEndpoint) { Date currentDate = new Date(); String endpointPrefix = Strings.isNullOrEmpty(redirectedEndpoint) ? getQuickPulseEndpoint() : redirectedEndpoint; - HttpRequest request = networkHelper.buildRequest(currentDate, this.getEndpointUrl(endpointPrefix)); + HttpRequest request = networkHelper.buildRequest(currentDate, this.getEndpointUrl(endpointPrefix), + quickPulseConfiguration.getEtag()); request.setBody(buildPostEntity(counters)); if (!sendQueue.offer(request)) { @@ -121,15 +124,15 @@ private String buildPostEntity(QuickPulseDataCollector.FinalCounters counters) t postEnvelope.setStreamId(quickPulseId); postEnvelope.setVersion(sdkVersion); postEnvelope.setTimeStamp("/Date(" + System.currentTimeMillis() + ")/"); - postEnvelope.setMetrics(addMetricsToQuickPulseEnvelope(counters)); + postEnvelope.setMetrics(addMetricsToQuickPulseEnvelope(counters, collector.retrieveOtelMetrics())); envelopes.add(postEnvelope); - // By default '/' is not escaped in JSON, so we need to escape it manually as the backend requires it. - return postEnvelope.toJsonString().replace("/", "\\/"); + //By default, '/' is not escaped in JSON, so we need to escape it manually as the backend requires it. + return "[" + postEnvelope.toJsonString().replace("/", "\\/") + "]"; } - private static List - addMetricsToQuickPulseEnvelope(QuickPulseDataCollector.FinalCounters counters) { + private static List addMetricsToQuickPulseEnvelope( + QuickPulseDataCollector.FinalCounters counters, ArrayList otelMetrics) { List metricsList = new ArrayList<>(); metricsList.add(new QuickPulseMetrics("\\ApplicationInsights\\Requests/Sec", counters.requests, 1)); if (counters.requests != 0) { @@ -153,6 +156,7 @@ private String buildPostEntity(QuickPulseDataCollector.FinalCounters counters) t metricsList.add(new QuickPulseMetrics("\\Memory\\Committed Bytes", counters.memoryCommitted, 1)); metricsList.add(new QuickPulseMetrics("\\Processor(_Total)\\% Processor Time", counters.cpuUsage, 1)); + metricsList.addAll(otelMetrics); return metricsList; } } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataSender.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataSender.java index 1183d0175c4c..02c661bc5d5b 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataSender.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataSender.java @@ -9,22 +9,28 @@ import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; +import java.util.ArrayList; +import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; class QuickPulseDataSender implements Runnable { private static final ClientLogger logger = new ClientLogger(QuickPulseCoordinator.class); private final QuickPulseNetworkHelper networkHelper = new QuickPulseNetworkHelper(); + private QuickPulseConfiguration quickPulseConfiguration; private final HttpPipeline httpPipeline; private volatile QuickPulseHeaderInfo quickPulseHeaderInfo; private long lastValidTransmission = 0; private final ArrayBlockingQueue sendQueue; - QuickPulseDataSender(HttpPipeline httpPipeline, ArrayBlockingQueue sendQueue) { + QuickPulseDataSender(HttpPipeline httpPipeline, ArrayBlockingQueue sendQueue, + QuickPulseConfiguration quickPulseConfiguration) { this.httpPipeline = httpPipeline; this.sendQueue = sendQueue; + this.quickPulseConfiguration = quickPulseConfiguration; } @Override @@ -58,6 +64,12 @@ public void run() { case QP_IS_ON: lastValidTransmission = sendTime; this.quickPulseHeaderInfo = quickPulseHeaderInfo; + String etagValue = networkHelper.getEtagHeaderValue(response); + if (!Objects.equals(etagValue, quickPulseConfiguration.getEtag())) { + ConcurrentHashMap> otelMetrics + = quickPulseConfiguration.parseDerivedMetrics(response); + quickPulseConfiguration.updateConfig(etagValue, otelMetrics); + } break; case ERROR: @@ -65,6 +77,7 @@ public void run() { break; } } + } catch (Throwable t) { logger.error("QuickPulseDataSender failed to send a request", t); } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseMetricReader.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseMetricReader.java new file mode 100644 index 000000000000..b8131ef17964 --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseMetricReader.java @@ -0,0 +1,41 @@ +package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.CollectionRegistration; +import io.opentelemetry.sdk.metrics.export.MetricReader; + +import java.util.Collection; + +public class QuickPulseMetricReader implements MetricReader { + + private volatile CollectionRegistration collectionRegistration = CollectionRegistration.noop(); + + @Override + public void register(CollectionRegistration registration) { + // this should get (once) called when the OpenTelemetry SDK is created + collectionRegistration = registration; + } + + @Override + public CompletableResultCode forceFlush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + + // this will be called on-demand from Quick Pulse code + Collection collectAllMetrics() { + return collectionRegistration.collectAllMetrics(); + } + + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return AggregationTemporality.DELTA; + } +} diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseMetricReceiver.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseMetricReceiver.java new file mode 100644 index 000000000000..51f31bd8d64f --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseMetricReceiver.java @@ -0,0 +1,69 @@ +package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse; + +import com.azure.monitor.opentelemetry.exporter.implementation.MetricDataMapper; +import com.azure.monitor.opentelemetry.exporter.implementation.logging.OperationLogger; +import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem; +import io.opentelemetry.sdk.metrics.data.MetricData; + +import java.util.Collection; +import java.util.function.Consumer; + +import static com.azure.monitor.opentelemetry.exporter.implementation.utils.AzureMonitorMsgId.EXPORTER_MAPPING_ERROR; + +public class QuickPulseMetricReceiver implements Runnable { + + private static QuickPulseHeaderInfo quickPulseHeaderInfo; + private QuickPulseMetricReader quickPulseMetricReader; + private QuickPulseDataCollector collector; + private static final OperationLogger metricReceiverLogger + = new OperationLogger(QuickPulseMetricReceiver.class, "Exporting metric"); + private final MetricDataMapper mapper; + private final Consumer quickPulseConsumer; + + public QuickPulseMetricReceiver(QuickPulseMetricReader quickPulseMetricReader, MetricDataMapper metricDataMapper, + QuickPulseDataCollector collector) { + this.quickPulseMetricReader = quickPulseMetricReader; + this.mapper = metricDataMapper; + this.collector = collector; + this.quickPulseConsumer = telemetryItem -> { + if (this.collector.isEnabled()) { + this.collector.addOtelMetric(telemetryItem); + } + }; + } + + public static synchronized QuickPulseHeaderInfo getQuickPulseHeaderInfo() { + return quickPulseHeaderInfo; + } + + public static synchronized void setQuickPulseHeaderInfo(QuickPulseHeaderInfo info) { + quickPulseHeaderInfo = info; + } + + @Override + public void run() { + while (true) { + Collection metrics = quickPulseMetricReader.collectAllMetrics(); + QuickPulseHeaderInfo headerInfo = getQuickPulseHeaderInfo(); + + if (headerInfo == null || headerInfo.getQuickPulseStatus() != QuickPulseStatus.QP_IS_ON) { + continue; + } + + for (MetricData metricData : metrics) { + try { + mapper.mapMetrics(metricData, quickPulseConsumer); + metricReceiverLogger.recordSuccess(); + } catch (Throwable t) { + metricReceiverLogger.recordFailure(t.getMessage(), t, EXPORTER_MAPPING_ERROR); + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } +} diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelper.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelper.java index 3f4f5dabd545..f41b7006541a 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelper.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelper.java @@ -5,11 +5,11 @@ import com.azure.core.http.HttpHeaderName; import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpHeader; import com.azure.core.http.HttpMethod; import com.azure.core.http.HttpRequest; import com.azure.core.http.HttpResponse; import com.azure.monitor.opentelemetry.exporter.implementation.utils.Strings; - import java.util.Date; final class QuickPulseNetworkHelper { @@ -26,11 +26,13 @@ final class QuickPulseNetworkHelper { private static final HttpHeaderName QPS_STREAM_ID = HttpHeaderName.fromString("x-ms-qps-stream-id"); private static final HttpHeaderName QPS_INSTANCE_NAME = HttpHeaderName.fromString("x-ms-qps-instance-name"); private static final HttpHeaderName QPS_INVARIANT_VERSION = HttpHeaderName.fromString("x-ms-qps-invariant-version"); + private static final HttpHeaderName QPS_CONFIGURATION_ETAG_HEADER + = HttpHeaderName.fromString("x-ms-qps-configuration-etag"); HttpRequest buildPingRequest(Date currentDate, String address, String quickPulseId, String machineName, String roleName, String instanceName) { - HttpRequest request = buildRequest(currentDate, address); + HttpRequest request = buildRequest(currentDate, address, ""); request.setHeader(QPS_ROLE_NAME, roleName); request.setHeader(QPS_MACHINE_NAME, machineName); request.setHeader(QPS_STREAM_ID, quickPulseId); @@ -39,11 +41,12 @@ HttpRequest buildPingRequest(Date currentDate, String address, String quickPulse return request; } - HttpRequest buildRequest(Date currentDate, String address) { + HttpRequest buildRequest(Date currentDate, String address, String etag) { long ticks = currentDate.getTime() * 10000 + TICKS_AT_EPOCH; HttpRequest request = new HttpRequest(HttpMethod.POST, address); request.setHeader(HEADER_TRANSMISSION_TIME, String.valueOf(ticks)); + request.setHeader(QPS_CONFIGURATION_ETAG_HEADER, etag); return request; } @@ -73,4 +76,11 @@ QuickPulseHeaderInfo getQuickPulseHeaderInfo(HttpResponse response) { return new QuickPulseHeaderInfo(status, headers.getValue(QPS_SERVICE_ENDPOINT_REDIRECT), servicePollingIntervalHint); } + + String getEtagHeaderValue(HttpResponse response) { + HttpHeaders headers = response.getHeaders(); + HttpHeader etagHeader = headers.get(QPS_CONFIGURATION_ETAG_HEADER); + return etagHeader != null ? etagHeader.getValue() : null; + } + } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSender.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSender.java index 3fc31321f3be..d6696e3a3434 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSender.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSender.java @@ -16,7 +16,9 @@ import java.io.IOException; import java.net.URL; +import java.util.ArrayList; import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -35,6 +37,7 @@ class QuickPulsePingSender { private final HttpPipeline httpPipeline; private final QuickPulseNetworkHelper networkHelper = new QuickPulseNetworkHelper(); + private QuickPulseConfiguration quickPulseConfiguration; private volatile QuickPulseEnvelope pingEnvelope; // cached for performance private final Supplier endpointUrl; @@ -47,7 +50,8 @@ class QuickPulsePingSender { private final String sdkVersion; QuickPulsePingSender(HttpPipeline httpPipeline, Supplier endpointUrl, Supplier instrumentationKey, - String roleName, String instanceName, String machineName, String quickPulseId, String sdkVersion) { + String roleName, String instanceName, String machineName, String quickPulseId, String sdkVersion, + QuickPulseConfiguration quickPulseConfiguration) { this.httpPipeline = httpPipeline; this.endpointUrl = endpointUrl; this.instrumentationKey = instrumentationKey; @@ -56,6 +60,7 @@ class QuickPulsePingSender { this.machineName = machineName; this.quickPulseId = quickPulseId; this.sdkVersion = sdkVersion; + this.quickPulseConfiguration = quickPulseConfiguration; } QuickPulseHeaderInfo ping(String redirectedEndpoint) { @@ -88,6 +93,12 @@ QuickPulseHeaderInfo ping(String redirectedEndpoint) { case QP_IS_OFF: case QP_IS_ON: lastValidTransmission = sendTime; + String etagValue = networkHelper.getEtagHeaderValue(response); + if (etagValue != null) { + ConcurrentHashMap> otelMetrics + = quickPulseConfiguration.parseDerivedMetrics(response); + quickPulseConfiguration.updateConfig(etagValue, otelMetrics); + } operationLogger.recordSuccess(); return quickPulseHeaderInfo; @@ -102,6 +113,7 @@ QuickPulseHeaderInfo ping(String redirectedEndpoint) { } } finally { if (response != null) { + // need to consume the body or close the response, otherwise get netty ByteBuf leak // warnings: // io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/model/OTelDataPoint.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/model/OTelDataPoint.java new file mode 100644 index 000000000000..24fbecc708bb --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/model/OTelDataPoint.java @@ -0,0 +1,30 @@ +package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model; + +import java.util.HashMap; + +public class OTelDataPoint { + private double value; + private HashMap dimensions; + + public OTelDataPoint(double value, HashMap dimensions) { + this.value = value; + this.dimensions = dimensions; + } + + public double getValue() { + return value; + } + + public void setValue(double value) { + this.value = value; + } + + public HashMap getDimensions() { + return dimensions; + } + + public void setDimensions(HashMap dimensions) { + this.dimensions = dimensions; + } + +} diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/model/OTelMetric.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/model/OTelMetric.java new file mode 100644 index 000000000000..d51a1b3db9da --- /dev/null +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/main/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/model/OTelMetric.java @@ -0,0 +1,37 @@ +package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model; + +import java.util.ArrayList; +import java.util.HashMap; + +public class OTelMetric { + + private String name; + private ArrayList dataPoints; + + public OTelMetric(String name) { + this.name = name; + this.dataPoints = new ArrayList<>(); + } + + public void addDataPoint(double value, HashMap dimensions) { + OTelDataPoint dataPoint = new OTelDataPoint(value, dimensions); + this.dataPoints.add(dataPoint); + } + + public String getName() { + return name; + } + + public ArrayList getDataPoints() { + return dataPoints; + } + + public ArrayList getDataValues() { + ArrayList values = new ArrayList<>(); + for (OTelDataPoint dataPoint : dataPoints) { + values.add(dataPoint.getValue()); + } + return values; + } + +} diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinatorTest.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinatorTest.java index 101bde0c6f98..d3d997b40dd6 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinatorTest.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseCoordinatorTest.java @@ -19,7 +19,8 @@ void testOnlyPings() throws InterruptedException { QuickPulseDataFetcher mockFetcher = mock(QuickPulseDataFetcher.class); QuickPulseDataSender mockSender = mock(QuickPulseDataSender.class); QuickPulsePingSender mockPingSender = mock(QuickPulsePingSender.class); - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, quickPulseConfiguration); Mockito.doReturn(new QuickPulseHeaderInfo(QuickPulseStatus.QP_IS_OFF)).when(mockPingSender).ping(null); QuickPulseCoordinatorInitData initData = new QuickPulseCoordinatorInitDataBuilder().withDataFetcher(mockFetcher) @@ -49,6 +50,7 @@ void testOnlyPings() throws InterruptedException { Mockito.verify(mockPingSender, Mockito.atLeast(1)).ping(null); // make sure QP_IS_OFF after ping assertThat(collector.getQuickPulseStatus()).isEqualTo(QuickPulseStatus.QP_IS_OFF); + assertThat(quickPulseConfiguration.getEtag()).isNull(); } @Test @@ -63,8 +65,8 @@ void testOnePingAndThenOnePost() throws InterruptedException { Mockito.when(mockPingSender.ping(null)) .thenReturn(new QuickPulseHeaderInfo(QuickPulseStatus.QP_IS_ON), new QuickPulseHeaderInfo(QuickPulseStatus.QP_IS_OFF)); - - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, quickPulseConfiguration); QuickPulseCoordinatorInitData initData = new QuickPulseCoordinatorInitDataBuilder().withDataFetcher(mockFetcher) .withDataSender(mockSender) .withPingSender(mockPingSender) @@ -92,6 +94,7 @@ void testOnePingAndThenOnePost() throws InterruptedException { Mockito.verify(mockPingSender, Mockito.atLeast(1)).ping(null); // Make sure QP_IS_OFF after one post and ping assertThat(collector.getQuickPulseStatus()).isEqualTo(QuickPulseStatus.QP_IS_OFF); + assertThat(quickPulseConfiguration.getEtag()).isNull(); } @Disabled("sporadically failing on CI") @@ -100,6 +103,7 @@ void testOnePingAndThenOnePostWithRedirectedLink() throws InterruptedException { QuickPulseDataFetcher mockFetcher = Mockito.mock(QuickPulseDataFetcher.class); QuickPulseDataSender mockSender = Mockito.mock(QuickPulseDataSender.class); QuickPulsePingSender mockPingSender = Mockito.mock(QuickPulsePingSender.class); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); Mockito.doNothing().when(mockFetcher).prepareQuickPulseDataForSend(notNull()); Mockito.doReturn(new QuickPulseHeaderInfo(QuickPulseStatus.QP_IS_ON, "https://new.endpoint.com", 100)) @@ -112,7 +116,7 @@ void testOnePingAndThenOnePostWithRedirectedLink() throws InterruptedException { QuickPulseCoordinatorInitData initData = new QuickPulseCoordinatorInitDataBuilder().withDataFetcher(mockFetcher) .withDataSender(mockSender) .withPingSender(mockPingSender) - .withCollector(new QuickPulseDataCollector(true)) + .withCollector(new QuickPulseDataCollector(true, quickPulseConfiguration)) .withWaitBetweenPingsInMillis(10L) .withWaitBetweenPostsInMillis(10L) .withWaitOnErrorInMillis(10L) @@ -131,5 +135,6 @@ void testOnePingAndThenOnePostWithRedirectedLink() throws InterruptedException { Mockito.verify(mockFetcher, Mockito.atLeast(1)).prepareQuickPulseDataForSend("https://new.endpoint.com"); Mockito.verify(mockPingSender, Mockito.atLeast(1)).ping(null); Mockito.verify(mockPingSender, Mockito.times(2)).ping("https://new.endpoint.com"); + assertThat(quickPulseConfiguration.getEtag()).isNull(); } } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollectorTests.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollectorTests.java index c9d5bdf7905c..86325d57e8eb 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollectorTests.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataCollectorTests.java @@ -5,11 +5,20 @@ import com.azure.monitor.opentelemetry.exporter.implementation.builders.ExceptionTelemetryBuilder; import com.azure.monitor.opentelemetry.exporter.implementation.configuration.ConnectionString; +import com.azure.monitor.opentelemetry.exporter.implementation.models.MetricDataPoint; +import com.azure.monitor.opentelemetry.exporter.implementation.models.MetricsData; +import com.azure.monitor.opentelemetry.exporter.implementation.models.MonitorBase; import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem; +import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model.OTelMetric; +import com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.model.QuickPulseMetrics; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.resources.Resource; import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.ArrayList; import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; import static com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulseTestBase.createRemoteDependencyTelemetry; import static com.azure.monitor.opentelemetry.exporter.implementation.quickpulse.QuickPulseTestBase.createRequestTelemetry; @@ -23,30 +32,33 @@ class QuickPulseDataCollectorTests { @Test void initialStateIsDisabled() { - assertThat(new QuickPulseDataCollector(true).peek()).isNull(); + assertThat(new QuickPulseDataCollector(true, new QuickPulseConfiguration()).peek()).isNull(); } @Test void emptyCountsAndDurationsAfterEnable() { - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, new QuickPulseConfiguration()); collector.enable(FAKE_CONNECTION_STRING::getInstrumentationKey); QuickPulseDataCollector.FinalCounters counters = collector.peek(); assertCountersReset(counters); + ArrayList storedMetrics = collector.retrieveOtelMetrics(); + assertThat(storedMetrics).isEmpty(); } @Test void nullCountersAfterDisable() { - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, new QuickPulseConfiguration()); collector.enable(FAKE_CONNECTION_STRING::getInstrumentationKey); collector.disable(); assertThat(collector.peek()).isNull(); + assertThat(collector.retrieveOtelMetrics()).isEmpty(); } @Test void requestTelemetryIsCounted_DurationIsSum() { - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, new QuickPulseConfiguration()); collector.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON); collector.enable(FAKE_CONNECTION_STRING::getInstrumentationKey); @@ -88,7 +100,7 @@ void requestTelemetryIsCounted_DurationIsSum() { @Test void dependencyTelemetryIsCounted_DurationIsSum() { - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, new QuickPulseConfiguration()); collector.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON); collector.enable(FAKE_CONNECTION_STRING::getInstrumentationKey); @@ -130,7 +142,7 @@ void dependencyTelemetryIsCounted_DurationIsSum() { @Test void exceptionTelemetryIsCounted() { - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, new QuickPulseConfiguration()); collector.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON); collector.enable(FAKE_CONNECTION_STRING::getInstrumentationKey); @@ -150,6 +162,44 @@ void exceptionTelemetryIsCounted() { assertCountersReset(collector.peek()); } + @Test + void openTelemetryMetricsAreCounted() { + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, new QuickPulseConfiguration()); + + collector.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON); + collector.enable(FAKE_CONNECTION_STRING::getInstrumentationKey); + + TelemetryItem telemetry = new TelemetryItem(); + telemetry.setConnectionString(FAKE_CONNECTION_STRING); + MonitorBase data = new MonitorBase(); + MetricDataPoint point = new MetricDataPoint(); + point.setName("TestMetric"); + point.setValue(123.456); + ArrayList metricsList = new ArrayList<>(); + metricsList.add(point); + data.setBaseData(new MetricsData().setMetrics(metricsList)); + telemetry.setData(data); + Attributes attributes = Attributes.builder().put("telemetry.sdk.name", "opentelemetry").build(); + Resource resource = Resource.create(attributes); + telemetry.setResource(resource); + collector.addOtelMetric(telemetry); + ConcurrentHashMap storedMetrics = collector.getOtelMetrics(); + assertThat(storedMetrics.size()).isEqualTo(1); + assertThat(storedMetrics.containsKey("TestMetric")).isTrue(); + assertThat(storedMetrics.get("TestMetric").getDataValues().get(0)).isEqualTo(123.456); + + point.setName("TestMetric2"); + point.setValue(789.012); + collector.addOtelMetric(telemetry); + storedMetrics = collector.getOtelMetrics(); + assertThat(storedMetrics.size()).isEqualTo(2); + assertThat(storedMetrics.containsKey("TestMetric2")).isTrue(); + assertThat(storedMetrics.get("TestMetric2").getDataValues().get(0)).isEqualTo(789.012); + + collector.flushOtelMetrics(); + assertThat(collector.getOtelMetrics().size()).isEqualTo(0); + } + @Test void encodeDecodeIsIdentity() { long count = 456L; @@ -204,7 +254,7 @@ private static void assertCountersReset(QuickPulseDataCollector.FinalCounters co @Test void checkDocumentsListSize() { - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, new QuickPulseConfiguration()); collector.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON); collector.enable(FAKE_CONNECTION_STRING::getInstrumentationKey); diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcherTests.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcherTests.java index 7ea1bad69583..6c45d54044c1 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcherTests.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseDataFetcherTests.java @@ -24,8 +24,10 @@ class QuickPulseDataFetcherTests { @Test void testGetCurrentSdkVersion() { ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=testing-123"); - QuickPulseDataFetcher dataFetcher = new QuickPulseDataFetcher(new QuickPulseDataCollector(true), null, - connectionString::getLiveEndpoint, connectionString::getInstrumentationKey, null, null, null, null); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); + QuickPulseDataFetcher dataFetcher = new QuickPulseDataFetcher( + new QuickPulseDataCollector(true, quickPulseConfiguration), null, connectionString::getLiveEndpoint, + connectionString::getInstrumentationKey, null, null, null, null, quickPulseConfiguration); String sdkVersion = dataFetcher.getCurrentSdkVersion(); assertThat(sdkVersion).isNotNull(); assertThat(sdkVersion).isNotEqualTo("java:unknown"); @@ -34,8 +36,10 @@ void testGetCurrentSdkVersion() { @Test void endpointIsFormattedCorrectlyWhenUsingConfig() throws URISyntaxException { ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=testing-123"); - QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher(new QuickPulseDataCollector(true), null, - connectionString::getLiveEndpoint, connectionString::getInstrumentationKey, null, null, null, null); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); + QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher( + new QuickPulseDataCollector(true, quickPulseConfiguration), null, connectionString::getLiveEndpoint, + connectionString::getInstrumentationKey, null, null, null, null, quickPulseConfiguration); String quickPulseEndpoint = quickPulseDataFetcher.getQuickPulseEndpoint(); String endpointUrl = quickPulseDataFetcher.getEndpointUrl(quickPulseEndpoint); URI uri = new URI(endpointUrl); @@ -47,8 +51,10 @@ void endpointIsFormattedCorrectlyWhenUsingConfig() throws URISyntaxException { @Test void endpointIsFormattedCorrectlyWhenConfigIsNull() throws URISyntaxException { ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=testing-123"); - QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher(new QuickPulseDataCollector(true), null, - connectionString::getLiveEndpoint, connectionString::getInstrumentationKey, null, null, null, null); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); + QuickPulseDataFetcher quickPulseDataFetcher = new QuickPulseDataFetcher( + new QuickPulseDataCollector(true, quickPulseConfiguration), null, connectionString::getLiveEndpoint, + connectionString::getInstrumentationKey, null, null, null, null, quickPulseConfiguration); String quickPulseEndpoint = quickPulseDataFetcher.getQuickPulseEndpoint(); String endpointUrl = quickPulseDataFetcher.getEndpointUrl(quickPulseEndpoint); URI uri = new URI(endpointUrl); @@ -63,18 +69,21 @@ void endpointChangesWithRedirectHeaderAndGetNewPingInterval() { headers.put("x-ms-qps-service-polling-interval-hint", "1000"); headers.put("x-ms-qps-service-endpoint-redirect-v2", "https://new.endpoint.com"); headers.put("x-ms-qps-subscribed", "true"); + headers.put("x-ms-qps-configuration-etag", ""); HttpHeaders httpHeaders = new HttpHeaders(headers); ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=testing-123"); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); HttpPipeline httpPipeline = new HttpPipelineBuilder() .httpClient(request -> Mono.just(new MockHttpResponse(request, 200, httpHeaders))) .tracer(new NoopTracer()) .build(); - QuickPulsePingSender quickPulsePingSender - = new QuickPulsePingSender(httpPipeline, connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion"); + QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(httpPipeline, + connectionString::getLiveEndpoint, connectionString::getInstrumentationKey, null, "instance1", "machine1", + "qpid123", "testSdkVersion", quickPulseConfiguration); QuickPulseHeaderInfo quickPulseHeaderInfo = quickPulsePingSender.ping(null); assertThat(QuickPulseStatus.QP_IS_ON).isEqualTo(quickPulseHeaderInfo.getQuickPulseStatus()); assertThat(1000).isEqualTo(quickPulseHeaderInfo.getQpsServicePollingInterval()); assertThat("https://new.endpoint.com").isEqualTo(quickPulseHeaderInfo.getQpsServiceEndpointRedirect()); + assertThat(quickPulseConfiguration.getEtag()).isEqualTo(""); } } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseIntegrationTests.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseIntegrationTests.java index 073fd544dd2d..eb4daa1e7079 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseIntegrationTests.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseIntegrationTests.java @@ -25,25 +25,30 @@ public class QuickPulseIntegrationTests extends QuickPulseTestBase { private static final ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=ikey123"); private static final String instrumentationKey = "ikey123"; - private QuickPulsePingSender getQuickPulsePingSender() { + private QuickPulsePingSender getQuickPulsePingSender(QuickPulseConfiguration quickPulseConfiguration) { return new QuickPulsePingSender(getHttpPipeline(), connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion"); + connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion", + quickPulseConfiguration); } - private QuickPulsePingSender getQuickPulsePingSenderWithAuthentication() { + private QuickPulsePingSender + getQuickPulsePingSenderWithAuthentication(QuickPulseConfiguration quickPulseConfiguration) { return new QuickPulsePingSender(getHttpPipelineWithAuthentication(), connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion"); + connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion", + quickPulseConfiguration); } - private QuickPulsePingSender getQuickPulsePingSenderWithValidator(HttpPipelinePolicy validator) { + private QuickPulsePingSender getQuickPulsePingSenderWithValidator(HttpPipelinePolicy validator, + QuickPulseConfiguration quickPulseConfiguration) { return new QuickPulsePingSender(getHttpPipeline(validator), connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion"); + connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion", + quickPulseConfiguration); } @Disabled @Test public void testPing() { - QuickPulsePingSender quickPulsePingSender = getQuickPulsePingSender(); + QuickPulsePingSender quickPulsePingSender = getQuickPulsePingSender(new QuickPulseConfiguration()); QuickPulseHeaderInfo quickPulseHeaderInfo = quickPulsePingSender.ping(null); assertThat(quickPulseHeaderInfo.getQuickPulseStatus()).isEqualTo(QuickPulseStatus.QP_IS_ON); } @@ -51,7 +56,8 @@ public void testPing() { @Disabled @Test public void testPingWithAuthentication() { - QuickPulsePingSender quickPulsePingSender = getQuickPulsePingSenderWithAuthentication(); + QuickPulsePingSender quickPulsePingSender + = getQuickPulsePingSenderWithAuthentication(new QuickPulseConfiguration()); QuickPulseHeaderInfo quickPulseHeaderInfo = quickPulsePingSender.ping(null); assertThat(quickPulseHeaderInfo.getQuickPulseStatus()).isEqualTo(QuickPulseStatus.QP_IS_ON); } @@ -62,8 +68,8 @@ public void testPingRequestBody() throws InterruptedException { CountDownLatch pingCountDown = new CountDownLatch(1); String expectedRequestBody = "\\{\"Documents\":null,\"InstrumentationKey\":null,\"Metrics\":null,\"InvariantVersion\":1,\"Timestamp\":\"\\\\/Date\\(\\d+\\)\\\\/\",\"Version\":\"testSdkVersion\",\"StreamId\":\"qpid123\",\"MachineName\":\"machine1\",\"Instance\":\"instance1\",\"RoleName\":null\\}"; - QuickPulsePingSender quickPulsePingSender - = getQuickPulsePingSenderWithValidator(new ValidationPolicy(pingCountDown, expectedRequestBody)); + QuickPulsePingSender quickPulsePingSender = getQuickPulsePingSenderWithValidator( + new ValidationPolicy(pingCountDown, expectedRequestBody), new QuickPulseConfiguration()); QuickPulseHeaderInfo quickPulseHeaderInfo = quickPulsePingSender.ping(null); assertThat(quickPulseHeaderInfo.getQuickPulseStatus()).isEqualTo(QuickPulseStatus.QP_IS_ON); assertTrue(pingCountDown.await(60, TimeUnit.SECONDS)); @@ -76,21 +82,23 @@ public void testPostRequest() throws InterruptedException { CountDownLatch pingCountDown = new CountDownLatch(1); CountDownLatch postCountDown = new CountDownLatch(1); Date currDate = new Date(); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); String expectedPingRequestBody = "\\{\"Documents\":null,\"InstrumentationKey\":null,\"Metrics\":null,\"InvariantVersion\":1,\"Timestamp\":\"\\\\/Date\\(\\d+\\)\\\\/\",\"Version\":\"testSdkVersion\",\"StreamId\":\"qpid123\",\"MachineName\":\"machine1\",\"Instance\":\"instance1\",\"RoleName\":null\\}"; String expectedPostRequestBody = "\\[\\{\"Documents\":\\[\\{\"__type\":\"RequestTelemetryDocument\",\"DocumentType\":\"Request\",\"Version\":\"1.0\",\"OperationId\":null,\"Properties\":\\{\"customProperty\":\"customValue\"\\},\"Name\":\"request-test\",\"Success\":true,\"Duration\":\"PT.*S\",\"ResponseCode\":\"200\",\"OperationName\":null,\"Url\":\"foo\"\\},\\{\"__type\":\"DependencyTelemetryDocument\",\"DocumentType\":\"RemoteDependency\",\"Version\":\"1.0\",\"OperationId\":null,\"Properties\":\\{\"customProperty\":\"customValue\"\\},\"Name\":\"dep-test\",\"Target\":null,\"Success\":true,\"Duration\":\"PT.*S\",\"ResultCode\":null,\"CommandName\":\"dep-test-cmd\",\"DependencyTypeName\":null,\"OperationName\":null\\},\\{\"__type\":\"ExceptionTelemetryDocument\",\"DocumentType\":\"Exception\",\"Version\":\"1.0\",\"OperationId\":null,\"Properties\":null,\"Exception\":\"\",\"ExceptionMessage\":\"test\",\"ExceptionType\":\"java.lang.Exception\"\\}\\],\"InstrumentationKey\":\"" + instrumentationKey + "\",\"Metrics\":\\[\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Requests\\\\\\/Sec\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Request Duration\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Requests Failed\\\\\\/Sec\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Requests Succeeded\\\\\\/Sec\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Dependency Calls\\\\\\/Sec\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Dependency Call Duration\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Dependency Calls Failed\\\\\\/Sec\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Dependency Calls Succeeded\\\\\\/Sec\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\ApplicationInsights\\\\\\\\Exceptions\\\\\\/Sec\",\"Value\":[0-9.]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\Memory\\\\\\\\Committed Bytes\",\"Value\":[0-9.E]+,\"Weight\":\\d+\\},\\{\"Name\":\"\\\\\\\\Processor\\(_Total\\)\\\\\\\\% Processor Time\",\"Value\":-?[0-9.]+,\"Weight\":\\d+\\}\\],\"InvariantVersion\":1,\"Timestamp\":\"\\\\\\/Date\\(\\d+\\)\\\\\\/\",\"Version\":\"[^\"]*\",\"StreamId\":null,\"MachineName\":\"machine1\",\"Instance\":\"instance1\",\"RoleName\":null\\}\\]"; - QuickPulsePingSender pingSender - = getQuickPulsePingSenderWithValidator(new ValidationPolicy(pingCountDown, expectedPingRequestBody)); + QuickPulsePingSender pingSender = getQuickPulsePingSenderWithValidator( + new ValidationPolicy(pingCountDown, expectedPingRequestBody), quickPulseConfiguration); QuickPulseHeaderInfo quickPulseHeaderInfo = pingSender.ping(null); - QuickPulseDataSender dataSender = new QuickPulseDataSender( - getHttpPipeline(new ValidationPolicy(postCountDown, expectedPostRequestBody)), sendQueue); - QuickPulseDataCollector collector = new QuickPulseDataCollector(true); + QuickPulseDataSender dataSender + = new QuickPulseDataSender(getHttpPipeline(new ValidationPolicy(postCountDown, expectedPostRequestBody)), + sendQueue, quickPulseConfiguration); + QuickPulseDataCollector collector = new QuickPulseDataCollector(true, quickPulseConfiguration); QuickPulseDataFetcher dataFetcher = new QuickPulseDataFetcher(collector, sendQueue, connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, "instance1", "machine1", null); + connectionString::getInstrumentationKey, null, "instance1", "machine1", null, quickPulseConfiguration); collector.setQuickPulseStatus(QuickPulseStatus.QP_IS_ON); collector.enable(connectionString::getInstrumentationKey); diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelperTest.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelperTest.java index 526deeb3518a..ae5d6e293d2f 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelperTest.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulseNetworkHelperTest.java @@ -3,6 +3,8 @@ package com.azure.monitor.opentelemetry.exporter.implementation.quickpulse; +import com.azure.core.http.HttpHeaderName; +import com.azure.core.http.HttpHeaders; import com.azure.core.http.HttpResponse; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -28,4 +30,15 @@ void testIsSuccessWith500() { boolean result = new QuickPulseNetworkHelper().isSuccess(response); assertThat(result).isFalse(); } + + @Test + void testGetEtagHeader() { + HttpResponse response = mock(HttpResponse.class); + HttpHeaders headers = new HttpHeaders(); + HttpHeaderName QPS_CONFIGURATION_ETAG_HEADER_NAME = HttpHeaderName.fromString("x-ms-qps-configuration-etag"); + headers.add(QPS_CONFIGURATION_ETAG_HEADER_NAME, "0::randometag::1::"); + Mockito.doReturn(headers).when(response).getHeaders(); + String result = new QuickPulseNetworkHelper().getEtagHeaderValue(response); + assertThat(result).isEqualTo("0::randometag::1::"); + } } diff --git a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSenderTests.java b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSenderTests.java index f67255af0590..908543542124 100644 --- a/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSenderTests.java +++ b/sdk/monitor/azure-monitor-opentelemetry-exporter/src/test/java/com/azure/monitor/opentelemetry/exporter/implementation/quickpulse/QuickPulsePingSenderTests.java @@ -6,15 +6,26 @@ import com.azure.core.http.HttpHeaders; import com.azure.core.http.HttpPipeline; import com.azure.core.http.HttpPipelineBuilder; +import com.azure.json.JsonProviders; +import com.azure.json.JsonWriter; import com.azure.monitor.opentelemetry.exporter.implementation.MockHttpResponse; import com.azure.monitor.opentelemetry.exporter.implementation.NoopTracer; import com.azure.monitor.opentelemetry.exporter.implementation.configuration.ConnectionString; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import java.io.IOException; +import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -24,8 +35,9 @@ class QuickPulsePingSenderTests { @Test void endpointIsFormattedCorrectlyWhenUsingConnectionString() throws URISyntaxException { ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=testing-123"); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(null, connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, null, null, null, null); + connectionString::getInstrumentationKey, null, null, null, null, null, quickPulseConfiguration); String quickPulseEndpoint = quickPulsePingSender.getQuickPulseEndpoint(); String endpointUrl = quickPulsePingSender.getQuickPulsePingUri(quickPulseEndpoint); URI uri = new URI(endpointUrl); @@ -38,8 +50,9 @@ void endpointIsFormattedCorrectlyWhenUsingConnectionString() throws URISyntaxExc @Test void endpointIsFormattedCorrectlyWhenUsingInstrumentationKey() throws URISyntaxException { ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=A-test-instrumentation-key"); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(null, connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, null, null, null, null); + connectionString::getInstrumentationKey, null, null, null, null, null, quickPulseConfiguration); String quickPulseEndpoint = quickPulsePingSender.getQuickPulseEndpoint(); String endpointUrl = quickPulsePingSender.getQuickPulsePingUri(quickPulseEndpoint); URI uri = new URI(endpointUrl); @@ -55,18 +68,108 @@ void endpointChangesWithRedirectHeaderAndGetNewPingInterval() { headers.put("x-ms-qps-service-polling-interval-hint", "1000"); headers.put("x-ms-qps-service-endpoint-redirect-v2", "https://new.endpoint.com"); headers.put("x-ms-qps-subscribed", "true"); + headers.put("x-ms-qps-configuration-etag", "0::randometag::1::"); HttpHeaders httpHeaders = new HttpHeaders(headers); ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=fake-ikey"); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); HttpPipeline httpPipeline = new HttpPipelineBuilder() .httpClient(request -> Mono.just(new MockHttpResponse(request, 200, httpHeaders))) .tracer(new NoopTracer()) .build(); - QuickPulsePingSender quickPulsePingSender - = new QuickPulsePingSender(httpPipeline, connectionString::getLiveEndpoint, - connectionString::getInstrumentationKey, null, "instance1", "machine1", "qpid123", "testSdkVersion"); + QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(httpPipeline, + connectionString::getLiveEndpoint, connectionString::getInstrumentationKey, null, "instance1", "machine1", + "qpid123", "testSdkVersion", quickPulseConfiguration); QuickPulseHeaderInfo quickPulseHeaderInfo = quickPulsePingSender.ping(null); assertThat(QuickPulseStatus.QP_IS_ON).isEqualTo(quickPulseHeaderInfo.getQuickPulseStatus()); assertThat(1000).isEqualTo(quickPulseHeaderInfo.getQpsServicePollingInterval()); assertThat("https://new.endpoint.com").isEqualTo(quickPulseHeaderInfo.getQpsServiceEndpointRedirect()); + assertThat(quickPulseConfiguration.getEtag()).isEqualTo("0::randometag::1::"); } + + @Test + void successfulPingReturnsWithEtagHeaderAndRequestedMetrics() throws IOException { + Map headers = new HashMap<>(); + headers.put("x-ms-qps-service-polling-interval-hint", "1000"); + headers.put("x-ms-qps-service-endpoint-redirect-v2", "https://new.endpoint.com"); + headers.put("x-ms-qps-subscribed", "true"); + headers.put("x-ms-qps-configuration-etag", "2::randometag::3::"); + HttpHeaders httpHeaders = new HttpHeaders(headers); + ConnectionString connectionString = ConnectionString.parse("InstrumentationKey=fake-ikey"); + QuickPulseConfiguration quickPulseConfiguration = new QuickPulseConfiguration(); + + List> metrics = new ArrayList<>(); + Map metric1 = new HashMap<>(); + metric1.put("Id", "my_gauge"); + metric1.put("Aggregation", "Avg"); + metric1.put("TelemetryType", "Metric"); + metric1.put("Projection", "my_gauge"); + metric1.put("BackendAggregation", "Min"); + + ArrayList>>> filterGroups = new ArrayList<>(); + HashMap>> filterGroup = new HashMap<>(); + ArrayList> filters = new ArrayList<>(); + HashMap filterOne = new HashMap<>(); + filterOne.put("FieldName", "Test"); + filterOne.put("Predicate", "Equals"); + filterOne.put("Comparand", "Value"); + filters.add(filterOne); + filterGroup.put("Filters", filters); + filterGroups.add(filterGroup); + metric1.put("FilterGroups", filterGroups); + + Map metric2 = new HashMap<>(); + metric2.put("Id", "MyFruitCounter"); + metric2.put("Aggregation", "Sum"); + metric2.put("TelemetryType", "Metric"); + metric2.put("Projection", "MyFruitCounter"); + metric2.put("BackendAggregation", "Max"); + metric2.put("FilterGroups", new ArrayList<>()); + metrics.add(metric1); + metrics.add(metric2); + + Map metricsMap = new HashMap<>(); + metricsMap.put("DocumentStreams", null); + metricsMap.put("ETag", "2::randometag::3::"); + metricsMap.put("Metrics", metrics); + metricsMap.put("QuotaInfo", null); + ObjectMapper objectMapper = new ObjectMapper(); + String jsonBody; + + try { + jsonBody = objectMapper.writeValueAsString(metricsMap); + } catch (com.fasterxml.jackson.core.JsonProcessingException e) { + jsonBody = "{}"; + } + byte[] bodyBytes = jsonBody.getBytes(StandardCharsets.UTF_8); + + HttpPipeline httpPipeline = new HttpPipelineBuilder() + .httpClient(request -> Mono.just(new MockHttpResponse(request, 200, httpHeaders, bodyBytes))) + .tracer(new NoopTracer()) + .build(); + QuickPulsePingSender quickPulsePingSender = new QuickPulsePingSender(httpPipeline, + connectionString::getLiveEndpoint, connectionString::getInstrumentationKey, null, "instance1", "machine1", + "qpid123", "testSdkVersion", quickPulseConfiguration); + + QuickPulseHeaderInfo quickPulseHeaderInfo = quickPulsePingSender.ping(null); + assertThat(QuickPulseStatus.QP_IS_ON).isEqualTo(quickPulseHeaderInfo.getQuickPulseStatus()); + assertThat("https://new.endpoint.com").isEqualTo(quickPulseHeaderInfo.getQpsServiceEndpointRedirect()); + assertThat(quickPulseConfiguration.getEtag()).isEqualTo("2::randometag::3::"); + assertThat(quickPulseConfiguration.getDerivedMetrics().size()).isEqualTo(1); + assertThat(quickPulseConfiguration.getDerivedMetrics().get("Metric").size()).isEqualTo(2); + ArrayList metricCategory + = quickPulseConfiguration.getDerivedMetrics().get("Metric"); + assertThat(metricCategory.get(0).getAggregation()).isEqualTo("Avg"); + assertThat(metricCategory.get(0).getTelemetryType()).isEqualTo("Metric"); + assertThat(metricCategory.get(0).getFilterGroups().size() == 1); + assertThat(metricCategory.get(0).getFilterGroups().get(0).getFieldName()).isEqualTo("Test"); + assertThat(metricCategory.get(0).getFilterGroups().get(0).getOperator()).isEqualTo("Equals"); + assertThat(metricCategory.get(0).getFilterGroups().get(0).getComparand()).isEqualTo("Value"); + assertThat(metricCategory.get(0).getProjection()).isEqualTo("my_gauge"); + assertThat(metricCategory.get(0).getId()).isEqualTo("my_gauge"); + assertThat(metricCategory.get(1).getAggregation()).isEqualTo("Sum"); + assertThat(metricCategory.get(1).getTelemetryType()).isEqualTo("Metric"); + assertThat(metricCategory.get(1).getProjection()).isEqualTo("MyFruitCounter"); + assertThat(metricCategory.get(1).getId()).isEqualTo("MyFruitCounter"); + } + }