Skip to content

Commit 5d22713

Browse files
authored
Merge pull request #393 from miller-time/parallel-s3-uploads
Parallel s3 uploads
2 parents c3d3ed3 + c95b8bf commit 5d22713

File tree

1 file changed

+62
-59
lines changed

1 file changed

+62
-59
lines changed

src/db/file.rs

+62-59
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use std::path::{PathBuf, Path};
99
use postgres::Connection;
1010
use rustc_serialize::json::{Json, ToJson};
11+
use std::cmp;
1112
use std::fs;
1213
use std::io::Read;
1314
use error::Result;
@@ -17,6 +18,9 @@ use rusoto_core::region::Region;
1718
use rusoto_credential::DefaultCredentialsProvider;
1819

1920

21+
const MAX_CONCURRENT_UPLOADS: usize = 1000;
22+
23+
2024
fn get_file_list_from_dir<P: AsRef<Path>>(path: P,
2125
files: &mut Vec<PathBuf>)
2226
-> Result<()> {
@@ -145,11 +149,23 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
145149
try!(cookie.load::<&str>(&[]));
146150

147151
let trans = try!(conn.transaction());
148-
let mut client = s3_client();
149-
let mut file_list_with_mimes: Vec<(String, PathBuf)> = Vec::new();
150152

151-
for file_path in try!(get_file_list(&path)) {
152-
let (path, content, mime) = {
153+
use std::collections::HashMap;
154+
let mut file_paths_and_mimes: HashMap<PathBuf, String> = HashMap::new();
155+
156+
let mut rt = ::tokio::runtime::Runtime::new().unwrap();
157+
158+
let mut to_upload = try!(get_file_list(&path));
159+
let mut batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS);
160+
let mut currently_uploading: Vec<_> = to_upload.drain(..batch_size).collect();
161+
let mut attempts = 0;
162+
163+
while !to_upload.is_empty() || !currently_uploading.is_empty() {
164+
165+
let mut futures = Vec::new();
166+
let client = s3_client();
167+
168+
for file_path in &currently_uploading {
153169
let path = Path::new(path.as_ref()).join(&file_path);
154170
// Some files have insufficient permissions (like .lock file created by cargo in
155171
// documentation directory). We are skipping this files.
@@ -182,73 +198,60 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
182198
}
183199
};
184200

185-
let content: Option<Vec<u8>> = if let Some(client) = &mut client {
186-
let mut attempts = 0;
187-
loop {
188-
let s3_res = client.put_object(PutObjectRequest {
189-
bucket: "rust-docs-rs".into(),
190-
key: bucket_path.clone(),
191-
body: Some(content.clone().into()),
192-
content_type: Some(mime.clone()),
193-
..Default::default()
194-
}).sync();
195-
attempts += 1;
196-
match s3_res {
197-
// we've successfully uploaded the content, so steal it;
198-
// we don't want to put it in the DB
199-
Ok(_) => {
200-
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
201-
break None;
202-
},
203-
// Since s3 was configured, we want to panic on failure to upload.
204-
Err(e) => {
205-
log::error!("failed to upload to {}: {:?}", bucket_path, e);
206-
// Get a new client, in case the old one's connection is stale.
207-
// AWS will kill our connection if it's alive for too long; this avoids
208-
// that preventing us from building the crate entirely.
209-
*client = s3_client().unwrap();
210-
if attempts > 3 {
211-
panic!("failed to upload 3 times, exiting");
212-
} else {
213-
continue;
214-
}
215-
},
216-
}
217-
}
201+
if let Some(client) = &client {
202+
futures.push(client.put_object(PutObjectRequest {
203+
bucket: "rust-docs-rs".into(),
204+
key: bucket_path.clone(),
205+
body: Some(content.clone().into()),
206+
content_type: Some(mime.clone()),
207+
..Default::default()
208+
}));
218209
} else {
219-
Some(content.clone().into())
220-
};
221-
222-
file_list_with_mimes.push((mime.clone(), file_path.clone()));
210+
// If AWS credentials are configured, don't insert/update the database
211+
// check if file already exists in database
212+
let rows = try!(conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&bucket_path]));
223213

224-
(
225-
bucket_path,
226-
content,
227-
mime,
228-
)
229-
};
214+
if rows.get(0).get::<usize, i64>(0) == 0 {
215+
try!(trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)",
216+
&[&bucket_path, &mime, &content]));
217+
} else {
218+
try!(trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \
219+
WHERE path = $1",
220+
&[&bucket_path, &mime, &content]));
221+
}
222+
}
230223

231-
// If AWS credentials are configured, don't insert/update the database
232-
if client.is_none() {
233-
// check if file already exists in database
234-
let rows = try!(conn.query("SELECT COUNT(*) FROM files WHERE path = $1", &[&path]));
224+
file_paths_and_mimes.insert(file_path.clone(), mime.clone());
225+
}
235226

236-
let content = content.expect("content never None if client is None");
227+
if !futures.is_empty() {
228+
attempts += 1;
237229

238-
if rows.get(0).get::<usize, i64>(0) == 0 {
239-
try!(trans.query("INSERT INTO files (path, mime, content) VALUES ($1, $2, $3)",
240-
&[&path, &mime, &content]));
230+
if rt.block_on(::futures::future::join_all(futures)).is_ok() {
231+
// this batch was successful, start another batch if there are still more files
232+
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(batch_size as i64);
233+
batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS);
234+
currently_uploading = to_upload.drain(..batch_size).collect();
235+
attempts = 0;
241236
} else {
242-
try!(trans.query("UPDATE files SET mime = $2, content = $3, date_updated = NOW() \
243-
WHERE path = $1",
244-
&[&path, &mime, &content]));
237+
// if any futures error, leave `currently_uploading` in tact so that we can retry the batch
238+
if attempts > 2 {
239+
panic!("failed to upload 3 times, exiting");
240+
}
245241
}
246-
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
242+
} else {
243+
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(batch_size as i64);
244+
batch_size = cmp::min(to_upload.len(), MAX_CONCURRENT_UPLOADS);
245+
currently_uploading = to_upload.drain(..batch_size).collect();
247246
}
248247
}
249248

250249
try!(trans.commit());
251250

251+
let file_list_with_mimes: Vec<(String, PathBuf)> = file_paths_and_mimes
252+
.into_iter()
253+
.map(|(file_path, mime)| (mime, file_path))
254+
.collect();
252255
file_list_to_json(file_list_with_mimes)
253256
}
254257

0 commit comments

Comments
 (0)