From 509beee7726a259c9ff8fa276ef982ed352d935f Mon Sep 17 00:00:00 2001 From: Arlo Siemsen Date: Fri, 17 Apr 2026 16:59:44 -0500 Subject: [PATCH 1/3] Add global HTTP timeout tracking to http_async This brings http_async up to match how the .crate downloader works in terms of having a global HTTP timeout handled by cargo rather than setting the parameters on each curl handle. --- src/cargo/util/network/http.rs | 4 +- src/cargo/util/network/http_async.rs | 123 ++++++++++++++++++++++++--- 2 files changed, 115 insertions(+), 12 deletions(-) diff --git a/src/cargo/util/network/http.rs b/src/cargo/util/network/http.rs index ba089b676db..6937a91fd9c 100644 --- a/src/cargo/util/network/http.rs +++ b/src/cargo/util/network/http.rs @@ -53,7 +53,7 @@ pub struct HandleConfiguration { useragent: String, ssl_version: Option, ssl_min_max_version: Option<(SslVersion, SslVersion)>, - timeout: HttpTimeout, + pub timeout: HttpTimeout, pub verbose: bool, pub multiplexing: bool, } @@ -226,7 +226,6 @@ impl HandleConfiguration { handle.verbose(true)?; tracing::debug!(target: "network", "{:#?}", curl::Version::get()); } - self.timeout.configure2(handle)?; // Enable HTTP/2 if possible. crate::try_old_curl_http2_pipewait!(self.multiplexing, handle); @@ -285,6 +284,7 @@ pub(crate) fn debug(kind: InfoType, data: &[u8]) { } #[must_use] +#[derive(Clone)] pub struct HttpTimeout { pub dur: Duration, pub low_speed_limit: u32, diff --git a/src/cargo/util/network/http_async.rs b/src/cargo/util/network/http_async.rs index d17aba65121..ea12bd67651 100644 --- a/src/cargo/util/network/http_async.rs +++ b/src/cargo/util/network/http_async.rs @@ -4,19 +4,30 @@ //! a worker thread that is owned by the Client. use std::collections::HashMap; -use std::io::{Cursor, Read}; +use std::io::Cursor; +use std::io::Read; use std::str::FromStr; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::mpsc; +use std::sync::mpsc::Receiver; +use std::sync::mpsc::Sender; use std::thread::JoinHandle; use std::time::Duration; +use std::time::Instant; +use curl::easy::Easy2; +use curl::easy::Handler; +use curl::easy::InfoType; use curl::easy::WriteError; -use curl::easy::{Easy2, Handler, InfoType}; -use curl::multi::{Easy2Handle, Multi}; +use curl::multi::Easy2Handle; +use curl::multi::Multi; +use futures::channel::oneshot; +use tracing::{debug, error, trace, warn}; use crate::util::network::http::HandleConfiguration; -use futures::channel::oneshot; -use tracing::{debug, error, trace}; +use crate::util::network::http::HttpTimeout; type Response = http::Response>; type Request = http::Request>; @@ -31,6 +42,16 @@ pub enum Error { #[error(transparent)] Easy(#[from] curl::Error), + #[error( + "transfer too slow: failed to transfer more than {low_speed_limit} bytes in {}s (transferred {transferred} bytes)", + timeout_dur.as_secs() + )] + TooSlow { + low_speed_limit: u32, + timeout_dur: Duration, + transferred: u64, + }, + #[error("failed to convert header value of `{name}` to string: {bytes:?}")] BadHeader { name: String, bytes: Vec }, } @@ -40,23 +61,35 @@ struct Message { sender: oneshot::Sender>, } +#[derive(Default)] +struct Stats { + dl_transferred: AtomicU64, +} + /// HTTP Client. Creating a new client spawns a cURL `Multi` and /// thread that is used for all HTTP requests by this client. pub struct Client { channel: Option>, thread_handle: Option>, handle_config: HandleConfiguration, + stats: Arc, } impl Client { /// Spawns a new worker thread where HTTP request execute. pub fn new(handle_config: HandleConfiguration) -> Client { let (tx, rx) = mpsc::channel(); - let handle = std::thread::spawn(move || WorkerServer::run(rx, handle_config.multiplexing)); + let stats = Arc::new(Stats::default()); + let timeout = handle_config.timeout.clone(); + let worker_stats = stats.clone(); + let handle = std::thread::spawn(move || { + WorkerServer::run(rx, handle_config.multiplexing, timeout, worker_stats) + }); Client { channel: Some(tx), thread_handle: Some(handle), handle_config, + stats, } } @@ -64,7 +97,7 @@ impl Client { pub async fn request(&self, request: Request) -> HttpResult { let url = request.uri().to_string(); debug!(target: "network::fetch", url); - let mut collector = Collector::new(); + let mut collector = Collector::new(self.stats.clone()); let (parts, body) = request.into_parts(); let body_len = body.len(); collector.request_body = Cursor::new(body); @@ -134,8 +167,11 @@ impl std::fmt::Debug for Client { /// Manages the cURL `Multi`. Processes incoming work sent over the /// channel, and returns responses. struct WorkerServer { + /// Channel to receive new work incoming_work: Receiver, + /// curl multi interface multi: Multi, + /// Map of token to curl handle and response channel handles: HashMap< usize, ( @@ -143,11 +179,25 @@ struct WorkerServer { oneshot::Sender>, ), >, + /// Next token to use token: usize, + /// Global timeout configuration + timeout: HttpTimeout, + /// Global transfer statistics + stats: Arc, + /// Instant when the current low speed window started + low_speed_window_start: Instant, + /// Amount of total bytes transferred when the current low speed window started + low_speed_window_initial: u64, } impl WorkerServer { - fn run(incoming_work: Receiver, multiplex: bool) { + fn run( + incoming_work: Receiver, + multiplex: bool, + timeout: HttpTimeout, + stats: Arc, + ) { let mut multi = Multi::new(); // let's not flood the server with connections if let Err(e) = multi.set_max_host_connections(2) { @@ -162,16 +212,53 @@ impl WorkerServer { multi, handles: HashMap::new(), token: 0, + timeout, + stats, + low_speed_window_start: Instant::now(), + low_speed_window_initial: 0, }; worker.worker_loop(); } fn fail_and_drain(&mut self, e: &Error) { + warn!( + target: "network", + "failing all outstanding HTTP requests: {e}" + ); for (_token, (_handle, sender)) in self.handles.drain() { let _ = sender.send(Err(e.clone())); } } + /// Marks the start of a new timeout window. + fn reset_low_speed_timeout(&mut self) { + self.low_speed_window_start = Instant::now(); + self.low_speed_window_initial = self.stats.dl_transferred.load(Ordering::Acquire); + } + + /// Return an error if we're at the end of a timeout window, we haven't + /// made enough progress. + fn check_low_speed_timeout(&mut self) -> Option { + // Make sure we've waited for the timeout duration + if Instant::now().duration_since(self.low_speed_window_start) < self.timeout.dur { + return None; + } + + // Calculate how much we've transferred since the last check. + let current = self.stats.dl_transferred.load(Ordering::Acquire); + let transferred = current.saturating_sub(self.low_speed_window_initial); + self.reset_low_speed_timeout(); + if transferred < self.timeout.low_speed_limit.into() { + Some(Error::TooSlow { + low_speed_limit: self.timeout.low_speed_limit, + timeout_dur: self.timeout.dur, + transferred, + }) + } else { + None + } + } + fn worker_loop(&mut self) { const INITIAL_DELAY: Duration = Duration::from_millis(1); let mut wait_backoff = INITIAL_DELAY; @@ -219,6 +306,12 @@ impl WorkerServer { }); if running > 0 { + // Check for low speed timeout. + if let Some(timeout_error) = self.check_low_speed_timeout() { + self.fail_and_drain(&timeout_error); + continue; + } + let max_timeout = Duration::from_millis(1000); let mut timeout = self .multi @@ -260,6 +353,7 @@ impl WorkerServer { match self.incoming_work.recv() { Ok(msg) => { trace!("resuming work"); + self.reset_low_speed_timeout(); self.enqueue_request(msg); wait_backoff = INITIAL_DELAY; } @@ -291,17 +385,23 @@ impl WorkerServer { /// Interface that cURL (`Easy2`) uses to make progress. struct Collector { + /// The response being built response: Response, + /// The body to transmit request_body: Cursor>, + /// Whether we're in debug mode debug: bool, + /// Global transfer statistics. + global_stats: Arc, } impl Collector { - fn new() -> Self { + fn new(stats: Arc) -> Self { Collector { response: Response::new(Vec::new()), request_body: Cursor::new(Vec::new()), debug: false, + global_stats: stats, } } } @@ -309,6 +409,9 @@ impl Collector { impl Handler for Collector { fn write(&mut self, data: &[u8]) -> Result { self.response.body_mut().extend_from_slice(data); + self.global_stats + .dl_transferred + .fetch_add(data.len() as u64, Ordering::Release); Ok(data.len()) } From 963787c7591d9f59694de695e367dd0040ac070d Mon Sep 17 00:00:00 2001 From: Arlo Siemsen Date: Fri, 17 Apr 2026 17:01:04 -0500 Subject: [PATCH 2/3] fix: add http_async types to spurious error detection This is a fix, since we were not correctly retrying errors from curl when using http_async. --- src/cargo/util/network/retry.rs | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/src/cargo/util/network/retry.rs b/src/cargo/util/network/retry.rs index 196592b8c4b..5dca3c62fe1 100644 --- a/src/cargo/util/network/retry.rs +++ b/src/cargo/util/network/retry.rs @@ -42,6 +42,7 @@ //! - use crate::util::errors::{GitCliError, HttpNotSuccessful}; +use crate::util::network::http_async; use crate::{CargoResult, GlobalContext}; use anyhow::Error; use rand::RngExt; @@ -185,6 +186,26 @@ impl<'a> Retry<'a> { } fn maybe_spurious(err: &Error) -> bool { + fn maybe_spurious_curl(curl_err: &curl::Error) -> bool { + curl_err.is_couldnt_connect() + || curl_err.is_couldnt_resolve_proxy() + || curl_err.is_couldnt_resolve_host() + || curl_err.is_operation_timedout() + || curl_err.is_recv_error() + || curl_err.is_send_error() + || curl_err.is_http2_error() + || curl_err.is_http2_stream_error() + || curl_err.is_ssl_connect_error() + || curl_err.is_partial_file() + } + if let Some(async_http_error) = err.downcast_ref::() { + match async_http_error { + http_async::Error::Easy(error) => return maybe_spurious_curl(error), + http_async::Error::TooSlow { .. } => return true, + http_async::Error::Multi(_) => {} + http_async::Error::BadHeader { .. } => {} + } + } if let Some(git_err) = err.downcast_ref::() { match git_err.class() { git2::ErrorClass::Net @@ -195,17 +216,7 @@ fn maybe_spurious(err: &Error) -> bool { } } if let Some(curl_err) = err.downcast_ref::() { - if curl_err.is_couldnt_connect() - || curl_err.is_couldnt_resolve_proxy() - || curl_err.is_couldnt_resolve_host() - || curl_err.is_operation_timedout() - || curl_err.is_recv_error() - || curl_err.is_send_error() - || curl_err.is_http2_error() - || curl_err.is_http2_stream_error() - || curl_err.is_ssl_connect_error() - || curl_err.is_partial_file() - { + if maybe_spurious_curl(curl_err) { return true; } } From b42cd8b7075b07767e0cc200fe83271abf5cc55c Mon Sep 17 00:00:00 2001 From: Arlo Siemsen Date: Fri, 17 Apr 2026 17:04:24 -0500 Subject: [PATCH 3/3] Add bytes_pending function to http_async --- src/cargo/util/network/http_async.rs | 38 +++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/src/cargo/util/network/http_async.rs b/src/cargo/util/network/http_async.rs index ea12bd67651..970d36b2d88 100644 --- a/src/cargo/util/network/http_async.rs +++ b/src/cargo/util/network/http_async.rs @@ -8,6 +8,7 @@ use std::io::Cursor; use std::io::Read; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::mpsc; @@ -63,6 +64,7 @@ struct Message { #[derive(Default)] struct Stats { + dl_remaining: AtomicI64, dl_transferred: AtomicU64, } @@ -107,6 +109,7 @@ impl Client { handle.url(&url)?; handle.follow_location(true)?; + handle.progress(true)?; match parts.method { http::Method::HEAD => handle.nobody(true)?, @@ -147,6 +150,15 @@ impl Client { self.channel.as_ref().unwrap().send(req).unwrap(); receiver.await.unwrap() } + + /// Returns the number pending bytes across all active transfers. + pub fn bytes_pending(&self) -> u64 { + self.stats + .dl_remaining + .load(Ordering::Acquire) + .try_into() + .unwrap() + } } impl Drop for Client { @@ -393,6 +405,8 @@ struct Collector { debug: bool, /// Global transfer statistics. global_stats: Arc, + /// How much has this particular transfer added to global `dl_remaining` stats. + dl_remaining_delta: i64, } impl Collector { @@ -402,6 +416,7 @@ impl Collector { request_body: Cursor::new(Vec::new()), debug: false, global_stats: stats, + dl_remaining_delta: 0, } } } @@ -435,11 +450,32 @@ impl Handler for Collector { } } - fn progress(&mut self, _dltotal: f64, _dlnow: f64, _ultotal: f64, _ulnow: f64) -> bool { + fn progress(&mut self, dltotal: f64, dlnow: f64, _ultotal: f64, _ulnow: f64) -> bool { + if dlnow > dltotal { + return true; + } + let dl_total = dltotal as i64; + let dl_current = dlnow as i64; + + let remaining = dl_total - dl_current; + + self.global_stats + .dl_remaining + .fetch_add(remaining - self.dl_remaining_delta, Ordering::Release); + self.dl_remaining_delta = remaining; true } } +impl Drop for Collector { + fn drop(&mut self) { + // Zero out this transfer's contribution to the global dl_remaining. + self.global_stats + .dl_remaining + .fetch_add(-self.dl_remaining_delta, Ordering::Release); + } +} + /// Additional fields on an [`http::Response`]. #[derive(Clone)] struct Extensions {