diff --git a/docs/examples/exemplars/.ipynb_checkpoints/statistical_exemplars-checkpoint.ipynb b/docs/examples/exemplars/.ipynb_checkpoints/statistical_exemplars-checkpoint.ipynb new file mode 100644 index 00000000000..deb0f27457f --- /dev/null +++ b/docs/examples/exemplars/.ipynb_checkpoints/statistical_exemplars-checkpoint.ipynb @@ -0,0 +1,359 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This example will build an exemplar sample set on a \"bytes in\" counter aggregator, which just sums up the number of bytes sent into our \"application\".\n", + "We will use these statistical exemplars to generate insights into the data that was aggregated away.\n", + "\n", + "We'll start by importing everything we will need from opentelemetry to create the metrics:" + ] + }, + { + "cell_type": "code", + "execution_count": 48, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "import random\n", + "\n", + "from collections import defaultdict\n", + "\n", + "from opentelemetry import metrics\n", + "from opentelemetry.sdk.metrics import Counter, MeterProvider\n", + "from opentelemetry.sdk.metrics.export.aggregate import SumAggregator\n", + "from opentelemetry.sdk.metrics.export.controller import PushController\n", + "from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter\n", + "from opentelemetry.sdk.metrics.view import View, ViewConfig" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can then set up an in-memory metrics exporter so we can analyze the exemplar data in-service:" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Overriding current MeterProvider\n" + ] + } + ], + "source": [ + "## set up opentelemetry\n", + "\n", + "# Sets the global MeterProvider instance\n", + "metrics.set_meter_provider(MeterProvider())\n", + "\n", + "meter = metrics.get_meter(__name__)\n", + "\n", + "# Export to a python list so we can do stats with the data\n", + "exporter = InMemoryMetricsExporter()\n", + "\n", + "# instead of waiting for the controller to tick over time, we will just tick it ourselves\n", + "controller = PushController(meter, exporter, 500)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We now need to create the bytes in metric, and assign it a view (this is where we set up exemplars):" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# Create the metric that we will use\n", + "bytes_counter = meter.create_metric(\n", + " name=\"bytes_counter\",\n", + " description=\"Number of bytes received by service\",\n", + " unit=\"By\",\n", + " value_type=int,\n", + " metric_type=Counter,\n", + ")\n", + "\n", + "# Every time interval we will collect 100 exemplars statistically (selected without bias)\n", + "aggregator_config = {\"num_exemplars\": 100, \"statistical_exemplars\": True}\n", + "\n", + "# Assign a Sum aggregator to `bytes_counter` that collects exemplars\n", + "counter_view = View(\n", + " bytes_counter,\n", + " SumAggregator(config=aggregator_config),\n", + " label_keys=[\"environment\"],\n", + " config=ViewConfig.LABEL_KEYS,\n", + ")\n", + "\n", + "meter.register_view(counter_view)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The last thing we need to do before we can start working with exemplars is generating a large set of data for metrics.\n", + "If the dataset is too small, we won't be able to collect a large enough subset of the input to analyze with exemplars.\n", + "\n", + "If this was a real application, the data would be generated through requests to/from the server." + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "metadata": {}, + "outputs": [], + "source": [ + "## generate the random metric data\n", + "\n", + "def unknown_customer_calls():\n", + " \"\"\"Generate customer call data to our application\"\"\"\n", + "\n", + " # set a random seed for consistency of data for example purposes\n", + " np.random.seed(1)\n", + " # Make exemplar selection consistent for example purposes\n", + " random.seed(1)\n", + "\n", + " # customer 123 is a big user, and made 1000 requests in this timeframe\n", + " requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 123})\n", + "\n", + " # customer 247 is another big user, making fewer, but bigger requests\n", + " requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 247})\n", + "\n", + " # There are many other smaller customers\n", + " for customer_id in range(250):\n", + " requests = np.random.normal(1000, 250, np.random.randint(1, 10))\n", + " method = \"REST\" if np.random.randint(2) else \"gRPC\"\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": method, \"customer_id\": customer_id})\n", + "\n", + "unknown_customer_calls()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Analyzing the Exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's export our metric and collect the exemplars from the outputted aggregation:" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "metadata": {}, + "outputs": [], + "source": [ + "# Tick the controller so it sends metrics to the exporter\n", + "controller.tick()\n", + "\n", + "# collect metrics from our exporter\n", + "metric_data = exporter.get_exported_metrics()\n", + "\n", + "# get the exemplars from the bytes_in counter aggregator\n", + "aggregator = metric_data[0].aggregator\n", + "exemplars = aggregator.checkpoint_exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "One of the key values of exemplars is its ability to handle dropped labels (labels that are too high cardinality to create a new metric record for each value). \n", + "In our application, we drop the \"customer_id\" label since there is an unbounded number of possible labels. However, with exemplars, we can still estimate stats related\n", + "to the customer ids, for example the relative size of each customer:" + ] + }, + { + "cell_type": "code", + "execution_count": 53, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Sum up the total bytes in per customer from all of the exemplars collected\n", + "customer_bytes_map = defaultdict(int)\n", + "for exemplar in exemplars:\n", + " customer_bytes_map[exemplar.dropped_labels] += exemplar.value\n", + "\n", + "\n", + "customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)\n", + "\n", + "# Save our top 5 customers and sum all of the rest into \"Others\".\n", + "top_5_customers = [(\"Customer {}\".format(dict(val[0])[\"customer_id\"]), val[1]) for val in customer_bytes_list[:5]] + [(\"Other Customers\", sum([val[1] for val in customer_bytes_list[5:]]))]\n", + "\n", + "# unzip the data into X (sizes of each customer's contribution) and labels\n", + "labels, X = zip(*top_5_customers)\n", + "\n", + "# create the chart with matplotlib and show it\n", + "plt.pie(X, labels=labels)\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Running this shows that the usage of our service is relatively closely split between customer 247, customer 123, and everyone else, which lines up closely with the data that was generated.\n", + "The more exemplars we sample, the more accurate this data will be, but also the more costly (in terms of memory usage) the metric would be.\n", + "\n", + "We can use the \"sample_count\" property of exemplars to predict the actual number of bytes customers sent (vs the percentage)\n", + "For example, to predict the number of bytes customer 123 sent:" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "sample count 25.33 custmer 40474\n", + "Customer 123 sent about 1025206 bytes this interval\n" + ] + } + ], + "source": [ + "# Estimate how many bytes customer 123 sent\n", + "customer_123_bytes = customer_bytes_map[((\"customer_id\", 123), (\"method\", \"REST\"))]\n", + "\n", + "# Since the exemplars were randomly sampled, all sample_counts will be the same\n", + "sample_count = exemplars[0].sample_count\n", + "print(\"sample count\", sample_count, \"custmer\", customer_123_bytes)\n", + "full_customer_123_bytes = sample_count * customer_123_bytes\n", + "\n", + "# With seed == 1 we get 1025206 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)\n", + "print(\"Customer 123 sent about {} bytes this interval\".format(int(full_customer_123_bytes)))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "We could also estimate the percentage of our top 25 customers that use gRPC (another dropped label):" + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "~44% of the top 25 customers (by bytes in) used gRPC this interval\n" + ] + } + ], + "source": [ + "# Determine the top 25 customers by how many bytes they sent in exemplars\n", + "top_25_customers = customer_bytes_list[:25]\n", + "\n", + "# out of those 25 customers, determine how many used grpc, and come up with a ratio\n", + "percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == \"gRPC\", top_25_customers))) / len(top_25_customers)\n", + "\n", + "print(\"~{}% of the top 25 customers (by bytes in) used gRPC this interval\".format(int(percent_grpc*100)))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The value of exemplars goes beyond just handling dropped labels, however. We can also estimate the input distribution to the `bytes_counter` metric, through histograms or quantiles:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "50th Percentile Bytes In: 1031\n", + "90th Percentile Bytes In: 1624\n", + "99th Percentile Bytes In: 6094\n" + ] + } + ], + "source": [ + "# Determine the 50th, 90th, and 99th percentile of byte size sent in\n", + "quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99])\n", + "print(\"50th Percentile Bytes In:\", int(quantiles[0]))\n", + "print(\"90th Percentile Bytes In:\", int(quantiles[1]))\n", + "print(\"99th Percentile Bytes In:\", int(quantiles[2]))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This is only a small subset of the things that can be done with exemplars - almost any statistic \n", + "that could be created through an aggregator on the original data can be estimated through exemplars." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "jupyter3_Python_3", + "language": "python", + "name": "jupyter3_python_3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/examples/exemplars/README.rst b/docs/examples/exemplars/README.rst new file mode 100644 index 00000000000..89af3407b6f --- /dev/null +++ b/docs/examples/exemplars/README.rst @@ -0,0 +1,49 @@ +OpenTelemetry Exemplars Example +=============================== + +.. _Exemplars: + +Exemplars are example measurements for aggregations. While they are simple conceptually, exemplars can estimate any statistic about the input distribution, can provide links to sample traces for high latency requests, and much more. +For more information about exemplars and how they work in OpenTelemetry, see the `spec `_ + +Examples +-------- + +Installation + +.. code-block:: sh + + pip install opentelemetry-api + pip install opentelemetry-sdk + pip install matplotlib # may have to install Qt as well + pip install numpy + + pip install opentelemetry-exporter-cloud-monitoring # if you want to export exemplars to cloud monitoring + +Statistical exemplars +^^^^^^^^^^^^^^^^^^^^^ + +The opentelemetry SDK provides a way to sample exemplars statistically: + + - Exemplars will be picked to represent the input distribution, without unquantifiable bias + - A "sample_count" attribute will be set on each exemplar to quantify how many measurements each exemplar represents (for randomly sampled exemplars, this value will be N (total measurements) / num_samples. For histogram exemplars, this value will be specific to each bucket). + +.. literalinclude:: statistical_exemplars.py + :language: python + :lines: 1- + +For the output of this example, see the corresponding Jupyter notebook. + +Trace exemplars +^^^^^^^^^^^^^^^^^^ + +Trace exemplars are exemplars that have not been sampled statistically, +but instead aim to provide value as individual exemplars. +They will have a trace id/span id attached for the active trace when the exemplar was recorded, +and they may focus on measurements with abnormally high/low values. + +.. literalinclude:: trace_exemplars.py + :language: python + :lines: 1- + +Currently only the Google Cloud Monitoring exporter supports uploading these exemplars. diff --git a/docs/examples/exemplars/statistical_exemplars.ipynb b/docs/examples/exemplars/statistical_exemplars.ipynb new file mode 100644 index 00000000000..ca7edd1c3db --- /dev/null +++ b/docs/examples/exemplars/statistical_exemplars.ipynb @@ -0,0 +1,340 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This example will build an exemplar sample set on a \"bytes in\" counter aggregator, which just sums up the number of bytes sent into our \"application\".\n", + "We will use these statistical exemplars to generate insights into the data that was aggregated away.\n", + "\n", + "We'll start by importing everything we will need from opentelemetry to create the metrics:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import matplotlib.pyplot as plt\n", + "import random\n", + "\n", + "from collections import defaultdict\n", + "\n", + "from opentelemetry import metrics\n", + "from opentelemetry.sdk.metrics import Counter, MeterProvider\n", + "from opentelemetry.sdk.metrics.export.aggregate import SumAggregator\n", + "from opentelemetry.sdk.metrics.export.controller import PushController\n", + "from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter\n", + "from opentelemetry.sdk.metrics.view import View, ViewConfig" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can then set up an in-memory metrics exporter so we can analyze the exemplar data in-service:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "## set up opentelemetry\n", + "\n", + "# Sets the global MeterProvider instance\n", + "metrics.set_meter_provider(MeterProvider())\n", + "\n", + "meter = metrics.get_meter(__name__)\n", + "\n", + "# Export to a python list so we can do stats with the data\n", + "exporter = InMemoryMetricsExporter()\n", + "\n", + "# instead of waiting for the controller to tick over time, we will just tick it ourselves\n", + "controller = PushController(meter, exporter, 500)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We now need to create the bytes in metric, and assign it a view (this is where we set up exemplars):" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# Create the metric that we will use\n", + "bytes_counter = meter.create_metric(\n", + " name=\"bytes_counter\",\n", + " description=\"Number of bytes received by service\",\n", + " unit=\"By\",\n", + " value_type=int,\n", + " metric_type=Counter,\n", + ")\n", + "\n", + "# Every time interval we will collect 100 exemplars statistically (selected without bias)\n", + "aggregator_config = {\"num_exemplars\": 100, \"statistical_exemplars\": True}\n", + "\n", + "# Assign a Sum aggregator to `bytes_counter` that collects exemplars\n", + "counter_view = View(\n", + " bytes_counter,\n", + " SumAggregator,\n", + " aggregator_config=aggregator_config,\n", + " label_keys=[\"environment\"],\n", + " view_config=ViewConfig.LABEL_KEYS,\n", + ")\n", + "\n", + "meter.register_view(counter_view)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The last thing we need to do before we can start working with exemplars is generating a large set of data for metrics.\n", + "If the dataset is too small, we won't be able to collect a large enough subset of the input to analyze with exemplars.\n", + "\n", + "If this was a real application, the data would be generated through requests to/from the server." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "## generate the random metric data\n", + "\n", + "def unknown_customer_calls():\n", + " \"\"\"Generate customer call data to our application\"\"\"\n", + "\n", + " # set a random seed for consistency of data for example purposes\n", + " np.random.seed(1)\n", + " # Make exemplar selection consistent for example purposes\n", + " random.seed(1)\n", + "\n", + " # customer 123 is a big user, and made 1000 requests in this timeframe\n", + " requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, standard deviation 250\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 123})\n", + "\n", + " # customer 247 is another big user, making fewer, but bigger requests\n", + " requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes\n", + "\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": \"REST\", \"customer_id\": 247})\n", + "\n", + " # There are many other smaller customers\n", + " for customer_id in range(250):\n", + " requests = np.random.normal(1000, 250, np.random.randint(1, 10))\n", + " method = \"REST\" if np.random.randint(2) else \"gRPC\"\n", + " for request in requests:\n", + " bytes_counter.add(int(request), {\"environment\": \"production\", \"method\": method, \"customer_id\": customer_id})\n", + "\n", + "unknown_customer_calls()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Analyzing the Exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's export our metric and collect the exemplars from the outputted aggregation:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "# Tick the controller so it sends metrics to the exporter\n", + "controller.tick()\n", + "\n", + "# collect metrics from our exporter\n", + "metric_data = exporter.get_exported_metrics()\n", + "\n", + "# get the exemplars from the bytes_in counter aggregator\n", + "aggregator = metric_data[0].aggregator\n", + "exemplars = aggregator.checkpoint_exemplars" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "One of the key values of exemplars is its ability to handle dropped labels (labels that are too high cardinality to create a new metric record for each value). \n", + "In our application, we drop the \"customer_id\" label since there is an unbounded number of possible labels. However, with exemplars, we can still estimate stats related\n", + "to the customer ids, for example the sizes of our top customers:" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "output_type": "display_data", + "data": { + "text/plain": "
", + "image/svg+xml": "\n\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n\n", + "image/png": "\n" + }, + "metadata": {} + } + ], + "source": [ + "# Sum up the total bytes in per customer from all of the exemplars collected\n", + "customer_bytes_map = defaultdict(int)\n", + "for exemplar in exemplars:\n", + " customer_bytes_map[exemplar.dropped_labels] += exemplar.value\n", + "\n", + "\n", + "customer_bytes_list = sorted(customer_bytes_map.items(), key=lambda t: t[1], reverse=True)\n", + "\n", + "# Save our top 5 customers and sum all of the rest into \"Others\".\n", + "top_3_customers = [(\"Customer {}\".format(dict(val[0])[\"customer_id\"]), val[1]) for val in customer_bytes_list[:3]] + [(\"Other Customers\", sum([val[1] for val in customer_bytes_list[3:]]))]\n", + "\n", + "# unzip the data into X (sizes of each customer's contribution) and labels\n", + "labels, X = zip(*top_3_customers)\n", + "\n", + "# create the chart with matplotlib and show it\n", + "plt.pie(X)\n", + "plt.legend(labels, loc = \"upper right\") \n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Running this shows that the usage of our service is relatively closely split between customer 247, customer 123, and everyone else, which lines up closely with the data that was generated.\n", + "The more exemplars we sample, the more accurate this data will be, but also the more costly (in terms of memory usage) the metric would be.\n", + "\n", + "We can use the \"sample_count\" property of exemplars to predict the actual number of bytes customers sent (vs the percentage)\n", + "For example, to predict the number of bytes customer 123 sent:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": "Customer 123 sent about 1025206 bytes this interval\n" + } + ], + "source": [ + "# Estimate how many bytes customer 123 sent\n", + "customer_123_bytes = customer_bytes_map[((\"customer_id\", 123), (\"method\", \"REST\"))]\n", + "\n", + "# Since the exemplars were randomly sampled, all sample_counts will be the same\n", + "sample_count = exemplars[0].sample_count\n", + "full_customer_123_bytes = sample_count * customer_123_bytes\n", + "\n", + "# With seed == 1 we get 1025206 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)\n", + "print(\"Customer 123 sent about {} bytes this interval\".format(int(full_customer_123_bytes)))\n" + ] + }, + { + "source": [ + "We could also estimate the percentage of our top 25 customers that use gRPC (another dropped label):" + ], + "cell_type": "markdown", + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": "~44% of the top 25 customers (by bytes in) used gRPC this interval\n" + } + ], + "source": [ + "# Determine the top 25 customers by how many bytes they sent in exemplars\n", + "top_25_customers = customer_bytes_list[:25]\n", + "\n", + "# out of those 25 customers, determine how many used grpc, and come up with a ratio\n", + "percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == \"gRPC\", top_25_customers))) / len(top_25_customers)\n", + "\n", + "print(\"~{}% of the top 25 customers (by bytes in) used gRPC this interval\".format(int(percent_grpc*100)))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The value of exemplars goes beyond just handling dropped labels, however. We can also estimate the input distribution to the `bytes_counter` metric, through histograms or quantiles:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": "50th Percentile Bytes In: 1031\n90th Percentile Bytes In: 1624\n99th Percentile Bytes In: 6094\n" + } + ], + "source": [ + "# Determine the 50th, 90th, and 99th percentile of byte size sent in\n", + "quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99])\n", + "print(\"50th Percentile Bytes In:\", int(quantiles[0]))\n", + "print(\"90th Percentile Bytes In:\", int(quantiles[1]))\n", + "print(\"99th Percentile Bytes In:\", int(quantiles[2]))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This is only a small subset of the things that can be done with exemplars - almost any statistic \n", + "that could be created through an aggregator on the original data can be estimated through exemplars." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "jupyter3_Python_3", + "language": "python", + "name": "jupyter3_python_3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/docs/examples/exemplars/statistical_exemplars.py b/docs/examples/exemplars/statistical_exemplars.py new file mode 100644 index 00000000000..b7a3ffbd5cd --- /dev/null +++ b/docs/examples/exemplars/statistical_exemplars.py @@ -0,0 +1,180 @@ +import random +from collections import defaultdict + +import matplotlib.pyplot as plt +import numpy as np +from opentelemetry import metrics +from opentelemetry.sdk.metrics import Counter, MeterProvider +from opentelemetry.sdk.metrics.export.aggregate import SumAggregator +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import ( + InMemoryMetricsExporter, +) +from opentelemetry.sdk.metrics.view import View, ViewConfig + +# set up opentelemetry + +# Sets the global MeterProvider instance +metrics.set_meter_provider(MeterProvider()) + +meter = metrics.get_meter(__name__) + +# Export to a python list so we can do stats with the data +exporter = InMemoryMetricsExporter() + +# instead of waiting for the controller to tick over time, we will just tick it ourselves +controller = PushController(meter, exporter, 500) + +# Create the metric that we will use +bytes_counter = meter.create_metric( + name="bytes_counter", + description="Number of bytes received by service", + unit="By", + value_type=int, + metric_type=Counter, +) + +# Every time interval we will collect 100 exemplars statistically (selected without bias) +aggregator_config = {"num_exemplars": 100, "statistical_exemplars": True} + +# Assign a Sum aggregator to `bytes_counter` that collects exemplars +counter_view = View( + bytes_counter, + SumAggregator, + aggregator_config=aggregator_config, + label_keys=["environment"], + view_config=ViewConfig.LABEL_KEYS, +) + +meter.register_view(counter_view) + +# generate the random metric data + + +def unknown_customer_calls(): + """Generate customer call data to our application""" + + # set a random seed for consistency of data for example purposes + np.random.seed(1) + # Make exemplar selection consistent for example purposes + random.seed(1) + + # customer 123 is a big user, and made 1000 requests in this timeframe + requests = np.random.normal( + 1000, 100, 1000 + ) # 1000 requests with average 1000 bytes, standard deviation 100 + + for request in requests: + bytes_counter.add( + int(request), + { + "environment": "production", + "method": "REST", + "customer_id": 123, + }, + ) + + # customer 247 is another big user, making fewer, but bigger requests + requests = np.random.normal( + 5000, 1250, 200 + ) # 200 requests with average size of 5k bytes + + for request in requests: + bytes_counter.add( + int(request), + { + "environment": "production", + "method": "REST", + "customer_id": 247, + }, + ) + + # There are many other smaller customers + for customer_id in range(250): + requests = np.random.normal(1000, 250, np.random.randint(1, 10)) + method = "REST" if np.random.randint(2) else "gRPC" + for request in requests: + bytes_counter.add( + int(request), + { + "environment": "production", + "method": method, + "customer_id": customer_id, + }, + ) + + +unknown_customer_calls() + +# Tick the controller so it sends metrics to the exporter +controller.tick() + +# collect metrics from our exporter +metric_data = exporter.get_exported_metrics() + +# get the exemplars from the bytes_in counter aggregator +aggregator = metric_data[0].aggregator +exemplars = aggregator.checkpoint_exemplars + +# Sum up the total bytes in per customer from all of the exemplars collected +customer_bytes_map = defaultdict(int) +for exemplar in exemplars: + customer_bytes_map[exemplar.dropped_labels] += exemplar.value + + +customer_bytes_list = sorted( + customer_bytes_map.items(), key=lambda t: t[1], reverse=True +) + +# Save our top 5 customers and sum all of the rest into "Others". +top_5_customers = [ + ("Customer {}".format(dict(val[0])["customer_id"]), val[1]) + for val in customer_bytes_list[:5] +] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))] + +# unzip the data into X (sizes of each customer's contribution) and labels +labels, X = zip(*top_5_customers) + +# create the chart with matplotlib and show it +plt.pie(X, labels=labels) +plt.show() + +# Estimate how many bytes customer 123 sent +customer_123_bytes = customer_bytes_map[ + (("customer_id", 123), ("method", "REST")) +] + +# Since the exemplars were randomly sampled, all sample_counts will be the same +sample_count = exemplars[0].sample_count +full_customer_123_bytes = sample_count * customer_123_bytes + +# With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate) +print( + "Customer 123 sent about {} bytes this interval".format( + int(full_customer_123_bytes) + ) +) + +# Determine the top 25 customers by how many bytes they sent in exemplars +top_25_customers = customer_bytes_list[:25] + +# out of those 25 customers, determine how many used grpc, and come up with a ratio +percent_grpc = sum( + 1 + for customer_value in top_25_customers + if customer_value[0][1][1] == "gRPC" +) / len(top_25_customers) + +print( + "~{}% of the top 25 customers (by bytes in) used gRPC this interval".format( + int(percent_grpc * 100) + ) +) + +# Determine the 50th, 90th, and 99th percentile of byte size sent in +quantiles = np.quantile( + [exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99] +) +print("50th Percentile Bytes In:", int(quantiles[0])) +print("90th Percentile Bytes In:", int(quantiles[1])) +print("99th Percentile Bytes In:", int(quantiles[2])) diff --git a/docs/examples/exemplars/trace_exemplars.py b/docs/examples/exemplars/trace_exemplars.py new file mode 100644 index 00000000000..735e329d9ac --- /dev/null +++ b/docs/examples/exemplars/trace_exemplars.py @@ -0,0 +1,84 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This example shows how to generate trace exemplars for a histogram, and how to export them to Google Cloud Monitoring. +""" + +import random +import time + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder +from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter +from opentelemetry.sdk.metrics.export.aggregate import HistogramAggregator +from opentelemetry.sdk.metrics.view import View, ViewConfig + +# Set up OpenTelemetry metrics +metrics.set_meter_provider(MeterProvider(stateful=False)) +meter = metrics.get_meter(__name__) + +# Use the Google Cloud Monitoring Metrics Exporter since its the only one that currently supports exemplars +metrics.get_meter_provider().start_pipeline( + meter, ConsoleMetricsExporter(), 10 +) + +# Create our duration metric +request_duration = meter.create_metric( + name="request_duration", + description="duration (ms) of incoming requests", + unit="ms", + value_type=int, + metric_type=ValueRecorder, +) + +# Add a Histogram view to our duration metric, and make it generate 1 exemplars per bucket +duration_view = View( + request_duration, + # Latency in buckets: + # [>=0ms, >=25ms, >=50ms, >=75ms, >=100ms, >=200ms, >=400ms, >=600ms, >=800ms, >=1s, >=2s, >=4s, >=6s] + # We want to generate 1 exemplar per bucket, where each exemplar has a linked trace that was recorded. + # So we need to set num_exemplars to 1 and not specify statistical_exemplars (defaults to false) + HistogramAggregator, + aggregator_config={ + "bounds": [ + 0, + 25, + 50, + 75, + 100, + 200, + 400, + 600, + 800, + 1000, + 2000, + 4000, + 6000, + ], + "num_exemplars": 1, + }, + label_keys=["environment"], + view_config=ViewConfig.LABEL_KEYS, +) + +meter.register_view(duration_view) + +for i in range(100): + # Generate some random data for the histogram with a dropped label "customer_id" + request_duration.record( + random.randint(1, 8000), + {"environment": "staging", "customer_id": random.randint(1, 100)}, + ) + time.sleep(1) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 2af8a551ee1..9bad705b9c2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -75,7 +75,7 @@ def update(self, value: metrics_api.ValueT): with self._view_datas_lock: # record the value for each view_data belonging to this aggregator for view_data in self.view_datas: - view_data.record(value) + view_data.record(value, self._labels) def release(self): self.decrease_ref_count() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py index 16911f94efb..ddd08df13c8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py @@ -77,11 +77,12 @@ def export( ) -> "MetricsExportResult": for record in metric_records: print( - '{}(data="{}", labels="{}", value={})'.format( + '{}(data="{}", labels="{}", value={}, exemplars={})'.format( type(self).__name__, record.instrument, record.labels, record.aggregator.checkpoint, + record.aggregator.checkpoint_exemplars, ) ) return MetricsExportResult.SUCCESS diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index 121f39a98b6..48908857b3e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -17,6 +17,12 @@ import threading from collections import OrderedDict, namedtuple +from opentelemetry.sdk.metrics.export.exemplars import ( + BucketedExemplarSampler, + ExemplarManager, + MinMaxExemplarSampler, + RandomExemplarSampler, +) from opentelemetry.util import time_ns logger = logging.getLogger(__name__) @@ -36,9 +42,10 @@ def __init__(self, config=None): self.config = config else: self.config = {} + self.checkpoint_exemplars = [] @abc.abstractmethod - def update(self, value): + def update(self, value, dropped_labels=None): """Updates the current with the new value.""" @abc.abstractmethod @@ -59,15 +66,21 @@ def __init__(self, config=None): self.checkpoint = 0 self._lock = threading.Lock() self.last_update_timestamp = None + self.exemplar_manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) - def update(self, value): + def update(self, value, dropped_labels=None): with self._lock: self.current += value self.last_update_timestamp = time_ns() + self.exemplar_manager.sample(value, dropped_labels) + def take_checkpoint(self): with self._lock: self.checkpoint = self.current + self.checkpoint_exemplars = self.exemplar_manager.take_checkpoint() self.current = 0 def merge(self, other): @@ -77,6 +90,9 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) class MinMaxSumCountAggregator(Aggregator): @@ -105,8 +121,13 @@ def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = None - def update(self, value): + self.exemplar_manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) + + def update(self, value, dropped_labels=None): with self._lock: + if self.current is self._EMPTY: self.current = self._TYPE(value, value, value, 1) else: @@ -118,9 +139,12 @@ def update(self, value): ) self.last_update_timestamp = time_ns() + self.exemplar_manager.sample(value, dropped_labels) + def take_checkpoint(self): with self._lock: self.checkpoint = self.current + self.checkpoint_exemplars = self.exemplar_manager.take_checkpoint() self.current = self._EMPTY def merge(self, other): @@ -132,6 +156,9 @@ def merge(self, other): self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) class HistogramAggregator(Aggregator): @@ -151,6 +178,13 @@ def __init__(self, config=None): self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries]) + self.exemplar_manager = ExemplarManager( + config, + BucketedExemplarSampler, + BucketedExemplarSampler, + boundaries=self._boundaries, + ) + self.current[">"] = 0 self.checkpoint[">"] = 0 @@ -178,18 +212,25 @@ def _merge_checkpoint(cls, val1, val2): logger.warning("Cannot merge histograms with different buckets.") return val1 - def update(self, value): + def update(self, value, dropped_labels=None): with self._lock: if self.current is None: self.current = [0 for ii in range(len(self._boundaries) + 1)] # greater than max value if value >= self._boundaries[len(self._boundaries) - 1]: self.current[">"] += 1 + self.exemplar_manager.sample( + value, dropped_labels, bucket_index=len(self._boundaries) + ) else: - for bb in self._boundaries: + for index, bb in enumerate(self._boundaries): # find first bucket that value is less than if value < bb: self.current[bb] += 1 + + self.exemplar_manager.sample( + value, dropped_labels, bucket_index=index + ) break self.last_update_timestamp = time_ns() @@ -197,6 +238,9 @@ def take_checkpoint(self): with self._lock: self.checkpoint = self.current self.current = OrderedDict([(bb, 0) for bb in self._boundaries]) + + self.checkpoint_exemplars = self.exemplar_manager.take_checkpoint() + self.current[">"] = 0 def merge(self, other): @@ -205,6 +249,11 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) + + self.checkpoint_exemplars = self.exemplar_manager.merge( + self.checkpoint_exemplars, other.checkpoint_exemplars + ) + self.last_update_timestamp = get_latest_timestamp( self.last_update_timestamp, other.last_update_timestamp ) @@ -218,7 +267,7 @@ def __init__(self, config=None): self._lock = threading.Lock() self.last_update_timestamp = None - def update(self, value): + def update(self, value, dropped_labels=None): with self._lock: self.current = value self.last_update_timestamp = time_ns() @@ -245,19 +294,20 @@ class ValueObserverAggregator(Aggregator): def __init__(self, config=None): super().__init__(config=config) - self.mmsc = MinMaxSumCountAggregator() + self.mmsc = MinMaxSumCountAggregator(config=config) self.current = None self.checkpoint = self._TYPE(None, None, None, 0, None) self.last_update_timestamp = None - def update(self, value): - self.mmsc.update(value) + def update(self, value, dropped_labels=None): + self.mmsc.update(value, dropped_labels=dropped_labels) self.current = value self.last_update_timestamp = time_ns() def take_checkpoint(self): self.mmsc.take_checkpoint() self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,))) + self.checkpoint_exemplars = self.mmsc.checkpoint_exemplars def merge(self, other): if verify_type(self, other): @@ -269,6 +319,7 @@ def merge(self, other): if self.last_update_timestamp == other.last_update_timestamp: last = other.checkpoint.last self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (last,))) + self.checkpoint_exemplars = self.mmsc.checkpoint_exemplars def get_latest_timestamp(time_stamp, other_timestamp): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py new file mode 100644 index 00000000000..98bc6c44660 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/exemplars.py @@ -0,0 +1,331 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Exemplars are sample data points for aggregators. For more information, see the `spec `_ + +Every synchronous aggregator is instrumented with two exemplar recorders: + 1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context (and can pick exemplars with other biases, ie min + max). + 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) + +To use an exemplar recorder, pass in two arguments to the aggregator config in views (see the :ref:`Exemplars` example for an example): + "num_exemplars": The number of exemplars to record (if applicable, in each bucket). Note that in non-statistical mode the recorder may not use "num_exemplars" + "statistical_exemplars": If exemplars should be recorded statistically + +For exemplars to be recorded, `num_exemplars` must be greater than 0. +""" + +import abc +import itertools +import random +from typing import List, Optional, Tuple, Type, Union + +from opentelemetry.context import get_current +from opentelemetry.util import time_ns + + +class Exemplar: + """ + A sample data point for an aggregator. Exemplars represent individual measurements recorded. + """ + + def __init__( + self, + value: Union[int, float], + timestamp: int, + dropped_labels: Optional[Tuple[Tuple[str, str]]] = None, + span_id: Optional[bytes] = None, + trace_id: Optional[bytes] = None, + sample_count: Optional[float] = None, + ): + self._value = value + self._timestamp = timestamp + self._span_id = span_id + self._trace_id = trace_id + self._sample_count = sample_count + self._dropped_labels = dropped_labels + + def __repr__(self): + return "Exemplar(value={}, timestamp={}, labels={}, context={{'span_id':{}, 'trace_id':{}}})".format( + self._value, + self._timestamp, + dict(self._dropped_labels) if self._dropped_labels else None, + self._span_id, + self._trace_id, + ) + + @property + def value(self): + """The current value of the Exemplar point""" + return self._value + + @property + def timestamp(self): + """The time that this Exemplar's value was recorded""" + return self._timestamp + + @property + def span_id(self): + """The span ID of the context when the exemplar was recorded""" + return self._span_id + + @property + def trace_id(self): + """The trace ID of the context when the exemplar was recorded""" + return self._trace_id + + @property + def dropped_labels(self): + """Labels that were dropped by the aggregator but still passed by the user""" + return self._dropped_labels + + @property + def sample_count(self): + """For statistical exemplars, how many measurements a single exemplar represents""" + return self._sample_count + + @sample_count.setter + def sample_count(self, count: float): + self._sample_count = count + + +class ExemplarSampler(abc.ABC): + """ + Abstract class to sample `k` exemplars in some way through a stream of incoming measurements + """ + + def __init__(self, k: int, statistical: bool = False): + self._k = k + self._statistical = statistical + self._sample_set = [] + + @abc.abstractmethod + def sample(self, exemplar: Exemplar, **kwargs): + """ + Given an exemplar, determine if it should be sampled or not + """ + + @property + @abc.abstractmethod + def sample_set(self): + """ + Return the list of exemplars that have been sampled + """ + + def merge(self, set1: List[Exemplar], set2: List[Exemplar]): + """ + Assume that set2 is the latest set of exemplars. + For simplicity, we will just keep set2 and assume set1 has already been exported. + This may need to change with a different SDK implementation. + """ + # pylint: disable=unused-argument,no-self-use + return set2 + + @abc.abstractmethod + def reset(self): + """ + Reset the sampler + """ + + +class RandomExemplarSampler(ExemplarSampler): + """ + Randomly sample a set of k exemplars from a stream. Each measurement in the stream + will have an equal chance of being sampled. + + If `RandomExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. + This value will be equal to the number of measurements recorded per every exemplar measured - all exemplars will have the same sample_count value. + """ + + def __init__(self, k: int, statistical: bool = False): + super().__init__(k, statistical=statistical) + self.rand_count = 0 + + def sample(self, exemplar: Exemplar, **kwargs): + self.rand_count += 1 + + if len(self._sample_set) < self._k: + self._sample_set.append(exemplar) + return + + # We sample a random subset of a stream using "Algorithm R": + # https://en.wikipedia.org/wiki/Reservoir_sampling#Simple_algorithm + replace_index = random.randint(0, self.rand_count - 1) + + if replace_index < self._k: + self._sample_set[replace_index] = exemplar + + @property + def sample_set(self): + if self._statistical: + for exemplar in self._sample_set: + exemplar.sample_count = self.rand_count / len(self._sample_set) + return self._sample_set + + def reset(self): + self._sample_set = [] + self.rand_count = 0 + + +class MinMaxExemplarSampler(ExemplarSampler): + """ + Sample the minimum and maximum measurements recorded only + """ + + def __init__(self, k: int, statistical: bool = False): + # K will always be 2 (min and max), and selecting min and max can never be statistical + super().__init__(2, statistical=False) + self._sample_set = [] + + def sample(self, exemplar: Exemplar, **kwargs): + self._sample_set = [ + min( + self._sample_set + [exemplar], + key=lambda exemplar: exemplar.value, + ), + max( + self._sample_set + [exemplar], + key=lambda exemplar: exemplar.value, + ), + ] + if self._sample_set[0] == self._sample_set[1]: + self._sample_set = [self._sample_set[0]] + + @property + def sample_set(self): + return self._sample_set + + def reset(self): + self._sample_set = [] + + +class BucketedExemplarSampler(ExemplarSampler): + """ + Randomly sample k exemplars for each bucket in the aggregator. + + If `BucketedExemplarSampler` is specified to be statistical, it will add a sample_count to every exemplar it records. + This value will be equal to `len(bucket.exemplars) / bucket.count`, that is the number of measurements each exemplar represents. + """ + + def __init__( + self, k: int, statistical: bool = False, boundaries: List[float] = None + ): + super().__init__(k) + self._boundaries = boundaries + self._sample_set = [ + RandomExemplarSampler(k, statistical=statistical) + for _ in range(len(self._boundaries) + 1) + ] + + def sample(self, exemplar: Exemplar, **kwargs): + bucket_index = kwargs.get("bucket_index") + if bucket_index is None: + return + + self._sample_set[bucket_index].sample(exemplar) + + @property + def sample_set(self): + return list( + itertools.chain.from_iterable( + sampler.sample_set for sampler in self._sample_set + ) + ) + + def reset(self): + for sampler in self._sample_set: + sampler.reset() + + +class ExemplarManager: + """ + Manages two different exemplar samplers: + 1. A "trace" exemplar sampler, which only samples exemplars if they have a sampled trace context. + 2. A "statistical" exemplar sampler, which samples exemplars without bias (ie no preferenced for traced exemplars) + """ + + def __init__( + self, + config: dict, + default_exemplar_sampler: Type[ExemplarSampler], + statistical_exemplar_sampler: Type[ExemplarSampler], + **kwargs + ): + if config: + self.exemplars_count = config.get("num_exemplars", 0) + self.record_exemplars = self.exemplars_count > 0 + self.statistical_exemplars = config.get( + "statistical_exemplars", False + ) + if self.statistical_exemplars: + self.exemplar_sampler = statistical_exemplar_sampler( + self.exemplars_count, + statistical=self.statistical_exemplars, + **kwargs + ) + else: + self.exemplar_sampler = default_exemplar_sampler( + self.exemplars_count, + statistical=self.statistical_exemplars, + **kwargs + ) + else: + self.record_exemplars = False + + def sample( + self, + value: Union[int, float], + dropped_labels: Tuple[Tuple[str, str]], + **kwargs + ): + context = get_current() + + is_sampled = ( + "current-span" in context + and context["current-span"].get_context().trace_flags.sampled + if context + else False + ) + + # if not statistical, we want to gather traced exemplars only - so otherwise don't sample + if self.record_exemplars and ( + is_sampled or self.statistical_exemplars + ): + span_id = ( + context["current-span"].context.span_id if context else None + ) + trace_id = ( + context["current-span"].context.trace_id if context else None + ) + self.exemplar_sampler.sample( + Exemplar(value, time_ns(), dropped_labels, span_id, trace_id), + **kwargs + ) + + def take_checkpoint(self): + if self.record_exemplars: + ret = self.exemplar_sampler.sample_set + self.exemplar_sampler.reset() + return ret + return [] + + def merge( + self, + checkpoint_exemplars: List[Exemplar], + other_checkpoint_exemplars: List[Exemplar], + ): + if self.record_exemplars: + return self.exemplar_sampler.merge( + checkpoint_exemplars, other_checkpoint_exemplars + ) + return [] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py index 0dd75c6887b..ec1e9df6f71 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/view.py @@ -39,12 +39,20 @@ class ViewData: - def __init__(self, labels: Tuple[Tuple[str, str]], aggregator: Aggregator): + def __init__( + self, labels: Tuple[Tuple[str, str]], aggregator: Aggregator, + ): self.labels = labels self.aggregator = aggregator - def record(self, value: ValueT): - self.aggregator.update(value) + def record(self, value: ValueT, all_labels: Tuple[Tuple[str, str]]): + label_dict = dict(self.labels) + self.aggregator.update( + value, + dropped_labels=tuple( + filter(lambda label: label[0] not in label_dict, all_labels) + ), + ) # Uniqueness is based on labels and aggregator type def __hash__(self): diff --git a/opentelemetry-sdk/tests/metrics/export/test_exemplars.py b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py new file mode 100644 index 00000000000..4e78f4d23e9 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/export/test_exemplars.py @@ -0,0 +1,516 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from time import time +from unittest.mock import patch + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + MinMaxExemplarSampler, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) +from opentelemetry.sdk.metrics.export.controller import PushController +from opentelemetry.sdk.metrics.export.exemplars import ( + BucketedExemplarSampler, + Exemplar, + ExemplarManager, + RandomExemplarSampler, +) +from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import ( + InMemoryMetricsExporter, +) +from opentelemetry.sdk.metrics.view import View, ViewConfig +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace.sampling import ALWAYS_OFF, ALWAYS_ON + + +class TestRandomExemplarSampler(unittest.TestCase): + def test_sample(self): + sampler = RandomExemplarSampler(2, statistical=True) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + exemplar3 = Exemplar(3, time()) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + self.assertEqual(sampler.sample_set[0], exemplar1) + self.assertEqual(exemplar1.sample_count, 1) + + sampler.sample(exemplar2) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1], exemplar2) + self.assertEqual(exemplar1.sample_count, 1) + self.assertEqual(exemplar2.sample_count, 1) + + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument + return minimum + + with patch("random.randint", _patched_randint): + sampler.sample(exemplar3) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[0], exemplar3) + self.assertEqual(exemplar3.sample_count, 1.5) + self.assertEqual(exemplar2.sample_count, 1.5) + + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument + return 1 + + with patch("random.randint", _patched_randint): + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1], exemplar1) + self.assertEqual(exemplar1.sample_count, 2) + + def test_reset(self): + sampler = RandomExemplarSampler(2) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + + sampler.sample(exemplar1) + sampler.sample(exemplar2) + + sampler.reset() + self.assertEqual(len(sampler.sample_set), 0) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + + def test_merge(self): + set1 = [1, 2, 3] + set2 = [4, 5, 6] + sampler = RandomExemplarSampler(6) + self.assertEqual(set2, sampler.merge(set1, set2)) + sampler = RandomExemplarSampler(8) + self.assertEqual(set2, sampler.merge(set1, set2)) + sampler = RandomExemplarSampler(4) + self.assertEqual(3, len(sampler.merge(set1, set2))) + + +class TestMinMaxExemplarSampler(unittest.TestCase): + def test_sample(self): + sampler = MinMaxExemplarSampler(2) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + exemplar3 = Exemplar(3, time()) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + self.assertEqual(sampler.sample_set[0], exemplar1) + + sampler.sample(exemplar2) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[0], exemplar1) + self.assertEqual(sampler.sample_set[1], exemplar2) + + sampler.sample(exemplar3) + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[0], exemplar1) + self.assertEqual(sampler.sample_set[1], exemplar3) + + def test_reset(self): + sampler = MinMaxExemplarSampler(2) + exemplar1 = Exemplar(1, time()) + exemplar2 = Exemplar(2, time()) + + sampler.sample(exemplar1) + sampler.sample(exemplar2) + + sampler.reset() + self.assertEqual(len(sampler.sample_set), 0) + + sampler.sample(exemplar1) + self.assertEqual(len(sampler.sample_set), 1) + + def test_merge(self): + set1 = [1, 3] + set2 = [4, 6] + sampler = MinMaxExemplarSampler(2) + self.assertEqual([4, 6], sampler.merge(set1, set2)) + + +class TestBucketedExemplarSampler(unittest.TestCase): + def test_exemplars(self): + sampler = BucketedExemplarSampler( + 1, boundaries=[2, 4, 7], statistical=True + ) + sampler.sample(Exemplar(3, time()), bucket_index=1) + self.assertEqual(len(sampler.sample_set), 1) + self.assertEqual(sampler.sample_set[0].value, 3) + + sampler.sample(Exemplar(5, time()), bucket_index=2) + + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1].value, 5) + self.assertEqual(sampler.sample_set[1].sample_count, 1) + + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument + return 0 + + with patch("random.randint", _patched_randint): + sampler.sample(Exemplar(6, time()), bucket_index=2) + + self.assertEqual(len(sampler.sample_set), 2) + self.assertEqual(sampler.sample_set[1].value, 6) + self.assertEqual(sampler.sample_set[1].sample_count, 2) + + sampler.sample(Exemplar(1, time()), bucket_index=0) + sampler.sample(Exemplar(9, time()), bucket_index=3) + + self.assertEqual(len(sampler.sample_set), 4) + self.assertEqual(sampler.sample_set[0].sample_count, 1) + self.assertEqual(sampler.sample_set[1].sample_count, 1) + self.assertEqual(sampler.sample_set[2].sample_count, 2) + self.assertEqual(sampler.sample_set[3].sample_count, 1) + + def test_merge(self): + sampler = BucketedExemplarSampler(1, boundaries=[3, 4, 6]) + + self.assertEqual( + len(sampler.merge([Exemplar(1, time())], [Exemplar(2, time())])), 1 + ) + + self.assertEqual( + len( + sampler.merge( + [Exemplar(1, time()), Exemplar(5, time())], + [Exemplar(2, time())], + ) + ), + 1, + ) + + +class TestExemplarManager(unittest.TestCase): + def test_statistical(self): + config = {"statistical_exemplars": True, "num_exemplars": 1} + manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) + self.assertIsInstance(manager.exemplar_sampler, RandomExemplarSampler) + manager.sample(5, {"dropped_label": "value"}) + self.assertEqual(len(manager.exemplar_sampler.sample_set), 1) + self.assertEqual(manager.exemplar_sampler.sample_set[0].value, 5) + self.assertEqual( + manager.exemplar_sampler.sample_set[0].dropped_labels, + {"dropped_label": "value"}, + ) + + checkpoint = manager.take_checkpoint() + self.assertEqual(len(checkpoint), 1) + self.assertEqual(checkpoint[0].value, 5) + + self.assertEqual(len(manager.exemplar_sampler.sample_set), 0) + + merged = manager.merge([Exemplar(2, time())], [Exemplar(3, time())]) + self.assertEqual(len(merged), 1) + + def test_trace(self): + config = {"statistical_exemplars": False, "num_exemplars": 1} + manager = ExemplarManager( + config, MinMaxExemplarSampler, RandomExemplarSampler + ) + self.assertIsInstance(manager.exemplar_sampler, MinMaxExemplarSampler) + + merged = manager.merge([Exemplar(2, time())], [Exemplar(3, time())]) + self.assertEqual(len(merged), 1) + + +class TestStandardExemplars(unittest.TestCase): + def _no_exemplars_test(self, aggregator): + config = {} + agg = aggregator(config=config) + agg.update(3) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(agg.checkpoint_exemplars, []) + + other_agg = aggregator( + config={"num_exemplars": 2, "statistical_exemplars": True} + ) + other_agg.update(2) + other_agg.update(4) + other_agg.take_checkpoint() + self.assertEqual(len(other_agg.checkpoint_exemplars), 2) + agg.merge(other_agg) + self.assertEqual(agg.checkpoint_exemplars, []) + + def _simple_exemplars_test(self, aggregator): + config = {"num_exemplars": 2, "statistical_exemplars": True} + agg = aggregator(config=config) + agg.update(2, dropped_labels={"dropped_label": "value"}) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + self.assertEqual(agg.checkpoint_exemplars[0].value, 2) + self.assertEqual( + agg.checkpoint_exemplars[0].dropped_labels, + {"dropped_label": "value"}, + ) + + agg.update(2) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + self.assertEqual(agg.checkpoint_exemplars[1].value, 5) + + agg.update(2) + agg.update(5) + + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument + return 1 + + with patch("random.randint", _patched_randint): + agg.update(7) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + self.assertEqual(agg.checkpoint_exemplars[0].value, 2) + self.assertEqual(agg.checkpoint_exemplars[1].value, 7) + + def _record_traces_only_test(self, aggregator): + config = {"num_exemplars": 2} + agg = aggregator(config=config) + + agg.update(2) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + + # Test with sampler on/off + tp = TracerProvider(sampler=ALWAYS_ON) + tracer = tp.get_tracer(__name__) + + span = tracer.start_span("Test Span ON") + with tracer.use_span(span): + agg.update(5) + agg.update(7) + agg.update(6) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + + self.assertEqual( + agg.checkpoint_exemplars[0].span_id, span.context.span_id + ) + self.assertEqual(agg.checkpoint_exemplars[0].value, 5) + self.assertEqual(agg.checkpoint_exemplars[1].value, 7) + + tp = TracerProvider(sampler=ALWAYS_OFF) + tracer = tp.get_tracer(__name__) + + with tracer.start_as_current_span("Test Span OFF"): + agg.update(5) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + + def _merge_aggregators_test(self, aggregator): + config = {"num_exemplars": 2, "statistical_exemplars": True} + + agg1 = aggregator(config=config) + agg2 = aggregator(config=config) + + agg1.update(1) + agg1.take_checkpoint() + + agg2.update(2) + agg2.take_checkpoint() + + self.assertEqual(len(agg1.checkpoint_exemplars), 1) + self.assertEqual(len(agg2.checkpoint_exemplars), 1) + + agg1.merge(agg2) + + self.assertEqual(len(agg1.checkpoint_exemplars), 1) + + def test_sum_aggregator(self): + self._no_exemplars_test(SumAggregator) + self._simple_exemplars_test(SumAggregator) + self._record_traces_only_test(SumAggregator) + self._merge_aggregators_test(SumAggregator) + + def test_mmsc_aggregator(self): + self._no_exemplars_test(MinMaxSumCountAggregator) + self._simple_exemplars_test(MinMaxSumCountAggregator) + self._record_traces_only_test(MinMaxSumCountAggregator) + self._merge_aggregators_test(MinMaxSumCountAggregator) + + def test_observer_aggregator(self): + self._no_exemplars_test(ValueObserverAggregator) + self._simple_exemplars_test(ValueObserverAggregator) + self._record_traces_only_test(ValueObserverAggregator) + self._merge_aggregators_test(ValueObserverAggregator) + + +class TestHistogramExemplars(unittest.TestCase): + def test_no_exemplars(self): + config = {"bounds": [2, 4, 6]} + agg = HistogramAggregator(config=config) + agg.update(3) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(agg.checkpoint_exemplars, []) + + other_agg = HistogramAggregator( + config=dict( + config, **{"num_exemplars": 1, "statistical_exemplars": True} + ) + ) + + other_agg.update(3) + other_agg.update(5) + other_agg.take_checkpoint() + self.assertEqual(len(other_agg.checkpoint_exemplars), 2) + + agg.merge(other_agg) + self.assertEqual(agg.checkpoint_exemplars, []) + + def test_simple_exemplars(self): + config = { + "bounds": [2, 4, 7], + "num_exemplars": 1, + "statistical_exemplars": True, + } + agg = HistogramAggregator(config=config) + agg.update(2, dropped_labels={"dropped_label": "value"}) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + self.assertEqual(agg.checkpoint_exemplars[0].value, 2) + self.assertEqual( + agg.checkpoint_exemplars[0].dropped_labels, + {"dropped_label": "value"}, + ) + + agg.update(2) + agg.update(5) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 2) + self.assertEqual(agg.checkpoint_exemplars[1].value, 5) + + agg.update(5) + + def _patched_randint(minimum, maximum): + # pylint: disable=unused-argument + return 0 + + with patch("random.randint", _patched_randint): + agg.update(6) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + self.assertEqual(agg.checkpoint_exemplars[0].value, 6) + + agg.update(1) + agg.update(3) + agg.update(6) + agg.update(9) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 4) + + def test_record_traces_only(self): + config = { + "bounds": [2, 4, 6], + "num_exemplars": 2, + "statistical_exemplars": False, + } + agg = HistogramAggregator(config=config) + + agg.update(2) + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + + # Test with sampler on/off + tp = TracerProvider(sampler=ALWAYS_ON) + tracer = tp.get_tracer(__name__) + + span = tracer.start_span("Test Span ON") + with tracer.use_span(span): + agg.update(5) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 1) + + self.assertEqual( + agg.checkpoint_exemplars[0].span_id, span.context.span_id + ) + + tp = TracerProvider(sampler=ALWAYS_OFF) + tracer = tp.get_tracer(__name__) + + with tracer.start_as_current_span("Test Span OFF"): + agg.update(5) + + agg.take_checkpoint() + self.assertEqual(len(agg.checkpoint_exemplars), 0) + + +class TestFullPipelineExemplars(unittest.TestCase): + def test_histogram(self): + # Use the meter type provided by the SDK package + metrics.set_meter_provider(MeterProvider()) + meter = metrics.get_meter(__name__) + exporter = InMemoryMetricsExporter() + controller = PushController(meter, exporter, 5) + + requests_size = meter.create_metric( + name="requests_size", + description="size of requests", + unit="1", + value_type=int, + metric_type=ValueRecorder, + ) + + size_view = View( + requests_size, + HistogramAggregator, + aggregator_config={ + "bounds": (20, 40, 60, 80, 100), + "num_exemplars": 1, + "statistical_exemplars": True, + }, + label_keys=["environment"], + view_config=ViewConfig.LABEL_KEYS, + ) + + meter.register_view(size_view) + + # Since this is using the HistogramAggregator, the bucket counts will be reflected + # with each record + requests_size.record(25, {"environment": "staging", "test": "value"}) + requests_size.record(1, {"environment": "staging", "test": "value"}) + requests_size.record(200, {"environment": "staging", "test": "value"}) + + controller.tick() + metrics_list = exporter.get_exported_metrics() + self.assertEqual(len(metrics_list), 1) + exemplars = metrics_list[0].aggregator.checkpoint_exemplars + self.assertEqual(len(exemplars), 3) + self.assertEqual( + [ + (exemplar.value, exemplar.dropped_labels) + for exemplar in exemplars + ], + [ + (1, (("test", "value"),)), + (25, (("test", "value"),)), + (200, (("test", "value"),)), + ], + ) diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 99aa9c4a629..c262a5dd202 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -50,7 +50,7 @@ def test_export(self): labels = {"environment": "staging"} aggregator = SumAggregator() record = MetricRecord(metric, labels, aggregator) - result = '{}(data="{}", labels="{}", value={})'.format( + result = '{}(data="{}", labels="{}", value={}, exemplars=[])'.format( ConsoleMetricsExporter.__name__, metric, labels, diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index b854f2d5db9..d620d5eb6f5 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -496,7 +496,7 @@ def test_add(self): view_datas_mock = mock.Mock() bound_metric.view_datas = [view_datas_mock] bound_metric.add(3) - view_datas_mock.record.assert_called_once_with(3) + view_datas_mock.record.assert_called_once_with(3, ()) def test_add_disabled(self): meter_mock = mock.Mock() @@ -538,7 +538,7 @@ def test_record(self): view_datas_mock = mock.Mock() bound_valuerecorder.view_datas = [view_datas_mock] bound_valuerecorder.record(3) - view_datas_mock.record.assert_called_once_with(3) + view_datas_mock.record.assert_called_once_with(3, ()) def test_record_disabled(self): meter_mock = mock.Mock()