diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index abb7f37141..528a235f9b 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -205,6 +205,11 @@ def _fetch(self, url: str) -> Iterator[bytes]: if path.startswith("/metadata/") and path.endswith(".json"): # figure out rolename and version ver_and_name = path[len("/metadata/") :][: -len(".json")] + # inside a version folder + if "/" in ver_and_name: + ver_and_name_split = ver_and_name.partition("/") + assert len(ver_and_name_split) == 3 + ver_and_name = ver_and_name_split[2] version_str, _, role = ver_and_name.partition(".") # root is always version-prefixed while timestamp is always NOT if role == Root.type or ( diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index ca04621da0..664a726ec6 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -134,7 +134,7 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" expected_files = sorted([f"{role}.json" for role in roles]) local_metadata_files = sorted(os.listdir(self.metadata_dir)) - self.assertListEqual(local_metadata_files, expected_files) + self.assertEqual(expected_files, local_metadata_files) class TestDelegationsGraphs(TestDelegations): @@ -263,7 +263,11 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: in the delegator's metadata, using pre-order depth-first search""" try: - exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] + exp_files = [ + *TOP_LEVEL_ROLE_NAMES, + *test_data.visited_order, + "spec_version", + ] exp_calls = [(role, 1) for role in test_data.visited_order] self._init_repo(test_data) @@ -276,7 +280,7 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: updater.refresh() self.sim.fetch_tracker.metadata.clear() # Check that metadata dir contains only top-level roles - self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + self._assert_files_exist([*TOP_LEVEL_ROLE_NAMES, "spec_version"]) # Looking for a non-existing targetpath forces updater # to visit all possible delegated roles @@ -311,7 +315,11 @@ def test_invalid_metadata(self, test_data: DelegationsTestCase) -> None: self.setup_subtest() # The invalid role metadata must not be persisted - exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order[:-1]] + exp_files = [ + *TOP_LEVEL_ROLE_NAMES, + *test_data.visited_order[:-1], + "spec_version", + ] exp_calls = [(role, 1) for role in test_data.visited_order] updater = self._init_updater() @@ -397,7 +405,11 @@ def test_hash_bins_graph_traversal( they correctly reffer to the corresponding hash bin prefixes""" try: - exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] + exp_files = [ + *TOP_LEVEL_ROLE_NAMES, + *test_data.visited_order, + "spec_version", + ] exp_calls = [(role, 1) for role in test_data.visited_order] self._init_repo(test_data) @@ -408,7 +420,7 @@ def test_hash_bins_graph_traversal( updater.refresh() self.sim.fetch_tracker.metadata.clear() # Check that metadata dir contains only top-level roles - self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + self._assert_files_exist([*TOP_LEVEL_ROLE_NAMES, "spec_version"]) # Looking for a non-existing targetpath forces updater # to visit a correspondning delegated role @@ -481,7 +493,11 @@ def test_succinct_roles_graph_traversal( # bin should exist locally and only one bin must be downloaded. try: - exp_files = [*TOP_LEVEL_ROLE_NAMES, test_data.expected_target_bin] + exp_files = [ + *TOP_LEVEL_ROLE_NAMES, + test_data.expected_target_bin, + "spec_version", + ] exp_calls = [(test_data.expected_target_bin, 1)] self.sim = RepositorySimulator() @@ -495,7 +511,7 @@ def test_succinct_roles_graph_traversal( updater.refresh() self.sim.fetch_tracker.metadata.clear() # Check that metadata dir contains only top-level roles - self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + self._assert_files_exist([*TOP_LEVEL_ROLE_NAMES, "spec_version"]) # Looking for a non-existing targetpath forces updater # to visit a corresponding delegated role. @@ -564,7 +580,11 @@ def setUp(self) -> None: def test_targetfile_search(self, test_data: TargetTestCase) -> None: try: self.setup_subtest() - exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] + exp_files = [ + *TOP_LEVEL_ROLE_NAMES, + *test_data.visited_order, + "spec_version", + ] exp_calls = [(role, 1) for role in test_data.visited_order] exp_target = self.sim.target_files[test_data.targetpath].target_file diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index c87a8fdc74..60b7a4d4f8 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -6,17 +6,22 @@ """Test Updater class """ +import base64 +import json import logging import os import shutil import sys import tempfile import unittest -from typing import Callable, ClassVar, List +from typing import Callable, ClassVar, List, Tuple from unittest.mock import MagicMock, patch +from securesystemslib import hash as sslib_hash from securesystemslib.interface import import_rsa_privatekey_from_file from securesystemslib.signer import SSlibSigner +from securesystemslib.storage import FilesystemBackend +from securesystemslib.util import persist_temp_file from tests import utils from tuf import ngclient @@ -32,7 +37,7 @@ logger = logging.getLogger(__name__) - +# pylint: disable=too-many-public-methods class TestUpdater(unittest.TestCase): """Test the Updater class from 'tuf/ngclient/updater.py'.""" @@ -167,14 +172,26 @@ def test_refresh_and_download(self) -> None: # top-level metadata is in local directory already self.updater.refresh() self._assert_files( - [Root.type, Snapshot.type, Targets.type, Timestamp.type] + [ + Root.type, + Snapshot.type, + "spec_version", + Targets.type, + Timestamp.type, + ] ) # Get targetinfos, assert that cache does not contain files info1 = self.updater.get_targetinfo("file1.txt") assert isinstance(info1, TargetFile) self._assert_files( - [Root.type, Snapshot.type, Targets.type, Timestamp.type] + [ + Root.type, + Snapshot.type, + "spec_version", + Targets.type, + Timestamp.type, + ] ) # Get targetinfo for 'file3.txt' listed in the delegated role1 @@ -184,6 +201,7 @@ def test_refresh_and_download(self) -> None: "role1", Root.type, Snapshot.type, + "spec_version", Targets.type, Timestamp.type, ] @@ -214,7 +232,13 @@ def test_refresh_with_only_local_root(self) -> None: self.updater.refresh() self._assert_files( - [Root.type, Snapshot.type, Targets.type, Timestamp.type] + [ + Root.type, + Snapshot.type, + "spec_version", + Targets.type, + Timestamp.type, + ] ) # Get targetinfo for 'file3.txt' listed in the delegated role1 @@ -223,6 +247,7 @@ def test_refresh_with_only_local_root(self) -> None: "role1", Root.type, Snapshot.type, + "spec_version", Targets.type, Timestamp.type, ] @@ -239,7 +264,14 @@ def test_implicit_refresh_with_only_local_root(self) -> None: # Get targetinfo for 'file3.txt' listed in the delegated role1 self.updater.get_targetinfo("file3.txt") - expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] + expected_files = [ + "role1", + "root", + "snapshot", + "spec_version", + "targets", + "timestamp", + ] self._assert_files(expected_files) def test_both_target_urls_not_set(self) -> None: @@ -326,6 +358,473 @@ def test_non_existing_target_file(self) -> None: with self.assertRaises(exceptions.DownloadHTTPError): self.updater.download_target(info) + def test_get_spec_version_supported(self) -> None: + """This uses the default SUPPORTED_VERSIONS variable from updater.py""" + + with self.assertRaises( + exceptions.RepositoryError, + msg="Latest repository version less than 4", + ): + self.updater._get_spec_version( + ["1", "2", "3"], [""], "4", ngclient.updater.SUPPORTED_VERSIONS + ) + + self.assertEqual( + self.updater._get_spec_version( + ["1", "2", "3"], ["", "", ""], "3", ["3"] + ), + ("3", ""), + "3 is selected as the spec version and no warning ensues", + ) + + def test_get_spec_version(self) -> None: + # warningchecker = "Not using the latest specification version available on the repository" + # Checks with different values + test_cases: List[Tuple[List[str], List[str], str, List[str]]] = [ + ( + ["3", "5", "6"], + ["", "", ""], + "7", + ["1", "2", "3", "4"], + ), # Latest repository version less than 7 + ( + ["3", "5", "6"], + ["", "", ""], + "3", + ["1", "2", "4"], + ), # No common specification version between repository and client + ] + for ( + repo_versions, + repo_features, + spec_version, + supported_versions, + ) in test_cases: + with self.assertRaises(exceptions.RepositoryError): + self.updater._get_spec_version( + repo_versions, + repo_features, + spec_version, + supported_versions, + ) + + test_cases_2: List[Tuple[List[str], List[str], str, List[str], str]] = [ + ( + ["3", "5", "6"], + ["", "", ""], + "3", + ["1", "2", "3", "4"], + "3", + ), # 3 is selected as the spec version but a warning ensues + ( + ["1", "2", "3"], + ["", "", ""], + "3", + ["3", "5", "6"], + "3", + ), # 3 is selected as the spec version and no warning ensues + ( + ["8", "11", "13"], + ["", "", ""], + "12", + ["8", "11", "12"], + "11", + ), # 11 is selected as the spec version but a warning ensues + ] + for t in test_cases_2: + ( + repo_versions, + repo_features, + spec_version, + supported_versions, + expected_version, + ) = t + actual_version, _ = self.updater._get_spec_version( + repo_versions, repo_features, spec_version, supported_versions + ) + self.assertEqual(actual_version, expected_version) + # TODO ensure warning was logged for case 1 and 3 + + # TODO Testing logging functionality. + # with self.assertLogs(ngclient.updater.__name__) as cm: + # logging.getLogger('foo').info('first message') + # self.updater._get_spec_version(["3","5","6"],"3",["1","2","3","4"]) + + def test_spec_version_increase(self) -> None: + # switch client supported versions + self.updater._supported_versions = ["1", "2"] + + # copy the current metadata to 1/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "1"), + ) + + # switch repository supported versions + def _set_supported_version_v2(root: Metadata) -> None: + repo_version = [{"version": 2}] + root.signed.supported_versions = repo_version + + self._modify_repository_root( + _set_supported_version_v2, bump_version=True + ) + + # copy the current metadata to 2/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "2"), + ) + + # switch back to version 1 + shutil.copyfile( + os.path.join( + self.repository_directory, "metadata", "1", "1.root.json" + ), + os.path.join(self.repository_directory, "metadata", "1.root.json"), + ) + + # get root digest + root_path = os.path.join( + self.repository_directory, "metadata", "2", "2.root.json" + ) + with open(root_path, "rb") as f: + hasher = sslib_hash.digest_fileobject(f, algorithm="sha256") + root_digest = base64.b64encode(hasher.digest()) + + # set supported versions in root + def _set_supported_version(root: Metadata) -> None: + repo_version = [ + {"version": 1}, + { + "version": 2, + "path": "2/", + "features": "", + "root-filename": "2.root.json", + "root-digest": root_digest.decode("utf-8"), + }, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root(_set_supported_version, bump_version=True) + + self.updater.refresh() + self.assertEqual(self.updater._spec_version, "2") + + def test_spec_version_overlap(self) -> None: + # client supports version 1 and 2 + self.updater._supported_versions = ["1", "2"] + + # copy the current metadata to 1/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "1"), + ) + + # repository supports version 2 and 3 + def _set_supported_version_v2(root: Metadata) -> None: + repo_version = [ + {"version": 2}, + { + "version": 3, + "path": "3/", + "features": "", + "root-filename": "wontuse", + "root-digest": "wontuse", + }, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root( + _set_supported_version_v2, bump_version=True + ) + + # copy the current metadata to 2/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "2"), + ) + + # switch back to version 1 + shutil.copyfile( + os.path.join( + self.repository_directory, "metadata", "1", "1.root.json" + ), + os.path.join(self.repository_directory, "metadata", "1.root.json"), + ) + + # get root digest + root_path = os.path.join( + self.repository_directory, "metadata", "2", "2.root.json" + ) + with open(root_path, "rb") as f: + hasher = sslib_hash.digest_fileobject(f, algorithm="sha256") + root_digest = base64.b64encode(hasher.digest()) + + # set supported versions in root + def _set_supported_version(root: Metadata) -> None: + repo_version = [ + {"version": 1}, + { + "version": 2, + "path": "2/", + "features": "", + "root-filename": "2.root.json", + "root-digest": root_digest.decode("utf-8"), + }, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root(_set_supported_version, bump_version=True) + + self.updater.refresh() + self.assertEqual(self.updater._spec_version, "2") + # TODO assert that higher repo version available warning was logged + + def test_tap14_backwards_compat(self) -> None: + # copy the current metadata to 2/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "2"), + ) + + # add supported-versions to root + def _set_supported_version(root: Metadata) -> None: + repo_version = [{"version": 1}] + root.signed.supported_versions = repo_version + + self._modify_repository_root(_set_supported_version, bump_version=True) + + self.updater.refresh() + + def test_spec_version_rollback(self) -> None: + # set _spec_version to 2 + client_spec_version_path = os.path.join( + self.client_directory, "spec_version.json" + ) + client_spec_version_json = json.dumps({"version": 2}) + with tempfile.TemporaryFile() as temp_file: + temp_file.write(client_spec_version_json.encode("utf-8")) + persist_temp_file( + temp_file, client_spec_version_path, FilesystemBackend() + ) + + # but supported-versions only contains 1 + # add supported-versions to root + def _set_supported_version(root: Metadata) -> None: + repo_version = [{"version": 1}] + root.signed.supported_versions = repo_version + + self._modify_repository_root(_set_supported_version, bump_version=True) + + self.assertEqual(self.updater._supported_versions, ["1"]) + + with self.assertRaises(exceptions.RepositoryError): + self.updater.refresh() + + def test_spec_version_root_update_order(self) -> None: + + # copy the current metadata to 1/ to save it + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "1"), + ) + + # make version 2 metadata + def _set_supported_version_v2(root: Metadata) -> None: + repo_version = [ + {"version": 2}, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root( + _set_supported_version_v2, bump_version=True + ) + + # move this metadata to 2/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "2"), + ) + + # switch back to version 1 + shutil.copyfile( + os.path.join( + self.repository_directory, "metadata", "1", "1.root.json" + ), + os.path.join(self.repository_directory, "metadata", "1.root.json"), + ) + + # get root digest + root_path = os.path.join( + self.repository_directory, "metadata", "2", "2.root.json" + ) + with open(root_path, "rb") as f: + hasher = sslib_hash.digest_fileobject(f, algorithm="sha256") + root_digest = base64.b64encode(hasher.digest()) + + # set supported versions in root + def _set_supported_version(root: Metadata) -> None: + repo_version = [ + {"version": 1}, + { + "version": 2, + "path": "2/", + "features": "", + "root-filename": "2.root.json", + "root-digest": root_digest.decode("utf-8"), + }, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root(_set_supported_version, bump_version=True) + + # update root not in 2/ + self._modify_repository_root(lambda root: None, bump_version=True) + + # switch client supported versions + self.updater._supported_versions = ["1", "2"] + + self.updater.refresh() + # version 2 is missing the most recent root + self.assertEqual(self.updater._trusted_set.root.signed.version, 2) + self.assertEqual(self.updater._spec_version, "2") + + def test_spec_version_root_update(self) -> None: + # update root + self._modify_repository_root(lambda root: None, bump_version=True) + + # save the current metadata to 1/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "1"), + ) + + # make repository supported versions for version 2 + def _set_supported_version_v2(root: Metadata) -> None: + repo_version = [ + {"version": 2}, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root( + _set_supported_version_v2, bump_version=True + ) + + # move this metadata to 2/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "2"), + ) + + # switch back to version 1 + shutil.copyfile( + os.path.join( + self.repository_directory, "metadata", "1", "1.root.json" + ), + os.path.join(self.repository_directory, "metadata", "1.root.json"), + ) + + # get root digest + root_path = os.path.join( + self.repository_directory, "metadata", "2", "3.root.json" + ) + with open(root_path, "rb") as f: + file_bytes = f.read() + hasher = sslib_hash.digest(algorithm="sha256") + hasher.update(file_bytes) + root_digest = base64.b64encode(hasher.digest()) + + # switch repository supported versions + def _set_supported_version(root: Metadata) -> None: + repo_version = [ + {"version": 1}, + { + "version": 2, + "path": "2/", + "features": "", + "root-filename": "3.root.json", + "root-digest": root_digest.decode("utf-8"), + }, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root(_set_supported_version, bump_version=True) + + # switch client supported versions + self.updater._supported_versions = ["1", "2"] + + self.updater.refresh() + self.assertEqual(self.updater._trusted_set.root.signed.version, 3) + self.assertEqual(self.updater._spec_version, "2") + + def test_feature_change(self) -> None: + # switch client supported versions + self.updater._supported_versions = ["1"] + self.updater._supported_features = ["new_features"] + + # copy the current metadata to 1/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "1"), + ) + + # self._modify_repository_root( + # lambda root: None, bump_version=True + # ) + + # switch repository supported versions + def _set_supported_version_v2(root: Metadata) -> None: + repo_version = [ + {"version": 1, "path": "feature/", "features": "new_features"} + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root( + _set_supported_version_v2, bump_version=False + ) + + # copy the current metadata to feature/ + shutil.copytree( + os.path.join(self.repository_directory, "metadata"), + os.path.join(self.repository_directory, "metadata", "feature"), + ) + + # switch back to version 1 + shutil.copyfile( + os.path.join( + self.repository_directory, "metadata", "1", "1.root.json" + ), + os.path.join(self.repository_directory, "metadata", "1.root.json"), + ) + + # get root digest + root_path = os.path.join( + self.repository_directory, "metadata", "feature", "1.root.json" + ) + with open(root_path, "rb") as f: + hasher = sslib_hash.digest_fileobject(f, algorithm="sha256") + root_digest = base64.b64encode(hasher.digest()) + + # set supported versions in root + def _set_supported_version(root: Metadata) -> None: + repo_version = [ + { + "version": 1, + "path": "feature/", + "features": "new_features", + "root-filename": "1.root.json", + "root-digest": root_digest.decode("utf-8"), + }, + {"version": 1}, + ] + root.signed.supported_versions = repo_version + + self._modify_repository_root(_set_supported_version, bump_version=True) + + self.updater.refresh() + self.assertEqual(self.updater._spec_version, "1") + self.assertEqual(self.updater._spec_version_dir, "feature/") + if __name__ == "__main__": utils.configure_test_logging(sys.argv) diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index be6ce09d27..c2b1b0beac 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -103,7 +103,8 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" expected_files = sorted([f"{role}.json" for role in roles]) local_metadata_files = sorted(os.listdir(self.metadata_dir)) - self.assertListEqual(local_metadata_files, expected_files) + for e in expected_files: + self.assertTrue(e in local_metadata_files) def _assert_content_equals( self, role: str, version: Optional[int] = None @@ -730,6 +731,9 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None: wrapped_open.assert_has_calls( [ call(os.path.join(self.metadata_dir, "root.json"), "rb"), + call( + os.path.join(self.metadata_dir, "spec_version.json"), "rb" + ), call(os.path.join(self.metadata_dir, "timestamp.json"), "rb"), call(os.path.join(self.metadata_dir, "snapshot.json"), "rb"), call(os.path.join(self.metadata_dir, "targets.json"), "rb"), @@ -737,7 +741,10 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None: ] ) - expected_calls = [("root", 2), ("timestamp", None)] + expected_calls = [ + ("root", 2), + ("timestamp", None), + ] self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls) @patch.object(datetime, "datetime", wraps=datetime.datetime) diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index d05f12fcf8..0d40f19e47 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -866,6 +866,8 @@ class Root(Signed): a dictionary of top level roles without keys and threshold of 1. consistent_snapshot: ``True`` if repository supports consistent snapshots. Default is True. + supported_versions: List of supported versions and their associated + directory unrecognized_fields: Dictionary of all attributes that are not managed by TUF Metadata API @@ -884,11 +886,13 @@ def __init__( keys: Optional[Dict[str, Key]] = None, roles: Optional[Mapping[str, Role]] = None, consistent_snapshot: Optional[bool] = True, + supported_versions: Optional[List[Dict]] = None, unrecognized_fields: Optional[Dict[str, Any]] = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.consistent_snapshot = consistent_snapshot self.keys = keys if keys is not None else {} + self.supported_versions = supported_versions if roles is None: roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES} @@ -905,6 +909,7 @@ def __eq__(self, other: Any) -> bool: and self.keys == other.keys and self.roles == other.roles and self.consistent_snapshot == other.consistent_snapshot + and self.supported_versions == other.supported_versions ) @classmethod @@ -915,6 +920,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root": ValueError, KeyError, TypeError: Invalid arguments. """ common_args = cls._common_fields_from_dict(signed_dict) + supported_versions = signed_dict.pop("supported_versions", None) consistent_snapshot = signed_dict.pop("consistent_snapshot", None) keys = signed_dict.pop("keys") roles = signed_dict.pop("roles") @@ -925,7 +931,14 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root": roles[role_name] = Role.from_dict(role_dict) # All fields left in the signed_dict are unrecognized. - return cls(*common_args, keys, roles, consistent_snapshot, signed_dict) + return cls( + *common_args, + keys, + roles, + consistent_snapshot, + supported_versions, + signed_dict, + ) def to_dict(self) -> Dict[str, Any]: """Returns the dict representation of self.""" @@ -936,6 +949,9 @@ def to_dict(self) -> Dict[str, Any]: roles[role_name] = role.to_dict() if self.consistent_snapshot is not None: root_dict["consistent_snapshot"] = self.consistent_snapshot + if self.supported_versions is not None: + supported_versions = self.supported_versions + root_dict["supported_versions"] = supported_versions root_dict.update( { diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index fa788d0a1f..abddd7a0d2 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -177,6 +177,19 @@ def update_root(self, data: bytes) -> Metadata[Root]: return new_root + def update_root_spec_rotation(self, data: bytes) -> Metadata[Root]: + """Loads ``data`` as new root metadata, skipping version and + signature checks. + """ + new_root = Metadata[Root].from_bytes(data) + self._trusted_set[Root.type] = new_root + logger.info( + "Updated root to new specification version root, new root version is v%d", + new_root.signed.version, + ) + + return new_root + def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: """Verifies and loads ``data`` as new timestamp metadata. diff --git a/tuf/ngclient/config.py b/tuf/ngclient/config.py index e6213d0bed..c308ac71d5 100644 --- a/tuf/ngclient/config.py +++ b/tuf/ngclient/config.py @@ -8,6 +8,7 @@ @dataclass +# pylint: disable=too-many-instance-attributes class UpdaterConfig: """Used to store ``Updater`` configuration. @@ -28,6 +29,7 @@ class UpdaterConfig: max_root_rotations: int = 32 max_delegations: int = 32 + supported_versions_max_length: int = 1000 # bytes root_max_length: int = 512000 # bytes timestamp_max_length: int = 16384 # bytes snapshot_max_length: int = 2000000 # bytes diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index e518118a80..6223b29c1f 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -33,15 +33,21 @@ `_. """ +import base64 +import errno +import json import logging import os import shutil import tempfile -from typing import Optional, Set +from typing import List, Optional, Set, Tuple from urllib import parse +from securesystemslib import hash as sslib_hash + from tuf.api import exceptions from tuf.api.metadata import ( + SPECIFICATION_VERSION, Metadata, Root, Snapshot, @@ -54,6 +60,9 @@ from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) +# only include the major version +SUPPORTED_VERSIONS = [SPECIFICATION_VERSION[0]] +SUPPORTED_FEATURES = [""] class Updater: @@ -76,6 +85,7 @@ class Updater: RepositoryError: Local root.json is invalid """ + # pylint: disable=too-many-instance-attributes def __init__( self, metadata_dir: str, @@ -85,6 +95,8 @@ def __init__( fetcher: Optional[FetcherInterface] = None, config: Optional[UpdaterConfig] = None, ): + self._spec_version: str = "1" # spec_version is the last used version by the client to get metadata + self._spec_version_dir = "" self._dir = metadata_dir self._metadata_base_url = _ensure_trailing_slash(metadata_base_url) self.target_dir = target_dir @@ -98,6 +110,8 @@ def __init__( self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() + self._supported_versions = SUPPORTED_VERSIONS + self._supported_features = SUPPORTED_FEATURES def refresh(self) -> None: """Refreshes top-level metadata. @@ -122,12 +136,83 @@ def refresh(self) -> None: RepositoryError: Metadata failed to verify in some way DownloadError: Download of a metadata file failed in some way """ + self._set_spec_version() self._load_root() self._load_timestamp() self._load_snapshot() self._load_targets(Targets.type, Root.type) + def _set_spec_version(self) -> None: + """Load previous spec version from disk""" + # get previous spec version + try: + version_bytes = self._load_local_metadata("spec_version") + self._spec_version = json.loads(version_bytes.decode("utf-8"))[ + "version" + ] + + except OSError as e: + if e.errno != errno.ENOENT: + raise + self._spec_version = "1" + # persist the version number + self._persist_metadata( + "spec_version", + json.dumps({"version": "1"}).encode("utf-8"), + ) + + def _find_matching_spec_version( + self, repository_versions_dicts: List[dict] + ) -> Tuple[str, str, str, str]: + """Find the set of spec versions that match between the client and repository. + Returns the largest matching version and information about + the new root and its path + """ + + repository_versions = [i["version"] for i in repository_versions_dicts] + repository_features = [ + i["features"] if "features" in i else "" + for i in repository_versions_dicts + ] + + # Updating self.spec_version + spec_version, features = self._get_spec_version( + repository_versions, + repository_features, + self._spec_version, + self._supported_versions, + ) + + for i in repository_versions_dicts: + if i["version"] == int(spec_version) and ( + "features" not in i or i["features"] == features + ): + new_path = self._spec_version_dir + if "path" in i: + new_path = i["path"] + if "root-filename" in i and "root-digest" in i: + return ( + spec_version, + i["root-filename"], + i["root-digest"], + new_path, + ) + # root info will be empty if the version does not change + return spec_version, "", "", new_path + + # we shouldn't get here... + raise ValueError("current repository version not found") + + def _get_repository_versions(self) -> List[dict]: + """Returns a list of all the repository versions and paths.""" + + # If supported-versions is not found, then default to version 1 + supported = self._trusted_set.root.signed.supported_versions + if supported is not None and len(supported) > 0: + return supported + return [{"version": 1}] + def _generate_target_file_path(self, targetinfo: TargetFile) -> str: if self.target_dir is None: raise ValueError("target_dir must be set if filepath is not given") @@ -257,15 +342,23 @@ def download_target( logger.info("Downloaded target %s", targetinfo.path) return filepath + def _download_new_spec_root(self, rolename: str, length: int) -> bytes: + spec_folder = f"{self._spec_version_dir}" + url = f"{self._metadata_base_url}{spec_folder}{rolename}" + return self._fetcher.download_bytes(url, length) + def _download_metadata( self, rolename: str, length: int, version: Optional[int] = None ) -> bytes: """Download a metadata file and return it as bytes""" encoded_name = parse.quote(rolename, "") - if version is None: - url = f"{self._metadata_base_url}{encoded_name}.json" + + spec_folder = f"{self._spec_version_dir}" + + if version is None: # THIS IS SNAPSHOT VERSION !! + url = f"{self._metadata_base_url}{spec_folder}{encoded_name}.json" else: - url = f"{self._metadata_base_url}{version}.{encoded_name}.json" + url = f"{self._metadata_base_url}{spec_folder}{version}.{encoded_name}.json" return self._fetcher.download_bytes(url, length) def _load_local_metadata(self, rolename: str) -> bytes: @@ -303,6 +396,43 @@ def _load_root(self) -> None: version available on the remote. """ + # find the current highest compatible spec version and get the + # list of versions to look for root metadata + ( + supported_version, + new_root_filename, + new_root_bytes, + new_spec_version_path, + ) = self._find_matching_spec_version(self._get_repository_versions()) + + # if compatible version higher than current, jump to that root + # call _load_root in new version + if new_spec_version_path != self._spec_version_dir: + save_dir = self._spec_version_dir + self._spec_version_dir = new_spec_version_path + actual_bytes = self._download_new_spec_root( + new_root_filename, self.config.root_max_length + ) + hasher = sslib_hash.digest(algorithm="sha256") + hasher.update(actual_bytes) + if base64.b64decode(new_root_bytes) == hasher.digest(): + # set trustedroot + self._trusted_set.update_root_spec_rotation(actual_bytes) + + # try update again, checking for new supported versions first + self._spec_version = supported_version + + # persist the version number + self._persist_metadata( + "spec_version", + json.dumps({"version": self._spec_version}).encode("utf-8"), + ) + + self._load_root() + return + # error loading new version, stay with this version + self._spec_version_dir = save_dir + # Update the root role lower_bound = self._trusted_set.root.signed.version + 1 upper_bound = lower_bound + self.config.max_root_rotations @@ -316,6 +446,18 @@ def _load_root(self) -> None: ) self._trusted_set.update_root(data) self._persist_metadata(Root.type, data) + # check if supported_versions was updated in this root + repository_versions = self._get_repository_versions() + ( + _, + _, + _, + new_spec_version_path, + ) = self._find_matching_spec_version(repository_versions) + # if path changed, recurse to try loading new spec version + if self._spec_version_dir != new_spec_version_path: + self._load_root() + return except exceptions.DownloadHTTPError as exception: if exception.status_code not in {403, 404}: @@ -471,6 +613,74 @@ def _preorder_depth_first_walk( # If this point is reached then target is not found, return None return None + def _get_spec_version( + self, + repository_versions: List[str], + repository_features: List[str], + spec_version: str, + supported_versions: List[str], + ) -> Tuple[str, str]: + """Returns the specification version and features to be used, following the rules of TAP-14 + and displays a warning if chosen spec_version is lower than the highest repository version. + + Raises: + ValueError: supported_versions, repository_version or spec_version contains an + invalid entry (not parseable as ``int()``) + RepositoryError: Latest repository version lower than the last used version from + this repository + RepositoryError: No matching version found between supported_versions and repository_versions + """ + repository_versions_int = [int(i) for i in repository_versions] + supported_versions_int = [int(i) for i in supported_versions] + spec_version_int = int(spec_version) + + # The client determines the latest version available on the repository by looking + # for the directory with the largest version number. + latest_repo_version = max(repository_versions_int) + + # If the latest version on the repository is lower than the previous specification + # version the client used from this repository, the client should report an error + # and terminate the update. + if latest_repo_version < spec_version_int: + raise exceptions.RepositoryError( + f"The latest repository version ({latest_repo_version}) is lower than the last used spec version ({spec_version})." + ) + + # If the latest version on the repository is equal to that of the client, it will use this directory to download metadata. + + # If the latest version pre-dates the client specification version, it may call functions from a previous client version + # to download the metadata. The client may support as many or as few versions as desired for the application. If the + # previous version is not available, the client shall report that an update can not be performed due to an old + # specification version on the repository. + + # If the latest version on the repository is higher than the client spec version, the client should report + # to the user that it is not using the most up to date version, and then perform the update with the directory + # that corresponds with the latest client specification version, if available. If no such directory exists, + # the client terminates the update. + try: + spec_version_int = max( + set(repository_versions_int) & set(supported_versions_int) + ) + except ValueError as e: + raise exceptions.RepositoryError( + f"No matching specification version found. Found {repository_versions} in" + f"repository and {supported_versions} in client." + ) from e + + if latest_repo_version > spec_version_int: + logger.warning( + "Not using the latest specification version available on the repository" + ) + + features = "" + for i, v in enumerate(repository_versions_int): + if v == spec_version_int: + # current version, check for valid features + if repository_features[i] in self._supported_features: + features = repository_features[i] + + return str(spec_version_int), features + def _ensure_trailing_slash(url: str) -> str: """Return url guaranteed to end in a slash"""