Skip to content

Commit 1aa84b4

Browse files
committed
linting
1 parent 5f7409e commit 1aa84b4

File tree

10 files changed

+358
-130
lines changed

10 files changed

+358
-130
lines changed

docs/examples/exemplars/semantic_exemplars.py

+27-11
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,19 @@
2020
import time
2121

2222
from opentelemetry import metrics
23-
from opentelemetry.sdk.metrics import (
24-
MeterProvider,
25-
ValueRecorder,
26-
)
23+
from opentelemetry.sdk.metrics import MeterProvider, ValueRecorder
2724
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
28-
from opentelemetry.sdk.metrics.export.aggregate import (
29-
HistogramAggregator,
30-
)
25+
from opentelemetry.sdk.metrics.export.aggregate import HistogramAggregator
3126
from opentelemetry.sdk.metrics.view import View, ViewConfig
3227

3328
# Set up OpenTelemetry metrics
3429
metrics.set_meter_provider(MeterProvider(stateful=False))
3530
meter = metrics.get_meter(__name__)
3631

3732
# Use the Google Cloud Monitoring Metrics Exporter since its the only one that currently supports exemplars
38-
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 10)
33+
metrics.get_meter_provider().start_pipeline(
34+
meter, ConsoleMetricsExporter(), 10
35+
)
3936

4037
# Create our duration metric
4138
request_duration = meter.create_metric(
@@ -54,8 +51,24 @@
5451
# We want to generate 1 exemplar per bucket, where each exemplar has a linked trace that was recorded.
5552
# So we need to set num_exemplars to 1 and not specify statistical_exemplars (defaults to false)
5653
HistogramAggregator,
57-
aggregator_config={"bounds": [0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000],
58-
"num_exemplars": 1},
54+
aggregator_config={
55+
"bounds": [
56+
0,
57+
25,
58+
50,
59+
75,
60+
100,
61+
200,
62+
400,
63+
600,
64+
800,
65+
1000,
66+
2000,
67+
4000,
68+
6000,
69+
],
70+
"num_exemplars": 1,
71+
},
5972
label_keys=["environment"],
6073
view_config=ViewConfig.LABEL_KEYS,
6174
)
@@ -64,5 +77,8 @@
6477

6578
for i in range(100):
6679
# Generate some random data for the histogram with a dropped label "customer_id"
67-
request_duration.record(random.randint(1, 8000), {"environment": "staging", "customer_id": random.randint(1, 100)})
80+
request_duration.record(
81+
random.randint(1, 8000),
82+
{"environment": "staging", "customer_id": random.randint(1, 100)},
83+
)
6884
time.sleep(1)

docs/examples/exemplars/statistical_exemplars.py

+71-19
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
import numpy as np
2-
import matplotlib.pyplot as plt
31
import random
4-
52
from collections import defaultdict
63

4+
import matplotlib.pyplot as plt
5+
import numpy as np
76
from opentelemetry import metrics
87
from opentelemetry.sdk.metrics import Counter, MeterProvider
98
from opentelemetry.sdk.metrics.export.aggregate import SumAggregator
109
from opentelemetry.sdk.metrics.export.controller import PushController
11-
from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import InMemoryMetricsExporter
10+
from opentelemetry.sdk.metrics.export.in_memory_metrics_exporter import (
11+
InMemoryMetricsExporter,
12+
)
1213
from opentelemetry.sdk.metrics.view import View, ViewConfig
1314

14-
## set up opentelemetry
15+
# set up opentelemetry
1516

1617
# Sets the global MeterProvider instance
1718
metrics.set_meter_provider(MeterProvider())
@@ -47,7 +48,8 @@
4748

4849
meter.register_view(counter_view)
4950

50-
## generate the random metric data
51+
# generate the random metric data
52+
5153

5254
def unknown_customer_calls():
5355
"""Generate customer call data to our application"""
@@ -58,23 +60,49 @@ def unknown_customer_calls():
5860
random.seed(1)
5961

6062
# customer 123 is a big user, and made 1000 requests in this timeframe
61-
requests = np.random.normal(1000, 250, 1000) # 1000 requests with average 1000 bytes, covariance 100
63+
requests = np.random.normal(
64+
1000, 250, 1000
65+
) # 1000 requests with average 1000 bytes, covariance 100
6266

6367
for request in requests:
64-
bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 123})
68+
bytes_counter.add(
69+
int(request),
70+
{
71+
"environment": "production",
72+
"method": "REST",
73+
"customer_id": 123,
74+
},
75+
)
6576

6677
# customer 247 is another big user, making fewer, but bigger requests
67-
requests = np.random.normal(5000, 1250, 200) # 200 requests with average size of 5k bytes
78+
requests = np.random.normal(
79+
5000, 1250, 200
80+
) # 200 requests with average size of 5k bytes
6881

6982
for request in requests:
70-
bytes_counter.add(int(request), {"environment": "production", "method": "REST", "customer_id": 247})
83+
bytes_counter.add(
84+
int(request),
85+
{
86+
"environment": "production",
87+
"method": "REST",
88+
"customer_id": 247,
89+
},
90+
)
7191

7292
# There are many other smaller customers
7393
for customer_id in range(250):
7494
requests = np.random.normal(1000, 250, np.random.randint(1, 10))
7595
method = "REST" if np.random.randint(2) else "gRPC"
7696
for request in requests:
77-
bytes_counter.add(int(request), {"environment": "production", "method": method, "customer_id": customer_id})
97+
bytes_counter.add(
98+
int(request),
99+
{
100+
"environment": "production",
101+
"method": method,
102+
"customer_id": customer_id,
103+
},
104+
)
105+
78106

79107
unknown_customer_calls()
80108

@@ -94,10 +122,15 @@ def unknown_customer_calls():
94122
customer_bytes_map[exemplar.dropped_labels] += exemplar.value
95123

96124

97-
customer_bytes_list = sorted(list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True)
125+
customer_bytes_list = sorted(
126+
list(customer_bytes_map.items()), key=lambda t: t[1], reverse=True
127+
)
98128

99129
# Save our top 5 customers and sum all of the rest into "Others".
100-
top_5_customers = [("Customer {}".format(dict(val[0])["customer_id"]), val[1]) for val in customer_bytes_list[:5]] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))]
130+
top_5_customers = [
131+
("Customer {}".format(dict(val[0])["customer_id"]), val[1])
132+
for val in customer_bytes_list[:5]
133+
] + [("Other Customers", sum([val[1] for val in customer_bytes_list[5:]]))]
101134

102135
# unzip the data into X (sizes of each customer's contribution) and labels
103136
labels, X = zip(*top_5_customers)
@@ -107,26 +140,45 @@ def unknown_customer_calls():
107140
plt.show()
108141

109142
# Estimate how many bytes customer 123 sent
110-
customer_123_bytes = customer_bytes_map[(("customer_id", 123), ("method", "REST"))]
143+
customer_123_bytes = customer_bytes_map[
144+
(("customer_id", 123), ("method", "REST"))
145+
]
111146

112147
# Since the exemplars were randomly sampled, all sample_counts will be the same
113148
sample_count = exemplars[0].sample_count
114149
print("sample count", sample_count, "custmer", customer_123_bytes)
115150
full_customer_123_bytes = sample_count * customer_123_bytes
116151

117152
# With seed == 1 we get 1008612 - quite close to the statistical mean of 1000000! (more exemplars would make this estimation even more accurate)
118-
print("Customer 123 sent about {} bytes this interval".format(int(full_customer_123_bytes)))
153+
print(
154+
"Customer 123 sent about {} bytes this interval".format(
155+
int(full_customer_123_bytes)
156+
)
157+
)
119158

120159
# Determine the top 25 customers by how many bytes they sent in exemplars
121160
top_25_customers = customer_bytes_list[:25]
122161

123162
# out of those 25 customers, determine how many used grpc, and come up with a ratio
124-
percent_grpc = len(list(filter(lambda customer_value: customer_value[0][1][1] == "gRPC", top_25_customers))) / len(top_25_customers)
125-
126-
print("~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(int(percent_grpc*100)))
163+
percent_grpc = len(
164+
list(
165+
filter(
166+
lambda customer_value: customer_value[0][1][1] == "gRPC",
167+
top_25_customers,
168+
)
169+
)
170+
) / len(top_25_customers)
171+
172+
print(
173+
"~{}% of the top 25 customers (by bytes in) used gRPC this interval".format(
174+
int(percent_grpc * 100)
175+
)
176+
)
127177

128178
# Determine the 50th, 90th, and 99th percentile of byte size sent in
129-
quantiles = np.quantile([exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99])
179+
quantiles = np.quantile(
180+
[exemplar.value for exemplar in exemplars], [0.5, 0.9, 0.99]
181+
)
130182
print("50th Percentile Bytes In:", int(quantiles[0]))
131183
print("90th Percentile Bytes In:", int(quantiles[1]))
132184
print("99th Percentile Bytes In:", int(quantiles[2]))

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def update(self, value: metrics_api.ValueT):
7575
with self._view_datas_lock:
7676
# record the value for each view_data belonging to this aggregator
7777
for view_data in self.view_datas:
78-
view_data.record(value)
78+
view_data.record(value, self._labels)
7979

8080
def release(self):
8181
self.decrease_ref_count()

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def export(
8282
record.instrument,
8383
record.labels,
8484
record.aggregator.checkpoint,
85-
record.aggregator.checkpoint_exemplars
85+
record.aggregator.checkpoint_exemplars,
8686
)
8787
)
8888
return MetricsExportResult.SUCCESS

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py

+31-15
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,14 @@
1616
import logging
1717
import threading
1818
from collections import OrderedDict, namedtuple
19-
import itertools
2019

21-
from collections import namedtuple, OrderedDict
22-
from opentelemetry.util import time_ns
2320
from opentelemetry.sdk.metrics.export.exemplars import (
24-
Exemplar,
25-
RandomExemplarSampler,
26-
MinMaxExemplarSampler,
2721
BucketedExemplarSampler,
28-
ExemplarManager
22+
ExemplarManager,
23+
MinMaxExemplarSampler,
24+
RandomExemplarSampler,
2925
)
26+
from opentelemetry.util import time_ns
3027

3128
logger = logging.getLogger(__name__)
3229

@@ -69,7 +66,9 @@ def __init__(self, config=None):
6966
self.checkpoint = 0
7067
self._lock = threading.Lock()
7168
self.last_update_timestamp = None
72-
self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler)
69+
self.exemplar_manager = ExemplarManager(
70+
config, MinMaxExemplarSampler, RandomExemplarSampler
71+
)
7372

7473
def update(self, value, dropped_labels=None):
7574
with self._lock:
@@ -91,7 +90,9 @@ def merge(self, other):
9190
self.last_update_timestamp = get_latest_timestamp(
9291
self.last_update_timestamp, other.last_update_timestamp
9392
)
94-
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
93+
self.checkpoint_exemplars = self.exemplar_manager.merge(
94+
self.checkpoint_exemplars, other.checkpoint_exemplars
95+
)
9596

9697

9798
class MinMaxSumCountAggregator(Aggregator):
@@ -120,7 +121,9 @@ def __init__(self, config=None):
120121
self._lock = threading.Lock()
121122
self.last_update_timestamp = None
122123

123-
self.exemplar_manager = ExemplarManager(config, MinMaxExemplarSampler, RandomExemplarSampler)
124+
self.exemplar_manager = ExemplarManager(
125+
config, MinMaxExemplarSampler, RandomExemplarSampler
126+
)
124127

125128
def update(self, value, dropped_labels=None):
126129
with self._lock:
@@ -153,7 +156,9 @@ def merge(self, other):
153156
self.last_update_timestamp = get_latest_timestamp(
154157
self.last_update_timestamp, other.last_update_timestamp
155158
)
156-
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
159+
self.checkpoint_exemplars = self.exemplar_manager.merge(
160+
self.checkpoint_exemplars, other.checkpoint_exemplars
161+
)
157162

158163

159164
class HistogramAggregator(Aggregator):
@@ -173,7 +178,12 @@ def __init__(self, config=None):
173178
self.current = OrderedDict([(bb, 0) for bb in self._boundaries])
174179
self.checkpoint = OrderedDict([(bb, 0) for bb in self._boundaries])
175180

176-
self.exemplar_manager = ExemplarManager(config, BucketedExemplarSampler, BucketedExemplarSampler, boundaries=self._boundaries)
181+
self.exemplar_manager = ExemplarManager(
182+
config,
183+
BucketedExemplarSampler,
184+
BucketedExemplarSampler,
185+
boundaries=self._boundaries,
186+
)
177187

178188
self.current[">"] = 0
179189
self.checkpoint[">"] = 0
@@ -209,14 +219,18 @@ def update(self, value, dropped_labels=None):
209219
# greater than max value
210220
if value >= self._boundaries[len(self._boundaries) - 1]:
211221
self.current[">"] += 1
212-
self.exemplar_manager.sample(value, dropped_labels, bucket_index=len(self._boundaries))
222+
self.exemplar_manager.sample(
223+
value, dropped_labels, bucket_index=len(self._boundaries)
224+
)
213225
else:
214226
for index, bb in enumerate(self._boundaries):
215227
# find first bucket that value is less than
216228
if value < bb:
217229
self.current[bb] += 1
218230

219-
self.exemplar_manager.sample(value, dropped_labels, bucket_index=index)
231+
self.exemplar_manager.sample(
232+
value, dropped_labels, bucket_index=index
233+
)
220234
break
221235
self.last_update_timestamp = time_ns()
222236

@@ -236,7 +250,9 @@ def merge(self, other):
236250
self.checkpoint, other.checkpoint
237251
)
238252

239-
self.checkpoint_exemplars = self.exemplar_manager.merge(self.checkpoint_exemplars, other.checkpoint_exemplars)
253+
self.checkpoint_exemplars = self.exemplar_manager.merge(
254+
self.checkpoint_exemplars, other.checkpoint_exemplars
255+
)
240256

241257
self.last_update_timestamp = get_latest_timestamp(
242258
self.last_update_timestamp, other.last_update_timestamp

0 commit comments

Comments
 (0)