Skip to content

Parallel s3 uploads #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 5, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 62 additions & 59 deletions src/db/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::path::{PathBuf, Path};
use postgres::Connection;
use rustc_serialize::json::{Json, ToJson};
use std::cmp;
use std::fs;
use std::io::Read;
use error::Result;
Expand All @@ -17,6 +18,9 @@ use rusoto_core::region::Region;
use rusoto_credential::DefaultCredentialsProvider;


const MAX_CONCURRENT_UPLOADS: usize = 1000;


fn get_file_list_from_dir<P: AsRef<Path>>(path: P,
files: &mut Vec<PathBuf>)
-> Result<()> {
Expand Down Expand Up @@ -145,11 +149,23 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
try!(cookie.load::<&str>(&[]));

let trans = try!(conn.transaction());
let mut client = s3_client();
let mut file_list_with_mimes: Vec<(String, PathBuf)> = Vec::new();

for file_path in try!(get_file_list(&path)) {
let (path, content, mime) = {
use std::collections::HashMap;
let mut file_paths_and_mimes: HashMap<PathBuf, String> = HashMap::new();

let mut rt = ::tokio::runtime::Runtime::new().unwrap();

let mut to_upload = try!(get_file_list(&path));
let mut batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS);
let mut currently_uploading: Vec<_> = to_upload.drain(..batch_size).collect();
let mut attempts = 0;

while !to_upload.is_empty() || !currently_uploading.is_empty() {

let mut futures = Vec::new();
let client = s3_client();

for file_path in &currently_uploading {
let path = Path::new(path.as_ref()).join(&file_path);
// Some files have insufficient permissions (like .lock file created by cargo in
// documentation directory). We are skipping this files.
Expand Down Expand Up @@ -182,73 +198,60 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
}
};

let content: Option<Vec<u8>> = if let Some(client) = &mut client {
let mut attempts = 0;
loop {
let s3_res = client.put_object(PutObjectRequest {
bucket: "rust-docs-rs".into(),
key: bucket_path.clone(),
body: Some(content.clone().into()),
content_type: Some(mime.clone()),
..Default::default()
}).sync();
attempts += 1;
match s3_res {
// we've successfully uploaded the content, so steal it;
// we don't want to put it in the DB
Ok(_) => {
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
break None;
},
// Since s3 was configured, we want to panic on failure to upload.
Err(e) => {
log::error!("failed to upload to {}: {:?}", bucket_path, e);
// Get a new client, in case the old one's connection is stale.
// AWS will kill our connection if it's alive for too long; this avoids
// that preventing us from building the crate entirely.
*client = s3_client().unwrap();
if attempts > 3 {
panic!("failed to upload 3 times, exiting");
} else {
continue;
}
},
}
}
if let Some(client) = &client {
futures.push(client.put_object(PutObjectRequest {
bucket: "rust-docs-rs".into(),
key: bucket_path.clone(),
body: Some(content.clone().into()),
content_type: Some(mime.clone()),
..Default::default()
}));
} else {
Some(content.clone().into())
};

file_list_with_mimes.push((mime.clone(), file_path.clone()));
// If AWS credentials are configured, don't insert/update the database
// check if file already exists in database
let rows = try!(conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&bucket_path]));

(
bucket_path,
content,
mime,
)
};
if rows.get(0).get::<usize, i64>(0) == 0 {
try!(trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)",
&[&bucket_path, &mime, &content]));
} else {
try!(trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \
WHERE path = $1",
&[&bucket_path, &mime, &content]));
}
}

// If AWS credentials are configured, don't insert/update the database
if client.is_none() {
// check if file already exists in database
let rows = try!(conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&path]));
file_paths_and_mimes.insert(file_path.clone(), mime.clone());
}

let content = content.expect("content never None if client is None");
if !futures.is_empty() {
attempts += 1;

if rows.get(0).get::<usize, i64>(0) == 0 {
try!(trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)",
&[&path, &mime, &content]));
if rt.block_on(::futures::future::join_all(futures)).is_ok() {
// this batch was successful, start another batch if there are still more files
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(batch_size as i64);
batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS);
currently_uploading = to_upload.drain(..batch_size).collect();
attempts = 0;
} else {
try!(trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \
WHERE path = $1",
&[&path, &mime, &content]));
// if any futures error, leave `currently_uploading` in tact so that we can retry the batch
if attempts > 2 {
panic!("failed to upload 3 times, exiting");
}
}
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
} else {
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(batch_size as i64);
batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS);
currently_uploading = to_upload.drain(..batch_size).collect();
}
}

try!(trans.commit());

let file_list_with_mimes: Vec<(String, PathBuf)> = file_paths_and_mimes
.into_iter()
.map(|(file_path, mime)| (mime, file_path))
.collect();
file_list_to_json(file_list_with_mimes)
}

Expand Down