Skip to content

Revert 758 Update async dependencies #768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
995 changes: 454 additions & 541 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 7 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ regex = "1"
structopt = "0.3"
crates-index-diff = "7"
time = "0.1"
reqwest = { version = "0.10", features = ["blocking"] }
reqwest = "0.9"
semver = "0.9"
slug = "=0.1.1"
env_logger = "0.7"
Expand All @@ -29,10 +29,15 @@ toml = "0.5"
html5ever = "0.22"
schemamama = "0.3"
schemamama_postgres = "0.2"
rusoto_s3 = "0.40"
rusoto_core = "0.40"
rusoto_credential = "0.40"
futures = "0.1"
tokio = "0.1"
systemstat = "0.1.4"
prometheus = { version = "0.7.0", default-features = false }
lazy_static = "1.0.0"
rustwide = "0.7.1"
rustwide = "=0.7.0"
mime_guess = "2"
dotenv = "0.15"

Expand All @@ -44,15 +49,6 @@ params = "0.8"
staticfile = { version = "0.4", features = [ "cache" ] }
tempfile = "3.1.0"

# Communicating with S3
rusoto_s3 = "0.43"
rusoto_core = "0.43"
rusoto_credential = "0.43"

# Async
futures-util = "0.3"
tokio = { version = "0.2", features = ["rt-threaded"] }

[target.'cfg(not(windows))'.dependencies]
libc = "0.2"

Expand Down
2 changes: 1 addition & 1 deletion src/db/add_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::error::Result;
use failure::err_msg;
use log::debug;
use postgres::Connection;
use reqwest::blocking::Client;
use reqwest::header::ACCEPT;
use reqwest::Client;
use rustc_serialize::json::{Json, ToJson};
use semver::Version;
use slug::slugify;
Expand Down
54 changes: 24 additions & 30 deletions src/db/delete_crate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::storage::s3::{s3_client, S3Backend, S3_BUCKET_NAME};
use crate::storage::s3::{s3_client, S3_BUCKET_NAME};
use failure::{Error, Fail};
use postgres::Connection;
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3};
use rusoto_s3::{DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3};

/// List of directories in docs.rs's underlying storage (either the database or S3) containing a
/// subdirectory named after the crate. Those subdirectories will be deleted.
Expand All @@ -22,9 +22,8 @@ pub fn delete_crate(conn: &Connection, name: &str) -> Result<(), Error> {
};

delete_from_database(conn, name, crate_id)?;
if let Some(client) = s3_client() {
let mut backend = S3Backend::new(client, S3_BUCKET_NAME);
delete_from_s3(&mut backend, name)?;
if let Some(s3) = s3_client() {
delete_from_s3(&s3, name)?;
}

Ok(())
Expand Down Expand Up @@ -69,26 +68,24 @@ fn delete_from_database(conn: &Connection, name: &str, crate_id: i32) -> Result<
Ok(())
}

fn delete_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
fn delete_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
for prefix in STORAGE_PATHS_TO_DELETE {
delete_prefix_from_s3(s3, &format!("{}/{}/", prefix, name))?;
}

Ok(())
}

fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error> {
fn delete_prefix_from_s3(s3: &S3Client, name: &str) -> Result<(), Error> {
let mut continuation_token = None;

loop {
let list =
s3.runtime_handle()
.block_on(s3.client().list_objects_v2(ListObjectsV2Request {
bucket: S3_BUCKET_NAME.into(),
prefix: Some(name.into()),
continuation_token,
..ListObjectsV2Request::default()
}))?;
let list = s3
.list_objects_v2(ListObjectsV2Request {
bucket: S3_BUCKET_NAME.into(),
prefix: Some(name.into()),
continuation_token,
..ListObjectsV2Request::default()
})
.sync()?;

let to_delete = list
.contents
Expand All @@ -100,23 +97,20 @@ fn delete_prefix_from_s3(s3: &mut S3Backend<'_>, name: &str) -> Result<(), Error
version_id: None,
})
.collect::<Vec<_>>();

let resp =
s3.runtime_handle()
.block_on(s3.client().delete_objects(DeleteObjectsRequest {
bucket: S3_BUCKET_NAME.into(),
delete: rusoto_s3::Delete {
objects: to_delete,
quiet: None,
},
..DeleteObjectsRequest::default()
}))?;

let resp = s3
.delete_objects(DeleteObjectsRequest {
bucket: S3_BUCKET_NAME.into(),
delete: rusoto_s3::Delete {
objects: to_delete,
quiet: None,
},
..DeleteObjectsRequest::default()
})
.sync()?;
if let Some(errs) = resp.errors {
for err in &errs {
log::error!("error deleting file from s3: {:?}", err);
}

failure::bail!("uploading to s3 failed");
}

Expand Down
12 changes: 6 additions & 6 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn get_file_list<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>, Error> {

pub(crate) enum Storage<'a> {
Database(DatabaseBackend<'a>),
S3(Box<S3Backend<'a>>),
S3(S3Backend<'a>),
}

impl<'a> Storage<'a> {
Expand All @@ -72,16 +72,16 @@ impl<'a> Storage<'a> {
DatabaseBackend::new(conn).into()
}
}
pub(crate) fn get(&mut self, path: &str) -> Result<Blob, Error> {
pub(crate) fn get(&self, path: &str) -> Result<Blob, Error> {
match self {
Self::Database(db) => db.get(path),
Self::S3(s3) => s3.get(path),
}
}

fn store_batch(&mut self, batch: Vec<Blob>, trans: &Transaction) -> Result<(), Error> {
fn store_batch(&mut self, batch: &[Blob], trans: &Transaction) -> Result<(), Error> {
match self {
Self::Database(db) => db.store_batch(&batch, trans),
Self::Database(db) => db.store_batch(batch, trans),
Self::S3(s3) => s3.store_batch(batch),
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ impl<'a> Storage<'a> {
if batch.is_empty() {
break;
}
self.store_batch(batch, &trans)?;
self.store_batch(&batch, &trans)?;
}

trans.commit()?;
Expand Down Expand Up @@ -180,7 +180,7 @@ impl<'a> From<DatabaseBackend<'a>> for Storage<'a> {

impl<'a> From<S3Backend<'a>> for Storage<'a> {
fn from(db: S3Backend<'a>) -> Self {
Self::S3(Box::new(db))
Self::S3(db)
}
}

Expand Down
100 changes: 28 additions & 72 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use super::Blob;
use failure::Error;
use futures_util::{
future::FutureExt,
stream::{FuturesUnordered, StreamExt},
};
use futures::Future;
use log::{error, warn};
use rusoto_core::region::Region;
use rusoto_credential::DefaultCredentialsProvider;
use rusoto_s3::{GetObjectRequest, PutObjectRequest, S3Client, S3};
use std::convert::TryInto;
use std::io::Read;
use time::Timespec;
use tokio::runtime::{Handle, Runtime};
use tokio::runtime::Runtime;

#[cfg(test)]
mod test;
Expand All @@ -35,23 +32,15 @@ impl<'a> S3Backend<'a> {
}
}

#[cfg(test)]
pub(crate) fn with_runtime(client: S3Client, bucket: &'a str, runtime: Runtime) -> Self {
Self {
client,
bucket,
runtime,
}
}

pub(super) fn get(&mut self, path: &str) -> Result<Blob, Error> {
pub(super) fn get(&self, path: &str) -> Result<Blob, Error> {
let res = self
.runtime
.block_on(self.client.get_object(GetObjectRequest {
.client
.get_object(GetObjectRequest {
bucket: self.bucket.to_string(),
key: path.into(),
..Default::default()
}))?;
})
.sync()?;

let mut b = res.body.unwrap().into_blocking_read();
let mut content = Vec::with_capacity(
Expand All @@ -71,16 +60,14 @@ impl<'a> S3Backend<'a> {
})
}

pub(super) fn store_batch(&mut self, mut uploads: Vec<Blob>) -> Result<(), Error> {
pub(super) fn store_batch(&mut self, batch: &[Blob]) -> Result<(), Error> {
use futures::stream::FuturesUnordered;
use futures::stream::Stream;
let mut attempts = 0;

loop {
// `FuturesUnordered` is used because the order of execution doesn't
// matter, we just want things to execute as fast as possible
let futures = FuturesUnordered::new();

// Drain uploads, filling `futures` with upload requests
for blob in uploads.drain(..) {
let mut futures = FuturesUnordered::new();
for blob in batch {
futures.push(
self.client
.put_object(PutObjectRequest {
Expand All @@ -90,53 +77,27 @@ impl<'a> S3Backend<'a> {
content_type: Some(blob.mime.clone()),
..Default::default()
})
// Drop the value returned by `put_object` because we don't need it,
// emit an error and replace the error values with the blob that failed
// to upload so that we can retry failed uploads
.map(|resp| match resp {
Ok(..) => {
// Increment the total uploaded files when a file is uploaded
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);

Ok(())
}
Err(err) => {
error!("failed to upload file to s3: {:?}", err);
Err(blob)
}
.inspect(|_| {
crate::web::metrics::UPLOADED_FILES_TOTAL.inc_by(1);
}),
);
}
attempts += 1;

// Collect all the failed uploads so that we can retry them
uploads = self.runtime.block_on(
futures
.filter_map(|resp| async move { resp.err() })
.collect(),
);

// If there are no further uploads we were successful and can return
if uploads.is_empty() {
break;

// If more than three attempts to upload fail, return an error
} else if attempts >= 3 {
error!("failed to upload to s3, abandoning");
failure::bail!("Failed to upload to s3 three times, abandoning");
match self.runtime.block_on(futures.map(drop).collect()) {
// this batch was successful, start another batch if there are still more files
Ok(_) => break,
Err(err) => {
error!("failed to upload to s3: {:?}", err);
// if a futures error occurs, retry the batch
if attempts > 2 {
panic!("failed to upload 3 times, exiting");
}
}
}
}

Ok(())
}

pub fn runtime_handle(&self) -> Handle {
self.runtime.handle().clone()
}

pub fn client(&self) -> &S3Client {
&self.client
}
}

fn parse_timespec(raw: &str) -> Result<Timespec, Error> {
Expand All @@ -150,15 +111,13 @@ pub(crate) fn s3_client() -> Option<S3Client> {
if std::env::var_os("AWS_ACCESS_KEY_ID").is_none() && std::env::var_os("FORCE_S3").is_none() {
return None;
}

let creds = match DefaultCredentialsProvider::new() {
Ok(creds) => creds,
Err(err) => {
warn!("failed to retrieve AWS credentials: {}", err);
return None;
}
};

Some(S3Client::new_with(
rusoto_core::request::HttpClient::new().unwrap(),
creds,
Expand All @@ -176,6 +135,7 @@ pub(crate) fn s3_client() -> Option<S3Client> {
pub(crate) mod tests {
use super::*;
use crate::test::*;
use std::slice;

#[test]
fn test_parse_timespec() {
Expand Down Expand Up @@ -205,7 +165,7 @@ pub(crate) mod tests {

// Add a test file to the database
let s3 = env.s3();
s3.upload(vec![blob.clone()]).unwrap();
s3.upload(slice::from_ref(&blob)).unwrap();

// Test that the proper file was returned
s3.assert_blob(&blob, "dir/foo.txt");
Expand All @@ -227,10 +187,8 @@ pub(crate) mod tests {
"b",
"a_very_long_file_name_that_has_an.extension",
"parent/child",
"h/i/
g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s",
"h/i/g/h/l/y/_/n/e/s/t/e/d/_/d/i/r/e/c/t/o/r/i/e/s",
];

let blobs: Vec<_> = names
.iter()
.map(|&path| Blob {
Expand All @@ -240,12 +198,10 @@ pub(crate) mod tests {
content: "Hello world!".into(),
})
.collect();

s3.upload(blobs.clone()).unwrap();
s3.upload(&blobs).unwrap();
for blob in &blobs {
s3.assert_blob(blob, &blob.path);
}

Ok(())
})
}
Expand Down
Loading