diff --git a/Cargo.lock b/Cargo.lock index a89e826a378..922ba6118bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1421,6 +1421,17 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -1448,6 +1459,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", diff --git a/Cargo.toml b/Cargo.toml index c0e6faa2d3b..f804bd83982 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ curl = "0.4.49" curl-sys = "0.4.87" filetime = "0.2.27" flate2 = { version = "1.1.9", default-features = false, features = ["zlib-rs"] } -futures = { version = "0.3.32", default-features = false, features = ["std", "executor"]} +futures = { version = "0.3.32", default-features = false, features = ["std", "executor", "async-await"]} futures-timer = "3.0.3" git2 = "0.20.4" git2-curl = "0.21.0" diff --git a/src/cargo/core/package.rs b/src/cargo/core/package.rs index f3ecdd66089..e0f0e5f96cf 100644 --- a/src/cargo/core/package.rs +++ b/src/cargo/core/package.rs @@ -1,18 +1,19 @@ use std::cell::OnceCell; use std::cell::{Cell, Ref, RefCell}; use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt; use std::hash; -use std::mem; use std::path::{Path, PathBuf}; use std::rc::Rc; use std::time::{Duration, Instant}; use anyhow::Context as _; use cargo_util_schemas::manifest::{Hints, RustVersion}; -use curl::easy::Easy; -use curl::multi::{EasyHandle, Multi}; +use futures::FutureExt; +use futures::TryStreamExt; +use futures::stream::FuturesUnordered; +use http::Request; use semver::Version; use serde::Serialize; use tracing::debug; @@ -31,10 +32,7 @@ use crate::util::HumanBytes; use crate::util::cache_lock::{CacheLock, CacheLockMode}; use crate::util::errors::{CargoResult, HttpNotSuccessful}; use crate::util::interning::InternedString; -use crate::util::network::http::HttpTimeout; -use crate::util::network::http::http_handle_and_timeout; use crate::util::network::retry::{Retry, RetryResult}; -use crate::util::network::sleep::SleepTracker; use crate::util::{self, GlobalContext, Progress, ProgressStyle, internal}; /// Information about a package that is available somewhere in the file system. @@ -290,102 +288,293 @@ pub struct PackageSet<'gctx> { packages: HashMap>, sources: RefCell>, gctx: &'gctx GlobalContext, - multi: Multi, - /// Used to prevent reusing the `PackageSet` to download twice. - downloading: Cell, - /// Whether or not to use curl HTTP/2 multiplexing. - multiplexing: bool, } /// Helper for downloading crates. pub struct Downloads<'a, 'gctx> { set: &'a PackageSet<'gctx>, - /// When a download is started, it is added to this map. The key is a - /// "token" (see `Download::token`). It is removed once the download is - /// finished. - pending: HashMap, EasyHandle)>, - /// Set of packages currently being downloaded. This should stay in sync - /// with `pending`. - pending_ids: HashSet, - /// Downloads that have failed and are waiting to retry again later. - sleeping: SleepTracker<(Download<'gctx>, Easy)>, - /// The final result of each download. A pair `(token, result)`. This is a - /// temporary holding area, needed because curl can report multiple - /// downloads at once, but the main loop (`wait`) is written to only - /// handle one at a time. - results: VecDeque<(usize, Result<(), curl::Error>)>, - /// The next ID to use for creating a token (see `Download::token`). - next: usize, /// Progress bar. - progress: RefCell>>, + progress: RefCell>, + /// Flag for keeping track of whether we've printed the Downloading message. + first: Cell, + /// Size (in bytes) and package name of the largest downloaded package. + largest: Cell>, /// Number of downloads that have successfully finished. - downloads_finished: usize, + downloads_finished: Cell, /// Total bytes for all successfully downloaded packages. - downloaded_bytes: u64, - /// Size (in bytes) and package name of the largest downloaded package. - largest: (u64, InternedString), + downloaded_bytes: Cell, + /// Number of currently pending downloads. + pending: Cell, /// Time when downloading started. start: Instant, - /// Indicates *all* downloads were successful. - success: bool, - - /// Timeout management, both of timeout thresholds as well as whether or not - /// our connection has timed out (and accompanying message if it has). - /// - /// Note that timeout management is done manually here instead of in libcurl - /// because we want to apply timeouts to an entire batch of operations, not - /// any one particular single operation. - timeout: HttpTimeout, - /// Last time bytes were received. - updated_at: Cell, - /// This is a slow-speed check. It is reset to `now + timeout_duration` - /// every time at least `threshold` bytes are received. If the current - /// time ever exceeds `next_speed_check`, then give up and report a - /// timeout error. - next_speed_check: Cell, - /// This is the slow-speed threshold byte count. It starts at the - /// configured threshold value (default 10), and is decremented by the - /// number of bytes received in each chunk. If it is <= zero, the - /// threshold has been met and data is being received fast enough not to - /// trigger a timeout; reset `next_speed_check` and set this back to the - /// configured threshold. - next_speed_check_bytes_threshold: Cell, - /// Global filesystem lock to ensure only one Cargo is downloading at a - /// time. + /// Global filesystem lock to ensure only one Cargo is downloading one at a time. _lock: CacheLock<'gctx>, } -struct Download<'gctx> { - /// The token for this download, used as the key of the `Downloads::pending` map - /// and stored in `EasyHandle` as well. - token: usize, +impl<'a, 'gctx> Downloads<'a, 'gctx> { + pub async fn download( + set: &'a PackageSet<'gctx>, + ids: impl IntoIterator, + ) -> CargoResult> { + let progress = RefCell::new(Progress::with_style( + "Downloading", + ProgressStyle::Ratio, + set.gctx, + )); + let dl = Downloads { + set, + progress, + first: Cell::new(true), + largest: Cell::new(None), + downloads_finished: Cell::new(0), + downloaded_bytes: Cell::new(0), + pending: Cell::new(0), + start: Instant::now(), + _lock: set + .gctx + .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?, + }; + dl.run(ids).await + } - /// The package that we're downloading. - id: PackageId, + async fn run(&self, ids: impl IntoIterator) -> CargoResult> { + let mut futures: FuturesUnordered<_> = + ids.into_iter().map(|id| self.get_package(id)).collect(); - /// Actual downloaded data, updated throughout the lifetime of this download. - data: RefCell>, + // Wait for downloads to complete, or the timer to expire. + // This ensure that we call the tick function at a fast + // enough rate to give the user progress updates. + let mut out = Vec::new(); + loop { + futures::select! { + pkg = futures.try_next() => { + match pkg? { + Some(pkg) => out.push(pkg), + None => break, + } + }, + _ = futures_timer::Delay::new(Duration::from_millis(200)).fuse() => { + self.tick(WhyTick::DownloadUpdate)?; + }, + } + } + self.print_summary()?; + self.set + .gctx + .deferred_global_last_use()? + .save_no_error(self.set.gctx); + Ok(out) + } - /// HTTP headers for debugging. - headers: RefCell>, + /// Get the existing package, or find the URL to download the .crate + /// file and start the download. + async fn get_package(&self, id: PackageId) -> CargoResult<&'a Package> { + let slot = self + .set + .packages + .get(&id) + .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?; + if let Some(pkg) = slot.get() { + return CargoResult::Ok(pkg); + } + let source = self + .set + .sources + .borrow() + .get(id.source_id()) + .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))? + .clone(); + let pkg = match source + .download(id) + .await + .context("unable to get packages from source") + .with_context(|| format!("failed to download `{}`", id))? + { + MaybePackage::Ready(package) => CargoResult::Ok(package), + MaybePackage::Download { + url, + descriptor, + authorization, + } => { + let mut r = Retry::new(self.set.gctx)?; + let contents = loop { + self.tick(WhyTick::DownloadStarted)?; + self.pending.update(|v| v + 1); + let response = self + .fetch(&url, authorization.as_deref(), &descriptor, &id) + .await; + self.pending.update(|v| v - 1); + match r.r#try(|| response) { + RetryResult::Success(result) => break result, + RetryResult::Err(error) => { + debug!(target: "network", "final failure for {url}"); + return Err(error); + } + RetryResult::Retry(delay_ms) => { + debug!(target: "network", "download retry {url} for {delay_ms}ms"); + futures_timer::Delay::new(Duration::from_millis(delay_ms)).await; + } + } + }; + self.downloads_finished.update(|v| v + 1); + self.downloaded_bytes.update(|v| v + contents.len() as u64); + + // We're about to synchronously extract the crate below. While we're + // doing that our download progress won't actually be updated, nor do we + // have a great view into the progress of the extraction. Let's prepare + // the user for this CPU-heavy step if it looks like it'll take some + // time to do so. + let kib_400 = 1024 * 400; + if contents.len() < kib_400 { + self.tick(WhyTick::DownloadFinished)?; + } else { + self.tick(WhyTick::Extracting(&id.name()))?; + } - /// The URL that we're downloading from, cached here for error messages and - /// reenqueuing. - url: String, + Ok(source.finish_download(id, contents).await?) + } + }?; - /// A descriptive string to print when we've finished downloading this crate. - descriptor: String, + assert!(slot.set(pkg).is_ok()); + Ok(slot.get().unwrap()) + } + + /// Perform the request to download the .crate file. + async fn fetch( + &self, + url: &str, + authorization: Option<&str>, + descriptor: &str, + id: &PackageId, + ) -> CargoResult> { + // http::Uri doesn't support file urls without an authority, even though it's optional. + // so we insert localhost here to make it work. + let mut request = if let Some(file_url) = url.strip_prefix("file:///") { + Request::get(format!("file://localhost/{file_url}")) + } else { + Request::get(url) + }; + if let Some(authorization) = authorization { + request = request.header(http::header::AUTHORIZATION, authorization); + } + let client = self + .set + .gctx + .http_async() + .with_context(|| format!("failed to download `{}`", id))?; - /// Statistics updated from the progress callback in libcurl. - total: Cell, - current: Cell, + // If the progress bar isn't enabled then it may be awhile before the + // first crate finishes downloading so we inform immediately that we're + // downloading crates here. + if self.first.get() && !self.progress.borrow().is_enabled() { + self.first.set(false); + self.set.gctx.shell().status("Downloading", "crates ...")?; + } - /// The moment we started this transfer at. - start: Instant, - timed_out: Cell>, + let response = client + .request(request.body(Vec::new())?) + .await + .with_context(|| format!("failed to download from `{}`", url))?; + + let previous_largest = self.largest.get().map(|(v, _)| v).unwrap_or_default(); + let len = response.body().len() as u64; + if len > previous_largest { + self.largest.set(Some((len, id.name()))); + } + + if response.status() != http::StatusCode::OK { + return Err(HttpNotSuccessful::new_from_response(response, &url)) + .with_context(|| format!("failed to download from `{}`", url))?; + } + // If the progress bar isn't enabled then we still want to provide some + // semblance of progress of how we're downloading crates, and if the + // progress bar is enabled this provides a good log of what's happening. + // progress.clear(); + self.set.gctx.shell().status("Downloaded", descriptor)?; + + Ok(response.into_body()) + } + + fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> { + let mut progress = self.progress.borrow_mut(); + + if let WhyTick::DownloadUpdate = why { + if !progress.update_allowed() { + return Ok(()); + } + } + + let pending = self.pending.get(); + let mut msg = if pending == 1 { + format!("{} crate", pending) + } else { + format!("{} crates", pending) + }; + match why { + WhyTick::Extracting(krate) => { + msg.push_str(&format!(", extracting {} ...", krate)); + } + _ => { + let remaining = self + .set + .gctx + .http_async() + .map(|c| c.bytes_pending()) + .unwrap_or_default(); + if remaining > 0 { + msg.push_str(&format!( + ", remaining bytes: {:.1}", + HumanBytes(remaining as u64) + )); + } + } + } + progress.print_now(&msg) + } - /// Logic used to track retrying this download if it's a spurious failure. - retry: Retry<'gctx>, + fn print_summary(&self) -> CargoResult<()> { + // Don't print a download summary if we're not using a progress bar, + // we've already printed lots of `Downloading...` items. + if !self.progress.borrow().is_enabled() { + return Ok(()); + } + let downloads_finished = self.downloads_finished.get(); + + // If we didn't download anything, no need for a summary. + if downloads_finished == 0 { + return Ok(()); + } + + // pick the correct plural of crate(s) + let crate_string = if downloads_finished == 1 { + "crate" + } else { + "crates" + }; + let mut status = format!( + "{downloads_finished} {crate_string} ({:.1}) in {}", + HumanBytes(self.downloaded_bytes.get()), + util::elapsed(self.start.elapsed()) + ); + // print the size of largest crate if it was >1mb + // however don't print if only a single crate was downloaded + // because it is obvious that it will be the largest then + if let Some(largest) = self.largest.get() { + let mib_1 = 1024 * 1024; + if largest.0 > mib_1 && downloads_finished > 1 { + status.push_str(&format!( + " (largest was `{}` at {:.1})", + largest.1, + HumanBytes(largest.0), + )); + } + } + + // Clear progress before displaying final summary. + self.progress.borrow_mut().clear(); + self.set.gctx.shell().status("Downloaded", status)?; + Ok(()) + } } impl<'gctx> PackageSet<'gctx> { @@ -394,16 +583,7 @@ impl<'gctx> PackageSet<'gctx> { sources: SourceMap<'gctx>, gctx: &'gctx GlobalContext, ) -> CargoResult> { - // We've enabled the `http2` feature of `curl` in Cargo, so treat - // failures here as fatal as it would indicate a build-time problem. - let mut multi = Multi::new(); - let multiplexing = gctx.http_config()?.multiplexing.unwrap_or(true); - multi - .pipelining(false, multiplexing) - .context("failed to enable multiplexing/pipelining in curl")?; - - // let's not flood crates.io with connections - multi.set_max_host_connections(2)?; + gctx.http_config()?; Ok(PackageSet { packages: package_ids @@ -412,9 +592,6 @@ impl<'gctx> PackageSet<'gctx> { .collect(), sources: RefCell::new(sources), gctx, - multi, - downloading: Cell::new(false), - multiplexing, }) } @@ -426,36 +603,6 @@ impl<'gctx> PackageSet<'gctx> { self.packages.values().filter_map(|p| p.get()) } - pub fn enable_download<'a>(&'a self) -> CargoResult> { - assert!(!self.downloading.replace(true)); - let timeout = HttpTimeout::new(self.gctx)?; - Ok(Downloads { - start: Instant::now(), - set: self, - next: 0, - pending: HashMap::new(), - pending_ids: HashSet::new(), - sleeping: SleepTracker::new(), - results: VecDeque::new(), - progress: RefCell::new(Some(Progress::with_style( - "Downloading", - ProgressStyle::Ratio, - self.gctx, - ))), - downloads_finished: 0, - downloaded_bytes: 0, - largest: (0, "".into()), - success: false, - updated_at: Cell::new(Instant::now()), - timeout, - next_speed_check: Cell::new(Instant::now()), - next_speed_check_bytes_threshold: Cell::new(0), - _lock: self - .gctx - .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?, - }) - } - pub fn get_one(&self, id: PackageId) -> CargoResult<&Package> { if let Some(pkg) = self.packages.get(&id).and_then(|slot| slot.get()) { return Ok(pkg); @@ -464,23 +611,7 @@ impl<'gctx> PackageSet<'gctx> { } pub fn get_many(&self, ids: impl IntoIterator) -> CargoResult> { - let mut pkgs = Vec::new(); - let _lock = self - .gctx - .acquire_package_cache_lock(CacheLockMode::DownloadExclusive)?; - let mut downloads = self.enable_download()?; - for id in ids { - pkgs.extend(downloads.start(id)?); - } - while downloads.remaining() > 0 { - pkgs.push(downloads.wait()?); - } - downloads.success = true; - drop(downloads); - - let mut deferred = self.gctx.deferred_global_last_use()?; - deferred.save_no_error(self.gctx); - Ok(pkgs) + return crate::util::block_on(Downloads::download(self, ids)); } /// Downloads any packages accessible from the give root ids. @@ -665,8 +796,6 @@ impl<'gctx> PackageSet<'gctx> { /// Merge the given set into self. pub fn add_set(&mut self, set: PackageSet<'gctx>) { - assert!(!self.downloading.get()); - assert!(!set.downloading.get()); for (pkg_id, p_cell) in set.packages { self.packages.entry(pkg_id).or_insert(p_cell); } @@ -676,449 +805,6 @@ impl<'gctx> PackageSet<'gctx> { } } -impl<'a, 'gctx> Downloads<'a, 'gctx> { - /// Starts to download the package for the `id` specified. - /// - /// Returns `None` if the package is queued up for download and will - /// eventually be returned from `wait_for_download`. Returns `Some(pkg)` if - /// the package is ready and doesn't need to be downloaded. - #[tracing::instrument(skip_all)] - pub fn start(&mut self, id: PackageId) -> CargoResult> { - self.start_inner(id) - .with_context(|| format!("failed to download `{}`", id)) - } - - fn start_inner(&mut self, id: PackageId) -> CargoResult> { - // First up see if we've already cached this package, in which case - // there's nothing to do. - let slot = self - .set - .packages - .get(&id) - .ok_or_else(|| internal(format!("couldn't find `{}` in package set", id)))?; - if let Some(pkg) = slot.get() { - return Ok(Some(pkg)); - } - - // Ask the original source for this `PackageId` for the corresponding - // package. That may immediately come back and tell us that the package - // is ready, or it could tell us that it needs to be downloaded. - let sources = self.set.sources.borrow_mut(); - let source = sources - .get(id.source_id()) - .ok_or_else(|| internal(format!("couldn't find source for `{}`", id)))?; - let pkg = source - .download(id) - .context("unable to get packages from source")?; - let (url, descriptor, authorization) = match pkg { - MaybePackage::Ready(pkg) => { - debug!("{} doesn't need a download", id); - assert!(slot.set(pkg).is_ok()); - return Ok(Some(slot.get().unwrap())); - } - MaybePackage::Download { - url, - descriptor, - authorization, - } => (url, descriptor, authorization), - }; - - // Ok we're going to download this crate, so let's set up all our - // internal state and hand off an `Easy` handle to our libcurl `Multi` - // handle. This won't actually start the transfer, but later it'll - // happen during `wait_for_download` - let token = self.next; - self.next += 1; - debug!(target: "network", "downloading {} as {}", id, token); - assert!(self.pending_ids.insert(id)); - - let (mut handle, _timeout) = http_handle_and_timeout(self.set.gctx)?; - handle.get(true)?; - handle.url(&url)?; - handle.follow_location(true)?; // follow redirects - - // Add authorization header. - if let Some(authorization) = authorization { - let mut headers = curl::easy::List::new(); - headers.append(&format!("Authorization: {}", authorization))?; - handle.http_headers(headers)?; - } - - // Enable HTTP/2 if possible. - crate::try_old_curl_http2_pipewait!(self.set.multiplexing, handle); - - handle.write_function(move |buf| { - debug!(target: "network", "{} - {} bytes of data", token, buf.len()); - tls::with(|downloads| { - if let Some(downloads) = downloads { - downloads.pending[&token] - .0 - .data - .borrow_mut() - .extend_from_slice(buf); - } - }); - Ok(buf.len()) - })?; - handle.header_function(move |data| { - tls::with(|downloads| { - if let Some(downloads) = downloads { - // Headers contain trailing \r\n, trim them to make it easier - // to work with. - let h = String::from_utf8_lossy(data).trim().to_string(); - downloads.pending[&token].0.headers.borrow_mut().push(h); - } - }); - true - })?; - - handle.progress(true)?; - handle.progress_function(move |dl_total, dl_cur, _, _| { - tls::with(|downloads| match downloads { - Some(d) => d.progress(token, dl_total as u64, dl_cur as u64), - None => false, - }) - })?; - - // If the progress bar isn't enabled then it may be awhile before the - // first crate finishes downloading so we inform immediately that we're - // downloading crates here. - if self.downloads_finished == 0 - && self.pending.is_empty() - && !self.progress.borrow().as_ref().unwrap().is_enabled() - { - self.set.gctx.shell().status("Downloading", "crates ...")?; - } - - let dl = Download { - token, - data: RefCell::new(Vec::new()), - headers: RefCell::new(Vec::new()), - id, - url, - descriptor, - total: Cell::new(0), - current: Cell::new(0), - start: Instant::now(), - timed_out: Cell::new(None), - retry: Retry::new(self.set.gctx)?, - }; - self.enqueue(dl, handle)?; - self.tick(WhyTick::DownloadStarted)?; - - Ok(None) - } - - /// Returns the number of crates that are still downloading. - pub fn remaining(&self) -> usize { - self.pending.len() + self.sleeping.len() - } - - /// Blocks the current thread waiting for a package to finish downloading. - /// - /// This method will wait for a previously enqueued package to finish - /// downloading and return a reference to it after it's done downloading. - /// - /// # Panics - /// - /// This function will panic if there are no remaining downloads. - #[tracing::instrument(skip_all)] - pub fn wait(&mut self) -> CargoResult<&'a Package> { - let (dl, data) = loop { - assert_eq!(self.pending.len(), self.pending_ids.len()); - let (token, result) = self.wait_for_curl()?; - debug!(target: "network", "{} finished with {:?}", token, result); - - let (mut dl, handle) = self - .pending - .remove(&token) - .expect("got a token for a non-in-progress transfer"); - let data = mem::take(&mut *dl.data.borrow_mut()); - let headers = mem::take(&mut *dl.headers.borrow_mut()); - let mut handle = self.set.multi.remove(handle)?; - self.pending_ids.remove(&dl.id); - - // Check if this was a spurious error. If it was a spurious error - // then we want to re-enqueue our request for another attempt and - // then we wait for another request to finish. - let ret = { - let timed_out = &dl.timed_out; - let url = &dl.url; - dl.retry.r#try(|| { - if let Err(e) = result { - // If this error is "aborted by callback" then that's - // probably because our progress callback aborted due to - // a timeout. We'll find out by looking at the - // `timed_out` field, looking for a descriptive message. - // If one is found we switch the error code (to ensure - // it's flagged as spurious) and then attach our extra - // information to the error. - if !e.is_aborted_by_callback() { - return Err(e.into()); - } - - return Err(match timed_out.replace(None) { - Some(msg) => { - let code = curl_sys::CURLE_OPERATION_TIMEDOUT; - let mut err = curl::Error::new(code); - err.set_extra(msg); - err - } - None => e, - } - .into()); - } - - let code = handle.response_code()?; - if code != 200 && code != 0 { - return Err(HttpNotSuccessful::new_from_handle( - &mut handle, - &url, - data, - headers, - ) - .into()); - } - Ok(data) - }) - }; - match ret { - RetryResult::Success(data) => break (dl, data), - RetryResult::Err(e) => { - return Err(e.context(format!("failed to download from `{}`", dl.url))); - } - RetryResult::Retry(sleep) => { - debug!(target: "network", "download retry {} for {sleep}ms", dl.url); - self.sleeping.push(sleep, (dl, handle)); - } - } - }; - - // If the progress bar isn't enabled then we still want to provide some - // semblance of progress of how we're downloading crates, and if the - // progress bar is enabled this provides a good log of what's happening. - self.progress.borrow_mut().as_mut().unwrap().clear(); - self.set.gctx.shell().status("Downloaded", &dl.descriptor)?; - - self.downloads_finished += 1; - self.downloaded_bytes += dl.total.get(); - if dl.total.get() > self.largest.0 { - self.largest = (dl.total.get(), dl.id.name()); - } - - // We're about to synchronously extract the crate below. While we're - // doing that our download progress won't actually be updated, nor do we - // have a great view into the progress of the extraction. Let's prepare - // the user for this CPU-heavy step if it looks like it'll take some - // time to do so. - let kib_400 = 1024 * 400; - if dl.total.get() < kib_400 { - self.tick(WhyTick::DownloadFinished)?; - } else { - self.tick(WhyTick::Extracting(&dl.id.name()))?; - } - - // Inform the original source that the download is finished which - // should allow us to actually get the package and fill it in now. - let sources = self.set.sources.borrow_mut(); - let source = sources - .get(dl.id.source_id()) - .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?; - let start = Instant::now(); - let pkg = source.finish_download(dl.id, data)?; - - // Assume that no time has passed while we were calling - // `finish_download`, update all speed checks and timeout limits of all - // active downloads to make sure they don't fire because of a slowly - // extracted tarball. - let finish_dur = start.elapsed(); - self.updated_at.set(self.updated_at.get() + finish_dur); - self.next_speed_check - .set(self.next_speed_check.get() + finish_dur); - - let slot = &self.set.packages[&dl.id]; - assert!(slot.set(pkg).is_ok()); - Ok(slot.get().unwrap()) - } - - fn enqueue(&mut self, dl: Download<'gctx>, handle: Easy) -> CargoResult<()> { - let mut handle = self.set.multi.add(handle)?; - let now = Instant::now(); - handle.set_token(dl.token)?; - self.updated_at.set(now); - self.next_speed_check.set(now + self.timeout.dur); - self.next_speed_check_bytes_threshold - .set(u64::from(self.timeout.low_speed_limit)); - dl.timed_out.set(None); - dl.current.set(0); - dl.total.set(0); - self.pending.insert(dl.token, (dl, handle)); - Ok(()) - } - - /// Block, waiting for curl. Returns a token and a `Result` for that token - /// (`Ok` means the download successfully finished). - fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> { - // This is the main workhorse loop. We use libcurl's portable `wait` - // method to actually perform blocking. This isn't necessarily too - // efficient in terms of fd management, but we should only be juggling - // a few anyway. - // - // Here we start off by asking the `multi` handle to do some work via - // the `perform` method. This will actually do I/O work (non-blocking) - // and attempt to make progress. Afterwards we ask about the `messages` - // contained in the handle which will inform us if anything has finished - // transferring. - // - // If we've got a finished transfer after all that work we break out - // and process the finished transfer at the end. Otherwise we need to - // actually block waiting for I/O to happen, which we achieve with the - // `wait` method on `multi`. - loop { - self.add_sleepers()?; - let n = tls::set(self, || { - self.set - .multi - .perform() - .context("failed to perform http requests") - })?; - debug!(target: "network", "handles remaining: {}", n); - let results = &mut self.results; - let pending = &self.pending; - self.set.multi.messages(|msg| { - let token = msg.token().expect("failed to read token"); - let handle = &pending[&token].1; - if let Some(result) = msg.result_for(handle) { - results.push_back((token, result)); - } else { - debug!(target: "network", "message without a result (?)"); - } - }); - - if let Some(pair) = results.pop_front() { - break Ok(pair); - } - assert_ne!(self.remaining(), 0); - if self.pending.is_empty() { - let delay = self.sleeping.time_to_next().unwrap(); - debug!(target: "network", "sleeping main thread for {delay:?}"); - std::thread::sleep(delay); - } else { - let min_timeout = Duration::new(1, 0); - let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout); - let timeout = timeout.min(min_timeout); - self.set - .multi - .wait(&mut [], timeout) - .context("failed to wait on curl `Multi`")?; - } - } - } - - fn add_sleepers(&mut self) -> CargoResult<()> { - for (dl, handle) in self.sleeping.to_retry() { - self.pending_ids.insert(dl.id); - self.enqueue(dl, handle)?; - } - Ok(()) - } - - fn progress(&self, token: usize, total: u64, cur: u64) -> bool { - let dl = &self.pending[&token].0; - dl.total.set(total); - let now = Instant::now(); - if cur > dl.current.get() { - let delta = cur - dl.current.get(); - let threshold = self.next_speed_check_bytes_threshold.get(); - - dl.current.set(cur); - self.updated_at.set(now); - - if delta >= threshold { - self.next_speed_check.set(now + self.timeout.dur); - self.next_speed_check_bytes_threshold - .set(u64::from(self.timeout.low_speed_limit)); - } else { - self.next_speed_check_bytes_threshold.set(threshold - delta); - } - } - if self.tick(WhyTick::DownloadUpdate).is_err() { - return false; - } - - // If we've spent too long not actually receiving any data we time out. - if now > self.updated_at.get() + self.timeout.dur { - self.updated_at.set(now); - let msg = format!( - "failed to download any data for `{}` within {}s", - dl.id, - self.timeout.dur.as_secs() - ); - dl.timed_out.set(Some(msg)); - return false; - } - - // If we reached the point in time that we need to check our speed - // limit, see if we've transferred enough data during this threshold. If - // it fails this check then we fail because the download is going too - // slowly. - if now >= self.next_speed_check.get() { - self.next_speed_check.set(now + self.timeout.dur); - assert!(self.next_speed_check_bytes_threshold.get() > 0); - let msg = format!( - "download of `{}` failed to transfer more \ - than {} bytes in {}s", - dl.id, - self.timeout.low_speed_limit, - self.timeout.dur.as_secs() - ); - dl.timed_out.set(Some(msg)); - return false; - } - - true - } - - fn tick(&self, why: WhyTick<'_>) -> CargoResult<()> { - let mut progress = self.progress.borrow_mut(); - let progress = progress.as_mut().unwrap(); - - if let WhyTick::DownloadUpdate = why { - if !progress.update_allowed() { - return Ok(()); - } - } - let pending = self.remaining(); - let mut msg = if pending == 1 { - format!("{} crate", pending) - } else { - format!("{} crates", pending) - }; - match why { - WhyTick::Extracting(krate) => { - msg.push_str(&format!(", extracting {} ...", krate)); - } - _ => { - let mut dur = Duration::new(0, 0); - let mut remaining = 0; - for (dl, _) in self.pending.values() { - dur += dl.start.elapsed(); - // If the total/current look weird just throw out the data - // point, sounds like curl has more to learn before we have - // the true information. - if dl.total.get() >= dl.current.get() { - remaining += dl.total.get() - dl.current.get(); - } - } - if remaining > 0 && dur > Duration::from_millis(500) { - msg.push_str(&format!(", remaining bytes: {:.1}", HumanBytes(remaining))); - } - } - } - progress.print_now(&msg) - } -} - #[derive(Copy, Clone)] enum WhyTick<'a> { DownloadStarted, @@ -1126,83 +812,3 @@ enum WhyTick<'a> { DownloadFinished, Extracting(&'a str), } - -impl<'a, 'gctx> Drop for Downloads<'a, 'gctx> { - fn drop(&mut self) { - self.set.downloading.set(false); - let progress = self.progress.get_mut().take().unwrap(); - // Don't print a download summary if we're not using a progress bar, - // we've already printed lots of `Downloading...` items. - if !progress.is_enabled() { - return; - } - // If we didn't download anything, no need for a summary. - if self.downloads_finished == 0 { - return; - } - // If an error happened, let's not clutter up the output. - if !self.success { - return; - } - // pick the correct plural of crate(s) - let crate_string = if self.downloads_finished == 1 { - "crate" - } else { - "crates" - }; - let mut status = format!( - "{} {} ({:.1}) in {}", - self.downloads_finished, - crate_string, - HumanBytes(self.downloaded_bytes), - util::elapsed(self.start.elapsed()) - ); - // print the size of largest crate if it was >1mb - // however don't print if only a single crate was downloaded - // because it is obvious that it will be the largest then - let mib_1 = 1024 * 1024; - if self.largest.0 > mib_1 && self.downloads_finished > 1 { - status.push_str(&format!( - " (largest was `{}` at {:.1})", - self.largest.1, - HumanBytes(self.largest.0), - )); - } - // Clear progress before displaying final summary. - drop(progress); - drop(self.set.gctx.shell().status("Downloaded", status)); - } -} - -mod tls { - use std::cell::Cell; - - use super::Downloads; - - thread_local!(static PTR: Cell = const { Cell::new(0) }); - - pub(crate) fn with(f: impl FnOnce(Option<&Downloads<'_, '_>>) -> R) -> R { - let ptr = PTR.with(|p| p.get()); - if ptr == 0 { - f(None) - } else { - unsafe { f(Some(&*(ptr as *const Downloads<'_, '_>))) } - } - } - - pub(crate) fn set(dl: &Downloads<'_, '_>, f: impl FnOnce() -> R) -> R { - struct Reset<'a, T: Copy>(&'a Cell, T); - - impl<'a, T: Copy> Drop for Reset<'a, T> { - fn drop(&mut self) { - self.0.set(self.1); - } - } - - PTR.with(|p| { - let _reset = Reset(p, p.get()); - p.set(dl as *const Downloads<'_, '_> as usize); - f() - }) - } -} diff --git a/src/cargo/sources/directory.rs b/src/cargo/sources/directory.rs index c1f0e6cf5d6..4d49d581889 100644 --- a/src/cargo/sources/directory.rs +++ b/src/cargo/sources/directory.rs @@ -212,7 +212,7 @@ impl<'gctx> Source for DirectorySource<'gctx> { self.source_id } - fn download(&self, id: PackageId) -> CargoResult { + async fn download(&self, id: PackageId) -> CargoResult { self.packages .borrow() .get(&id) @@ -222,7 +222,7 @@ impl<'gctx> Source for DirectorySource<'gctx> { .ok_or_else(|| anyhow::format_err!("failed to find package with id: {}", id)) } - fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { + async fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { panic!("no downloads to do") } diff --git a/src/cargo/sources/git/source.rs b/src/cargo/sources/git/source.rs index e691f22cd6a..1db73a1b9b3 100644 --- a/src/cargo/sources/git/source.rs +++ b/src/cargo/sources/git/source.rs @@ -413,7 +413,7 @@ impl<'gctx> Source for GitSource<'gctx> { *self.source_id.borrow() } - fn download(&self, id: PackageId) -> CargoResult { + async fn download(&self, id: PackageId) -> CargoResult { trace!( "getting packages for package ID `{}` from `{:?}`", id, self.remote @@ -424,9 +424,10 @@ impl<'gctx> Source for GitSource<'gctx> { .as_mut() .expect("BUG: `update()` must be called before `get()`") .download(id) + .await } - fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { + async fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { panic!("no download should have started") } diff --git a/src/cargo/sources/overlay.rs b/src/cargo/sources/overlay.rs index de31c2625eb..8df6ad7dd5d 100644 --- a/src/cargo/sources/overlay.rs +++ b/src/cargo/sources/overlay.rs @@ -86,32 +86,37 @@ impl<'gctx> Source for DependencyConfusionThreatOverlaySource<'gctx> { self.remote.set_quiet(quiet); } - fn download( + async fn download( &self, package: crate::core::PackageId, ) -> crate::CargoResult { let local_source = self.local.source_id(); let remote_source = self.remote.source_id(); - self.local + if let Ok(pkg) = self + .local .download(package.map_source(remote_source, local_source)) + .await .map(|maybe_pkg| match maybe_pkg { MaybePackage::Ready(pkg) => { MaybePackage::Ready(pkg.map_source(local_source, remote_source)) } x => x, }) - .or_else(|_| self.remote.download(package)) + { + return Ok(pkg); + } + self.remote.download(package).await } - fn finish_download( + async fn finish_download( &self, pkg_id: crate::core::PackageId, contents: Vec, ) -> crate::CargoResult { // The local registry should never return MaybePackage::Download from `download`, so any // downloads that need to be finished come from the remote registry. - self.remote.finish_download(pkg_id, contents) + self.remote.finish_download(pkg_id, contents).await } fn fingerprint(&self, pkg: &crate::core::Package) -> crate::CargoResult { diff --git a/src/cargo/sources/path.rs b/src/cargo/sources/path.rs index a2565fd3a5a..78dbe2027ee 100644 --- a/src/cargo/sources/path.rs +++ b/src/cargo/sources/path.rs @@ -171,7 +171,7 @@ impl<'gctx> Source for PathSource<'gctx> { self.source_id } - fn download(&self, id: PackageId) -> CargoResult { + async fn download(&self, id: PackageId) -> CargoResult { trace!("getting packages; id={}", id); self.load()?; let pkg = self.package.borrow(); @@ -181,7 +181,7 @@ impl<'gctx> Source for PathSource<'gctx> { .ok_or_else(|| internal(format!("failed to find {} in path source", id))) } - fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { + async fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { panic!("no download should have started") } @@ -370,7 +370,7 @@ impl<'gctx> Source for RecursivePathSource<'gctx> { self.source_id } - fn download(&self, id: PackageId) -> CargoResult { + async fn download(&self, id: PackageId) -> CargoResult { trace!("getting packages; id={}", id); self.load()?; let pkgs = self.packages.borrow(); @@ -382,7 +382,7 @@ impl<'gctx> Source for RecursivePathSource<'gctx> { .ok_or_else(|| internal(format!("failed to find {} in path source", id))) } - fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { + async fn finish_download(&self, _id: PackageId, _data: Vec) -> CargoResult { panic!("no download should have started") } diff --git a/src/cargo/sources/registry/http_remote.rs b/src/cargo/sources/registry/http_remote.rs index a98d2eee7ae..6019e5984c8 100644 --- a/src/cargo/sources/registry/http_remote.rs +++ b/src/cargo/sources/registry/http_remote.rs @@ -302,8 +302,8 @@ impl<'gctx> RegistryData for HttpRegistry<'gctx> { self.inner().progress.replace(None); } - fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult { - let registry_config = crate::util::block_on(self.config())?; + async fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult { + let registry_config = self.config().await?; download::download( &self.inner().crate_cache_path, &self.inner().gctx, @@ -314,7 +314,12 @@ impl<'gctx> RegistryData for HttpRegistry<'gctx> { ) } - fn finish_download(&self, pkg: PackageId, checksum: &str, data: &[u8]) -> CargoResult { + async fn finish_download( + &self, + pkg: PackageId, + checksum: &str, + data: &[u8], + ) -> CargoResult { download::finish_download( &self.inner().crate_cache_path, &self.inner().gctx, diff --git a/src/cargo/sources/registry/local.rs b/src/cargo/sources/registry/local.rs index 146ba6b57e1..f87b0709772 100644 --- a/src/cargo/sources/registry/local.rs +++ b/src/cargo/sources/registry/local.rs @@ -171,7 +171,7 @@ impl<'gctx> RegistryData for LocalRegistry<'gctx> { true } - fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult { + async fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult { // Note that the usage of `into_path_unlocked` here is because the local // crate files here never change in that we're not the one writing them, // so it's not our responsibility to synchronize access to them. @@ -201,7 +201,12 @@ impl<'gctx> RegistryData for LocalRegistry<'gctx> { Ok(MaybeLock::Ready(crate_file)) } - fn finish_download(&self, _pkg: PackageId, _checksum: &str, _data: &[u8]) -> CargoResult { + async fn finish_download( + &self, + _pkg: PackageId, + _checksum: &str, + _data: &[u8], + ) -> CargoResult { panic!("this source doesn't download") } } diff --git a/src/cargo/sources/registry/mod.rs b/src/cargo/sources/registry/mod.rs index 7ecc64e41bc..441278fc058 100644 --- a/src/cargo/sources/registry/mod.rs +++ b/src/cargo/sources/registry/mod.rs @@ -403,7 +403,7 @@ pub trait RegistryData { /// `finish_download`. For already downloaded `.crate` files, it does not /// validate the checksum, assuming the filesystem does not suffer from /// corruption or manipulation. - fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult; + async fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult; /// Finish a download by saving a `.crate` file to disk. /// @@ -413,7 +413,12 @@ pub trait RegistryData { /// the given data to the on-disk cache. /// /// Returns a [`File`] handle to the `.crate` file, positioned at the start. - fn finish_download(&self, pkg: PackageId, checksum: &str, data: &[u8]) -> CargoResult; + async fn finish_download( + &self, + pkg: PackageId, + checksum: &str, + data: &[u8], + ) -> CargoResult; /// Returns whether or not the `.crate` file is already downloaded. fn is_crate_downloaded(&self, _pkg: PackageId) -> bool { @@ -709,13 +714,13 @@ impl<'gctx> RegistrySource<'gctx> { /// should only be called after doing integrity check. That is to say, /// you need to call either [`RegistryData::download`] or /// [`RegistryData::finish_download`] before calling this method. - fn get_pkg(&self, package: PackageId, path: &File) -> CargoResult { + async fn get_pkg(&self, package: PackageId, path: &File) -> CargoResult { let path = self .unpack_package(package, path) .with_context(|| format!("failed to unpack package `{}`", package))?; let src = PathSource::new(&path, self.source_id, self.gctx); src.load()?; - let mut pkg = match src.download(package)? { + let mut pkg = match src.download(package).await? { MaybePackage::Ready(pkg) => pkg, MaybePackage::Download { .. } => unreachable!(), }; @@ -921,10 +926,10 @@ impl<'gctx> Source for RegistrySource<'gctx> { self.ops.set_quiet(quiet); } - fn download(&self, package: PackageId) -> CargoResult { - let hash = crate::util::block_on(self.index.hash(package, &*self.ops))?; - match self.ops.download(package, &hash)? { - MaybeLock::Ready(file) => self.get_pkg(package, &file).map(MaybePackage::Ready), + async fn download(&self, package: PackageId) -> CargoResult { + let hash = self.index.hash(package, &*self.ops).await?; + match self.ops.download(package, &hash).await? { + MaybeLock::Ready(file) => self.get_pkg(package, &file).await.map(MaybePackage::Ready), MaybeLock::Download { url, descriptor, @@ -937,10 +942,10 @@ impl<'gctx> Source for RegistrySource<'gctx> { } } - fn finish_download(&self, package: PackageId, data: Vec) -> CargoResult { - let hash = crate::util::block_on(self.index.hash(package, &*self.ops))?; - let file = self.ops.finish_download(package, &hash, &data)?; - self.get_pkg(package, &file) + async fn finish_download(&self, package: PackageId, data: Vec) -> CargoResult { + let hash = self.index.hash(package, &*self.ops).await?; + let file = self.ops.finish_download(package, &hash, &data).await?; + self.get_pkg(package, &file).await } fn fingerprint(&self, pkg: &Package) -> CargoResult { diff --git a/src/cargo/sources/registry/remote.rs b/src/cargo/sources/registry/remote.rs index b12119ac786..6e829cb3943 100644 --- a/src/cargo/sources/registry/remote.rs +++ b/src/cargo/sources/registry/remote.rs @@ -423,8 +423,8 @@ impl<'gctx> RegistryData for RemoteRegistry<'gctx> { self.is_updated() } - fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult { - let registry_config = crate::util::block_on(self.config())?.unwrap(); + async fn download(&self, pkg: PackageId, checksum: &str) -> CargoResult { + let registry_config = self.config().await?.unwrap(); download::download( &self.cache_path, @@ -436,7 +436,12 @@ impl<'gctx> RegistryData for RemoteRegistry<'gctx> { ) } - fn finish_download(&self, pkg: PackageId, checksum: &str, data: &[u8]) -> CargoResult { + async fn finish_download( + &self, + pkg: PackageId, + checksum: &str, + data: &[u8], + ) -> CargoResult { download::finish_download( &self.cache_path, &self.gctx, diff --git a/src/cargo/sources/replaced.rs b/src/cargo/sources/replaced.rs index 23eff200cdf..d0e3d5ba33c 100644 --- a/src/cargo/sources/replaced.rs +++ b/src/cargo/sources/replaced.rs @@ -96,9 +96,9 @@ impl<'gctx> Source for ReplacedSource<'gctx> { self.inner.set_quiet(quiet); } - fn download(&self, id: PackageId) -> CargoResult { + async fn download(&self, id: PackageId) -> CargoResult { let id = id.with_source_id(self.replace_with); - let pkg = self.inner.download(id).map_err(|e| { + let pkg = self.inner.download(id).await.map_err(|e| { if self.is_builtin_replacement() { e } else { @@ -116,9 +116,9 @@ impl<'gctx> Source for ReplacedSource<'gctx> { }) } - fn finish_download(&self, id: PackageId, data: Vec) -> CargoResult { + async fn finish_download(&self, id: PackageId, data: Vec) -> CargoResult { let id = id.with_source_id(self.replace_with); - let pkg = self.inner.finish_download(id, data).map_err(|e| { + let pkg = self.inner.finish_download(id, data).await.map_err(|e| { if self.is_builtin_replacement() { e } else { diff --git a/src/cargo/sources/source.rs b/src/cargo/sources/source.rs index f0673e99d31..02740231a7a 100644 --- a/src/cargo/sources/source.rs +++ b/src/cargo/sources/source.rs @@ -81,7 +81,7 @@ pub trait Source { /// In the case where [`MaybePackage::Download`] is returned, then the /// package downloader will call [`Source::finish_download`] after the /// download has finished. - fn download(&self, package: PackageId) -> CargoResult; + async fn download(&self, package: PackageId) -> CargoResult; /// Gives the source the downloaded `.crate` file. /// @@ -90,7 +90,7 @@ pub trait Source { /// the results of the download of the given URL. The source is /// responsible for saving to disk, and returning the appropriate /// [`Package`]. - fn finish_download(&self, pkg_id: PackageId, contents: Vec) -> CargoResult; + async fn finish_download(&self, pkg_id: PackageId, contents: Vec) -> CargoResult; /// Generates a unique string which represents the fingerprint of the /// current state of the source. @@ -214,12 +214,12 @@ impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T { (**self).set_quiet(quiet) } - fn download(&self, id: PackageId) -> CargoResult { - (**self).download(id) + async fn download(&self, id: PackageId) -> CargoResult { + (**self).download(id).await } - fn finish_download(&self, id: PackageId, data: Vec) -> CargoResult { - (**self).finish_download(id, data) + async fn finish_download(&self, id: PackageId, data: Vec) -> CargoResult { + (**self).finish_download(id, data).await } fn fingerprint(&self, pkg: &Package) -> CargoResult { diff --git a/src/cargo/util/network/mod.rs b/src/cargo/util/network/mod.rs index c2785e30bad..bb20ffa8efa 100644 --- a/src/cargo/util/network/mod.rs +++ b/src/cargo/util/network/mod.rs @@ -11,7 +11,6 @@ pub mod http; pub mod http_async; pub mod proxy; pub mod retry; -pub mod sleep; /// LOCALHOST constants for both IPv4 and IPv6. pub const LOCALHOST: [SocketAddr; 2] = [ diff --git a/src/cargo/util/network/sleep.rs b/src/cargo/util/network/sleep.rs deleted file mode 100644 index cfe03953052..00000000000 --- a/src/cargo/util/network/sleep.rs +++ /dev/null @@ -1,104 +0,0 @@ -//! Utility for tracking network requests that will be retried in the future. - -use core::cmp::Ordering; -use std::collections::BinaryHeap; -use std::time::{Duration, Instant}; - -/// A tracker for network requests that have failed, and are awaiting to be -/// retried in the future. -pub struct SleepTracker { - /// This is a priority queue that tracks the time when the next sleeper - /// should awaken (based on the [`Sleeper::wakeup`] property). - heap: BinaryHeap>, -} - -/// An individual network request that is waiting to be retried in the future. -struct Sleeper { - /// The time when this requests should be retried. - wakeup: Instant, - /// Information about the network request. - data: T, -} - -impl PartialEq for Sleeper { - fn eq(&self, other: &Sleeper) -> bool { - self.wakeup == other.wakeup - } -} - -impl PartialOrd for Sleeper { - fn partial_cmp(&self, other: &Sleeper) -> Option { - // This reverses the comparison so that the BinaryHeap tracks the - // entry with the *lowest* wakeup time. - Some(other.wakeup.cmp(&self.wakeup)) - } -} - -impl Eq for Sleeper {} - -impl Ord for Sleeper { - fn cmp(&self, other: &Sleeper) -> Ordering { - self.wakeup.cmp(&other.wakeup) - } -} - -impl SleepTracker { - pub fn new() -> SleepTracker { - SleepTracker { - heap: BinaryHeap::new(), - } - } - - /// Adds a new download that should be retried in the future. - pub fn push(&mut self, sleep: u64, data: T) { - self.heap.push(Sleeper { - wakeup: Instant::now() - .checked_add(Duration::from_millis(sleep)) - .expect("instant should not wrap"), - data, - }); - } - - pub fn len(&self) -> usize { - self.heap.len() - } - - /// Returns any downloads that are ready to go now. - pub fn to_retry(&mut self) -> Vec { - let now = Instant::now(); - let mut result = Vec::new(); - while let Some(next) = self.heap.peek() { - if next.wakeup < now { - result.push(self.heap.pop().unwrap().data); - } else { - break; - } - } - result - } - - /// Returns the time when the next download is ready to go. - /// - /// Returns None if there are no sleepers remaining. - pub fn time_to_next(&self) -> Option { - self.heap - .peek() - .map(|s| s.wakeup.saturating_duration_since(Instant::now())) - } -} - -#[test] -fn returns_in_order() { - let mut s = SleepTracker::new(); - s.push(30_000, 30_000); - s.push(1, 1); - assert_eq!(s.len(), 2); - std::thread::sleep(Duration::from_millis(2)); - assert_eq!(s.to_retry(), &[1]); - assert!(s.to_retry().is_empty()); - let next = s.time_to_next().expect("should be next"); - assert!( - next < Duration::from_millis(30_000), - "{next:?} should be less than 30s" - ); -} diff --git a/tests/testsuite/alt_registry.rs b/tests/testsuite/alt_registry.rs index c9c83c78353..e7a4130f492 100644 --- a/tests/testsuite/alt_registry.rs +++ b/tests/testsuite/alt_registry.rs @@ -87,7 +87,8 @@ fn depend_on_alt_registry_depends_on_same_registry_no_index() { .publish(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `alternative` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -98,7 +99,9 @@ fn depend_on_alt_registry_depends_on_same_registry_no_index() { [CHECKING] foo v0.0.1 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } @@ -130,7 +133,8 @@ fn depend_on_alt_registry_depends_on_same_registry() { .publish(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `alternative` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -141,7 +145,9 @@ fn depend_on_alt_registry_depends_on_same_registry() { [CHECKING] foo v0.0.1 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } diff --git a/tests/testsuite/cargo_tree/deps.rs b/tests/testsuite/cargo_tree/deps.rs index d7e4e2b5922..d3807a3f63f 100644 --- a/tests/testsuite/cargo_tree/deps.rs +++ b/tests/testsuite/cargo_tree/deps.rs @@ -1677,7 +1677,8 @@ fn ambiguous_name() { .build(); p.cargo("tree -p dep") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 3 packages to latest compatible versions [ADDING] dep v1.0.0 (available: v2.0.0) @@ -1690,7 +1691,9 @@ fn ambiguous_name() { dep@1.0.0 dep@2.0.0 -"#]]) +"#]] + .unordered(), + ) .with_status(101) .run(); } diff --git a/tests/testsuite/cargo_tree/dupe/stderr.term.svg b/tests/testsuite/cargo_tree/dupe/stderr.term.svg index 90da2719e6e..a4a72478719 100644 --- a/tests/testsuite/cargo_tree/dupe/stderr.term.svg +++ b/tests/testsuite/cargo_tree/dupe/stderr.term.svg @@ -24,11 +24,11 @@ Downloading crates ... - Downloaded a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded b v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded c v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) diff --git a/tests/testsuite/cargo_tree/edge_kind/stderr.term.svg b/tests/testsuite/cargo_tree/edge_kind/stderr.term.svg index c257abd33fb..418f1052390 100644 --- a/tests/testsuite/cargo_tree/edge_kind/stderr.term.svg +++ b/tests/testsuite/cargo_tree/edge_kind/stderr.term.svg @@ -24,41 +24,41 @@ Downloading crates ... - Downloaded build_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded build_b v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded build_b_build_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded build_b_build_a_normal_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded build_c v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded build_d v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded dev_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded dev_b v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded dev_b_build_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded dev_b_build_a_normal_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded dev_c v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded dev_d v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded normal_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded normal_b v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded normal_b_build_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded normal_b_build_a_normal_a v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded normal_c v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) - Downloaded normal_d v1.0.0 (registry `dummy-registry`) + Downloaded [..] v1.0.0 (registry `dummy-registry`) diff --git a/tests/testsuite/cfg.rs b/tests/testsuite/cfg.rs index f7af4d0304d..c185e3c4a50 100644 --- a/tests/testsuite/cfg.rs +++ b/tests/testsuite/cfg.rs @@ -93,7 +93,8 @@ fn works_through_the_registry() { .build(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -104,7 +105,9 @@ fn works_through_the_registry() { [CHECKING] foo v0.0.1 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } diff --git a/tests/testsuite/doc.rs b/tests/testsuite/doc.rs index 070803e42e3..29b0cea77f4 100644 --- a/tests/testsuite/doc.rs +++ b/tests/testsuite/doc.rs @@ -3961,7 +3961,7 @@ fn mergeable_info_dep_collision() { [FINISHED] documentation merge in [ELAPSED]s [GENERATED] [ROOT]/foo/target/doc/dep/index.html -"#]] +"#]].unordered() ) .run(); diff --git a/tests/testsuite/features.rs b/tests/testsuite/features.rs index 5e99a5022cc..7eaf53388a6 100644 --- a/tests/testsuite/features.rs +++ b/tests/testsuite/features.rs @@ -2263,7 +2263,8 @@ fn registry_summary_order_doesnt_matter() { .build(); p.cargo("run") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -2275,7 +2276,9 @@ fn registry_summary_order_doesnt_matter() { [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s [RUNNING] `target/debug/foo[EXE]` -"#]]) +"#]] + .unordered(), + ) .with_stdout_data(str![[r#" it works diff --git a/tests/testsuite/features_namespaced.rs b/tests/testsuite/features_namespaced.rs index d9b5de44494..2874e97a1f0 100644 --- a/tests/testsuite/features_namespaced.rs +++ b/tests/testsuite/features_namespaced.rs @@ -32,7 +32,8 @@ fn dependency_with_crate_syntax() { .build(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -43,7 +44,9 @@ fn dependency_with_crate_syntax() { [CHECKING] foo v0.1.0 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } diff --git a/tests/testsuite/future_incompat_report.rs b/tests/testsuite/future_incompat_report.rs index 7c53cd6072b..e59a9469524 100644 --- a/tests/testsuite/future_incompat_report.rs +++ b/tests/testsuite/future_incompat_report.rs @@ -610,7 +610,7 @@ fn suggestions_for_updates() { https://doc.rust-lang.org/cargo/reference/overriding-dependencies.html#the-patch-section [NOTE] this report can be shown with `cargo report future-incompatibilities --id 1` -"#]]) +"#]].unordered()) .run(); p.cargo("report future-incompatibilities") diff --git a/tests/testsuite/inheritable_workspace_fields.rs b/tests/testsuite/inheritable_workspace_fields.rs index 14ad149ade8..64a32f027f1 100644 --- a/tests/testsuite/inheritable_workspace_fields.rs +++ b/tests/testsuite/inheritable_workspace_fields.rs @@ -1167,7 +1167,8 @@ fn inherit_dependency_features() { .build(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -1178,7 +1179,9 @@ fn inherit_dependency_features() { [CHECKING] bar v0.2.0 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); let lockfile = p.read_lockfile(); @@ -1546,7 +1549,7 @@ fn warn_inherit_def_feat_true_member_def_feat_false() { [CHECKING] bar v0.2.0 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]).run(); +"#]].unordered()).run(); } #[cargo_test] @@ -1638,7 +1641,7 @@ fn warn_inherit_simple_member_def_feat_false() { [CHECKING] bar v0.2.0 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]).run(); +"#]].unordered()).run(); } #[cargo_test] @@ -1719,7 +1722,8 @@ fn inherit_def_feat_false_member_def_feat_true() { .build(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -1730,7 +1734,9 @@ fn inherit_def_feat_false_member_def_feat_true() { [CHECKING] bar v0.2.0 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } diff --git a/tests/testsuite/lints/unused_dependencies.rs b/tests/testsuite/lints/unused_dependencies.rs index dab557305b1..8c7b915d3b8 100644 --- a/tests/testsuite/lints/unused_dependencies.rs +++ b/tests/testsuite/lints/unused_dependencies.rs @@ -1230,7 +1230,8 @@ fn pinned_transitive_dep() { p.cargo("check -Zcargo-lints") .masquerade_as_nightly_cargo(&["cargo-lints"]) - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -1241,7 +1242,9 @@ fn pinned_transitive_dep() { [CHECKING] foo v0.1.0 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } @@ -1301,7 +1304,8 @@ pub fn fun() -> &'static str { p.cargo("check -Zcargo-lints") .masquerade_as_nightly_cargo(&["cargo-lints"]) - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -1323,7 +1327,9 @@ pub fn fun() -> &'static str { | [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } diff --git a/tests/testsuite/local_registry.rs b/tests/testsuite/local_registry.rs index 90c8c80c136..100e58eff5b 100644 --- a/tests/testsuite/local_registry.rs +++ b/tests/testsuite/local_registry.rs @@ -300,7 +300,8 @@ fn interdependent() { .build(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [LOCKING] 2 packages to latest compatible versions [UNPACKING] bar v0.0.1 (registry `[ROOT]/registry`) [UNPACKING] baz v0.1.0 (registry `[ROOT]/registry`) @@ -309,7 +310,9 @@ fn interdependent() { [CHECKING] foo v0.0.1 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } @@ -370,7 +373,8 @@ fn path_dep_rewritten() { .build(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [LOCKING] 2 packages to latest compatible versions [UNPACKING] bar v0.0.1 (registry `[ROOT]/registry`) [UNPACKING] baz v0.1.0 (registry `[ROOT]/registry`) @@ -379,7 +383,9 @@ fn path_dep_rewritten() { [CHECKING] foo v0.0.1 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } diff --git a/tests/testsuite/pub_priv.rs b/tests/testsuite/pub_priv.rs index 3fd697d8d91..28418b221a9 100644 --- a/tests/testsuite/pub_priv.rs +++ b/tests/testsuite/pub_priv.rs @@ -564,7 +564,8 @@ fn publish_package_with_public_dependency() { p.cargo("check --message-format=short") .masquerade_as_nightly_cargo(&["public-dependency"]) - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 2 packages to latest compatible versions [DOWNLOADING] crates ... @@ -575,7 +576,9 @@ fn publish_package_with_public_dependency() { [CHECKING] foo v0.0.1 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } diff --git a/tests/testsuite/registry.rs b/tests/testsuite/registry.rs index 956ee3862a3..d2c8328745a 100644 --- a/tests/testsuite/registry.rs +++ b/tests/testsuite/registry.rs @@ -3588,6 +3588,9 @@ internal server error [LOCKING] 3 packages to latest compatible versions [..] DEBUG cargo::core::resolver::restarting: pending=0 [DOWNLOADING] crates ... + [..] DEBUG network::fetch: url="http://[..]/dl/bar/0.0.1/download" + [..] DEBUG network::fetch: url="http://[..]/dl/dep1/0.0.1/download" + [..] DEBUG network::fetch: url="http://[..]/dl/dep2/0.0.1/download" [DOWNLOADED] [..] v0.0.1 (registry `dummy-registry`) [DOWNLOADED] [..] v0.0.1 (registry `dummy-registry`) [DOWNLOADED] [..] v0.0.1 (registry `dummy-registry`) diff --git a/tests/testsuite/rust_version.rs b/tests/testsuite/rust_version.rs index 44cbf34c8a7..1d4e71609e5 100644 --- a/tests/testsuite/rust_version.rs +++ b/tests/testsuite/rust_version.rs @@ -164,7 +164,8 @@ fn lint_dep_incompatible_with_rust_version() { .run(); p.cargo("check") .with_status(101) - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [DOWNLOADING] crates ... [DOWNLOADED] rustc_compatible v0.0.1 (registry `dummy-registry`) [DOWNLOADED] too_new_child v0.0.1 (registry `dummy-registry`) @@ -177,7 +178,9 @@ Either upgrade rustc or select compatible dependency versions with where `` is the latest version supporting rustc [..] -"#]]) +"#]] + .unordered(), + ) .run(); p.cargo("check --ignore-rust-version").run(); } diff --git a/tests/testsuite/weak_dep_features.rs b/tests/testsuite/weak_dep_features.rs index b556ec2528e..03d19661353 100644 --- a/tests/testsuite/weak_dep_features.rs +++ b/tests/testsuite/weak_dep_features.rs @@ -106,7 +106,8 @@ fn deferred() { .build(); p.cargo("check") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 3 packages to latest compatible versions [DOWNLOADING] crates ... @@ -119,7 +120,9 @@ fn deferred() { [CHECKING] foo v0.1.0 ([ROOT]/foo) [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s -"#]]) +"#]] + .unordered(), + ) .run(); } @@ -351,7 +354,8 @@ fn weak_with_host_decouple() { .build(); p.cargo("run") - .with_stderr_data(str![[r#" + .with_stderr_data( + str![[r#" [UPDATING] `dummy-registry` index [LOCKING] 3 packages to latest compatible versions [DOWNLOADING] crates ... @@ -365,7 +369,9 @@ fn weak_with_host_decouple() { [FINISHED] `dev` profile [unoptimized + debuginfo] target(s) in [ELAPSED]s [RUNNING] `target/debug/foo[EXE]` -"#]]) +"#]] + .unordered(), + ) .run(); }