-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathglean_ping.py
More file actions
571 lines (482 loc) · 22.8 KB
/
Copy pathglean_ping.py
File metadata and controls
571 lines (482 loc) · 22.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import copy
import logging
from collections import defaultdict
from datetime import datetime
from functools import cache
from pathlib import Path
from typing import Any, Dict, List, Set
import yaml
from requests import HTTPError
from .config import Config
from .generic_ping import GenericPing
from .probes import GleanProbe
from .schema import Schema
ROOT_DIR = Path(__file__).parent
BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt"
METRIC_BLOCKLIST = ROOT_DIR / "configs" / "metric_blocklist.yaml"
logger = logging.getLogger(__name__)
SCHEMA_URL_TEMPLATE = (
"https://raw.githubusercontent.com"
"/mozilla-services/mozilla-pipeline-schemas"
"/{branch}/schemas/glean/glean/"
)
SCHEMA_VERSION_TEMPLATE = "{schema_type}.{version}.schema.json"
DEFAULT_SCHEMA_URL = SCHEMA_URL_TEMPLATE + SCHEMA_VERSION_TEMPLATE.format(
schema_type="glean", version=1
)
class GleanPing(GenericPing):
probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
dependencies_url_template = (
GenericPing.probe_info_base_url + "/glean/{}/dependencies"
)
app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings"
default_dependencies = ["glean-core"]
with open(BUG_1737656_TXT, "r") as f:
bug_1737656_affected_tables = [
line.strip() for line in f.readlines() if line.strip()
]
def __init__(
self, repo, version=1, use_metrics_blocklist=False, **kwargs
): # TODO: Make env-url optional
self.repo = repo
self.repo_name = repo["name"]
self.app_id = repo["app_id"]
self.version = version
if use_metrics_blocklist:
self.metric_blocklist = self.get_metric_blocklist()
else:
self.metric_blocklist = {}
super().__init__(
DEFAULT_SCHEMA_URL,
DEFAULT_SCHEMA_URL,
self.probes_url_template.format(self.repo_name),
**kwargs,
)
def get_schema(self, generic_schema=False) -> Schema:
"""
Fetch schema via URL.
Unless *generic_schema* is set to true, this function makes some modifications
to allow some workarounds for proper injection of metrics.
"""
schema = super().get_schema()
if generic_schema:
return schema
# We need to inject placeholders for the url2, text2, etc. types as part
# of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
for metric_name in ["labeled_rate", "jwe", "url", "text"]:
metric1 = schema.get(
("properties", "metrics", "properties", metric_name)
).copy()
metric1 = schema.set_schema_elem(
("properties", "metrics", "properties", metric_name + "2"),
metric1,
)
return schema
@cache
def get_dependencies(self):
# Get all of the library dependencies for the application that
# are also known about in the repositories file.
# The dependencies are specified using library names, but we need to
# map those back to the name of the repository in the repository file.
try:
dependencies = self._get_json(
self.dependencies_url_template.format(self.repo_name)
)
except HTTPError:
logging.info(f"For {self.repo_name}, using default Glean dependencies")
return self.default_dependencies
dependency_library_names = list(dependencies.keys())
repos = GleanPing._get_json(GleanPing.repos_url)
repos_by_dependency_name = {}
for repo in repos:
for library_name in repo.get("library_names", []):
repos_by_dependency_name[library_name] = repo["name"]
dependencies = []
for name in dependency_library_names:
if name in repos_by_dependency_name:
dependencies.append(repos_by_dependency_name[name])
if len(dependencies) == 0:
logging.info(f"For {self.repo_name}, using default Glean dependencies")
return self.default_dependencies
logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
return dependencies
@staticmethod
def remove_pings_from_metric(
metric: Dict[str, Any], blocked_pings: List[str]
) -> Dict[str, Any]:
"""Remove the given pings from the metric's `send_in_pings` history.
Only removes if the given metric has been removed from the source since a fixed date
(2025-01-01). This allows metrics to be added back to the schema.
"""
if (
metric["in-source"]
or len(blocked_pings) == 0
or datetime.fromisoformat(metric["history"][-1]["dates"]["last"])
>= datetime(year=2025, month=1, day=1)
):
return metric
for history_entry in metric["history"]:
history_entry["send_in_pings"] = [
p for p in history_entry["send_in_pings"] if p not in blocked_pings
]
return metric
def get_probes(self) -> List[GleanProbe]:
data = self._get_json(self.probes_url)
# blocklist needs to be applied here instead of generate_schema because it needs to be
# dependency-aware; metrics can move between app and library and still be in the schema
# turn blocklist into metric_name -> ping_types map
blocklist = defaultdict(list)
for ping_type, metric_names in self.metric_blocklist.get(
self.get_app_name(), {}
).items():
for metric_name in metric_names:
blocklist[metric_name].append(ping_type)
probes = [
(name, self.remove_pings_from_metric(defn, blocklist.get(name, [])))
for name, defn in data.items()
]
for dependency in self.get_dependencies():
dependency_probes = self._get_json(
self.probes_url_template.format(dependency)
)
dependency_blocklist = defaultdict(list)
for ping_type, metric_names in self.metric_blocklist.get(
dependency, {}
).items():
for metric_name in metric_names:
dependency_blocklist[metric_name].append(ping_type)
probes += [
(
name,
self.remove_pings_from_metric(
defn, dependency_blocklist.get(name, [])
),
)
for name, defn in dependency_probes.items()
]
# A metric can be moved between an app and its dependencies or between dependencies while
# probe scraper keeps the history in each location, so both definitions are returned
# Merge the history per probe to take the latest definition while still being able to
# find metric type changes below
# Metrics are not merged if they are not sent in the same pings as they are disjoint
# Metrics are grouped by their normalized BigQuery column name (from jsonschema-transpiler)
# rather than their raw name. e.g. "media.audio.init_failure" and "media.audio_init_failure"
# normalize to "media_audio_init_failure". The transpiler picks one of the colliding
# descriptions non-deterministically.
def _normalize_name(name):
return name.replace(".", "_").replace("-", "_")
def _pings_in_history(defn):
return {
p
for h in defn[GleanProbe.history_key]
for p in h.get("send_in_pings", ["metrics"])
}
def _latest_history_date(defn):
return max(
datetime.fromisoformat(h["dates"]["last"])
for h in defn[GleanProbe.history_key]
)
def _dedupe_sort_key(defn):
"""Prefer the most recent definition, breaking ties by choosing the in-source metric."""
return (
_latest_history_date(defn),
defn.get(GleanProbe.in_source_key, False),
defn["name"],
)
# Group probes that share a normalized name and whose pings intersect to combine
# moved metrics and metrics that only differ by "." vs "_"
grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list)
for name, defn in probes:
defn_pings = _pings_in_history(defn)
existing_groups = grouped_by_name[_normalize_name(name)]
matches = [
group
for group in existing_groups
if any(
_pings_in_history(other_defn) & defn_pings for other_defn in group
)
]
if not matches:
existing_groups.append([defn])
else:
merged_group = [defn]
for g in matches:
merged_group.extend(g)
existing_groups.remove(g)
existing_groups.append(merged_group)
# Take latest definition per group
deduped_probes: List[Any] = []
for groups in grouped_by_name.values():
for group in groups:
latest_defn = max(group, key=_dedupe_sort_key)
if len(group) > 1:
latest_defn = latest_defn.copy()
latest_defn[GleanProbe.history_key] = sorted(
(h for d in group for h in d[GleanProbe.history_key]),
key=lambda h: datetime.fromisoformat(h["dates"]["first"]),
)
deduped_probes.append((latest_defn["name"], latest_defn))
probes = deduped_probes
pings = self.get_pings()
processed = []
for _id, defn in probes:
probe = GleanProbe(_id, defn, pings=pings)
processed.append(probe)
# Handling probe type changes (Bug 1870317)
probe_types = {hist["type"] for hist in defn[probe.history_key]}
if len(probe_types) > 1:
# The probe type changed at some point in history.
# Create schema entry for each type.
hist_defn = defn.copy()
# No new entry needs to be created for the current probe type
probe_types.remove(defn["type"])
for hist in hist_defn[probe.history_key]:
# Create a new entry for a historic type
if hist["type"] in probe_types:
hist_defn["type"] = hist["type"]
probe = GleanProbe(_id, hist_defn, pings=pings)
processed.append(probe)
# Keep track of the types entries were already created for
probe_types.remove(hist["type"])
return processed
def _get_ping_data(self) -> Dict[str, Dict]:
url = self.ping_url_template.format(self.repo_name)
ping_data = GleanPing._get_json(url)
for dependency in self.get_dependencies():
dependency_pings = self._get_json(self.ping_url_template.format(dependency))
ping_data.update(dependency_pings)
return ping_data
def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
url = self.ping_url_template.format(self.repo_name)
ping_data = GleanPing._get_json(url)
return ping_data
def _get_dependency_pings(self, dependency):
return self._get_json(self.ping_url_template.format(dependency))
def get_pings(self) -> Set[str]:
return self._get_ping_data().keys()
@staticmethod
def apply_default_metadata(ping_metadata, default_metadata):
"""apply_default_metadata recurses down into dicts nested
to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
``ping_metadata``.
:param ping_metadata: dict onto which the merge is executed
:param default_metadata: dct merged into ping_metadata
:return: None
"""
for k, v in default_metadata.items():
if (
k in ping_metadata
and isinstance(ping_metadata[k], dict)
and isinstance(default_metadata[k], dict)
):
GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
else:
ping_metadata[k] = default_metadata[k]
def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
# Get the ping data with the pipeline metadata
ping_data = self._get_ping_data_without_dependencies()
# The ping endpoint for the dependency pings does not include any repo defined
# moz_pipeline_metadata_defaults so they need to be applied here.
# 1. Get repo and pipeline default metadata.
repos = self.get_repos()
current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
# 2. Apply the default metadata to each dependency defined ping.
# Apply app-level metadata to pings defined in dependencies
app_metadata = current_repo.get("moz_pipeline_metadata", {})
for dependency in self.get_dependencies():
dependency_pings = self._get_dependency_pings(dependency)
for dependency_ping in dependency_pings.values():
# Although it is counter intuitive to apply the default metadata on top of the
# existing dependency ping metadata it does set the repo specific value for
# bq_dataset_family instead of using the dependency id for the bq_dataset_family
# value.
GleanPing.apply_default_metadata(
dependency_ping.get("moz_pipeline_metadata"),
copy.deepcopy(default_metadata),
)
# app-level ping properties take priority over the app defaults
metadata_override = app_metadata.get(dependency_ping["name"])
if metadata_override is not None:
GleanPing.apply_default_metadata(
dependency_ping.get("moz_pipeline_metadata"), metadata_override
)
ping_data.update(dependency_pings)
return ping_data
@staticmethod
def reorder_metadata(metadata):
desired_order_list = [
"bq_dataset_family",
"bq_table",
"bq_metadata_format",
"include_info_sections",
"submission_timestamp_granularity",
"expiration_policy",
"override_attributes",
"jwe_mappings",
]
reordered_metadata = {
k: metadata[k] for k in desired_order_list if k in metadata
}
# re-order jwe-mappings
desired_order_list = ["source_field_path", "decrypted_field_path"]
jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
if jwe_mapping_metadata:
reordered_jwe_mapping_metadata = []
for mapping in jwe_mapping_metadata:
reordered_jwe_mapping_metadata.append(
{k: mapping[k] for k in desired_order_list if k in mapping}
)
reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
# future proofing, in case there are other fields added at the ping top level
# add them to the end.
leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
reordered_metadata = {**reordered_metadata, **leftovers}
return reordered_metadata
def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
pings = self._get_ping_data_and_dependencies_with_default_metadata()
for ping_name, ping_data in pings.items():
metadata = ping_data.get("moz_pipeline_metadata")
if not metadata:
continue
metadata["include_info_sections"] = self._is_field_included(
ping_data, "include_info_sections", consider_all_history=False
)
metadata["include_client_id"] = self._is_field_included(
ping_data, "include_client_id"
)
# While technically unnecessary, the dictionary elements are re-ordered to match the
# currently deployed order and used to verify no difference in output.
pings[ping_name] = GleanPing.reorder_metadata(metadata)
return pings
def get_ping_descriptions(self) -> Dict[str, str]:
return {
k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
}
@staticmethod
def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool:
"""Return false if the field exists and is false.
If `consider_all_history` is False, then only check the latest value in the ping history.
Otherwise, if the field is not found or true in one or more history entries,
true is returned.
"""
# Default to true if not specified.
if "history" not in ping_data or len(ping_data["history"]) == 0:
return True
# Check if at some point in the past the field has already been deployed.
# And if the caller of this method wants to consider this history of the field.
# Keep them in the schema, even if the field has changed as
# removing fields is currently not supported.
# See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105
# and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10
ping_history: list
if consider_all_history:
ping_history = ping_data["history"]
else:
ping_history = [ping_data["history"][-1]]
for history in ping_history:
if field_name not in history or history[field_name]:
return True
# The ping was created with include_info_sections = False. The fields can be excluded.
return False
def set_schema_url(self, metadata):
"""
Switch between the glean-min and glean schemas if the ping does not require
info sections as specified in the parsed ping info in probe scraper.
"""
if not metadata["include_info_sections"]:
self.schema_url = SCHEMA_URL_TEMPLATE.format(
branch=self.branch_name
) + SCHEMA_VERSION_TEMPLATE.format(
schema_type="glean-min", version=self.version
)
else:
self.schema_url = SCHEMA_URL_TEMPLATE.format(
branch=self.branch_name
) + SCHEMA_VERSION_TEMPLATE.format(
schema_type="glean", version=self.version
)
def generate_schema(
self,
config,
generic_schema=False,
blocked_distribution_pings=("events", "baseline"),
) -> Dict[str, Schema]:
pings = self.get_pings_and_pipeline_metadata()
schemas = {}
for ping, pipeline_meta in pings.items():
matchers = {
loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
}
# Four newly introduced metric types were incorrectly deployed
# as repeated key/value structs in all Glean ping tables existing prior
# to November 2021. We maintain the incorrect fields for existing tables
# by disabling the associated matchers.
# Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
# defined that will allow metrics of these types to be injected into proper
# structs. The gcp-ingestion repository includes logic to rewrite these
# metrics under the "2" names.
# See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
if bq_identifier in self.bug_1737656_affected_tables:
matchers = {
loc: m
for loc, m in matchers.items()
if not m.matcher.get("bug_1737656_affected")
}
for matcher in matchers.values():
matcher.matcher["send_in_pings"]["contains"] = ping
# temporarily block distributions from being added to events and baseline pings
# https://mozilla-hub.atlassian.net/browse/DENG-10606
if (
blocked_distribution_pings
and ping in blocked_distribution_pings
and matcher.type.endswith("_distribution")
):
matcher.matcher["send_in_pings"]["not_contains"] = ping
new_config = Config(ping, matchers=matchers)
defaults = {"mozPipelineMetadata": pipeline_meta}
# Adjust the schema path if the ping does not require info sections
self.set_schema_url(pipeline_meta)
if generic_schema: # Use the generic glean ping schema
schema = self.get_schema(generic_schema=True)
schema.schema.update(defaults)
schemas[new_config.name] = schema
else:
generated = super().generate_schema(new_config)
for schema in generated.values():
# We want to override each individual key with assembled defaults,
# but keep values _inside_ them if they have been set in the schemas.
for key, value in defaults.items():
if key not in schema.schema:
schema.schema[key] = {}
schema.schema[key].update(value)
schemas.update(generated)
return schemas
@staticmethod
def get_repos():
"""
Retrieve metadata for all non-library Glean repositories
"""
repos = GleanPing._get_json(GleanPing.repos_url)
return [repo for repo in repos if "library_names" not in repo]
def get_app_name(self) -> str:
"""Get app name associated with the app id.
e.g. org-mozilla-firefox -> fenix
"""
apps = GleanPing._get_json(GleanPing.app_listings_url)
# app id in app-listings has "." instead of "-" so using document_namespace
app_name = [
app["app_name"] for app in apps if app["document_namespace"] == self.app_id
]
return app_name[0] if len(app_name) > 0 else self.app_id
@staticmethod
def get_metric_blocklist():
with open(METRIC_BLOCKLIST, "r") as f:
return yaml.safe_load(f)