Skip to content

Commit d363d56

Browse files
authored
Post Process Batch Job (#312)
* initial post process batch job * add collection support and linting * linting * addressing comments * fix for inactive jobs
1 parent 1d947f7 commit d363d56

11 files changed

Lines changed: 400 additions & 40 deletions

File tree

client/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ The client is broken into 4 main folders which separate different parts of the s
102102

103103
## Unified API Specification
104104

105-
To resuse as much code as possible between the Desktop and Web versions there is a unified API which provides the capability to export/import/save/delete annotations and metadata as well as run training and configuration pipelines. This API allows both versions to share the `/dive-common` and `/src` folders while handling these calls differently.
105+
To reuse as much code as possible between the Desktop and Web versions there is a unified API which provides the capability to export/import/save/delete annotations and metadata as well as run training and configuration pipelines. This API allows both versions to share the `/dive-common` and `/src` folders while handling these calls differently.
106106

107107
## Annotation Viewer Organization
108108

client/platform/web-girder/views/Admin/AdminJobs.vue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export default defineComponent({
5555
const initTypes = async () => {
5656
const typesAndStatus = (await getJobTypesStatus());
5757
jobTypes.value = typesAndStatus.data.types;
58-
filterTypes.value = ['convert', 'Dive Metadata Slicer CLI Batch', 'slicer_cli_web_job', 'Slicer CLI Metadata Run'];
58+
filterTypes.value = ['convert', 'Dive Metadata Slicer CLI Batch', 'slicer_cli_web_job', 'Slicer CLI Metadata Run', 'DIVE Batch Postprocess'];
5959
};
6060
const getData = async () => {
6161
const statusNums = filterStatus.value.map(

server/dive_server/crud_rpc.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def postprocess(
202202
additive=False,
203203
additivePrepend='',
204204
logic='replace',
205-
) -> types.GirderModel:
205+
) -> dict:
206206
"""
207207
Post-processing to be run after media/annotation import
208208
@@ -214,12 +214,18 @@ def postprocess(
214214
215215
In either case, the following may run synchronously:
216216
Conversion of CSV annotations into track JSON
217+
218+
Returns:
219+
dict: Contains 'folder' (the processed folder) and 'job_ids' (list of created job IDs)
217220
"""
218221
job_is_private = user.get(constants.UserPrivateQueueEnabledMarker, False)
219222
isClone = dsFolder.get(constants.ForeignMediaIdMarker, None) is not None
220223
# add default confidence filter threshold to folder metadata
221224
dsFolder['meta'][constants.ConfidenceFiltersMarker] = {'default': 0.1}
222225

226+
# Track job IDs for batch processing
227+
created_job_ids = []
228+
223229
# Validate user-supplied metadata fields are present
224230
if fromMeta(dsFolder, constants.FPSMarker) is None:
225231
raise RestException(f'{constants.FPSMarker} missing from metadata')
@@ -255,7 +261,8 @@ def postprocess(
255261
newjob.job[constants.JOBCONST_DATASET_ID] = str(item["folderId"])
256262
newjob.job[constants.JOBCONST_CREATOR] = str(user['_id'])
257263
Job().save(newjob.job)
258-
return dsFolder
264+
created_job_ids.append(newjob.job['_id'])
265+
return {'folder': dsFolder, 'job_ids': created_job_ids}
259266

260267
if not isClone:
261268
# transcode VIDEO if necessary
@@ -281,6 +288,7 @@ def postprocess(
281288
newjob.job[constants.JOBCONST_PRIVATE_QUEUE] = job_is_private
282289
newjob.job[constants.JOBCONST_DATASET_ID] = dsFolder["_id"]
283290
Job().save(newjob.job)
291+
created_job_ids.append(newjob.job['_id'])
284292

285293
# transcode IMAGERY if necessary
286294
imageItems = Folder().childItems(
@@ -305,11 +313,12 @@ def postprocess(
305313
newjob.job[constants.JOBCONST_PRIVATE_QUEUE] = job_is_private
306314
newjob.job[constants.JOBCONST_DATASET_ID] = dsFolder["_id"]
307315
Job().save(newjob.job)
316+
created_job_ids.append(newjob.job['_id'])
308317

309318
elif imageItems.count() > 0:
310319
dsFolder["meta"][constants.DatasetMarker] = True
311320

312321
Folder().save(dsFolder)
313322
print(f'Processing Items: {user}')
314323
process_items(dsFolder, user, additive, additivePrepend)
315-
return dsFolder
324+
return {'folder': dsFolder, 'job_ids': created_job_ids}

server/dive_server/event.py

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@
22
import os
33

44
from bson.objectid import ObjectId
5+
import cherrypy
56
from girder import logger
7+
from girder.api.rest import getApiUrl
8+
from girder.models.collection import Collection
69
from girder.models.folder import Folder
710
from girder.models.item import Item
811
from girder.models.setting import Setting
12+
from girder.models.token import Token
913
from girder.models.user import User
1014
from girder.settings import SettingKey
1115
from girder.utility.mail_utils import renderTemplate, sendMail
16+
from girder_jobs.models.job import Job
17+
from girder_worker.girder_plugin.utils import getWorkerApiUrl
1218

19+
from dive_tasks.dive_batch_postprocess import DIVEBatchPostprocessTaskParams
1320
from dive_utils import asbool, fromMeta
1421
from dive_utils.constants import (
1522
AssetstoreSourceMarker,
@@ -22,8 +29,6 @@
2229
videoRegex,
2330
)
2431

25-
from . import crud_rpc
26-
2732

2833
def send_new_user_email(event):
2934
try:
@@ -66,11 +71,11 @@ def process_assetstore_import(event, meta: dict):
6671
userId = parentFolder['creatorId'] or parentFolder['baseParentId']
6772
user = User().findOne({'_id': ObjectId(userId)})
6873
foldername = f'Video {item["name"]}'
69-
# resuse existing folder if it already exists with same name
74+
# reuse existing folder if it already exists with same name
7075
dest = Folder().createFolder(parentFolder, foldername, creator=user, reuseExisting=True)
7176
now = datetime.now()
7277
if now - dest['created'] > timedelta(hours=1):
73-
# Remove the old referenced item, replace it with the new one.
78+
# Remove the old referenced item, replace it with the new one.
7479
oldItem = Item().findOne({'folderId': dest['_id'], 'name': item['name']})
7580
if oldItem is not None:
7681
if oldItem['meta'].get('codec', False):
@@ -108,12 +113,38 @@ def process_assetstore_import(event, meta: dict):
108113

109114

110115
def convert_video_recrusive(folder, user):
111-
subFolders = list(Folder().childFolders(folder, 'folder', user))
112-
for child in subFolders:
113-
if child.get('meta', {}).get(MarkForPostProcess, False):
114-
Folder().save(child)
115-
crud_rpc.postprocess(user, child, False, True)
116-
convert_video_recrusive(child, user)
116+
"""
117+
Start a batch postprocess job for all folders with MarkForPostProcess flag.
118+
This replaces the manual recursive postprocess calls with a single batch job.
119+
"""
120+
# Create a token for the batch job
121+
122+
token = Token().createToken(user=user, days=2)
123+
124+
dive_batch_postprocess_task_params: DIVEBatchPostprocessTaskParams = {
125+
"source_folder_id": str(folder['_id']),
126+
"skipJobs": False, # Allow jobs to run (transcoding, etc.)
127+
"skipTranscoding": True, # Skip transcoding if not needed
128+
"additive": False,
129+
"additivePrepend": '',
130+
"userId": str(user['_id']),
131+
"girderToken": str(token['_id']),
132+
"girderApiUrl": getWorkerApiUrl(),
133+
}
134+
if not Setting().get('worker.api_url'):
135+
Setting().set('worker.api_url', getApiUrl())
136+
job = Job().createLocalJob(
137+
module='dive_tasks.dive_batch_postprocess',
138+
function='batchPostProcessingTaskLauncher',
139+
kwargs={'params': dive_batch_postprocess_task_params, 'url': cherrypy.url()},
140+
title='Batch process Dive Batch Postprocess',
141+
type='DIVE Batch Postprocess',
142+
user=user,
143+
public=True,
144+
asynchronous=True,
145+
)
146+
job = Job().save(job)
147+
Job().scheduleJob(job)
117148

118149

119150
class DIVES3Imports:
@@ -132,6 +163,14 @@ def process_s3_import_after(self, event):
132163
userId = destinationFolder['creatorId'] or destinationFolder['baseParentId']
133164
user = User().findOne({'_id': ObjectId(userId)})
134165
convert_video_recrusive(destinationFolder, user)
166+
if self.destinationType == 'collection' and self.destinationId is not None:
167+
destinationCollection = Collection().findOne({"_id": ObjectId(self.destinationId)})
168+
userId = destinationCollection['creatorId'] or destinationCollection['baseParentId']
169+
user = User().findOne({'_id': ObjectId(userId)})
170+
child_folders = Folder().find({'parentId': ObjectId(self.destinationId)})
171+
for child in child_folders:
172+
convert_video_recrusive(child, user)
173+
135174
self.destinationId = None
136175
self.destinationType = None
137176

server/dive_server/views_metadata.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
from girder.models.file import File
1515
from girder.models.folder import Folder
1616
from girder.models.item import Item
17-
from girder.models.upload import Upload
1817
from girder.models.setting import Setting
1918
from girder.models.token import Token
19+
from girder.models.upload import Upload
2020
from girder.utility import path as path_util
2121
from girder_jobs.models.job import Job
2222
from girder_worker.girder_plugin.utils import getWorkerApiUrl
@@ -28,11 +28,11 @@
2828
DIVEMetadataClonedFilter,
2929
DIVEMetadataClonedFilterBase,
3030
DIVEMetadataFilter,
31-
DIVEMetadataMarker,
3231
DIVEMetadataHistoryMarker,
32+
DIVEMetadataMarker,
33+
csvRegex,
3334
jsonRegex,
3435
ndjsonRegex,
35-
csvRegex
3636
)
3737
from dive_utils.metadata.models import DIVE_Metadata, DIVE_MetadataKeys
3838
from dive_utils.types import DiveDatasetList, DIVEMetadataSlicerCLITaskParams
@@ -192,9 +192,7 @@ def bulk_metadata_update_process(user, rootFolder, updates, replace=False):
192192
raise RestException('Metadata Updates need either DIVEDataset or Filename', code=400)
193193
if dive_metadata:
194194
# Find the DIVE_Metadata entry for this dataset and root
195-
dataset = Folder().load(
196-
dive_metadata['DIVEDataset'], level=AccessType.READ, user=user
197-
)
195+
dataset = Folder().load(dive_metadata['DIVEDataset'], level=AccessType.READ, user=user)
198196
updated_keys = []
199197
errors = []
200198
# initial pass for all metadata keys:
@@ -1367,16 +1365,21 @@ def bulk_metadata_process_file(self, user, rootFolder, updates, replace=False):
13671365
if len(errors) > 0:
13681366
return results
13691367
# Check and find DIVEMetadataMarker folder or create it if it doesn't exist
1370-
metadata_folder = Folder().findOne({
1371-
"name": DIVEMetadataHistoryMarker,
1372-
"parent": str(rootFolder["_id"]),
1373-
f'meta.{DIVEMetadataHistoryMarker}': {'$in': TRUTHY_META_VALUES},
1374-
1375-
}, user=user, level=AccessType.WRITE)
1368+
metadata_folder = Folder().findOne(
1369+
{
1370+
"name": DIVEMetadataHistoryMarker,
1371+
"parent": str(rootFolder["_id"]),
1372+
f'meta.{DIVEMetadataHistoryMarker}': {'$in': TRUTHY_META_VALUES},
1373+
},
1374+
user=user,
1375+
level=AccessType.WRITE,
1376+
)
13761377
if not metadata_folder:
13771378
metadata_folder = Folder().createFolder(
1378-
rootFolder, DIVEMetadataHistoryMarker,
1379-
reuseExisting=True, creator=user,
1379+
rootFolder,
1380+
DIVEMetadataHistoryMarker,
1381+
reuseExisting=True,
1382+
creator=user,
13801383
)
13811384
Folder().setMetadata(metadata_folder, {DIVEMetadataHistoryMarker: True})
13821385
# now save the previous_data with a timestamp for the name in the folder
@@ -1473,10 +1476,7 @@ def bulk_update_metadata_file(self, rootFolder, replace):
14731476
required=True,
14741477
paramType="body",
14751478
requireArray=True,
1476-
default=[
1477-
{
1478-
}
1479-
],
1479+
default=[{}],
14801480
)
14811481
.param(
14821482
"replace",
@@ -1485,7 +1485,6 @@ def bulk_update_metadata_file(self, rootFolder, replace):
14851485
required=False,
14861486
default=False,
14871487
)
1488-
14891488
)
14901489
def bulk_update_metadata(self, rootFolder, updates, replace=False):
14911490
user = self.getCurrentUser()

server/dive_server/views_rpc.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def __init__(self, resourceName):
9696
)
9797
)
9898
def postprocess(self, folder, skipJobs, skipTranscoding, additive, additivePrepend, logic):
99-
return crud_rpc.postprocess(
99+
result = crud_rpc.postprocess(
100100
self.getCurrentUser(),
101101
folder,
102102
skipJobs,
@@ -105,6 +105,8 @@ def postprocess(self, folder, skipJobs, skipTranscoding, additive, additivePrepe
105105
additivePrepend,
106106
logic,
107107
)
108+
# Return the folder for backward compatibility, but also include job_ids
109+
return result
108110

109111
@access.user
110112
@autoDescribeRoute(

server/dive_server/web_client/JobStatus.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ events.on('g:appload.before', () => {
55
const JobStatus = girder.plugins.jobs.JobStatus;
66
const jobPluginIsCancelable = JobStatus.isCancelable;
77
JobStatus.isCancelable = function (job) {
8-
if (job.get('type').startsWith('Dive Metadata Slicer CLI Batch')) {
8+
if (job.get('type').startsWith('Dive Metadata Slicer CLI Batch') || job.get('type').startsWith('DIVE Batch Postprocess')) {
99
return ![JobStatus.CANCELED, JobStatus.WORKER_CANCELING || 824,
1010
JobStatus.SUCCESS, JobStatus.ERROR].includes(job.get('status'));
1111
}

server/dive_tasks/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,9 @@ def __init__(self, app, *args, **kwargs):
2424
def task_imports(self):
2525
# Return a list of python importable paths to the
2626
# plugin's path directory
27-
return ["dive_tasks.tasks", "dive_tasks.dive_metadata_slicer_cli", "dive_tasks.sam_tasks"]
27+
return [
28+
"dive_tasks.tasks",
29+
"dive_tasks.dive_metadata_slicer_cli",
30+
"dive_tasks.sam_tasks",
31+
"dive_tasks.dive_batch_postprocess",
32+
]

0 commit comments

Comments
 (0)