Skip to content

Commit 49031b4

Browse files
authored
Merge pull request #1052 from sechkova/issue-1045
Load correctly the delegated Targets objects hierarchy
2 parents 5d16f91 + 6ae3ea6 commit 49031b4

File tree

6 files changed

+140
-75
lines changed

6 files changed

+140
-75
lines changed

tests/test_repository_lib.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def test_import_ed25519_privatekey_from_file(self):
201201

202202

203203

204-
def test_get_metadata_filenames(self):
204+
def test_get_top_level_metadata_filenames(self):
205205

206206
# Test normal case.
207207
metadata_directory = os.path.join('metadata/')
@@ -210,7 +210,8 @@ def test_get_metadata_filenames(self):
210210
'snapshot.json': metadata_directory + 'snapshot.json',
211211
'timestamp.json': metadata_directory + 'timestamp.json'}
212212

213-
self.assertEqual(filenames, repo_lib.get_metadata_filenames('metadata/'))
213+
self.assertEqual(filenames,
214+
repo_lib.get_top_level_metadata_filenames('metadata/'))
214215

215216
# If a directory argument is not specified, the current working directory
216217
# is used.
@@ -219,11 +220,13 @@ def test_get_metadata_filenames(self):
219220
'targets.json': os.path.join(metadata_directory, 'targets.json'),
220221
'snapshot.json': os.path.join(metadata_directory, 'snapshot.json'),
221222
'timestamp.json': os.path.join(metadata_directory, 'timestamp.json')}
222-
self.assertEqual(filenames, repo_lib.get_metadata_filenames(metadata_directory))
223+
self.assertEqual(filenames,
224+
repo_lib.get_top_level_metadata_filenames(metadata_directory))
223225

224226

225227
# Test improperly formatted argument.
226-
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_metadata_filenames, 3)
228+
self.assertRaises(securesystemslib.exceptions.FormatError,
229+
repo_lib.get_top_level_metadata_filenames, 3)
227230

228231

229232

@@ -797,7 +800,7 @@ def test__load_top_level_metadata(self):
797800
storage_backend = securesystemslib.storage.FilesystemBackend()
798801
repo_lib.write_metadata_file(signable, root_file, 8, False, storage_backend)
799802

800-
filenames = repo_lib.get_metadata_filenames(metadata_directory)
803+
filenames = repo_lib.get_top_level_metadata_filenames(metadata_directory)
801804
repository = repo_tool.create_new_repository(repository_directory, repository_name)
802805
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
803806

tests/test_repository_tool.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2053,6 +2053,10 @@ def test_load_repository(self):
20532053

20542054
repository = repo_tool.load_repository(repository_directory)
20552055
self.assertTrue(isinstance(repository, repo_tool.Repository))
2056+
self.assertTrue(isinstance(repository.targets('role1'),
2057+
repo_tool.Targets))
2058+
self.assertTrue(isinstance(repository.targets('role1')('role2'),
2059+
repo_tool.Targets))
20562060

20572061
# Verify the expected roles have been loaded. See
20582062
# 'tuf/tests/repository_data/repository/'.

tuf/client/updater.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ def __init__(self, repository_name, repository_mirrors):
755755

756756
# Load current and previous metadata.
757757
for metadata_set in ['current', 'previous']:
758-
for metadata_role in ['root', 'targets', 'snapshot', 'timestamp']:
758+
for metadata_role in tuf.roledb.TOP_LEVEL_ROLES:
759759
self._load_metadata_from_file(metadata_set, metadata_role)
760760

761761
# Raise an exception if the repository is missing the required 'root'
@@ -2435,7 +2435,7 @@ def all_targets(self):
24352435
# all roles available on the repository.
24362436
delegated_targets = []
24372437
for role in tuf.roledb.get_rolenames(self.repository_name):
2438-
if role in ['root', 'snapshot', 'targets', 'timestamp']:
2438+
if role in tuf.roledb.TOP_LEVEL_ROLES:
24392439
continue
24402440

24412441
else:

tuf/repository_lib.py

100755100644
Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def _generate_and_write_metadata(rolename, metadata_filename,
175175
else:
176176
logger.debug('Not incrementing ' + repr(rolename) + '\'s version number.')
177177

178-
if rolename in ['root', 'targets', 'snapshot', 'timestamp'] and not allow_partially_signed:
178+
if rolename in tuf.roledb.TOP_LEVEL_ROLES and not allow_partially_signed:
179179
# Verify that the top-level 'rolename' is fully signed. Only a delegated
180180
# role should not be written to disk without full verification of its
181181
# signature(s), since it can only be considered fully signed depending on
@@ -394,18 +394,15 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
394394
else:
395395
logger.debug(repr(metadata_role) + ' found in the snapshot role.')
396396

397-
398-
399397
# Strip metadata extension from filename. The role database does not
400398
# include the metadata extension.
401399
if metadata_role.endswith(METADATA_EXTENSION):
402400
metadata_role = metadata_role[:-len(METADATA_EXTENSION)]
403-
404401
else:
405402
logger.debug(repr(metadata_role) + ' does not match'
406403
' supported extension ' + repr(METADATA_EXTENSION))
407404

408-
if metadata_role in ['root', 'targets', 'snapshot', 'timestamp']:
405+
if metadata_role in tuf.roledb.TOP_LEVEL_ROLES:
409406
logger.debug('Not removing top-level metadata ' + repr(metadata_role))
410407
return
411408

@@ -811,7 +808,57 @@ def import_ed25519_privatekey_from_file(filepath, password=None):
811808
return private_key
812809

813810

814-
def get_metadata_filenames(metadata_directory):
811+
812+
def get_delegated_roles_metadata_filenames(metadata_directory,
813+
consistent_snapshot, storage_backend=None):
814+
"""
815+
Return a dictionary containing all filenames in 'metadata_directory'
816+
except the top-level roles.
817+
If multiple versions of a file exist because of a consistent snapshot,
818+
only the file with biggest version prefix is included.
819+
"""
820+
821+
filenames = {}
822+
metadata_files = sorted(storage_backend.list_folder(metadata_directory),
823+
reverse=True)
824+
825+
# Iterate over role metadata files, sorted by their version-number prefix, with
826+
# more recent versions first, and only add the most recent version of any
827+
# (non top-level) metadata to the list of returned filenames. Note that there
828+
# should only be one version of each file, if consistent_snapshot is False.
829+
for metadata_role in metadata_files:
830+
metadata_path = os.path.join(metadata_directory, metadata_role)
831+
832+
# Strip the version number if 'consistent_snapshot' is True,
833+
# or if 'metadata_role' is Root.
834+
# Example: '10.django.json' --> 'django.json'
835+
consistent_snapshot = \
836+
metadata_role.endswith('root.json') or consistent_snapshot == True
837+
metadata_name, junk = _strip_version_number(metadata_role,
838+
consistent_snapshot)
839+
840+
if metadata_name.endswith(METADATA_EXTENSION):
841+
extension_length = len(METADATA_EXTENSION)
842+
metadata_name = metadata_name[:-extension_length]
843+
844+
else:
845+
logger.debug('Skipping file with unsupported metadata'
846+
' extension: ' + repr(metadata_path))
847+
continue
848+
849+
# Skip top-level roles, only interested in delegated roles.
850+
if metadata_name in tuf.roledb.TOP_LEVEL_ROLES:
851+
continue
852+
853+
# Prevent reloading duplicate versions if consistent_snapshot is True
854+
if metadata_name not in filenames:
855+
filenames[metadata_name] = metadata_path
856+
857+
return filenames
858+
859+
860+
861+
def get_top_level_metadata_filenames(metadata_directory):
815862
"""
816863
<Purpose>
817864
Return a dictionary containing the filenames of the top-level roles.
@@ -1081,7 +1128,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
10811128
# Extract the role, threshold, and keyid information of the top-level roles,
10821129
# which Root stores in its metadata. The necessary role metadata is generated
10831130
# from this information.
1084-
for rolename in ['root', 'targets', 'snapshot', 'timestamp']:
1131+
for rolename in tuf.roledb.TOP_LEVEL_ROLES:
10851132

10861133
# If a top-level role is missing from 'tuf.roledb.py', raise an exception.
10871134
if not tuf.roledb.role_exists(rolename, repository_name):
@@ -1457,7 +1504,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
14571504
# snapshot and timestamp roles are not listed in snapshot.json, do not
14581505
# list these roles found in the metadata directory.
14591506
if tuf.roledb.role_exists(rolename, repository_name) and \
1460-
rolename not in ['root', 'snapshot', 'timestamp', 'targets']:
1507+
rolename not in tuf.roledb.TOP_LEVEL_ROLES:
14611508
fileinfodict[metadata_name] = get_metadata_versioninfo(rolename,
14621509
repository_name)
14631510

@@ -1776,7 +1823,7 @@ def _log_status_of_top_level_roles(targets_directory, metadata_directory,
17761823

17771824
# The expected full filenames of the top-level roles needed to write them to
17781825
# disk.
1779-
filenames = get_metadata_filenames(metadata_directory)
1826+
filenames = get_top_level_metadata_filenames(metadata_directory)
17801827
root_filename = filenames[ROOT_FILENAME]
17811828
targets_filename = filenames[TARGETS_FILENAME]
17821829
snapshot_filename = filenames[SNAPSHOT_FILENAME]

tuf/repository_tool.py

Lines changed: 67 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import shutil
4040
import json
4141

42+
from collections import deque
43+
4244
import tuf
4345
import tuf.formats
4446
import tuf.roledb
@@ -293,7 +295,7 @@ def writeall(self, consistent_snapshot=False, use_existing_fileinfo=False):
293295
for dirty_rolename in dirty_rolenames:
294296

295297
# Ignore top-level roles, they will be generated later in this method.
296-
if dirty_rolename in ['root', 'targets', 'snapshot', 'timestamp']:
298+
if dirty_rolename in tuf.roledb.TOP_LEVEL_ROLES:
297299
continue
298300

299301
dirty_filename = os.path.join(self._metadata_directory,
@@ -2393,7 +2395,7 @@ def delegate(self, rolename, public_keys, paths, threshold=1,
23932395
self._parent_targets_object.add_delegated_role(rolename,
23942396
new_targets_object)
23952397

2396-
# Add 'new_targets_object' to the 'targets' role object (this object).
2398+
# Add 'new_targets_object' to the delegating role object (this object).
23972399
self.add_delegated_role(rolename, new_targets_object)
23982400

23992401
# Update the 'delegations' field of the current role.
@@ -3088,7 +3090,7 @@ def load_repository(repository_directory, repository_name='default',
30883090
repository = Repository(repository_directory, metadata_directory,
30893091
targets_directory, storage_backend, repository_name)
30903092

3091-
filenames = repo_lib.get_metadata_filenames(metadata_directory)
3093+
filenames = repo_lib.get_top_level_metadata_filenames(metadata_directory)
30923094

30933095
# The Root file is always available without a version number (a consistent
30943096
# snapshot) attached to the filename. Store the 'consistent_snapshot' value
@@ -3100,50 +3102,49 @@ def load_repository(repository_directory, repository_name='default',
31003102
repository, consistent_snapshot = repo_lib._load_top_level_metadata(repository,
31013103
filenames, repository_name)
31023104

3103-
# Load the delegated targets metadata and generate their fileinfo. The
3104-
# extracted fileinfo is stored in the 'meta' field of the snapshot metadata
3105-
# object.
3106-
targets_objects = {}
3107-
loaded_metadata = []
3108-
targets_objects['targets'] = repository.targets
3109-
3110-
metadata_files = sorted(storage_backend.list_folder(metadata_directory),
3111-
reverse=True)
3112-
for metadata_role in metadata_files:
3113-
3114-
metadata_path = os.path.join(metadata_directory, metadata_role)
3115-
metadata_name = \
3116-
metadata_path[len(metadata_directory):].lstrip(os.path.sep)
3117-
3118-
# Strip the version number if 'consistent_snapshot' is True,
3119-
# or if 'metadata_role' is Root.
3120-
# Example: '10.django.json' --> 'django.json'
3121-
consistent_snapshot = \
3122-
metadata_role.endswith('root.json') or consistent_snapshot == True
3123-
metadata_name, junk = repo_lib._strip_version_number(metadata_name,
3124-
consistent_snapshot)
3125-
3126-
if metadata_name.endswith(METADATA_EXTENSION):
3127-
extension_length = len(METADATA_EXTENSION)
3128-
metadata_name = metadata_name[:-extension_length]
3129-
3130-
else:
3131-
logger.debug('Skipping file with unsupported metadata'
3132-
' extension: ' + repr(metadata_path))
3105+
delegated_roles_filenames = repo_lib.get_delegated_roles_metadata_filenames(
3106+
metadata_directory, consistent_snapshot, storage_backend)
3107+
3108+
# Load the delegated targets metadata and their fileinfo.
3109+
# The delegated targets roles form a tree/graph which is traversed in a
3110+
# breadth-first-search manner starting from 'targets' in order to correctly
3111+
# load the delegations hierarchy.
3112+
parent_targets_object = repository.targets
3113+
3114+
# Keep the next delegations to be loaded in a deque structure which
3115+
# has the properties of a list but is designed to have fast appends
3116+
# and pops from both ends
3117+
delegations = deque()
3118+
# A set used to keep the already loaded delegations and avoid an infinite
3119+
# loop in case of cycles in the delegations graph
3120+
loaded_delegations = set()
3121+
3122+
# Top-level roles are already loaded, fetch targets and get its delegations.
3123+
# Store the delegations in the form of delegated-delegating role tuples,
3124+
# starting from the top-level targets:
3125+
# [('role1', 'targets'), ('role2', 'targets'), ... ]
3126+
roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
3127+
for role in roleinfo['delegations']['roles']:
3128+
delegations.append((role['name'], 'targets'))
3129+
3130+
# Traverse the graph by appending the next delegation to the deque and
3131+
# 'pop'-ing and loading the left-most element.
3132+
while delegations:
3133+
rolename, delegating_role = delegations.popleft()
3134+
if (rolename, delegating_role) in loaded_delegations:
3135+
logger.warning('Detected cycle in the delegation graph: ' +
3136+
repr(delegating_role) + ' -> ' +
3137+
repr(rolename) +
3138+
' is reached more than once.')
31333139
continue
31343140

3135-
# Skip top-level roles, only interested in delegated roles now that the
3136-
# top-level roles have already been loaded.
3137-
if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']:
3138-
continue
3139-
3140-
# Keep a store of metadata previously loaded metadata to prevent re-loading
3141-
# duplicate versions. Duplicate versions may occur with
3142-
# 'consistent_snapshot', where the same metadata may be available in
3143-
# multiples files (the different hash is included in each filename.
3144-
if metadata_name in loaded_metadata:
3145-
continue
3141+
# Instead of adding only rolename to the set, store the already loaded
3142+
# delegated-delegating role tuples. This way a delegated role is added
3143+
# to each of its delegating roles but when the role is reached twice
3144+
# from the same delegating role an infinite loop is avoided.
3145+
loaded_delegations.add((rolename, delegating_role))
31463146

3147+
metadata_path = delegated_roles_filenames[rolename]
31473148
signable = None
31483149

31493150
try:
@@ -3156,9 +3157,9 @@ def load_repository(repository_directory, repository_name='default',
31563157

31573158
metadata_object = signable['signed']
31583159

3159-
# Extract the metadata attributes of 'metadata_name' and update its
3160+
# Extract the metadata attributes of 'metadata_object' and update its
31603161
# corresponding roleinfo.
3161-
roleinfo = {'name': metadata_name,
3162+
roleinfo = {'name': rolename,
31623163
'signing_keyids': [],
31633164
'signatures': [],
31643165
'partial_loaded': False
@@ -3170,18 +3171,23 @@ def load_repository(repository_directory, repository_name='default',
31703171
roleinfo['paths'] = metadata_object['targets']
31713172
roleinfo['delegations'] = metadata_object['delegations']
31723173

3173-
tuf.roledb.add_role(metadata_name, roleinfo, repository_name)
3174-
loaded_metadata.append(metadata_name)
3175-
3176-
# Generate the Targets objects of the delegated roles of 'metadata_name'
3177-
# and add it to the top-level 'targets' object.
3178-
new_targets_object = Targets(targets_directory, metadata_name, roleinfo,
3179-
repository_name=repository_name)
3180-
targets_object = targets_objects['targets']
3181-
targets_objects[metadata_name] = new_targets_object
3174+
# Generate the Targets object of the delegated role,
3175+
# add it to the top-level 'targets' object and to its
3176+
# direct delegating role object.
3177+
new_targets_object = Targets(targets_directory, rolename,
3178+
roleinfo, parent_targets_object=parent_targets_object,
3179+
repository_name=repository_name)
3180+
3181+
parent_targets_object.add_delegated_role(rolename,
3182+
new_targets_object)
3183+
if delegating_role != 'targets':
3184+
parent_targets_object(delegating_role).add_delegated_role(rolename,
3185+
new_targets_object)
31823186

3183-
targets_object._delegated_roles[(os.path.basename(metadata_name))] = \
3184-
new_targets_object
3187+
# Append the next level delegations to the deque:
3188+
# the 'delegated' role becomes the 'delegating'
3189+
for delegation in metadata_object['delegations']['roles']:
3190+
delegations.append((delegation['name'], rolename))
31853191

31863192
# Extract the keys specified in the delegations field of the Targets
31873193
# role. Add 'key_object' to the list of recognized keys. Keys may be
@@ -3196,8 +3202,10 @@ def load_repository(repository_directory, repository_name='default',
31963202
# that doesn't match the client's set of hash algorithms. Make sure
31973203
# to only used the repo's selected hashing algorithms.
31983204
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
3199-
securesystemslib.settings.HASH_ALGORITHMS = key_metadata['keyid_hash_algorithms']
3200-
key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata)
3205+
securesystemslib.settings.HASH_ALGORITHMS = \
3206+
key_metadata['keyid_hash_algorithms']
3207+
key_object, keyids = \
3208+
securesystemslib.keys.format_metadata_to_key(key_metadata)
32013209
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms
32023210
try:
32033211
for keyid in keyids: # pragma: no branch

tuf/roledb.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@
7373
_dirty_roles['default'] = set()
7474

7575

76+
TOP_LEVEL_ROLES = ['root', 'targets', 'snapshot', 'timestamp']
77+
78+
7679
def create_roledb_from_root_metadata(root_metadata, repository_name='default'):
7780
"""
7881
<Purpose>

0 commit comments

Comments
 (0)