Skip to content

Parallel s3 uploads #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 5, 2019

Conversation

miller-time
Copy link
Contributor

No description provided.

@miller-time
Copy link
Contributor Author

oops a commit snuck in from my other branch #392

Copy link
Member

@Mark-Simulacrum Mark-Simulacrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more, we'll probably want to do block_on during the main loop as we get to, say, 1000 futures in queue. Based on previous testing there's not really any serious gain in performance from having more than that in queue anyway. As is this'll probably cause pretty serious memory growth since we'll be storing ~200,000 fairly large objects in memory for some of the larger crates.

@miller-time
Copy link
Contributor Author

miller-time commented Aug 2, 2019

during the main loop

can you point me to that?

@miller-time miller-time force-pushed the parallel-s3-uploads branch from c34a1f0 to 783e2f3 Compare August 2, 2019 23:52
@Mark-Simulacrum
Copy link
Member

Ah, the directory reading loop -- in that same function. Right after pushing onto the vector of futures we can check if we've exceeded some constant amount of futures and dispatch based on that.

Copy link
Member

@Mark-Simulacrum Mark-Simulacrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this last change I think this is pretty much good to go. It'd be good to drop the Vagrant commit (since it's not related to this PR) and I'll hand this off to r? @QuietMisdreavus for another round of review before we can merge.

Copy link
Member

@QuietMisdreavus QuietMisdreavus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine, just some small nits.

@Mark-Simulacrum
Copy link
Member

@QuietMisdreavus This has been updated and looks ready to go from my side; I can take care of deploy and monitoring after deploy if you sign off on the code.

Copy link
Member

@QuietMisdreavus QuietMisdreavus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly okay, just a couple more comments.

src/db/file.rs Outdated
}

if rt.block_on(::futures::future::join_all(futures)).is_err() {
panic!("upload to s3 failed");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're adding error information to the panic message, we should add it here as well.

@miller-time miller-time force-pushed the parallel-s3-uploads branch 2 times, most recently from c4f67a1 to f297719 Compare October 15, 2019 03:08
@QuietMisdreavus
Copy link
Member

This looks great! One thing i noticed is that this doesn't include the "refresh the client" logic. Do we think that this will get around that issue? Should we create a new client when we execute a batch of uploads?

@Mark-Simulacrum
Copy link
Member

Oh, yeah, we probably want a new client for each batch -- that should be low enough overhead wise. I imagine that's not too difficult to add?

@miller-time
Copy link
Contributor Author

miller-time commented Oct 22, 2019

The way it was working before, I believe the client was only refreshed if an error occurred. It seems like now we call s3_client() for each file. So I think, if anything, we might be refreshing the client too much?

I honestly don't remember why I changed that.

@QuietMisdreavus
Copy link
Member

The way it was working before, I believe the client was only refreshed if an error occurred. It seems like now we call s3_client() for each file. So I think, if anything, we might be refreshing the client too much?

Oh, you're right! In this case, i think this will be fine. I don't think that creating a new client each time is that heavyweight, but i haven't looked that strongly into it. @Mark-Simulacrum, will this work, or do you want to bring back the old logic of "save the client outside the loop and overwrite it after a batch is sent"?

@Mark-Simulacrum
Copy link
Member

Oh, yeah -- I think a new client for each future is probably not what we want (it seems likely to cause problems of some kind, or be generally slower) -- I'm not sure what a good way to structure this would be though. Maybe we can do a RwLock that we share between all the futures, and then replace it on error? If that proves pretty complicated, then we can likely get away with just creating a new client for every batch and re-running the batch if it was unsuccessful (so, kinda similar to before, but less fine-grained, I guess).

@miller-time
Copy link
Contributor Author

I feel like I'm getting pretty close...

diff --git src/db/file.rs src/db/file.rs
index faf66d2..2465141 100644
--- src/db/file.rs
+++ src/db/file.rs
@@ -10,6 +10,7 @@ use postgres::Connection;
 use rustc_serialize::json::{Json, ToJson};
 use std::fs;
 use std::io::Read;
+use std::sync::RwLock;
 use error::Result;
 use failure::err_msg;
 use rusoto_s3::{S3, PutObjectRequest, GetObjectRequest, S3Client};
@@ -148,6 +149,7 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
     try!(cookie.load::<&str>(&[]));
 
     let trans = try!(conn.transaction());
+    let client = s3_client().map(|c| RwLock::new(c));
     let mut file_list_with_mimes: Vec<(String, PathBuf)> = Vec::new();
 
     let mut rt = ::tokio::runtime::Runtime::new().unwrap();
@@ -188,11 +190,13 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
             }
         };
 
-        if let Some(client) = s3_client() {
+        if let Some(client) = client {
             let bucket_path = bucket_path.clone();
             let content = content.clone();
             let mime = mime.clone();
 
+            let mut client = client.write().unwrap();
+
             futures.push(client.put_object(PutObjectRequest {
                 bucket: "rust-docs-rs".into(),
                 key: bucket_path.clone(),
@@ -201,6 +205,11 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
                 ..Default::default()
             }).map_err(move |e| {
                 log::error!("failed to upload to {}: {:?}", bucket_path, e);
+                // Get a new client, in case the old one's connection is stale.
+                // AWS will kill our connection if it's alive for too long; this avoids
+                // that preventing us from building the crate entirely.
+                *client = s3_client().unwrap();
+
                 client.put_object(PutObjectRequest {
                     bucket: "rust-docs-rs".into(),
                     key: bucket_path,

...but

error[E0277]: `std::sync::RwLockWriteGuard<'_, rusoto_s3::S3Client>` cannot be sent between threads safely
   --> src/db/file.rs:225:23
    |
225 |                 if rt.block_on(::futures::future::join_all(futures)).is_err() {
    |                       ^^^^^^^^ `std::sync::RwLockWriteGuard<'_, rusoto_s3::S3Client>` cannot be sent between threads safely
    |

Also, acquiring a write lock for each file upload seems like a bad idea... but because of the closure with the "retry" in it, we're moving client during each iteration of the loop. Here's an attempt to only acquire the write lock for the "client refresh":

diff --git src/db/file.rs src/db/file.rs
index faf66d2..5f8c252 100644
--- src/db/file.rs
+++ src/db/file.rs
@@ -10,6 +10,7 @@ use postgres::Connection;
 use rustc_serialize::json::{Json, ToJson};
 use std::fs;
 use std::io::Read;
+use std::sync::RwLock;
 use error::Result;
 use failure::err_msg;
 use rusoto_s3::{S3, PutObjectRequest, GetObjectRequest, S3Client};
@@ -148,6 +149,7 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
     try!(cookie.load::<&str>(&[]));
 
     let trans = try!(conn.transaction());
+    let mut client = s3_client().map(|c| RwLock::new(c));
     let mut file_list_with_mimes: Vec<(String, PathBuf)> = Vec::new();
 
     let mut rt = ::tokio::runtime::Runtime::new().unwrap();
@@ -188,12 +190,12 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
             }
         };
 
-        if let Some(client) = s3_client() {
+        if let Some(client) = client {
             let bucket_path = bucket_path.clone();
             let content = content.clone();
             let mime = mime.clone();
 
-            futures.push(client.put_object(PutObjectRequest {
+            futures.push(client.read().unwrap().put_object(PutObjectRequest {
                 bucket: "rust-docs-rs".into(),
                 key: bucket_path.clone(),
                 body: Some(content.clone().into()),
@@ -201,7 +203,15 @@ pub fn add_path_into_database<P: AsRef<Path>>(conn: &Connection,
                 ..Default::default()
             }).map_err(move |e| {
                 log::error!("failed to upload to {}: {:?}", bucket_path, e);
-                client.put_object(PutObjectRequest {
+                // Get a new client, in case the old one's connection is stale.
+                // AWS will kill our connection if it's alive for too long; this avoids
+                // that preventing us from building the crate entirely.
+                {
+                    let mut c = client.write().unwrap();
+                    *c = s3_client().unwrap();
+                }
+
+                client.read().unwrap().put_object(PutObjectRequest {
                     bucket: "rust-docs-rs".into(),
                     key: bucket_path,
                     body: Some(content.into()),

As expected, the move prevents this from working:

error[E0382]: use of moved value
   --> src/db/file.rs:193:21
    |
193 |         if let Some(client) = client {
    |                     ^^^^^^ value moved here, in previous iteration of loop
    |
error[E0505]: cannot move out of `client` because it is borrowed
   --> src/db/file.rs:204:24
    |
198 |             futures.push(client.read().unwrap().put_object(PutObjectRequest {
    |                          ----------------------
    |                          |
    |                          borrow of `client` occurs here
    |                          a temporary with access to the borrow is created here ...
...
204 |             }).map_err(move |e| {
    |                        ^^^^^^^^ move out of `client` occurs here
...
210 |                     let mut c = client.write().unwrap();
    |                                 ------ move occurs due to use in closure
...
223 |             }));
    |                - ... and the borrow might be used here, when that temporary is dropped and runs the `Drop` code for type `std::sync::RwLockReadGuard`

I'll keep poking at this, but any help or insight greatly appreciated! 😀

src/db/file.rs Outdated
// Get a new client, in case the old one's connection is stale.
// AWS will kill our connection if it's alive for too long; this avoids
// that preventing us from building the crate entirely.
let mut c = retry_lock.write().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may result in lots of new clients being written to the RwLock in rapid succession in case several fail at once (which is probably what's going to happen since we're going to be running up to 1000 at a time). I feel like we may need some kind of connection pool or manager or something that can make sure that we only generate one new client when things start to fail, and the individual uploads only request "a new client" instead of making it themselves. I can try to sketch something out, but if anyone else knows something better, that would be ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only acquire the lock for writing outside the "inner loop" of 1000(?) futures; if any of those fail we can then re-create the client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that happens, how do we know which files to re-upload? Aren't the futures consumed in the call to join_all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reupload all the files; we might duplicate work but it shouldn't really matter (i.e., the files are the same).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two places where we might want to re-create the client (two different rt.block_on(::futures::future::join_all(futures)) calls):

  • if futures.len() > MAX_CONCURRENT_UPLOADS - this is actually inside the for file_path in ... loop. after blocking on the initial futures, it clears the futures vec and continues with the next file (are you calling this the "completion of the inner loop"?)
  • immediately following the for file_path in ... loop

were you talking about re-creating the client in both cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when MAX_CONCURRENT_UPLOADS is reached, there's a partially-completed iterator that represents the files that we tried to upload (of which one or more failed). it's not immediately obvious to me how we would retry those uploads...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we currently have a retry for each file. It's unclear how we'd perform a reupload of all the files. It seems like we basically need to start over iterating through the file paths. Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it doesn't match the code precisely -- but this is what I was envisioning in an "abstract" sense:

  • we have a list of all the file paths we need to upload (to_upload)
  • we remove MAX_CONCURRENT_UPLOADS elements off of this list (currently_uploading)
  • create a fresh client here (every time)
  • loop through these, creating a future for each upload
  • block on the completion of all these futures
    • if any fail, re-create the futures from the list currently_uploading (this may need an extra copy or so but that's fine) -- from the "create a fresh client" step basically
  • repeat the last several steps until we're done (i.e., to_upload is empty)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, I gave it a shot. It was a bit of a rewrite so hopefully it's correct 😄

@Mark-Simulacrum
Copy link
Member

Looking at the diff is pretty horrible (indentation changed and things moved around) but based on a high-level read through of https://github.com/rust-lang/docs.rs/blob/d9beb9e6a5e85348811cee5f5884dc977b02b4c5/src/db/file.rs this looks good.

@QuietMisdreavus
Copy link
Member

This looks like it will retry forever if files fail to upload for reasons other than a stale connection, i.e. now there's no panic!() branch. Do we want to have a failure counter and panic on 2-3 failed upload batches?

@miller-time
Copy link
Contributor Author

Oops, good catch! Yeah I think

            if rt.block_on(::futures::future::join_all(futures)).is_ok() {
                // this batch was successful, start another batch if there are still more files
                currently_uploading = to_upload.drain(..MAX_CONCURRENT_UPLOADS).collect();

could set a counter to 0, and

            } else {
                // if any futures error, leave `currently_uploading` in tact so that we can retry the batch

could set a counter to += 1

@Mark-Simulacrum
Copy link
Member

Seems quite reasonable. We might want to also log the error somewhere (ideally in a way metrics can pick up). But that can be left for future work I think.

src/db/file.rs Outdated

let content = content.expect("content never None if client is None");
file_list_with_mimes.push((mime.clone(), file_path.clone()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if this happens during a retry, it will duplicate items in the file_list_with_mimes vec.

This might need to be done only when the put_object is successful?

Something like

client.put_object(PutObjectRequest {
  // ...
}).and_then(|| {
  file_list_with_mimes.push((mime.clone(), file_path.clone()));
});

except I think now file_list_with_mimes needs to be in an Arc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be simpler to deduplicate after uploading by file path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or prevent duplicates during insert?

src/db/file.rs Outdated
let mut rt = ::tokio::runtime::Runtime::new().unwrap();

let mut to_upload = try!(get_file_list(&path));
let mut currently_uploading: Vec<_> = to_upload.drain(..MAX_CONCURRENT_UPLOADS).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call to drain will panic on the last batch of a crate upload, since it's unlikely that a crate will have an even multiple of MAX_CONCURRENT_UPLOADS files, and drain panics if the range is outside the Vec's bounds. The end of the range should be max(to_upload.len(), MAX_CONCURRENT_UPLOADS).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, I should have read the drain docs more closely 😞

Copy link
Member

@QuietMisdreavus QuietMisdreavus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is in a good position now. If @Mark-Simulacrum is good with this, i think we're ready to merge.

@Mark-Simulacrum
Copy link
Member

Yes, I'm happy with this overall (a few nits but not something we need to fix here, I might try to fix them up when I work on compression this weekend, presuming this lands).

Let's merge and deploy (cc @pietroalbini). I'd like to deploy this quickly after merging and monitor it for a few uploads so not doing so myself.

(Ideally, we'd collect metrics on upload time, and we'd know if this is actually an improvement).

@miller-time
Copy link
Contributor Author

squash first?

@Mark-Simulacrum
Copy link
Member

Oh, yes, it'd be good to squash this into one commit.

@pietroalbini
Copy link
Member

Let's merge and deploy (cc @pietroalbini). I'd like to deploy this quickly after merging and monitor it for a few uploads so not doing so myself.

(Ideally, we'd collect metrics on upload time, and we'd know if this is actually an improvement).

Should we add metrics for a day or two and then merge this PR then? Adding a new one for uploaded files shouldn't be hard.

@Mark-Simulacrum
Copy link
Member

I don't know, we can I suppose -- I don't know how hard that would be. I'm unlikely to get a chance to do so soon, unfortunately.

@miller-time
Copy link
Contributor Author

Is that something I can help out with?

@miller-time
Copy link
Contributor Author

miller-time commented Nov 1, 2019

@pietroalbini does this look right to you? (I just rebased after your merge of pull request #457)

crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(batch_size as i64);

@pietroalbini
Copy link
Member

@miller-time yeah, it's fine. It'd be great to increment by 1 each time a single future completes, but that's not critical and if you want it can be implemented in a later PR. I think merging the speedups first is more important than slightly more accurate metrics.

Huge thanks for the work you put into this! 🎉

@pietroalbini pietroalbini merged commit 5d22713 into rust-lang:master Nov 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants