Skip to content

Commit 0be0964

Browse files
committed
linting
1 parent a281776 commit 0be0964

File tree

8 files changed

+358
-125
lines changed

8 files changed

+358
-125
lines changed

docs/examples/exemplars/semantic_exemplars.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,19 @@
2020
import time
2121

2222
from opentelemetry import metrics
23-
from opentelemetry.sdk.metrics import (
24-
MeterProvider,
25-
ValueRecorder,
26-
)
23+
from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder
2724
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
28-
from opentelemetry.sdk.metrics.export.aggregate import (
29-
HistogramAggregator,
30-
)
25+
from opentelemetry.sdk.metrics.export.aggregate import HistogramAggregator
3126
from opentelemetry.sdk.metrics.view import View, ViewConfig
3227

3328
# Set up OpenTelemetry metrics
3429
metrics.set_meter_provider(MeterProvider(stateful=False))
3530
meter = metrics.get_meter(__name__)
3631

3732
# Use the Google Cloud Monitoring Metrics Exporter since its the only one that currently supports exemplars
38-
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 10)
33+
metrics.get_meter_provider().start_pipeline(
34+
meter, ConsoleMetricsExporter(), 10
35+
)
3936

4037
# Create our duration metric
4138
request_duration = meter.create_metric(
@@ -53,8 +50,26 @@
5350
# [>=0ms, >=25ms, >=50ms, >=75ms, >=100ms, >=200ms, >=400ms, >=600ms, >=800ms, >=1s, >=2s, >=4s, >=6s]
5451
# We want to generate 1 exemplar per bucket, where each exemplar has a linked trace that was recorded.
5552
# So we need to set num_exemplars to 1 and not specify statistical_exemplars (defaults to false)
56-
HistogramAggregator(config={"bounds": [0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000],
57-
"num_exemplars": 1}),
53+
HistogramAggregator(
54+
config={
55+
"bounds": [
56+
0,
57+
25,
58+
50,
59+
75,
60+
100,
61+
200,
62+
400,
63+
600,
64+
800,
65+
1000,
66+
2000,
67+
4000,
68+
6000,
69+
],
70+
"num_exemplars": 1,
71+
}
72+
),
5873
label_keys=["environment"],
5974
config=ViewConfig.LABEL_KEYS,
6075
)
@@ -63,5 +78,8 @@
6378

6479
for i in range(100):
6580
# Generate some random data for the histogram with a dropped label "customer_id"
66-
request_duration.record(random.randint(1, 8000), {"environment": "staging", "customer_id": random.randint(1, 100)})
81+
request_duration.record(
82+
random.randint(1, 8000),
83+
{"environment": "staging", "customer_id": random.randint(1, 100)},
84+
)
6785
time.sleep(1)

docs/examples/exemplars/statistical_exemplars.py

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
import numpy as np
2-
import matplotlib.pyplot as plt
31
import random
4-
52
from collections import defaultdict
63

4+
import matplotlib.pyplot as plt
5+
import numpy as np
76
from opentelemetry import metrics
87
from opentelemetry.sdk.metrics import Counter, MeterProvider
98
from opentelemetry.sdk.metrics.export.aggregate import SumAggregator
109
from opentelemetry.sdk.metrics.export.controller import PushController
11-
from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter
10+
from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import (
11+
InMemoryMetricsExporter,
12+
)
1213
from opentelemetry.sdk.metrics.view import View, ViewConfig
1314

14-
## set up opentelemetry
15+
# set up opentelemetry
1516

1617
# Sets the global MeterProvider instance
1718
metrics.set_meter_provider(MeterProvider())
@@ -46,7 +47,8 @@
4647

4748
meter.register_view(counter_view)
4849

49-
## generate the random metric data
50+
# generate the random metric data
51+
5052

5153
def unknown_customer_calls():
5254
"""Generate customer call data to our application"""
@@ -57,23 +59,49 @@ def unknown_customer_calls():
5759
random.seed(1)
5860

5961
# customer 123 is a big user, and made 1000 requests in this timeframe
60-
requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100
62+
requests = np.random.normal(
63+
1000, 250, 1000
64+
) # 1000 requests with average 1000 bytes, covariance 100
6165

6266
for request in requests:
63-
bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 123})
67+
bytes_counter.add(
68+
int(request),
69+
{
70+
"environment": "production",
71+
"method": "REST",
72+
"customer_id": 123,
73+
},
74+
)
6475

6576
# customer 247 is another big user, making fewer, but bigger requests
66-
requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes
77+
requests = np.random.normal(
78+
5000, 1250, 200
79+
) # 200 requests with average size of 5k bytes
6780

6881
for request in requests:
69-
bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 247})
82+
bytes_counter.add(
83+
int(request),
84+
{
85+
"environment": "production",
86+
"method": "REST",
87+
"customer_id": 247,
88+
},
89+
)
7090

7191
# There are many other smaller customers
7292
for customer_id in range(250):
7393
requests = np.random.normal(1000, 250, np.random.randint(1, 10))
7494
method = "REST" if np.random.randint(2) else "gRPC"
7595
for request in requests:
76-
bytes_counter.add(int(request), {"environment": "production", "method": method, "customer_id": customer_id})
96+
bytes_counter.add(
97+
int(request),
98+
{
99+
"environment": "production",
100+
"method": method,
101+
"customer_id": customer_id,
102+
},
103+
)
104+
77105

78106
unknown_customer_calls()
79107

@@ -93,10 +121,15 @@ def unknown_customer_calls():
93121
customer_bytes_map[exemplar.dropped_labels] += exemplar.value
94122

95123

96-
customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)
124+
customer_bytes_list = sorted(
125+
list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True
126+
)
97127

98128
# Save our top 5 customers and sum all of the rest into "Others".
99-
top_5_customers = [("Customer {}".format(dict(val[0])["customer_id"]), val[1]) for val in customer_bytes_list[:5]] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))]
129+
top_5_customers = [
130+
("Customer {}".format(dict(val[0])["customer_id"]), val[1])
131+
for val in customer_bytes_list[:5]
132+
] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))]
100133

101134
# unzip the data into X (sizes of each customer's contribution) and labels
102135
labels, X = zip(*top_5_customers)
@@ -106,26 +139,45 @@ def unknown_customer_calls():
106139
plt.show()
107140

108141
# Estimate how many bytes customer 123 sent
109-
customer_123_bytes = customer_bytes_map[(("customer_id", 123), ("method", "REST"))]
142+
customer_123_bytes = customer_bytes_map[
143+
(("customer_id", 123), ("method", "REST"))
144+
]
110145

111146
# Since the exemplars were randomly sampled, all sample_counts will be the same
112147
sample_count = exemplars[0].sample_count
113148
print("sample count", sample_count, "custmer", customer_123_bytes)
114149
full_customer_123_bytes = sample_count * customer_123_bytes
115150

116151
# With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)
117-
print("Customer 123 sent about {} bytes this interval".format(int(full_customer_123_bytes)))
152+
print(
153+
"Customer 123 sent about {} bytes this interval".format(
154+
int(full_customer_123_bytes)
155+
)
156+
)
118157

119158
# Determine the top 25 customers by how many bytes they sent in exemplars
120159
top_25_customers = customer_bytes_list[:25]
121160

122161
# out of those 25 customers, determine how many used grpc, and come up with a ratio
123-
percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == "gRPC", top_25_customers))) / len(top_25_customers)
124-
125-
print("~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(int(percent_grpc*100)))
162+
percent_grpc = len(
163+
list(
164+
filter(
165+
lambda customer_value: customer_value[0][1][1] == "gRPC",
166+
top_25_customers,
167+
)
168+
)
169+
) / len(top_25_customers)
170+
171+
print(
172+
"~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(
173+
int(percent_grpc * 100)
174+
)
175+
)
126176

127177
# Determine the 50th, 90th, and 99th percentile of byte size sent in
128-
quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99])
178+
quantiles = np.quantile(
179+
[exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99]
180+
)
129181
print("50th Percentile Bytes In:", int(quantiles[0]))
130182
print("90th Percentile Bytes In:", int(quantiles[1]))
131183
print("99th Percentile Bytes In:", int(quantiles[2]))

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def export(
8282
record.instrument,
8383
record.labels,
8484
record.aggregator.checkpoint,
85-
record.aggregator.checkpoint_exemplars
85+
record.aggregator.checkpoint_exemplars,
8686
)
8787
)
8888
return MetricsExportResult.SUCCESS

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,14 @@
1616
import logging
1717
import threading
1818
from collections import OrderedDict, namedtuple
19-
import itertools
2019

21-
from collections import namedtuple, OrderedDict
22-
from opentelemetry.util import time_ns
2320
from opentelemetry.sdk.metrics.export.exemplars import (
24-
Exemplar,
25-
RandomExemplarSampler,
26-
MinMaxExemplarSampler,
2721
BucketedExemplarSampler,
28-
ExemplarManager
22+
ExemplarManager,
23+
MinMaxExemplarSampler,
24+
RandomExemplarSampler,
2925
)
26+
from opentelemetry.util import time_ns
3027

3128
logger = logging.getLogger(__name__)
3229

@@ -67,7 +64,9 @@ def __init__(self, config=None):
6764
self.checkpoint = 0
6865
self._lock = threading.Lock()
6966
self.last_update_timestamp = None
70-
self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler)
67+
self.exemplar_manager = ExemplarManager(
68+
config, MinMaxExemplarSampler, RandomExemplarSampler
69+
)
7170

7271
def update(self, value, dropped_labels=None):
7372
with self._lock:
@@ -89,7 +88,9 @@ def merge(self, other):
8988
self.last_update_timestamp = get_latest_timestamp(
9089
self.last_update_timestamp, other.last_update_timestamp
9190
)
92-
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
91+
self.checkpoint_exemplars = self.exemplar_manager.merge(
92+
self.checkpoint_exemplars, other.checkpoint_exemplars
93+
)
9394

9495

9596
class MinMaxSumCountAggregator(Aggregator):
@@ -118,7 +119,9 @@ def __init__(self, config=None):
118119
self._lock = threading.Lock()
119120
self.last_update_timestamp = None
120121

121-
self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler)
122+
self.exemplar_manager = ExemplarManager(
123+
config, MinMaxExemplarSampler, RandomExemplarSampler
124+
)
122125

123126
def update(self, value, dropped_labels=None):
124127
with self._lock:
@@ -151,7 +154,9 @@ def merge(self, other):
151154
self.last_update_timestamp = get_latest_timestamp(
152155
self.last_update_timestamp, other.last_update_timestamp
153156
)
154-
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
157+
self.checkpoint_exemplars = self.exemplar_manager.merge(
158+
self.checkpoint_exemplars, other.checkpoint_exemplars
159+
)
155160

156161

157162
class HistogramAggregator(Aggregator):
@@ -161,15 +166,20 @@ def __init__(self, config=None):
161166
super().__init__(config=config)
162167
self._lock = threading.Lock()
163168
self.last_update_timestamp = None
164-
boundaries = self.config.get('bounds', None)
169+
boundaries = self.config.get("bounds", None)
165170
if boundaries and self._validate_boundaries(boundaries):
166171
self._boundaries = boundaries
167172
else:
168173
self._boundaries = (10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
169174
self.current = OrderedDict([(bb, 0) for bb in self._boundaries])
170175
self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries])
171176

172-
self.exemplar_manager = ExemplarManager(config, BucketedExemplarSampler, BucketedExemplarSampler, boundaries=self._boundaries)
177+
self.exemplar_manager = ExemplarManager(
178+
config,
179+
BucketedExemplarSampler,
180+
BucketedExemplarSampler,
181+
boundaries=self._boundaries,
182+
)
173183

174184
self.current[">"] = 0
175185
self.checkpoint[">"] = 0
@@ -205,14 +215,18 @@ def update(self, value, dropped_labels=None):
205215
# greater than max value
206216
if value >= self._boundaries[len(self._boundaries) - 1]:
207217
self.current[">"] += 1
208-
self.exemplar_manager.sample(value, dropped_labels, bucket_index=len(self._boundaries))
218+
self.exemplar_manager.sample(
219+
value, dropped_labels, bucket_index=len(self._boundaries)
220+
)
209221
else:
210222
for index, bb in enumerate(self._boundaries):
211223
# find first bucket that value is less than
212224
if value < bb:
213225
self.current[bb] += 1
214226

215-
self.exemplar_manager.sample(value, dropped_labels, bucket_index=index)
227+
self.exemplar_manager.sample(
228+
value, dropped_labels, bucket_index=index
229+
)
216230
break
217231
self.last_update_timestamp = time_ns()
218232

@@ -232,7 +246,9 @@ def merge(self, other):
232246
self.checkpoint, other.checkpoint
233247
)
234248

235-
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
249+
self.checkpoint_exemplars = self.exemplar_manager.merge(
250+
self.checkpoint_exemplars, other.checkpoint_exemplars
251+
)
236252

237253
self.last_update_timestamp = get_latest_timestamp(
238254
self.last_update_timestamp, other.last_update_timestamp

0 commit comments

Comments
 (0)