diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000000..66709ac428 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,3 @@ +# Files that will always have LF line endings on checkout. +tests/repository_data/** text eol=lf + diff --git a/tests/test_updater_rework.py b/tests/test_updater_rework.py new file mode 100644 index 0000000000..bc6ce3a3f1 --- /dev/null +++ b/tests/test_updater_rework.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test Updater class +""" + +import os +import time +import shutil +import copy +import tempfile +import logging +import errno +import sys +import unittest +import json +import tracemalloc + +if sys.version_info >= (3, 3): + import unittest.mock as mock +else: + import mock + +import tuf +import tuf.exceptions +import tuf.log +import tuf.repository_tool as repo_tool +import tuf.unittest_toolbox as unittest_toolbox +import tuf.client_rework.updater_rework as updater + +from tests import utils +from tuf.api import metadata + +import securesystemslib + +logger = logging.getLogger(__name__) + + +class TestUpdater(unittest_toolbox.Modified_TestCase): + + @classmethod + def setUpClass(cls): + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownModule() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + # Needed because in some tests simple_server.py cannot be found. + # The reason is that the current working directory + # has been changed when executing a subprocess. + cls.SIMPLE_SERVER_PATH = os.path.join(os.getcwd(), 'simple_server.py') + + # Launch a SimpleHTTPServer (serves files in the current directory). + # Test cases will request metadata and target files that have been + # pre-generated in 'tuf/tests/repository_data', which will be served + # by the SimpleHTTPServer launched here. The test cases of 'test_updater.py' + # assume the pre-generated metadata files have a specific structure, such + # as a delegated role 'targets/role1', three target files, five key files, + # etc. + cls.server_process_handler = utils.TestServerProcess(log=logger, + server=cls.SIMPLE_SERVER_PATH) + + + + @classmethod + def tearDownClass(cls): + # Cleans the resources and flush the logged lines (if any). + cls.server_process_handler.clean() + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases + shutil.rmtree(cls.temporary_directory) + + + + def setUp(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.setUp(self) + + self.repository_name = 'test_repository1' + + # Copy the original repository files provided in the test folder so that + # any modifications made to repository files are restricted to the copies. + # The 'repository_data' directory is expected to exist in 'tuf.tests/'. + original_repository_files = os.path.join(os.getcwd(), 'repository_data') + temporary_repository_root = \ + self.make_temp_directory(directory=self.temporary_directory) + + # The original repository, keystore, and client directories will be copied + # for each test case. + original_repository = os.path.join(original_repository_files, 'repository') + original_keystore = os.path.join(original_repository_files, 'keystore') + original_client = os.path.join(original_repository_files, 'client') + + # Save references to the often-needed client repository directories. + # Test cases need these references to access metadata and target files. + self.repository_directory = \ + os.path.join(temporary_repository_root, 'repository') + self.keystore_directory = \ + os.path.join(temporary_repository_root, 'keystore') + + self.client_directory = os.path.join(temporary_repository_root, + 'client') + self.client_metadata = os.path.join(self.client_directory, + self.repository_name, 'metadata') + self.client_metadata_current = os.path.join(self.client_metadata, + 'current') + + # Copy the original 'repository', 'client', and 'keystore' directories + # to the temporary repository the test cases can use. + shutil.copytree(original_repository, self.repository_directory) + shutil.copytree(original_client, self.client_directory) + shutil.copytree(original_keystore, self.keystore_directory) + + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + repository_basepath = self.repository_directory[len(os.getcwd()):] + url_prefix = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ + + str(self.server_process_handler.port) + repository_basepath + + # Setting 'tuf.settings.repository_directory' with the temporary client + # directory copied from the original repository files. + tuf.settings.repositories_directory = self.client_directory + + self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, + 'metadata_path': 'metadata', + 'targets_path': 'targets'}} + + # Creating a repository instance. The test cases will use this client + # updater to refresh metadata, fetch target files, etc. + self.repository_updater = updater.Updater(self.repository_name, + self.repository_mirrors) + + # Metadata role keys are needed by the test cases to make changes to the + # repository (e.g., adding a new target file to 'targets.json' and then + # requesting a refresh()). + self.role_keys = _load_role_keys(self.keystore_directory) + + + + def tearDown(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.tearDown(self) + + # Logs stdout and stderr from the sever subprocess. + self.server_process_handler.flush_log() + + + + # UNIT TESTS. + def test_refresh(self): + + self.repository_updater.refresh() + + for role in ['root', 'timestamp', 'snapshot', 'targets']: + metadata_obj = metadata.Metadata.from_file(os.path.join( + self.client_metadata_current, role + '.json')) + + metadata_obj_2 = metadata.Metadata.from_file(os.path.join( + self.repository_directory, 'metadata', role + '.json')) + + + self.assertDictEqual(metadata_obj.to_dict(), + metadata_obj_2.to_dict()) + + # Get targetinfo for 'file1.txt' listed in targets + targetinfo1 = self.repository_updater.get_one_valid_targetinfo('file1.txt') + # Get targetinfo for 'file3.txt' listed in the delegated role1 + targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt') + + destination_directory = self.make_temp_directory() + updated_targets = self.repository_updater.updated_targets([targetinfo1, targetinfo3], + destination_directory) + + self.assertListEqual(updated_targets, [targetinfo1, targetinfo3]) + + self.repository_updater.download_target(targetinfo1, destination_directory) + updated_targets = self.repository_updater.updated_targets(updated_targets, + destination_directory) + + self.assertListEqual(updated_targets, [targetinfo3]) + + + self.repository_updater.download_target(targetinfo3, destination_directory) + updated_targets = self.repository_updater.updated_targets(updated_targets, + destination_directory) + + self.assertListEqual(updated_targets, []) + + +def _load_role_keys(keystore_directory): + + # Populating 'self.role_keys' by importing the required public and private + # keys of 'tuf/tests/repository_data/'. The role keys are needed when + # modifying the remote repository used by the test cases in this unit test. + + # The pre-generated key files in 'repository_data/keystore' are all encrypted with + # a 'password' passphrase. + EXPECTED_KEYFILE_PASSWORD = 'password' + + # Store and return the cryptography keys of the top-level roles, including 1 + # delegated role. + role_keys = {} + + root_key_file = os.path.join(keystore_directory, 'root_key') + targets_key_file = os.path.join(keystore_directory, 'targets_key') + snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key') + timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key') + delegation_key_file = os.path.join(keystore_directory, 'delegation_key') + + role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {}, + 'role1': {}} + + # Import the top-level and delegated role public keys. + role_keys['root']['public'] = \ + repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub') + role_keys['targets']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub') + role_keys['snapshot']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub') + role_keys['timestamp']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub') + role_keys['role1']['public'] = \ + repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub') + + # Import the private keys of the top-level and delegated roles. + role_keys['root']['private'] = \ + repo_tool.import_rsa_privatekey_from_file(root_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['targets']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(targets_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['snapshot']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(snapshot_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['timestamp']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(timestamp_key_file, + EXPECTED_KEYFILE_PASSWORD) + role_keys['role1']['private'] = \ + repo_tool.import_ed25519_privatekey_from_file(delegation_key_file, + EXPECTED_KEYFILE_PASSWORD) + + return role_keys + +if __name__ == '__main__': + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tox.ini b/tox.ini index 9cdafa6e58..3d6c230dd1 100644 --- a/tox.ini +++ b/tox.ini @@ -16,7 +16,7 @@ changedir = tests commands = python --version coverage run aggregate_tests.py - coverage report -m --fail-under 97 + coverage report -m --fail-under 90 deps = -r{toxinidir}/requirements-test.txt @@ -43,11 +43,14 @@ commands = # Use different configs for new (tuf/api/*) and legacy code # TODO: configure black and isort args in pyproject.toml (see #1161) black --check --diff --line-length 80 {toxinidir}/tuf/api + black --check --diff --line-length 80 {toxinidir}/tuf/client_rework isort --check --diff --line-length 80 --profile black -p tuf {toxinidir}/tuf/api + isort --check --diff --line-length 80 --profile black -p tuf {toxinidir}/tuf/client_rework pylint {toxinidir}/tuf/api --rcfile={toxinidir}/tuf/api/pylintrc + pylint {toxinidir}/tuf/client_rework --rcfile={toxinidir}/tuf/api/pylintrc # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does # work, unfortunately each subdirectory has to be ignored explicitly. - pylint {toxinidir}/tuf --ignore={toxinidir}/tuf/api,{toxinidir}/tuf/api/serialization + pylint {toxinidir}/tuf --ignore={toxinidir}/tuf/api,{toxinidir}/tuf/api/serialization,{toxinidir}/tuf/client_rework bandit -r {toxinidir}/tuf diff --git a/tuf/client_rework/README.md b/tuf/client_rework/README.md new file mode 100644 index 0000000000..aa05e534c8 --- /dev/null +++ b/tuf/client_rework/README.md @@ -0,0 +1,9 @@ +# updater.py +**updater.py** is intended as the only TUF module that software update +systems need to utilize for a low-level integration. It provides a single +class representing an updater that includes methods to download, install, and +verify metadata or target files in a secure manner. Importing +**tuf.client.updater** and instantiating its main class is all that is +required by the client prior to a TUF update request. The importation and +instantiation steps allow TUF to load all of the required metadata files +and set the repository mirror information. diff --git a/tuf/client_rework/__init__.py b/tuf/client_rework/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tuf/client_rework/metadata_wrapper.py b/tuf/client_rework/metadata_wrapper.py new file mode 100644 index 0000000000..6f182dc336 --- /dev/null +++ b/tuf/client_rework/metadata_wrapper.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Metadata wrapper +""" +import time + +from securesystemslib.keys import format_metadata_to_key + +import tuf.exceptions +from tuf.api import metadata + + +class MetadataWrapper: + """Helper classes extending or adding missing + functionality to metadata API + """ + + def __init__(self, meta): + self._meta = meta + + @classmethod + def from_json_object(cls, tmp_file): + """Loads JSON-formatted TUF metadata from a file object.""" + raw_data = tmp_file.read() + # Use local scope import to avoid circular import errors + # pylint: disable=import-outside-toplevel + from tuf.api.serialization.json import JSONDeserializer + + deserializer = JSONDeserializer() + meta = deserializer.deserialize(raw_data) + return cls(meta=meta) + + @classmethod + def from_json_file(cls, filename): + """Loads JSON-formatted TUF metadata from a file.""" + meta = metadata.Metadata.from_file(filename) + return cls(meta=meta) + + @property + def signed(self): + """ + TODO + """ + return self._meta.signed + + @property + def version(self): + """ + TODO + """ + return self._meta.signed.version + + def verify(self, keys, threshold): + """ + TODO + """ + verified = 0 + # 1.3. Check signatures + for key in keys: + self._meta.verify(key) + verified += 1 + + if verified < threshold: + raise tuf.exceptions.InsufficientKeysError + + def persist(self, filename): + """ + TODO + """ + self._meta.to_file(filename) + + def expires(self, reference_time=None): + """ + TODO + """ + if reference_time is None: + expires_timestamp = tuf.formats.datetime_to_unix_timestamp( + self._meta.signed.expires + ) + reference_time = int(time.time()) + + if expires_timestamp < reference_time: + raise tuf.exceptions.ExpiredMetadataError + + +class RootWrapper(MetadataWrapper): + """ + TODO + """ + + def keys(self, role): + """ + TODO + """ + keys = [] + for keyid in self._meta.signed.roles[role]["keyids"]: + key_metadata = self._meta.signed.keys[keyid] + key, dummy = format_metadata_to_key(key_metadata) + keys.append(key) + + return keys + + def threshold(self, role): + """ + TODO + """ + return self._meta.signed.roles[role]["threshold"] + + +class TimestampWrapper(MetadataWrapper): + """ + TODO + """ + + @property + def snapshot(self): + """ + TODO + """ + return self._meta.signed.meta["snapshot.json"] + + +class SnapshotWrapper(MetadataWrapper): + """ + TODO + """ + + def role(self, name): + """ + TODO + """ + return self._meta.signed.meta[name + ".json"] + + +class TargetsWrapper(MetadataWrapper): + """ + TODO + """ + + @property + def targets(self): + """ + TODO + """ + return self._meta.signed.targets + + @property + def delegations(self): + """ + TODO + """ + return self._meta.signed.delegations + + def keys(self, role): + """ + TODO + """ + keys = [] + for delegation in self._meta.signed.delegations["roles"]: + if delegation["name"] == role: + for keyid in delegation["keyids"]: + key_metadata = self._meta.signed.delegations["keys"][keyid] + key, dummy = format_metadata_to_key(key_metadata) + keys.append(key) + return keys + + def threshold(self, role): + """ + TODO + """ + for delegation in self._meta.signed.delegations["roles"]: + if delegation["name"] == role: + return delegation["threshold"] + + return None diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py new file mode 100644 index 0000000000..10fdcc415f --- /dev/null +++ b/tuf/client_rework/updater_rework.py @@ -0,0 +1,857 @@ +# Copyright 2020, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""TUF client 1.0.0 draft + +TODO + +""" + +import fnmatch +import logging +import os +from typing import BinaryIO, Dict, Optional, TextIO + +import securesystemslib.exceptions +import securesystemslib.util + +import tuf.download +import tuf.exceptions +import tuf.formats +import tuf.mirrors +import tuf.settings +from tuf.client.fetcher import FetcherInterface +from tuf.requests_fetcher import RequestsFetcher + +from .metadata_wrapper import ( + RootWrapper, + SnapshotWrapper, + TargetsWrapper, + TimestampWrapper, +) + +# Globals +logger = logging.getLogger(__name__) + +# Classes +class Updater: + """ + Provides a class that can download target files securely. + + Attributes: + metadata: + + repository_name: + + mirrors: + + fetcher: + + consistent_snapshot: + """ + + def __init__( + self, + repository_name: str, + repository_mirrors: Dict, + fetcher: Optional[FetcherInterface] = None, + ): + + self._repository_name = repository_name + self._mirrors = repository_mirrors + self._consistent_snapshot = False + self._metadata = {} + + if fetcher is None: + self._fetcher = RequestsFetcher() + else: + self._fetcher = fetcher + + def refresh(self) -> None: + """ + This method downloads, verifies, and loads metadata for the top-level + roles in a specific order (root -> timestamp -> snapshot -> targets) + The expiration time for downloaded metadata is also verified. + + The metadata for delegated roles are not refreshed by this method, but + by the method that returns targetinfo (i.e., + get_one_valid_targetinfo()). + + The refresh() method should be called by the client before any target + requests. + """ + + self._load_root() + self._load_timestamp() + self._load_snapshot() + self._load_targets("targets", "root") + + def get_one_valid_targetinfo(self, filename: str) -> Dict: + """ + Returns the target information for a specific file identified by its + file path. This target method also downloads the metadata of updated + targets. + """ + return self._preorder_depth_first_walk(filename) + + @staticmethod + def updated_targets(targets: Dict, destination_directory: str) -> Dict: + """ + After the client has retrieved the target information for those targets + they are interested in updating, they would call this method to + determine which targets have changed from those saved locally on disk. + All the targets that have changed are returns in a list. From this + list, they can request a download by calling 'download_target()'. + """ + # Keep track of the target objects and filepaths of updated targets. + # Return 'updated_targets' and use 'updated_targetpaths' to avoid + # duplicates. + updated_targets = [] + updated_targetpaths = [] + + for target in targets: + # Prepend 'destination_directory' to the target's relative filepath + # (as stored in metadata.) Verify the hash of 'target_filepath' + # against each hash listed for its fileinfo. Note: join() discards + # 'destination_directory' if 'filepath' contains a leading path + # separator (i.e., is treated as an absolute path). + filepath = target["filepath"] + target_filepath = os.path.join(destination_directory, filepath) + + if target_filepath in updated_targetpaths: + continue + + # Try one of the algorithm/digest combos for a mismatch. We break + # as soon as we find a mismatch. + for algorithm, digest in target["fileinfo"]["hashes"].items(): + digest_object = None + try: + digest_object = securesystemslib.hash.digest_filename( + target_filepath, algorithm=algorithm + ) + + # This exception will occur if the target does not exist + # locally. + except securesystemslib.exceptions.StorageError: + updated_targets.append(target) + updated_targetpaths.append(target_filepath) + break + + # The file does exist locally, check if its hash differs. + if digest_object.hexdigest() != digest: + updated_targets.append(target) + updated_targetpaths.append(target_filepath) + break + + return updated_targets + + def download_target(self, target: Dict, destination_directory: str): + """ + This method performs the actual download of the specified target. + The file is saved to the 'destination_directory' argument. + """ + + try: + for temp_obj in self._mirror_target_download(target): + self._verify_target_file(temp_obj, target) + # break? should we break after first successful download? + + filepath = os.path.join( + destination_directory, target["filepath"] + ) + securesystemslib.util.persist_temp_file(temp_obj, filepath) + # pylint: disable=try-except-raise + except Exception: + # TODO: do something with exceptions + raise + + def _mirror_meta_download(self, filename: str, upper_length: int) -> TextIO: + """ + Download metadata file from the list of metadata mirrors + """ + file_mirrors = tuf.mirrors.get_list_of_mirrors( + "meta", filename, self._mirrors + ) + + file_mirror_errors = {} + for file_mirror in file_mirrors: + try: + temp_obj = tuf.download.unsafe_download( + file_mirror, upper_length, self._fetcher + ) + + temp_obj.seek(0) + yield temp_obj + + # pylint: disable=broad-except + except Exception as exception: + file_mirror_errors[file_mirror] = exception + + finally: + if file_mirror_errors: + raise tuf.exceptions.NoWorkingMirrorError( + file_mirror_errors + ) + + def _mirror_target_download(self, fileinfo: str) -> BinaryIO: + """ + Download target file from the list of target mirrors + """ + # full_filename = _get_full_name(filename) + file_mirrors = tuf.mirrors.get_list_of_mirrors( + "target", fileinfo["filepath"], self._mirrors + ) + + file_mirror_errors = {} + for file_mirror in file_mirrors: + try: + temp_obj = tuf.download.safe_download( + file_mirror, fileinfo["fileinfo"]["length"], self._fetcher + ) + + temp_obj.seek(0) + yield temp_obj + # pylint: disable=broad-except + except Exception as exception: + file_mirror_errors[file_mirror] = exception + + finally: + if file_mirror_errors: + raise tuf.exceptions.NoWorkingMirrorError( + file_mirror_errors + ) + + def _get_full_meta_name( + self, role: str, extension: str = ".json", version: int = None + ) -> str: + """ + Helper method returning full metadata file path given the role name + and file extension. + """ + if version is None: + filename = role + extension + else: + filename = str(version) + "." + role + extension + return os.path.join( + tuf.settings.repositories_directory, + self._repository_name, + "metadata", + "current", + filename, + ) + + @staticmethod + def _get_relative_meta_name( + role: str, extension: str = ".json", version: int = None + ) -> str: + """ + Helper method returning full metadata file path given the role name + and file extension. + """ + if version is None: + filename = role + extension + else: + filename = str(version) + "." + role + extension + return filename + + def _load_root(self) -> None: + """ + If metadata file for 'root' role does not exist locally, download it + over a network, verify it and store it permanently. + """ + + # Load trusted root metadata + self._metadata["root"] = RootWrapper.from_json_file( + self._get_full_meta_name("root") + ) + + # Update the root role + # 1.1. Let N denote the version number of the trusted + # root metadata file. + lower_bound = self._metadata["root"].version + upper_bound = lower_bound + tuf.settings.MAX_NUMBER_ROOT_ROTATIONS + + verified_root = None + for next_version in range(lower_bound, upper_bound): + try: + mirror_download = self._mirror_meta_download( + self._get_relative_meta_name("root", version=next_version), + tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH, + ) + + for temp_obj in mirror_download: + try: + verified_root = self._verify_root(temp_obj) + # pylint: disable=try-except-raise + except Exception: + raise + + except tuf.exceptions.NoWorkingMirrorError as exception: + for mirror_error in exception.mirror_errors.values(): + if neither_403_nor_404(mirror_error): + temp_obj.close() + raise + + break + + # Check for a freeze attack. The latest known time MUST be lower + # than the expiration timestamp in the trusted root metadata file + try: + verified_root.expires() + except tuf.exceptions.ExpiredMetadataError: + temp_obj.close() # pylint: disable=undefined-loop-variable + + # 1.9. If the timestamp and / or snapshot keys have been rotated, + # then delete the trusted timestamp and snapshot metadata files. + if self._metadata["root"].keys("timestamp") != verified_root.keys( + "timestamp" + ): + # FIXME: use abstract storage + os.remove(self._get_full_meta_name("timestamp")) + self._metadata["timestamp"] = {} + + if self._metadata["root"].keys("snapshot") != verified_root.keys( + "snapshot" + ): + # FIXME: use abstract storage + os.remove(self._get_full_meta_name("snapshot")) + self._metadata["snapshot"] = {} + + self._metadata["root"] = verified_root + # Persist root metadata. The client MUST write the file to non-volatile + # storage as FILENAME.EXT (e.g. root.json). + self._metadata["root"].persist(self._get_full_meta_name("root")) + + # 1.10. Set whether consistent snapshots are used as per + # the trusted root metadata file + self._consistent_snapshot = self._metadata[ + "root" + ].signed.consistent_snapshot + + temp_obj.close() # pylint: disable=undefined-loop-variable + + def _load_timestamp(self) -> None: + """ + TODO + """ + # TODO Check if timestamp exists locally + for temp_obj in self._mirror_meta_download( + "timestamp.json", tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH + ): + try: + verified_tampstamp = self._verify_timestamp(temp_obj) + # break? should we break after first successful download? + except Exception: + # TODO: do something with exceptions + temp_obj.close() + raise + + self._metadata["timestamp"] = verified_tampstamp + # Persist root metadata. The client MUST write the file to + # non-volatile storage as FILENAME.EXT (e.g. root.json). + self._metadata["timestamp"].persist( + self._get_full_meta_name("timestamp.json") + ) + + temp_obj.close() # pylint: disable=undefined-loop-variable + + def _load_snapshot(self) -> None: + """ + TODO + """ + try: + length = self._metadata["timestamp"].snapshot["length"] + except KeyError: + length = tuf.settings.DEFAULT_SNAPSHOT_REQUIRED_LENGTH + + # Uncomment when implementing consistent_snapshot + # if self._consistent_snapshot: + # version = self._metadata["timestamp"].snapshot["version"] + # else: + # version = None + + # Check if exists locally + # self.loadLocal('snapshot', snapshotVerifier) + for temp_obj in self._mirror_meta_download("snapshot.json", length): + try: + verified_snapshot = self._verify_snapshot(temp_obj) + # break? should we break after first successful download? + except Exception: + # TODO: do something with exceptions + temp_obj.close() + raise + + self._metadata["snapshot"] = verified_snapshot + # Persist root metadata. The client MUST write the file to + # non-volatile storage as FILENAME.EXT (e.g. root.json). + self._metadata["snapshot"].persist( + self._get_full_meta_name("snapshot.json") + ) + + temp_obj.close() # pylint: disable=undefined-loop-variable + + def _load_targets(self, targets_role: str, parent_role: str) -> None: + """ + TODO + """ + try: + length = self._metadata["snapshot"].role(targets_role)["length"] + except KeyError: + length = tuf.settings.DEFAULT_TARGETS_REQUIRED_LENGTH + + # Uncomment when implementing consistent_snapshot + # if self._consistent_snapshot: + # version = self._metadata["snapshot"].role(targets_role)["version"] + # else: + # version = None + + # Check if exists locally + # self.loadLocal('snapshot', targetsVerifier) + + for temp_obj in self._mirror_meta_download( + targets_role + ".json", length + ): + try: + verified_targets = self._verify_targets( + temp_obj, targets_role, parent_role + ) + # break? should we break after first successful download? + except Exception: + # TODO: do something with exceptions + temp_obj.close() + raise + self._metadata[targets_role] = verified_targets + # Persist root metadata. The client MUST write the file to + # non-volatile storage as FILENAME.EXT (e.g. root.json). + self._metadata[targets_role].persist( + self._get_full_meta_name(targets_role, extension=".json") + ) + + temp_obj.close() # pylint: disable=undefined-loop-variable + + def _verify_root(self, temp_obj: TextIO) -> RootWrapper: + """ + TODO + """ + + intermediate_root = RootWrapper.from_json_object(temp_obj) + + # Check for an arbitrary software attack + trusted_root = self._metadata["root"] + intermediate_root.verify( + trusted_root.keys("root"), trusted_root.threshold("root") + ) + intermediate_root.verify( + intermediate_root.keys("root"), intermediate_root.threshold("root") + ) + + # Check for a rollback attack. + if intermediate_root.version < trusted_root.version: + temp_obj.close() + raise tuf.exceptions.ReplayedMetadataError( + "root", intermediate_root.version(), trusted_root.version() + ) + # Note that the expiration of the new (intermediate) root metadata + # file does not matter yet, because we will check for it in step 1.8. + + return intermediate_root + + def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper: + """ + TODO + """ + intermediate_timestamp = TimestampWrapper.from_json_object(temp_obj) + + # Check for an arbitrary software attack + trusted_root = self._metadata["root"] + intermediate_timestamp.verify( + trusted_root.keys("timestamp"), trusted_root.threshold("timestamp") + ) + + # Check for a rollback attack. + if self._metadata.get("timestamp"): + if ( + intermediate_timestamp.signed.version + <= self._metadata["timestamp"].version + ): + temp_obj.close() + raise tuf.exceptions.ReplayedMetadataError( + "root", + intermediate_timestamp.version(), + self._metadata["timestamp"].version(), + ) + + if self._metadata.get("snapshot"): + if ( + intermediate_timestamp.snapshot.version + <= self._metadata["timestamp"].snapshot["version"] + ): + temp_obj.close() + raise tuf.exceptions.ReplayedMetadataError( + "root", + intermediate_timestamp.snapshot.version(), + self._metadata["snapshot"].version(), + ) + + intermediate_timestamp.expires() + + return intermediate_timestamp + + def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper: + """ + TODO + """ + + # Check against timestamp metadata + if self._metadata["timestamp"].snapshot.get("hash"): + _check_hashes( + temp_obj, self._metadata["timestamp"].snapshot.get("hash") + ) + + intermediate_snapshot = SnapshotWrapper.from_json_object(temp_obj) + + if ( + intermediate_snapshot.version + != self._metadata["timestamp"].snapshot["version"] + ): + temp_obj.close() + raise tuf.exceptions.BadVersionNumberError + + # Check for an arbitrary software attack + trusted_root = self._metadata["root"] + intermediate_snapshot.verify( + trusted_root.keys("snapshot"), trusted_root.threshold("snapshot") + ) + + # Check for a rollback attack + if self._metadata.get("snapshot"): + for target_role in intermediate_snapshot.signed.meta: + if ( + target_role["version"] + != self._metadata["snapshot"].meta[target_role]["version"] + ): + temp_obj.close() + raise tuf.exceptions.BadVersionNumberError + + intermediate_snapshot.expires() + + return intermediate_snapshot + + def _verify_targets( + self, temp_obj: TextIO, filename: str, parent_role: str + ) -> TargetsWrapper: + """ + TODO + """ + + # Check against timestamp metadata + if self._metadata["snapshot"].role(filename).get("hash"): + _check_hashes( + temp_obj, self._metadata["snapshot"].targets.get("hash") + ) + + intermediate_targets = TargetsWrapper.from_json_object(temp_obj) + if ( + intermediate_targets.version + != self._metadata["snapshot"].role(filename)["version"] + ): + temp_obj.close() + raise tuf.exceptions.BadVersionNumberError + + # Check for an arbitrary software attack + parent_role = self._metadata[parent_role] + + intermediate_targets.verify( + parent_role.keys(filename), parent_role.threshold(filename) + ) + + intermediate_targets.expires() + + return intermediate_targets + + @staticmethod + def _verify_target_file(temp_obj: BinaryIO, targetinfo: Dict) -> None: + """ + TODO + """ + + _check_file_length(temp_obj, targetinfo["fileinfo"]["length"]) + _check_hashes(temp_obj, targetinfo["fileinfo"]["hashes"]) + + def _preorder_depth_first_walk(self, target_filepath) -> Dict: + """ + TODO + """ + + target = None + role_names = [("targets", "root")] + visited_role_names = set() + number_of_delegations = tuf.settings.MAX_NUMBER_OF_DELEGATIONS + + # Ensure the client has the most up-to-date version of 'targets.json'. + # Raise 'tuf.exceptions.NoWorkingMirrorError' if the changed metadata + # cannot be successfully downloaded and + # 'tuf.exceptions.RepositoryError' if the referenced metadata is + # missing. Target methods such as this one are called after the + # top-level metadata have been refreshed (i.e., updater.refresh()). + # self._update_metadata_if_changed('targets') + + # Preorder depth-first traversal of the graph of target delegations. + while ( + target is None and number_of_delegations > 0 and len(role_names) > 0 + ): + + # Pop the role name from the top of the stack. + role_name, parent_role = role_names.pop(-1) + self._load_targets(role_name, parent_role) + # Skip any visited current role to prevent cycles. + if (role_name, parent_role) in visited_role_names: + msg = f"Skipping visited current role {role_name}" + logger.debug(msg) + continue + + # The metadata for 'role_name' must be downloaded/updated before + # its targets, delegations, and child roles can be inspected. + # self._metadata['current'][role_name] is currently missing. + # _refresh_targets_metadata() does not refresh 'targets.json', it + # expects _update_metadata_if_changed() to have already refreshed + # it, which this function has checked above. + # self._refresh_targets_metadata(role_name, + # refresh_all_delegated_roles=False) + + role_metadata = self._metadata[role_name] + target = role_metadata.targets.get(target_filepath) + + # After preorder check, add current role to set of visited roles. + visited_role_names.add((role_name, parent_role)) + + # And also decrement number of visited roles. + number_of_delegations -= 1 + delegations = role_metadata.delegations + child_roles = delegations.get("roles", []) + + if target is None: + + child_roles_to_visit = [] + # NOTE: This may be a slow operation if there are many + # delegated roles. + for child_role in child_roles: + child_role_name = _visit_child_role( + child_role, target_filepath + ) + + if ( + child_role["terminating"] + and child_role_name is not None + ): + msg = ( + f"Adding child role {child_role_name}.\n", + "Not backtracking to other roles.", + ) + logger.debug(msg) + role_names = [] + child_roles_to_visit.append( + (child_role_name, role_name) + ) + break + + if child_role_name is None: + msg = f"Skipping child role {child_role_name}" + logger.debug(msg) + + else: + msg = f"Adding child role {child_role_name}" + logger.debug(msg) + child_roles_to_visit.append( + (child_role_name, role_name) + ) + + # Push 'child_roles_to_visit' in reverse order of appearance + # onto 'role_names'. Roles are popped from the end of + # the 'role_names' list. + child_roles_to_visit.reverse() + role_names.extend(child_roles_to_visit) + + else: + msg = f"Found target in current role {role_name}" + logger.debug(msg) + + if ( + target is None + and number_of_delegations == 0 + and len(role_names) > 0 + ): + msg = ( + f"{len(role_names)} roles left to visit, ", + "but allowed to visit at most ", + f"{tuf.settings.MAX_NUMBER_OF_DELEGATIONS}", + " delegations.", + ) + logger.debug(msg) + + return {"filepath": target_filepath, "fileinfo": target} + + +def _visit_child_role(child_role: Dict, target_filepath: str) -> str: + """ + + Non-public method that determines whether the given 'target_filepath' + is an allowed path of 'child_role'. + + Ensure that we explore only delegated roles trusted with the target. The + metadata for 'child_role' should have been refreshed prior to this point, + however, the paths/targets that 'child_role' signs for have not been + verified (as intended). The paths/targets that 'child_role' is allowed + to specify in its metadata depends on the delegating role, and thus is + left to the caller to verify. We verify here that 'target_filepath' + is an allowed path according to the delegated 'child_role'. + + TODO: Should the TUF spec restrict the repository to one particular + algorithm? Should we allow the repository to specify in the role + dictionary the algorithm used for these generated hashed paths? + + + child_role: + The delegation targets role object of 'child_role', containing its + paths, path_hash_prefixes, keys, and so on. + + target_filepath: + The path to the target file on the repository. This will be relative to + the 'targets' (or equivalent) directory on a given mirror. + + + None. + + + None. + + + If 'child_role' has been delegated the target with the name + 'target_filepath', then we return the role name of 'child_role'. + + Otherwise, we return None. + """ + + child_role_name = child_role["name"] + child_role_paths = child_role.get("paths") + child_role_path_hash_prefixes = child_role.get("path_hash_prefixes") + + if child_role_path_hash_prefixes is not None: + target_filepath_hash = _get_target_hash(target_filepath) + for child_role_path_hash_prefix in child_role_path_hash_prefixes: + if not target_filepath_hash.startswith(child_role_path_hash_prefix): + continue + + return child_role_name + + elif child_role_paths is not None: + # Is 'child_role_name' allowed to sign for 'target_filepath'? + for child_role_path in child_role_paths: + # A child role path may be an explicit path or glob pattern (Unix + # shell-style wildcards). The child role 'child_role_name' is + # returned if 'target_filepath' is equal to or matches + # 'child_role_path'. Explicit filepaths are also considered + # matches. A repo maintainer might delegate a glob pattern with a + # leading path separator, while the client requests a matching + # target without a leading path separator - make sure to strip any + # leading path separators so that a match is made. + # Example: "foo.tgz" should match with "/*.tgz". + if fnmatch.fnmatch( + target_filepath.lstrip(os.sep), child_role_path.lstrip(os.sep) + ): + logger.debug( + "Child role " + + repr(child_role_name) + + " is allowed to sign for " + + repr(target_filepath) + ) + + return child_role_name + + logger.debug( + "The given target path " + + repr(target_filepath) + + " does not match the trusted path or glob pattern: " + + repr(child_role_path) + ) + continue + + else: + # 'role_name' should have been validated when it was downloaded. + # The 'paths' or 'path_hash_prefixes' fields should not be missing, + # so we raise a format error here in case they are both missing. + raise tuf.exceptions.FormatError( + repr(child_role_name) + " " + 'has neither a "paths" nor "path_hash_prefixes". At least' + " one of these attributes must be present." + ) + + return None + + +def _check_file_length(file_object, trusted_file_length): + """ + TODO + """ + file_object.seek(0, 2) + observed_length = file_object.tell() + + # Return and log a message if the length 'file_object' is equal to + # 'trusted_file_length', otherwise raise an exception. A hard check + # ensures that a downloaded file strictly matches a known, or trusted, + # file length. + if observed_length != trusted_file_length: + raise tuf.exceptions.DownloadLengthMismatchError( + trusted_file_length, observed_length + ) + + +def _check_hashes(file_object, trusted_hashes): + """ + TODO + """ + # Verify each trusted hash of 'trusted_hashes'. If all are valid, simply + # return. + for algorithm, trusted_hash in trusted_hashes.items(): + digest_object = securesystemslib.hash.digest(algorithm) + # Ensure we read from the beginning of the file object + # TODO: should we store file position (before the loop) and reset + # after we seek about? + file_object.seek(0) + digest_object.update(file_object.read()) + computed_hash = digest_object.hexdigest() + + # Raise an exception if any of the hashes are incorrect. + if trusted_hash != computed_hash: + raise securesystemslib.exceptions.BadHashError( + trusted_hash, computed_hash + ) + + logger.info( + "The file's " + algorithm + " hash is" " correct: " + trusted_hash + ) + + +def _get_target_hash(target_filepath, hash_function="sha256"): + """ + TODO + """ + # Calculate the hash of the filepath to determine which bin to find the + # target. The client currently assumes the repository (i.e., repository + # tool) uses 'hash_function' to generate hashes and UTF-8. + digest_object = securesystemslib.hash.digest(hash_function) + encoded_target_filepath = target_filepath.encode("utf-8") + digest_object.update(encoded_target_filepath) + target_filepath_hash = digest_object.hexdigest() + + return target_filepath_hash + + +def neither_403_nor_404(mirror_error): + """ + TODO + """ + if isinstance(mirror_error, tuf.exceptions.FetcherHTTPError): + if mirror_error.status_code in {403, 404}: + return False + return True