11import shutil
22from concurrent .futures import ThreadPoolExecutor
33from functools import partial
4- from threading import Lock
54from typing import Any , Dict , List , Set
65
76from django .conf import settings
109
1110from scpca_portal import common , metadata_file , s3
1211from scpca_portal .config .logging import get_and_configure_logger
13- from scpca_portal .models import ComputedFile , Contact , ExternalAccession , Project , Publication
12+ from scpca_portal .models import (
13+ ComputedFile ,
14+ Contact ,
15+ ExternalAccession ,
16+ Project ,
17+ Publication ,
18+ Sample ,
19+ )
1420
1521logger = get_and_configure_logger (__name__ )
1622
@@ -136,25 +142,55 @@ def create_project(
136142 return project
137143
138144
139- def _create_computed_file (future , * , update_s3 : bool , clean_up_output_data : bool ) -> None :
145+ def _create_computed_file (
146+ computed_file : ComputedFile , update_s3 : bool , clean_up_output_data : bool
147+ ) -> None :
140148 """
141149 Save computed file returned from future to the db.
142150 Upload file to s3 and clean up output data depending on passed options.
143151 """
144- if computed_file := future .result ():
145-
146- # Only upload and clean up projects and the last sample if multiplexed
147- if computed_file .project or computed_file .sample .is_last_multiplexed_sample :
148- if update_s3 :
149- s3 .upload_output_file (computed_file .s3_key , computed_file .s3_bucket )
150- if clean_up_output_data :
151- computed_file .clean_up_local_computed_file ()
152+ if update_s3 :
153+ s3 .upload_output_file (computed_file .s3_key , computed_file .s3_bucket )
154+ if clean_up_output_data :
155+ computed_file .clean_up_local_computed_file ()
156+
157+ if computed_file .sample and computed_file .has_multiplexed_data :
158+ computed_files = computed_file .get_multiplexed_computed_files ()
159+ ComputedFile .objects .bulk_create (computed_files )
160+ else :
152161 computed_file .save ()
153162
163+
164+ def _create_computed_file_callback (future , * , update_s3 : bool , clean_up_output_data : bool ) -> None :
165+ """
166+ Wrap computed file saving and uploading to s3 in a way that accommodates multiprocessing.
167+ """
168+ if computed_file := future .result ():
169+ _create_computed_file (computed_file , update_s3 , clean_up_output_data )
170+
154171 # Close DB connection for each thread.
155172 connection .close ()
156173
157174
175+ def generate_computed_file (
176+ * ,
177+ download_config : Dict ,
178+ project : Project | None = None ,
179+ sample : Sample | None = None ,
180+ update_s3 : bool = True ,
181+ ) -> None :
182+
183+ # Purge old computed file
184+ if old_computed_file := (project or sample ).get_computed_file (download_config ):
185+ old_computed_file .purge (update_s3 )
186+
187+ if project and (computed_file := ComputedFile .get_project_file (project , download_config )):
188+ _create_computed_file (computed_file , update_s3 , clean_up_output_data = False )
189+ if sample and (computed_file := ComputedFile .get_sample_file (sample , download_config )):
190+ _create_computed_file (computed_file , update_s3 , clean_up_output_data = False )
191+ sample .project .update_downloadable_sample_count ()
192+
193+
158194def generate_computed_files (
159195 project : Project ,
160196 max_workers : int ,
@@ -170,33 +206,27 @@ def generate_computed_files(
170206
171207 # Prep callback function
172208 on_get_file = partial (
173- _create_computed_file ,
209+ _create_computed_file_callback ,
174210 update_s3 = update_s3 ,
175211 clean_up_output_data = clean_up_output_data ,
176212 )
177- # Prepare a threading.Lock for each sample, with the chief purpose being to protect
178- # multiplexed samples that share a zip file.
179- locks = {}
213+
180214 with ThreadPoolExecutor (max_workers = max_workers ) as tasks :
181215 # Generated project computed files
182216 for config in common .GENERATED_PROJECT_DOWNLOAD_CONFIGS :
183217 tasks .submit (
184218 ComputedFile .get_project_file ,
185219 project ,
186220 config ,
187- project .get_output_file_name (config ),
188221 ).add_done_callback (on_get_file )
189222
190223 # Generated sample computed files
191- for sample in project .samples . all () :
224+ for sample in project .samples_to_generate :
192225 for config in common .GENERATED_SAMPLE_DOWNLOAD_CONFIGS :
193- sample_lock = locks .setdefault (sample .get_config_identifier (config ), Lock ())
194226 tasks .submit (
195227 ComputedFile .get_sample_file ,
196228 sample ,
197229 config ,
198- sample .get_output_file_name (config ),
199- sample_lock ,
200230 ).add_done_callback (on_get_file )
201231
202232 project .update_downloadable_sample_count ()
0 commit comments