88use std:: path:: { PathBuf , Path } ;
99use postgres:: Connection ;
1010use rustc_serialize:: json:: { Json , ToJson } ;
11+ use std:: cmp;
1112use std:: fs;
1213use std:: io:: Read ;
1314use error:: Result ;
@@ -17,6 +18,9 @@ use rusoto_core::region::Region;
1718use rusoto_credential:: DefaultCredentialsProvider ;
1819
1920
21+ const MAX_CONCURRENT_UPLOADS : usize = 1000 ;
22+
23+
2024fn get_file_list_from_dir < P : AsRef < Path > > ( path : P ,
2125 files : & mut Vec < PathBuf > )
2226 -> Result < ( ) > {
@@ -145,11 +149,23 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
145149 try!( cookie. load :: < & str > ( & [ ] ) ) ;
146150
147151 let trans = try!( conn. transaction ( ) ) ;
148- let mut client = s3_client ( ) ;
149- let mut file_list_with_mimes: Vec < ( String , PathBuf ) > = Vec :: new ( ) ;
150152
151- for file_path in try!( get_file_list ( & path) ) {
152- let ( path, content, mime) = {
153+ use std:: collections:: HashMap ;
154+ let mut file_paths_and_mimes: HashMap < PathBuf , String > = HashMap :: new ( ) ;
155+
156+ let mut rt = :: tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
157+
158+ let mut to_upload = try!( get_file_list ( & path) ) ;
159+ let mut batch_size = cmp:: min ( to_upload. len ( ) , MAX_CONCURRENT_UPLOADS ) ;
160+ let mut currently_uploading: Vec < _ > = to_upload. drain ( ..batch_size) . collect ( ) ;
161+ let mut attempts = 0 ;
162+
163+ while !to_upload. is_empty ( ) || !currently_uploading. is_empty ( ) {
164+
165+ let mut futures = Vec :: new ( ) ;
166+ let client = s3_client ( ) ;
167+
168+ for file_path in & currently_uploading {
153169 let path = Path :: new ( path. as_ref ( ) ) . join ( & file_path) ;
154170 // Some files have insufficient permissions (like .lock file created by cargo in
155171 // documentation directory). We are skipping this files.
@@ -182,73 +198,60 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
182198 }
183199 } ;
184200
185- let content: Option < Vec < u8 > > = if let Some ( client) = & mut client {
186- let mut attempts = 0 ;
187- loop {
188- let s3_res = client. put_object ( PutObjectRequest {
189- bucket : "rust-docs-rs" . into ( ) ,
190- key : bucket_path. clone ( ) ,
191- body : Some ( content. clone ( ) . into ( ) ) ,
192- content_type : Some ( mime. clone ( ) ) ,
193- ..Default :: default ( )
194- } ) . sync ( ) ;
195- attempts += 1 ;
196- match s3_res {
197- // we've successfully uploaded the content, so steal it;
198- // we don't want to put it in the DB
199- Ok ( _) => {
200- crate :: web:: metrics:: UPLOADED_FILES_TOTAL . inc_by ( 1 ) ;
201- break None ;
202- } ,
203- // Since s3 was configured, we want to panic on failure to upload.
204- Err ( e) => {
205- log:: error!( "failed to upload to {}: {:?}" , bucket_path, e) ;
206- // Get a new client, in case the old one's connection is stale.
207- // AWS will kill our connection if it's alive for too long; this avoids
208- // that preventing us from building the crate entirely.
209- * client = s3_client ( ) . unwrap ( ) ;
210- if attempts > 3 {
211- panic ! ( "failed to upload 3 times, exiting" ) ;
212- } else {
213- continue ;
214- }
215- } ,
216- }
217- }
201+ if let Some ( client) = & client {
202+ futures. push ( client. put_object ( PutObjectRequest {
203+ bucket : "rust-docs-rs" . into ( ) ,
204+ key : bucket_path. clone ( ) ,
205+ body : Some ( content. clone ( ) . into ( ) ) ,
206+ content_type : Some ( mime. clone ( ) ) ,
207+ ..Default :: default ( )
208+ } ) ) ;
218209 } else {
219- Some ( content. clone ( ) . into ( ) )
220- } ;
221-
222- file_list_with_mimes. push ( ( mime. clone ( ) , file_path. clone ( ) ) ) ;
210+ // If AWS credentials are configured, don't insert/update the database
211+ // check if file already exists in database
212+ let rows = try!( conn. query ( "SELECT COUNT(*) FROM files WHERE path = $1" , & [ & bucket_path] ) ) ;
223213
224- (
225- bucket_path,
226- content,
227- mime,
228- )
229- } ;
214+ if rows. get ( 0 ) . get :: < usize , i64 > ( 0 ) == 0 {
215+ try!( trans. query ( "INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)" ,
216+ & [ & bucket_path, & mime, & content] ) ) ;
217+ } else {
218+ try!( trans. query ( "UPDATE files SET mime = $2, content = $3, date_updated = NOW() \
219+ WHERE path = $1",
220+ & [ & bucket_path, & mime, & content] ) ) ;
221+ }
222+ }
230223
231- // If AWS credentials are configured, don't insert/update the database
232- if client. is_none ( ) {
233- // check if file already exists in database
234- let rows = try!( conn. query ( "SELECT COUNT(*) FROM files WHERE path = $1" , & [ & path] ) ) ;
224+ file_paths_and_mimes. insert ( file_path. clone ( ) , mime. clone ( ) ) ;
225+ }
235226
236- let content = content. expect ( "content never None if client is None" ) ;
227+ if !futures. is_empty ( ) {
228+ attempts += 1 ;
237229
238- if rows. get ( 0 ) . get :: < usize , i64 > ( 0 ) == 0 {
239- try!( trans. query ( "INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)" ,
240- & [ & path, & mime, & content] ) ) ;
230+ if rt. block_on ( :: futures:: future:: join_all ( futures) ) . is_ok ( ) {
231+ // this batch was successful, start another batch if there are still more files
232+ crate :: web:: metrics:: UPLOADED_FILES_TOTAL . inc_by ( batch_size as i64 ) ;
233+ batch_size = cmp:: min ( to_upload. len ( ) , MAX_CONCURRENT_UPLOADS ) ;
234+ currently_uploading = to_upload. drain ( ..batch_size) . collect ( ) ;
235+ attempts = 0 ;
241236 } else {
242- try!( trans. query ( "UPDATE files SET mime = $2, content = $3, date_updated = NOW() \
243- WHERE path = $1",
244- & [ & path, & mime, & content] ) ) ;
237+ // if any futures error, leave `currently_uploading` in tact so that we can retry the batch
238+ if attempts > 2 {
239+ panic ! ( "failed to upload 3 times, exiting" ) ;
240+ }
245241 }
246- crate :: web:: metrics:: UPLOADED_FILES_TOTAL . inc_by ( 1 ) ;
242+ } else {
243+ crate :: web:: metrics:: UPLOADED_FILES_TOTAL . inc_by ( batch_size as i64 ) ;
244+ batch_size = cmp:: min ( to_upload. len ( ) , MAX_CONCURRENT_UPLOADS ) ;
245+ currently_uploading = to_upload. drain ( ..batch_size) . collect ( ) ;
247246 }
248247 }
249248
250249 try!( trans. commit ( ) ) ;
251250
251+ let file_list_with_mimes: Vec < ( String , PathBuf ) > = file_paths_and_mimes
252+ . into_iter ( )
253+ . map ( |( file_path, mime) | ( mime, file_path) )
254+ . collect ( ) ;
252255 file_list_to_json ( file_list_with_mimes)
253256}
254257
0 commit comments