40
40
41
41
from . import base
42
42
43
- THUMBNAIL_MAX_ATTEMPTS = 30
44
- THUMBNAIL_RETRY_INTERAL = 10
45
- TRANSIENT_IMAGE_PATH = "images/status/transient"
46
-
47
43
48
44
class TestShotgunApi (base .LiveTestBase ):
49
45
def setUp (self ):
@@ -383,11 +379,10 @@ def test_upload_thumbnail_in_create(self):
383
379
data = {'image' : path , 'code' : 'Test Version' ,
384
380
'project' : self .project }
385
381
new_version = self .sg .create ("Version" , data , return_fields = ['image' ])
386
- new_version = find_one_await_thumbnail (
387
- self .sg ,
382
+ new_version = self .find_one_await_thumbnail (
388
383
"Version" ,
389
384
[["id" , "is" , new_version ["id" ]]],
390
- fields = ["image" , "project" , "type" , "id" ]
385
+ fields = ["image" , "project" , "type" , "id" ],
391
386
)
392
387
393
388
self .assertTrue (new_version is not None )
@@ -434,7 +429,9 @@ def test_upload_thumbnail_for_version(self):
434
429
435
430
# check result on version
436
431
version_with_thumbnail = self .sg .find_one ('Version' , [['id' , 'is' , self .version ['id' ]]])
437
- version_with_thumbnail = find_one_await_thumbnail (self .sg , 'Version' , [['id' , 'is' , self .version ['id' ]]])
432
+ version_with_thumbnail = self .find_one_await_thumbnail (
433
+ "Version" , [["id" , "is" , self .version ["id" ]]]
434
+ )
438
435
439
436
self .assertEqual (version_with_thumbnail .get ('type' ), 'Version' )
440
437
self .assertEqual (version_with_thumbnail .get ('id' ), self .version ['id' ])
@@ -461,7 +458,9 @@ def test_upload_thumbnail_for_task(self):
461
458
462
459
# check result on version
463
460
task_with_thumbnail = self .sg .find_one ('Task' , [['id' , 'is' , self .task ['id' ]]])
464
- task_with_thumbnail = find_one_await_thumbnail (self .sg , 'Task' , [['id' , 'is' , self .task ['id' ]]])
461
+ task_with_thumbnail = self .find_one_await_thumbnail (
462
+ "Task" , [["id" , "is" , self .task ["id" ]]]
463
+ )
465
464
466
465
self .assertEqual (task_with_thumbnail .get ('type' ), 'Task' )
467
466
self .assertEqual (task_with_thumbnail .get ('id' ), self .task ['id' ])
@@ -557,12 +556,11 @@ def test_linked_thumbnail_url(self):
557
556
558
557
thumb_id = self .sg .upload_thumbnail ("Project" , self .version ['project' ]['id' ], path )
559
558
560
- response_version_with_project = find_one_await_thumbnail (
561
- self .sg ,
559
+ response_version_with_project = self .find_one_await_thumbnail (
562
560
"Version" ,
563
561
[["id" , "is" , self .version ["id" ]]],
564
562
fields = ["id" , "code" , "project.Project.image" ],
565
- thumbnail_field_name = "project.Project.image"
563
+ thumbnail_field_name = "project.Project.image" ,
566
564
)
567
565
568
566
if self .sg .server_caps .version and self .sg .server_caps .version >= (3 , 3 , 0 ):
@@ -597,30 +595,28 @@ def share_thumbnail_retry(*args, **kwargs):
597
595
# the thumbnail to finish processing.
598
596
thumbnail_id = None
599
597
attempts = 0
600
- while attempts < THUMBNAIL_MAX_ATTEMPTS and thumbnail_id is None :
598
+ while attempts < base . THUMBNAIL_MAX_ATTEMPTS and thumbnail_id is None :
601
599
try :
602
600
thumbnail_id = self .sg .share_thumbnail (* args , ** kwargs )
603
601
attempts += 1
604
602
except shotgun_api3 .ShotgunError :
605
- time .sleep (THUMBNAIL_RETRY_INTERAL )
603
+ time .sleep (base . THUMBNAIL_RETRY_INTERVAL )
606
604
return thumbnail_id
607
605
608
606
this_dir , _ = os .path .split (__file__ )
609
607
path = os .path .abspath (os .path .expanduser (os .path .join (this_dir , "sg_logo.jpg" )))
610
608
611
609
# upload thumbnail to first entity and share it with the rest
612
610
share_thumbnail_retry ([self .version , self .shot ], thumbnail_path = path )
613
- response_version_thumbnail = find_one_await_thumbnail (
614
- self .sg ,
611
+ response_version_thumbnail = self .find_one_await_thumbnail (
615
612
'Version' ,
616
613
[['id' , 'is' , self .version ['id' ]]],
617
- fields = ['id' , 'code' , 'image' ]
614
+ fields = ['id' , 'code' , 'image' ],
618
615
)
619
- response_shot_thumbnail = find_one_await_thumbnail (
620
- self .sg ,
616
+ response_shot_thumbnail = self .find_one_await_thumbnail (
621
617
'Shot' ,
622
618
[['id' , 'is' , self .shot ['id' ]]],
623
- fields = ['id' , 'code' , 'image' ]
619
+ fields = ['id' , 'code' , 'image' ],
624
620
)
625
621
626
622
shot_url = urllib .parse .urlparse (response_shot_thumbnail .get ('image' ))
@@ -632,23 +628,20 @@ def share_thumbnail_retry(*args, **kwargs):
632
628
# share thumbnail from source entity with entities
633
629
self .sg .upload_thumbnail ("Version" , self .version ['id' ], path )
634
630
share_thumbnail_retry ([self .asset , self .shot ], source_entity = self .version )
635
- response_version_thumbnail = find_one_await_thumbnail (
636
- self .sg ,
631
+ response_version_thumbnail = self .find_one_await_thumbnail (
637
632
'Version' ,
638
633
[['id' , 'is' , self .version ['id' ]]],
639
- fields = ['id' , 'code' , 'image' ]
634
+ fields = ['id' , 'code' , 'image' ],
640
635
)
641
- response_shot_thumbnail = find_one_await_thumbnail (
642
- self .sg ,
636
+ response_shot_thumbnail = self .find_one_await_thumbnail (
643
637
'Shot' ,
644
638
[['id' , 'is' , self .shot ['id' ]]],
645
- fields = ['id' , 'code' , 'image' ]
639
+ fields = ['id' , 'code' , 'image' ],
646
640
)
647
- response_asset_thumbnail = find_one_await_thumbnail (
648
- self .sg ,
641
+ response_asset_thumbnail = self .find_one_await_thumbnail (
649
642
'Asset' ,
650
643
[['id' , 'is' , self .asset ['id' ]]],
651
- fields = ['id' , 'code' , 'image' ]
644
+ fields = ['id' , 'code' , 'image' ],
652
645
)
653
646
654
647
shot_url = urllib .parse .urlparse (response_shot_thumbnail .get ('image' ))
@@ -856,7 +849,6 @@ def test_work_schedule(self):
856
849
self .assertRaises (shotgun_api3 .ShotgunError , self .sg .work_schedule_read ,
857
850
start_date_obj , end_date_obj , project , user )
858
851
859
-
860
852
resp = self .sg .work_schedule_update ('2012-01-02' , False , 'Studio Holiday' )
861
853
expected = {
862
854
'date' : '2012-01-02' ,
@@ -1007,7 +999,7 @@ def test_set_date(self):
1007
999
1008
1000
def test_set_date_time (self ):
1009
1001
if self .config .jenkins :
1010
- self .skipTest ("Skipping test on Jenkins. locked_until not updating." )
1002
+ self .skipTest ("Jenkins. locked_until not updating." )
1011
1003
entity = 'HumanUser'
1012
1004
entity_id = self .human_user ['id' ]
1013
1005
field_name = 'locked_until'
@@ -1202,7 +1194,7 @@ def setUp(self):
1202
1194
1203
1195
def test_convert_to_utc (self ):
1204
1196
if self .config .jenkins :
1205
- self .skipTest ("Skipping test on Jenkins. locked_until not updating." )
1197
+ self .skipTest ("Jenkins. locked_until not updating." )
1206
1198
sg_utc = shotgun_api3 .Shotgun (self .config .server_url ,
1207
1199
http_proxy = self .config .http_proxy ,
1208
1200
convert_datetimes_to_utc = True ,
@@ -1212,7 +1204,7 @@ def test_convert_to_utc(self):
1212
1204
1213
1205
def test_no_convert_to_utc (self ):
1214
1206
if self .config .jenkins :
1215
- self .skipTest ("Skipping test on Jenkins. locked_until not updating." )
1207
+ self .skipTest ("Jenkins. locked_until not updating." )
1216
1208
sg_no_utc = shotgun_api3 .Shotgun (self .config .server_url ,
1217
1209
http_proxy = self .config .http_proxy ,
1218
1210
convert_datetimes_to_utc = False ,
@@ -2164,7 +2156,7 @@ def test_human_user_sudo_auth_fails(self):
2164
2156
Request fails on server because user has no permission to Sudo.
2165
2157
"""
2166
2158
if self .config .jenkins :
2167
- self .skipTest ("Skipping test on Jenkins. locked_until not updating." )
2159
+ self .skipTest ("Jenkins. locked_until not updating." )
2168
2160
2169
2161
if not self .sg .server_caps .version or self .sg .server_caps .version < (5 , 3 , 12 ):
2170
2162
return
@@ -2221,7 +2213,10 @@ def test_humanuser_upload_thumbnail_for_version(self):
2221
2213
self .assertTrue (isinstance (thumb_id , int ))
2222
2214
2223
2215
# check result on version
2224
- version_with_thumbnail = find_one_await_thumbnail (self .sg , 'Version' , [['id' , 'is' , self .version ['id' ]]])
2216
+ version_with_thumbnail = self .find_one_await_thumbnail (
2217
+ "Version" ,
2218
+ [["id" , "is" , self .version ["id" ]]],
2219
+ )
2225
2220
2226
2221
self .assertEqual (version_with_thumbnail .get ('type' ), 'Version' )
2227
2222
self .assertEqual (version_with_thumbnail .get ('id' ), self .version ['id' ])
@@ -2278,7 +2273,10 @@ def test_humanuser_upload_thumbnail_for_version(self):
2278
2273
self .assertTrue (isinstance (thumb_id , int ))
2279
2274
2280
2275
# check result on version
2281
- version_with_thumbnail = find_one_await_thumbnail (self .sg , 'Version' , [['id' , 'is' , self .version ['id' ]]])
2276
+ version_with_thumbnail = self .find_one_await_thumbnail (
2277
+ "Version" ,
2278
+ [["id" , "is" , self .version ["id" ]]],
2279
+ )
2282
2280
2283
2281
self .assertEqual (version_with_thumbnail .get ('type' ), 'Version' )
2284
2282
self .assertEqual (version_with_thumbnail .get ('id' ), self .version ['id' ])
@@ -3040,19 +3038,5 @@ def _get_path(url):
3040
3038
return url .path
3041
3039
3042
3040
3043
- def find_one_await_thumbnail (sg , entity_type , filters , fields = ["image" ], thumbnail_field_name = "image" , ** kwargs ):
3044
- attempts = 0
3045
- result = sg .find_one (entity_type , filters , fields = fields , ** kwargs )
3046
- while (
3047
- attempts < THUMBNAIL_MAX_ATTEMPTS
3048
- and result [thumbnail_field_name ]
3049
- and TRANSIENT_IMAGE_PATH in result [thumbnail_field_name ]
3050
- ):
3051
- time .sleep (THUMBNAIL_RETRY_INTERAL )
3052
- result = sg .find_one (entity_type , filters , fields = fields , ** kwargs )
3053
- attempts += 1
3054
- return result
3055
-
3056
-
3057
3041
if __name__ == '__main__' :
3058
3042
unittest .main ()
0 commit comments