52
52
import tuf .repository_tool as repo_tool
53
53
54
54
import securesystemslib
55
+ import securesystemslib .exceptions
55
56
import securesystemslib .rsa_keys
56
57
import securesystemslib .interface
58
+ import securesystemslib .storage
57
59
import six
58
60
59
61
logger = logging .getLogger (__name__ )
@@ -126,8 +128,9 @@ def test_import_rsa_privatekey_from_file(self):
126
128
# Non-existent key file.
127
129
nonexistent_keypath = os .path .join (temporary_directory ,
128
130
'nonexistent_keypath' )
129
- self .assertRaises (IOError , repo_lib .import_rsa_privatekey_from_file ,
130
- nonexistent_keypath , 'pw' )
131
+ self .assertRaises (securesystemslib .exceptions .StorageError ,
132
+ repo_lib .import_rsa_privatekey_from_file ,
133
+ nonexistent_keypath , 'pw' )
131
134
132
135
# Invalid key file argument.
133
136
invalid_keyfile = os .path .join (temporary_directory , 'invalid_keyfile' )
@@ -160,7 +163,8 @@ def test_import_ed25519_privatekey_from_file(self):
160
163
# Non-existent key file.
161
164
nonexistent_keypath = os .path .join (temporary_directory ,
162
165
'nonexistent_keypath' )
163
- self .assertRaises (IOError , repo_lib .import_ed25519_privatekey_from_file ,
166
+ self .assertRaises (securesystemslib .exceptions .StorageError ,
167
+ repo_lib .import_ed25519_privatekey_from_file ,
164
168
nonexistent_keypath , 'pw' )
165
169
166
170
# Invalid key file argument.
@@ -215,7 +219,7 @@ def test_get_metadata_filenames(self):
215
219
'targets.json' : os .path .join (metadata_directory , 'targets.json' ),
216
220
'snapshot.json' : os .path .join (metadata_directory , 'snapshot.json' ),
217
221
'timestamp.json' : os .path .join (metadata_directory , 'timestamp.json' )}
218
- self .assertEqual (filenames , repo_lib .get_metadata_filenames ())
222
+ self .assertEqual (filenames , repo_lib .get_metadata_filenames (metadata_directory ))
219
223
220
224
221
225
# Test improperly formatted argument.
@@ -241,17 +245,23 @@ def test_get_metadata_fileinfo(self):
241
245
fileinfo = {'length' : file_length , 'hashes' : file_hashes }
242
246
self .assertTrue (tuf .formats .FILEINFO_SCHEMA .matches (fileinfo ))
243
247
244
- self .assertEqual (fileinfo , repo_lib .get_metadata_fileinfo (test_filepath ))
248
+ storage_backend = securesystemslib .storage .FilesystemBackend ()
249
+
250
+ self .assertEqual (fileinfo , repo_lib .get_metadata_fileinfo (test_filepath ,
251
+ storage_backend ))
245
252
246
253
247
254
# Test improperly formatted argument.
248
- self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .get_metadata_fileinfo , 3 )
255
+ self .assertRaises (securesystemslib .exceptions .FormatError ,
256
+ repo_lib .get_metadata_fileinfo , 3 ,
257
+ storage_backend )
249
258
250
259
251
260
# Test non-existent file.
252
261
nonexistent_filepath = os .path .join (temporary_directory , 'oops.txt' )
253
- self .assertRaises (securesystemslib .exceptions .Error , repo_lib .get_metadata_fileinfo ,
254
- nonexistent_filepath )
262
+ self .assertRaises (securesystemslib .exceptions .Error ,
263
+ repo_lib .get_metadata_fileinfo ,
264
+ nonexistent_filepath , storage_backend )
255
265
256
266
257
267
@@ -440,8 +450,9 @@ def test_generate_snapshot_metadata(self):
440
450
441
451
# Load a valid repository so that top-level roles exist in roledb and
442
452
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
453
+ storage_backend = securesystemslib .storage .FilesystemBackend ()
443
454
repository = repo_tool .Repository (repository_directory , metadata_directory ,
444
- targets_directory )
455
+ targets_directory , storage_backend )
445
456
446
457
repository_junk = repo_tool .load_repository (repository_directory )
447
458
@@ -458,26 +469,27 @@ def test_generate_snapshot_metadata(self):
458
469
repo_lib .generate_snapshot_metadata (metadata_directory , version ,
459
470
expiration_date ,
460
471
targets_filename ,
472
+ storage_backend ,
461
473
consistent_snapshot = False )
462
474
self .assertTrue (tuf .formats .SNAPSHOT_SCHEMA .matches (snapshot_metadata ))
463
475
464
476
465
477
# Test improperly formatted arguments.
466
478
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .generate_snapshot_metadata ,
467
479
3 , version , expiration_date ,
468
- targets_filename , consistent_snapshot = False )
480
+ targets_filename , consistent_snapshot = False , storage_backend = storage_backend )
469
481
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .generate_snapshot_metadata ,
470
482
metadata_directory , '3' , expiration_date ,
471
- targets_filename , consistent_snapshot = False )
483
+ targets_filename , storage_backend , consistent_snapshot = False )
472
484
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .generate_snapshot_metadata ,
473
485
metadata_directory , version , '3' ,
474
- targets_filename , consistent_snapshot = False )
486
+ targets_filename , storage_backend , consistent_snapshot = False )
475
487
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .generate_snapshot_metadata ,
476
488
metadata_directory , version , expiration_date ,
477
- 3 , consistent_snapshot = False )
489
+ 3 , storage_backend , consistent_snapshot = False )
478
490
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .generate_snapshot_metadata ,
479
491
metadata_directory , version , expiration_date ,
480
- targets_filename , 3 )
492
+ targets_filename , 3 , storage_backend )
481
493
482
494
483
495
@@ -599,85 +611,25 @@ def test_write_metadata_file(self):
599
611
version_number = root_signable ['signed' ]['version' ] + 1
600
612
601
613
self .assertFalse (os .path .exists (output_filename ))
614
+ storage_backend = securesystemslib .storage .FilesystemBackend ()
602
615
repo_lib .write_metadata_file (root_signable , output_filename , version_number ,
603
- consistent_snapshot = False )
616
+ consistent_snapshot = False , storage_backend = storage_backend )
604
617
self .assertTrue (os .path .exists (output_filename ))
605
618
606
619
# Attempt to over-write the previously written metadata file. An exception
607
620
# is not raised in this case, only a debug message is logged.
608
621
repo_lib .write_metadata_file (root_signable , output_filename , version_number ,
609
- consistent_snapshot = False )
610
-
611
- # Try to write a consistent metadate file. An exception is not raised in
612
- # this case. For testing purposes, root.json should be a hard link to the
613
- # consistent metadata file. We should verify that root.json points to
614
- # the latest consistent files.
615
- tuf .settings .CONSISTENT_METHOD = 'hard_link'
616
- repo_lib .write_metadata_file (root_signable , output_filename , version_number ,
617
- consistent_snapshot = True )
618
-
619
- # Test if the consistent files are properly named
620
- # Filename format of a consistent file: <version number>.rolename.json
621
- version_and_filename = str (version_number ) + '.' + 'root.json'
622
- first_version_output_file = os .path .join (temporary_directory , version_and_filename )
623
- self .assertTrue (os .path .exists (first_version_output_file ))
624
-
625
- # Verify that the consistent file content is equal to 'output_filename'.
626
- self .assertEqual (
627
- securesystemslib .util .get_file_details (output_filename ),
628
- securesystemslib .util .get_file_details (first_version_output_file ))
629
-
630
- # Try to add more consistent metadata files.
631
- version_number += 1
632
- root_signable ['signed' ]['version' ] = version_number
633
- repo_lib .write_metadata_file (root_signable , output_filename ,
634
- version_number , consistent_snapshot = True )
635
-
636
- # Test if the latest root.json points to the expected consistent file
637
- # and consistent metadata do not all point to the same root.json
638
- version_and_filename = str (version_number ) + '.' + 'root.json'
639
- second_version_output_file = os .path .join (temporary_directory , version_and_filename )
640
- self .assertTrue (os .path .exists (second_version_output_file ))
641
-
642
- # Verify that the second version is equal to the second output file, and
643
- # that the second output filename differs from the first.
644
- self .assertEqual (securesystemslib .util .get_file_details (output_filename ),
645
- securesystemslib .util .get_file_details (second_version_output_file ))
646
- self .assertNotEqual (securesystemslib .util .get_file_details (output_filename ),
647
- securesystemslib .util .get_file_details (first_version_output_file ))
648
-
649
- # Test for an improper settings.CONSISTENT_METHOD string value.
650
- tuf .settings .CONSISTENT_METHOD = 'somebadidea'
651
-
652
- # Test for invalid consistent methods on systems other than Windows,
653
- # which always uses the copy method.
654
- if platform .system () == 'Windows' :
655
- pass
656
-
657
- else :
658
- self .assertRaises (securesystemslib .exceptions .InvalidConfigurationError ,
659
- repo_lib .write_metadata_file , root_signable , output_filename ,
660
- version_number , consistent_snapshot = True )
661
-
662
- # Try to create a link to root.json when root.json doesn't exist locally.
663
- # repository_lib should log a message if this is the case.
664
- tuf .settings .CONSISTENT_METHOD = 'hard_link'
665
- os .remove (output_filename )
666
- repo_lib .write_metadata_file (root_signable , output_filename , version_number ,
667
- consistent_snapshot = True )
668
-
669
- # Reset CONSISTENT_METHOD so that subsequent tests work as expected.
670
- tuf .settings .CONSISTENT_METHOD = 'copy'
622
+ consistent_snapshot = False , storage_backend = storage_backend )
671
623
672
624
# Test improperly formatted arguments.
673
625
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .write_metadata_file ,
674
- 3 , output_filename , version_number , False )
626
+ 3 , output_filename , version_number , False , storage_backend )
675
627
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .write_metadata_file ,
676
- root_signable , 3 , version_number , False )
628
+ root_signable , 3 , version_number , False , storage_backend )
677
629
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .write_metadata_file ,
678
- root_signable , output_filename , '3' , False )
630
+ root_signable , output_filename , '3' , False , storage_backend )
679
631
self .assertRaises (securesystemslib .exceptions .FormatError , repo_lib .write_metadata_file ,
680
- root_signable , output_filename , version_number , 3 )
632
+ root_signable , output_filename , storage_backend , version_number , 3 )
681
633
682
634
683
635
@@ -731,13 +683,6 @@ def test_create_tuf_client_directory(self):
731
683
732
684
733
685
734
- def test__check_directory (self ):
735
- # Test for non-existent directory.
736
- self .assertRaises (securesystemslib .exceptions .Error ,
737
- repo_lib ._check_directory , 'non-existent' )
738
-
739
-
740
-
741
686
def test__generate_and_write_metadata (self ):
742
687
# Test for invalid, or unsupported, rolename.
743
688
# Load the root metadata provided in 'tuf/tests/repository_data/'.
@@ -774,9 +719,11 @@ def test__generate_and_write_metadata(self):
774
719
tuf .roledb .add_role ('obsolete_role' , targets_roleinfo ,
775
720
repository_name = repository_name )
776
721
722
+ storage_backend = securesystemslib .storage .FilesystemBackend ()
777
723
repo_lib ._generate_and_write_metadata ('obsolete_role' , obsolete_metadata ,
778
- targets_directory , metadata_directory , consistent_snapshot = False ,
779
- filenames = None , repository_name = repository_name )
724
+ targets_directory , metadata_directory , storage_backend ,
725
+ consistent_snapshot = False , filenames = None ,
726
+ repository_name = repository_name )
780
727
781
728
snapshot_filepath = os .path .join ('repository_data' , 'repository' ,
782
729
'metadata' , 'snapshot.json' )
@@ -785,7 +732,8 @@ def test__generate_and_write_metadata(self):
785
732
self .assertTrue (os .path .exists (os .path .join (metadata_directory ,
786
733
'obsolete_role.json' )))
787
734
tuf .repository_lib ._delete_obsolete_metadata (metadata_directory ,
788
- snapshot_signable ['signed' ], False , repository_name )
735
+ snapshot_signable ['signed' ], False , repository_name ,
736
+ storage_backend )
789
737
self .assertFalse (os .path .exists (metadata_directory + 'obsolete_role.json' ))
790
738
shutil .copyfile (targets_metadata , obsolete_metadata )
791
739
@@ -801,27 +749,29 @@ def test__delete_obsolete_metadata(self):
801
749
snapshot_filepath = os .path .join ('repository_data' , 'repository' ,
802
750
'metadata' , 'snapshot.json' )
803
751
snapshot_signable = securesystemslib .util .load_json_file (snapshot_filepath )
752
+ storage_backend = securesystemslib .storage .FilesystemBackend ()
804
753
805
754
# Create role metadata that should not exist in snapshot.json.
806
755
role1_filepath = os .path .join ('repository_data' , 'repository' , 'metadata' ,
807
756
'role1.json' )
808
757
shutil .copyfile (role1_filepath , os .path .join (metadata_directory , 'role2.json' ))
809
758
810
759
repo_lib ._delete_obsolete_metadata (metadata_directory ,
811
- snapshot_signable ['signed' ], True , repository_name )
760
+ snapshot_signable ['signed' ], True , repository_name , storage_backend )
812
761
813
762
# _delete_obsolete_metadata should never delete root.json.
814
763
root_filepath = os .path .join ('repository_data' , 'repository' , 'metadata' ,
815
764
'root.json' )
816
765
shutil .copyfile (root_filepath , os .path .join (metadata_directory , 'root.json' ))
817
766
repo_lib ._delete_obsolete_metadata (metadata_directory ,
818
- snapshot_signable ['signed' ], True , repository_name )
767
+ snapshot_signable ['signed' ], True , repository_name , storage_backend )
819
768
self .assertTrue (os .path .exists (os .path .join (metadata_directory , 'root.json' )))
820
769
821
770
# Verify what happens for a non-existent metadata directory (a debug
822
771
# message is logged).
823
- repo_lib ._delete_obsolete_metadata ('non-existent' ,
824
- snapshot_signable ['signed' ], True , repository_name )
772
+ self .assertRaises (securesystemslib .exceptions .StorageError ,
773
+ repo_lib ._delete_obsolete_metadata , 'non-existent' ,
774
+ snapshot_signable ['signed' ], True , repository_name , storage_backend )
825
775
826
776
827
777
def test__load_top_level_metadata (self ):
@@ -843,12 +793,8 @@ def test__load_top_level_metadata(self):
843
793
signable = securesystemslib .util .load_json_file (os .path .join (metadata_directory , 'root.json' ))
844
794
signable ['signatures' ].append (signable ['signatures' ][0 ])
845
795
846
- repo_lib .write_metadata_file (signable , root_file , 8 , False )
847
-
848
- # Attempt to load a repository that contains a compressed Root file.
849
- repository = repo_tool .create_new_repository (repository_directory , repository_name )
850
- filenames = repo_lib .get_metadata_filenames (metadata_directory )
851
- repo_lib ._load_top_level_metadata (repository , filenames , repository_name )
796
+ storage_backend = securesystemslib .storage .FilesystemBackend ()
797
+ repo_lib .write_metadata_file (signable , root_file , 8 , False , storage_backend )
852
798
853
799
filenames = repo_lib .get_metadata_filenames (metadata_directory )
854
800
repository = repo_tool .create_new_repository (repository_directory , repository_name )
@@ -872,7 +818,9 @@ def test__load_top_level_metadata(self):
872
818
if role_file .endswith ('.json' ) and not role_file .startswith ('root' ):
873
819
role_filename = os .path .join (metadata_directory , role_file )
874
820
os .remove (role_filename )
875
- repo_lib ._load_top_level_metadata (repository , filenames , repository_name )
821
+ self .assertRaises (tuf .exceptions .RepositoryError ,
822
+ repo_lib ._load_top_level_metadata , repository , filenames ,
823
+ repository_name )
876
824
877
825
# Remove the required Root file and verify that an exception is raised.
878
826
os .remove (os .path .join (metadata_directory , 'root.json' ))
0 commit comments