Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cargo/util/network/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct HandleConfiguration {
useragent: String,
ssl_version: Option<SslVersion>,
ssl_min_max_version: Option<(SslVersion, SslVersion)>,
timeout: HttpTimeout,
pub timeout: HttpTimeout,
pub verbose: bool,
pub multiplexing: bool,
}
Expand Down Expand Up @@ -226,7 +226,6 @@ impl HandleConfiguration {
handle.verbose(true)?;
tracing::debug!(target: "network", "{:#?}", curl::Version::get());
}
self.timeout.configure2(handle)?;

// Enable HTTP/2 if possible.
crate::try_old_curl_http2_pipewait!(self.multiplexing, handle);
Expand Down Expand Up @@ -285,6 +284,7 @@ pub(crate) fn debug(kind: InfoType, data: &[u8]) {
}

#[must_use]
#[derive(Clone)]
pub struct HttpTimeout {
pub dur: Duration,
pub low_speed_limit: u32,
Expand Down
161 changes: 150 additions & 11 deletions src/cargo/util/network/http_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,31 @@
//! a worker thread that is owned by the Client.

use std::collections::HashMap;
use std::io::{Cursor, Read};
use std::io::Cursor;
use std::io::Read;
use std::str::FromStr;
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::Instant;

use curl::easy::Easy2;
use curl::easy::Handler;
use curl::easy::InfoType;
use curl::easy::WriteError;
use curl::easy::{Easy2, Handler, InfoType};
use curl::multi::{Easy2Handle, Multi};
use curl::multi::Easy2Handle;
use curl::multi::Multi;
use futures::channel::oneshot;
use tracing::{debug, error, trace, warn};

use crate::util::network::http::HandleConfiguration;
use futures::channel::oneshot;
use tracing::{debug, error, trace};
use crate::util::network::http::HttpTimeout;

type Response = http::Response<Vec<u8>>;
type Request = http::Request<Vec<u8>>;
Expand All @@ -31,6 +43,16 @@ pub enum Error {
#[error(transparent)]
Easy(#[from] curl::Error),

#[error(
"transfer too slow: failed to transfer more than {low_speed_limit} bytes in {}s (transferred {transferred} bytes)",
timeout_dur.as_secs()
)]
TooSlow {
low_speed_limit: u32,
timeout_dur: Duration,
transferred: u64,
},

#[error("failed to convert header value of `{name}` to string: {bytes:?}")]
BadHeader { name: String, bytes: Vec<u8> },
}
Expand All @@ -40,31 +62,44 @@ struct Message {
sender: oneshot::Sender<HttpResult<Response>>,
}

#[derive(Default)]
struct Stats {
dl_remaining: AtomicI64,
dl_transferred: AtomicU64,
}

/// HTTP Client. Creating a new client spawns a cURL `Multi` and
/// thread that is used for all HTTP requests by this client.
pub struct Client {
channel: Option<Sender<Message>>,
thread_handle: Option<JoinHandle<()>>,
handle_config: HandleConfiguration,
stats: Arc<Stats>,
}

impl Client {
/// Spawns a new worker thread where HTTP request execute.
pub fn new(handle_config: HandleConfiguration) -> Client {
let (tx, rx) = mpsc::channel();
let handle = std::thread::spawn(move || WorkerServer::run(rx, handle_config.multiplexing));
let stats = Arc::new(Stats::default());
let timeout = handle_config.timeout.clone();
let worker_stats = stats.clone();
let handle = std::thread::spawn(move || {
WorkerServer::run(rx, handle_config.multiplexing, timeout, worker_stats)
});
Client {
channel: Some(tx),
thread_handle: Some(handle),
handle_config,
stats,
}
}

/// Perform an HTTP request using this client.
pub async fn request(&self, request: Request) -> HttpResult<Response> {
let url = request.uri().to_string();
debug!(target: "network::fetch", url);
let mut collector = Collector::new();
let mut collector = Collector::new(self.stats.clone());
let (parts, body) = request.into_parts();
let body_len = body.len();
collector.request_body = Cursor::new(body);
Expand All @@ -74,6 +109,7 @@ impl Client {

handle.url(&url)?;
handle.follow_location(true)?;
handle.progress(true)?;

match parts.method {
http::Method::HEAD => handle.nobody(true)?,
Expand Down Expand Up @@ -114,6 +150,15 @@ impl Client {
self.channel.as_ref().unwrap().send(req).unwrap();
receiver.await.unwrap()
}

/// Returns the number pending bytes across all active transfers.
pub fn bytes_pending(&self) -> u64 {
self.stats
.dl_remaining
.load(Ordering::Acquire)
.try_into()
.unwrap()
}
}

impl Drop for Client {
Expand All @@ -134,20 +179,37 @@ impl std::fmt::Debug for Client {
/// Manages the cURL `Multi`. Processes incoming work sent over the
/// channel, and returns responses.
struct WorkerServer {
/// Channel to receive new work
incoming_work: Receiver<Message>,
/// curl multi interface
multi: Multi,
/// Map of token to curl handle and response channel
handles: HashMap<
usize,
(
Easy2Handle<Collector>,
oneshot::Sender<HttpResult<Response>>,
),
>,
/// Next token to use
token: usize,
/// Global timeout configuration
timeout: HttpTimeout,
/// Global transfer statistics
stats: Arc<Stats>,
/// Instant when the current low speed window started
low_speed_window_start: Instant,
/// Amount of total bytes transferred when the current low speed window started
low_speed_window_initial: u64,
}

impl WorkerServer {
fn run(incoming_work: Receiver<Message>, multiplex: bool) {
fn run(
incoming_work: Receiver<Message>,
multiplex: bool,
timeout: HttpTimeout,
stats: Arc<Stats>,
) {
let mut multi = Multi::new();
// let's not flood the server with connections
if let Err(e) = multi.set_max_host_connections(2) {
Expand All @@ -162,16 +224,53 @@ impl WorkerServer {
multi,
handles: HashMap::new(),
token: 0,
timeout,
stats,
low_speed_window_start: Instant::now(),
low_speed_window_initial: 0,
};
worker.worker_loop();
}

fn fail_and_drain(&mut self, e: &Error) {
warn!(
target: "network",
"failing all outstanding HTTP requests: {e}"
);
for (_token, (_handle, sender)) in self.handles.drain() {
let _ = sender.send(Err(e.clone()));
}
}

/// Marks the start of a new timeout window.
fn reset_low_speed_timeout(&mut self) {
self.low_speed_window_start = Instant::now();
self.low_speed_window_initial = self.stats.dl_transferred.load(Ordering::Acquire);
}

/// Return an error if we're at the end of a timeout window, we haven't
/// made enough progress.
fn check_low_speed_timeout(&mut self) -> Option<Error> {
// Make sure we've waited for the timeout duration
if Instant::now().duration_since(self.low_speed_window_start) < self.timeout.dur {
return None;
}

// Calculate how much we've transferred since the last check.
let current = self.stats.dl_transferred.load(Ordering::Acquire);
let transferred = current.saturating_sub(self.low_speed_window_initial);
self.reset_low_speed_timeout();
if transferred < self.timeout.low_speed_limit.into() {
Some(Error::TooSlow {
low_speed_limit: self.timeout.low_speed_limit,
timeout_dur: self.timeout.dur,
transferred,
})
} else {
None
}
}

fn worker_loop(&mut self) {
const INITIAL_DELAY: Duration = Duration::from_millis(1);
let mut wait_backoff = INITIAL_DELAY;
Expand Down Expand Up @@ -219,6 +318,12 @@ impl WorkerServer {
});

if running > 0 {
// Check for low speed timeout.
if let Some(timeout_error) = self.check_low_speed_timeout() {
self.fail_and_drain(&timeout_error);
continue;
}

let max_timeout = Duration::from_millis(1000);
let mut timeout = self
.multi
Expand Down Expand Up @@ -260,6 +365,7 @@ impl WorkerServer {
match self.incoming_work.recv() {
Ok(msg) => {
trace!("resuming work");
self.reset_low_speed_timeout();
self.enqueue_request(msg);
wait_backoff = INITIAL_DELAY;
}
Expand Down Expand Up @@ -291,24 +397,36 @@ impl WorkerServer {

/// Interface that cURL (`Easy2`) uses to make progress.
struct Collector {
/// The response being built
response: Response,
/// The body to transmit
request_body: Cursor<Vec<u8>>,
/// Whether we're in debug mode
debug: bool,
/// Global transfer statistics.
global_stats: Arc<Stats>,
/// How much has this particular transfer added to global `dl_remaining` stats.
dl_remaining_delta: i64,
}

impl Collector {
fn new() -> Self {
fn new(stats: Arc<Stats>) -> Self {
Collector {
response: Response::new(Vec::new()),
request_body: Cursor::new(Vec::new()),
debug: false,
global_stats: stats,
dl_remaining_delta: 0,
}
}
}

impl Handler for Collector {
fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
self.response.body_mut().extend_from_slice(data);
self.global_stats
.dl_transferred
.fetch_add(data.len() as u64, Ordering::Release);
Ok(data.len())
}

Expand All @@ -332,11 +450,32 @@ impl Handler for Collector {
}
}

fn progress(&mut self, _dltotal: f64, _dlnow: f64, _ultotal: f64, _ulnow: f64) -> bool {
fn progress(&mut self, dltotal: f64, dlnow: f64, _ultotal: f64, _ulnow: f64) -> bool {
if dlnow > dltotal {
return true;
}
let dl_total = dltotal as i64;
let dl_current = dlnow as i64;

let remaining = dl_total - dl_current;

self.global_stats
.dl_remaining
.fetch_add(remaining - self.dl_remaining_delta, Ordering::Release);
self.dl_remaining_delta = remaining;
true
}
}

impl Drop for Collector {
fn drop(&mut self) {
// Zero out this transfer's contribution to the global dl_remaining.
self.global_stats
.dl_remaining
.fetch_add(-self.dl_remaining_delta, Ordering::Release);
}
}

/// Additional fields on an [`http::Response`].
#[derive(Clone)]
struct Extensions {
Expand Down
33 changes: 22 additions & 11 deletions src/cargo/util/network/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
//! - <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After>

use crate::util::errors::{GitCliError, HttpNotSuccessful};
use crate::util::network::http_async;
use crate::{CargoResult, GlobalContext};
use anyhow::Error;
use rand::RngExt;
Expand Down Expand Up @@ -185,6 +186,26 @@ impl<'a> Retry<'a> {
}

fn maybe_spurious(err: &Error) -> bool {
fn maybe_spurious_curl(curl_err: &curl::Error) -> bool {
curl_err.is_couldnt_connect()
|| curl_err.is_couldnt_resolve_proxy()
|| curl_err.is_couldnt_resolve_host()
|| curl_err.is_operation_timedout()
|| curl_err.is_recv_error()
|| curl_err.is_send_error()
|| curl_err.is_http2_error()
|| curl_err.is_http2_stream_error()
|| curl_err.is_ssl_connect_error()
|| curl_err.is_partial_file()
}
if let Some(async_http_error) = err.downcast_ref::<http_async::Error>() {
match async_http_error {
http_async::Error::Easy(error) => return maybe_spurious_curl(error),
http_async::Error::TooSlow { .. } => return true,
http_async::Error::Multi(_) => {}
http_async::Error::BadHeader { .. } => {}
}
}
if let Some(git_err) = err.downcast_ref::<git2::Error>() {
match git_err.class() {
git2::ErrorClass::Net
Expand All @@ -195,17 +216,7 @@ fn maybe_spurious(err: &Error) -> bool {
}
}
if let Some(curl_err) = err.downcast_ref::<curl::Error>() {
if curl_err.is_couldnt_connect()
|| curl_err.is_couldnt_resolve_proxy()
|| curl_err.is_couldnt_resolve_host()
|| curl_err.is_operation_timedout()
|| curl_err.is_recv_error()
|| curl_err.is_send_error()
|| curl_err.is_http2_error()
|| curl_err.is_http2_stream_error()
|| curl_err.is_ssl_connect_error()
|| curl_err.is_partial_file()
{
if maybe_spurious_curl(curl_err) {
return true;
}
}
Expand Down