From c95b8bf6e9ba64816b4ede94a4ece8bade6dbaaf Mon Sep 17 00:00:00 2001 From: Russell Cousineau Date: Fri, 2 Aug 2019 16:38:13 -0700 Subject: [PATCH] perform s3 uploads in parallel --- src/db/file.rs | 121 +++++++++++++++++++++++++------------------------ 1 file changed, 62 insertions(+), 59 deletions(-) diff --git a/src/db/file.rs b/src/db/file.rs index 98f57f026..4687f9be1 100644 --- a/src/db/file.rs +++ b/src/db/file.rs @@ -8,6 +8,7 @@ use std::path::{PathBuf, Path}; use postgres::Connection; use rustc_serialize::json::{Json, ToJson}; +use std::cmp; use std::fs; use std::io::Read; use error::Result; @@ -17,6 +18,9 @@ use rusoto_core::region::Region; use rusoto_credential::DefaultCredentialsProvider; +const MAX_CONCURRENT_UPLOADS: usize = 1000; + + fn get_file_list_from_dir>(path: P, files: &mut Vec) -> Result<()> { @@ -145,11 +149,23 @@ pub fn add_path_into_database>(conn: &Connection, try!(cookie.load::<&str>(&[])); let trans = try!(conn.transaction()); - let mut client = s3_client(); - let mut file_list_with_mimes: Vec<(String, PathBuf)> = Vec::new(); - for file_path in try!(get_file_list(&path)) { - let (path, content, mime) = { + use std::collections::HashMap; + let mut file_paths_and_mimes: HashMap = HashMap::new(); + + let mut rt = ::tokio::runtime::Runtime::new().unwrap(); + + let mut to_upload = try!(get_file_list(&path)); + let mut batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); + let mut currently_uploading: Vec<_> = to_upload.drain(..batch_size).collect(); + let mut attempts = 0; + + while !to_upload.is_empty() || !currently_uploading.is_empty() { + + let mut futures = Vec::new(); + let client = s3_client(); + + for file_path in ¤tly_uploading { let path = Path::new(path.as_ref()).join(&file_path); // Some files have insufficient permissions (like .lock file created by cargo in // documentation directory). We are skipping this files. @@ -182,73 +198,60 @@ pub fn add_path_into_database>(conn: &Connection, } }; - let content: Option> = if let Some(client) = &mut client { - let mut attempts = 0; - loop { - let s3_res = client.put_object(PutObjectRequest { - bucket: "rust-docs-rs".into(), - key: bucket_path.clone(), - body: Some(content.clone().into()), - content_type: Some(mime.clone()), - ..Default::default() - }).sync(); - attempts += 1; - match s3_res { - // we've successfully uploaded the content, so steal it; - // we don't want to put it in the DB - Ok(_) => { - crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); - break None; - }, - // Since s3 was configured, we want to panic on failure to upload. - Err(e) => { - log::error!("failed to upload to {}: {:?}", bucket_path, e); - // Get a new client, in case the old one's connection is stale. - // AWS will kill our connection if it's alive for too long; this avoids - // that preventing us from building the crate entirely. - *client = s3_client().unwrap(); - if attempts > 3 { - panic!("failed to upload 3 times, exiting"); - } else { - continue; - } - }, - } - } + if let Some(client) = &client { + futures.push(client.put_object(PutObjectRequest { + bucket: "rust-docs-rs".into(), + key: bucket_path.clone(), + body: Some(content.clone().into()), + content_type: Some(mime.clone()), + ..Default::default() + })); } else { - Some(content.clone().into()) - }; - - file_list_with_mimes.push((mime.clone(), file_path.clone())); + // If AWS credentials are configured, don't insert/update the database + // check if file already exists in database + let rows = try!(conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&bucket_path])); - ( - bucket_path, - content, - mime, - ) - }; + if rows.get(0).get::(0) == 0 { + try!(trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", + &[&bucket_path, &mime, &content])); + } else { + try!(trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ + WHERE path = $1", + &[&bucket_path, &mime, &content])); + } + } - // If AWS credentials are configured, don't insert/update the database - if client.is_none() { - // check if file already exists in database - let rows = try!(conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&path])); + file_paths_and_mimes.insert(file_path.clone(), mime.clone()); + } - let content = content.expect("content never None if client is None"); + if !futures.is_empty() { + attempts += 1; - if rows.get(0).get::(0) == 0 { - try!(trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)", - &[&path, &mime, &content])); + if rt.block_on(::futures::future::join_all(futures)).is_ok() { + // this batch was successful, start another batch if there are still more files + crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(batch_size as i64); + batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); + currently_uploading = to_upload.drain(..batch_size).collect(); + attempts = 0; } else { - try!(trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \ - WHERE path = $1", - &[&path, &mime, &content])); + // if any futures error, leave `currently_uploading` in tact so that we can retry the batch + if attempts > 2 { + panic!("failed to upload 3 times, exiting"); + } } - crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1); + } else { + crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(batch_size as i64); + batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS); + currently_uploading = to_upload.drain(..batch_size).collect(); } } try!(trans.commit()); + let file_list_with_mimes: Vec<(String, PathBuf)> = file_paths_and_mimes + .into_iter() + .map(|(file_path, mime)| (mime, file_path)) + .collect(); file_list_to_json(file_list_with_mimes) }