|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import os |
| 4 | +import sys |
| 5 | +import jira.client |
| 6 | +import requests |
| 7 | +import urllib3 |
| 8 | +import json |
| 9 | +import jira |
| 10 | +import yaml |
| 11 | + |
| 12 | +SERVER_URL = 'https://issues.redhat.com/' |
| 13 | +JIRA_API_TOKEN = os.environ.get('JIRA_API_TOKEN') |
| 14 | + |
| 15 | + |
| 16 | +def usage(): |
| 17 | + print("""\ |
| 18 | + usage: advisory_publication_report.py OCP_VERSION |
| 19 | +
|
| 20 | + arguments: |
| 21 | + OCP_VERSION: The OCP versions to analyse if MicroShift version should be published. Format: "4.X.Z"\ |
| 22 | + """) |
| 23 | + |
| 24 | + |
| 25 | +def get_advisories(ocp_version: str) -> dict[str, int]: |
| 26 | + """ |
| 27 | + Get a list of advisory ids for a OCP version from github.com/openshift-eng/ocp-build-data repository |
| 28 | + Parameters: |
| 29 | + ocp_version (str): OCP version with format: "X.Y.Z" |
| 30 | + Returns: |
| 31 | + (dict): advisory dict with type and id |
| 32 | + """ |
| 33 | + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
| 34 | + |
| 35 | + try: |
| 36 | + microshift_xy_version = '.'.join(ocp_version.split('.')[:2]) |
| 37 | + request = requests.get(f'https://raw.githubusercontent.com/openshift-eng/ocp-build-data/refs/heads/openshift-{microshift_xy_version}/releases.yml', verify=False) |
| 38 | + request.raise_for_status() |
| 39 | + except requests.exceptions.HTTPError as err: |
| 40 | + raise SystemExit(err) |
| 41 | + releases_dict = yaml.load(str(request.text), Loader=yaml.SafeLoader) |
| 42 | + |
| 43 | + if ocp_version in releases_dict['releases']: |
| 44 | + return releases_dict['releases'][ocp_version]['assembly']['group']['advisories'] |
| 45 | + else: |
| 46 | + raise KeyError(f"{ocp_version} OCP version does NOT exist") |
| 47 | + |
| 48 | + |
| 49 | +def get_advisory_info(advisory_id: int) -> dict[str, str]: |
| 50 | + """ |
| 51 | + Get a list of strings with the CVEs ids for an advisory |
| 52 | + Parameters: |
| 53 | + advisory_id (int): advisory id |
| 54 | + Returns: |
| 55 | + (list): list of strings with CVE ids |
| 56 | + """ |
| 57 | + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
| 58 | + |
| 59 | + try: |
| 60 | + request = requests.get(f'https://errata.devel.redhat.com/cve/show/{advisory_id}.json', verify=False) |
| 61 | + request.raise_for_status() |
| 62 | + except requests.exceptions.HTTPError as err: |
| 63 | + raise SystemExit(err) |
| 64 | + advisory_info = json.loads(request.text) |
| 65 | + |
| 66 | + if advisory_info is None: |
| 67 | + raise ValueError |
| 68 | + if not isinstance(advisory_info, dict): |
| 69 | + raise TypeError |
| 70 | + return advisory_info |
| 71 | + |
| 72 | + |
| 73 | +def search_microshift_tickets(affects_version: str, cve_id: str) -> jira.client.ResultList: |
| 74 | + """ |
| 75 | + Query Jira for MicroShift ticket with CVE id and MicroShift version |
| 76 | + Parameters: |
| 77 | + affects_version (str): MicroShift affected version with format: "X.Y" |
| 78 | + cve_id (str): the CVE id with format: "CVE-YYYY-NNNNN" |
| 79 | + Returns: |
| 80 | + (jira.client.ResultList): a list with all the Jira tickets matching the query |
| 81 | + """ |
| 82 | + server = jira.JIRA(server=SERVER_URL, token_auth=JIRA_API_TOKEN) |
| 83 | + jira_tickets = server.search_issues(f''' |
| 84 | + summary ~ "{cve_id}" and component = MicroShift and (affectedVersion = {affects_version} or affectedVersion = {affects_version}.z) |
| 85 | + ''') |
| 86 | + |
| 87 | + if not isinstance(jira_tickets, jira.client.ResultList): |
| 88 | + raise TypeError |
| 89 | + return jira_tickets |
| 90 | + |
| 91 | + |
| 92 | +def get_report(ocp_version: str) -> dict[str, dict]: |
| 93 | + """ |
| 94 | + Get a json object with all the advisories, CVEs and jira tickets linked |
| 95 | + Parameters: |
| 96 | + ocp_version (str): OCP version with format: "X.Y.Z" |
| 97 | + Returns: |
| 98 | + (dict): json object with all the advisories, CVEs and jira tickets linked |
| 99 | + """ |
| 100 | + result_json = dict() |
| 101 | + advisories = get_advisories(ocp_version) |
| 102 | + for advisory_type, advisory_id in advisories.items(): |
| 103 | + advisory_info = get_advisory_info(advisory_id) |
| 104 | + cve_list = advisory_info['cve'] |
| 105 | + advisory_dict = dict() |
| 106 | + advisory_dict['type'] = advisory_type |
| 107 | + advisory_dict['url'] = f'https://errata.devel.redhat.com/advisory/{advisory_id}' |
| 108 | + advisory_dict['cves'] = dict() |
| 109 | + for cve in cve_list: |
| 110 | + jira_tickets = search_microshift_tickets(".".join(ocp_version.split(".")[:2]), cve) |
| 111 | + advisory_dict['cves'][cve] = dict() |
| 112 | + for ticket in jira_tickets: |
| 113 | + jira_ticket_dict = dict() |
| 114 | + jira_ticket_dict['id'] = ticket.key |
| 115 | + jira_ticket_dict['summary'] = ticket.fields.summary |
| 116 | + jira_ticket_dict['status'] = ticket.fields.status.name |
| 117 | + jira_ticket_dict['resolution'] = ticket.fields.resolution.name |
| 118 | + advisory_dict['cves'][cve]['jira_ticket'] = jira_ticket_dict |
| 119 | + result_json[advisory_info['advisory']] = advisory_dict |
| 120 | + return result_json |
| 121 | + |
| 122 | + |
| 123 | +def main(): |
| 124 | + if len(sys.argv) != 2: |
| 125 | + usage() |
| 126 | + raise ValueError('Invalid number of arguments') |
| 127 | + |
| 128 | + if JIRA_API_TOKEN is None: |
| 129 | + raise ValueError('JIRA_API_TOKEN var not found in the env') |
| 130 | + |
| 131 | + ocp_version = str(sys.argv[1]) |
| 132 | + result_json = get_report(ocp_version) |
| 133 | + print(f"{json.dumps(result_json, indent=4)}") |
| 134 | + |
| 135 | + |
| 136 | +if __name__ == '__main__': |
| 137 | + main() |
0 commit comments