Skip to content

Switch from sync to a tokio runtime #3367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
616ba09
Convert main to using a tokio runtime always
rbtcollins May 29, 2023
cf615ba
Make maybe_trace_rustup async
rbtcollins May 29, 2023
56853bd
Remove maybe_trace_rustup runtime setup
rbtcollins May 29, 2023
d1f5c55
Make run_rustup async
rbtcollins May 29, 2023
2277b8b
Convert run_rustup_inner to async
rbtcollins May 29, 2023
d8e7293
Make rustup_mode::main async
rbtcollins May 29, 2023
da291bb
Make download_with_backend async
rbtcollins May 29, 2023
eab53a5
Make download_to_path_with_backend async
rbtcollins May 29, 2023
0db19d1
Make download_file_ async
rbtcollins May 29, 2023
0fe714e
Make download_file_with_resume async
rbtcollins May 29, 2023
53b17f7
Make DownloadCfg::download async
rbtcollins May 29, 2023
b51b7c4
Make download_file async
rbtcollins May 30, 2023
ca0c797
Make try_install_msvc async
rbtcollins May 30, 2023
7f2123a
Make self_update::install async
rbtcollins May 30, 2023
b82e5f1
Make setup_mode::main async
rbtcollins Aug 15, 2023
b6cf2d7
Make get_available_rustup_version async
rbtcollins Aug 15, 2023
ab7346d
Make prepare_update async
rbtcollins Aug 15, 2023
4dde84b
Make check_rustup_update async
rbtcollins Aug 15, 2023
e639114
Make self_update::update async
rbtcollins Aug 15, 2023
dd0c9a2
Make DownloadCfg::download_hash async
rbtcollins Aug 15, 2023
e21aa20
Make DownloadCfg::download_and_check async
rbtcollins Aug 15, 2023
dcd0b36
Make download retries async
rbtcollins Aug 15, 2023
8fe45fb
Make manifestation::update async
rbtcollins Aug 15, 2023
5fd593c
Make common::self_update async
rbtcollins Aug 15, 2023
9f37164
Make dist::dl_*_manifest async
rbtcollins Aug 15, 2023
9a08ea9
Make update_v1 async
rbtcollins Aug 15, 2023
2a173cc
Make try_update_from_dist_ async
rbtcollins Aug 15, 2023
e70d7bc
Make update_from_dist_ async
rbtcollins Aug 15, 2023
c8db16c
Make update_from_dist async
rbtcollins Aug 15, 2023
e6b6f20
Make InstallMethod::run async
rbtcollins Aug 15, 2023
f4bd1a9
Make InstallMethod::install async
rbtcollins Aug 15, 2023
7edc2d2
Make show_dist_version async
rbtcollins Aug 15, 2023
7d0c892
Make update_extra async
rbtcollins Aug 15, 2023
825b1dd
Make toolchain.update async
rbtcollins Aug 15, 2023
0401846
Make DistributableToolChain::install async
rbtcollins Aug 16, 2023
a0e3acc
Make DistributableTool::install_if_not_installed async
rbtcollins Aug 16, 2023
0c68c56
Make DistributableToolchain::add_component async
rbtcollins Aug 16, 2023
584464f
Make DistributableToolchain::remove_component async
rbtcollins Aug 16, 2023
512e613
Make override_add async
rbtcollins Aug 16, 2023
9ba20eb
Make toolchain_link async
rbtcollins May 30, 2024
364b5ed
Make update_all_channels async
rbtcollins May 30, 2024
c7c7682
Make component_remove async
rbtcollins Aug 16, 2023
c1e51de
Make component_add async
rbtcollins Aug 16, 2023
e89952d
Make target_remove async
rbtcollins Aug 16, 2023
47477e2
Make target_add async
rbtcollins Aug 16, 2023
6a79501
Make check_updates async
rbtcollins Aug 16, 2023
5dc8bf9
Make default_ async
rbtcollins Aug 16, 2023
b2f1e21
Make update async
rbtcollins Aug 16, 2023
d14b3e8
Make manifestation test update_from_dist async
rbtcollins Aug 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ otel = [
"dep:tracing-subscriber",
"dep:opentelemetry",
"dep:opentelemetry_sdk",
"dep:tokio",
"dep:tracing",
]

# Exports code dependent on private interfaces for the integration test suite
Expand Down Expand Up @@ -85,11 +83,12 @@ tempfile.workspace = true
termcolor.workspace = true
thiserror.workspace = true
threadpool = "1"
tokio = { workspace = true, optional = true }
tokio-retry.workspace = true
tokio.workspace = true
toml = "0.8"
tracing-opentelemetry = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true, features = ["env-filter"] }
tracing = { workspace = true, optional = true }
tracing.workspace = true
url.workspace = true
wait-timeout = "0.2"
walkdir = { workspace = true, optional = true }
Expand Down Expand Up @@ -145,7 +144,9 @@ rustup-macros = { path = "rustup-macros" }
tempfile = "3.8"
termcolor = "1.2"
thiserror = "1.0"
tokio = { version = "1.26.0", default-features = false, features = ["rt-multi-thread"] }
tokio = { version = "1.26.0", default-features = false, features = ["macros", "rt-multi-thread"] }
tokio-retry = { version = "0.3.0" }
tokio-stream = { version = "0.1.14" }
tracing = "0.1"
tracing-opentelemetry = "0.24"
tracing-subscriber = "0.3.16"
Expand Down
5 changes: 3 additions & 2 deletions download/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ anyhow.workspace = true
curl = { version = "0.4.44", optional = true }
env_proxy = { version = "0.4.1", optional = true }
once_cell = { workspace = true, optional = true }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "gzip", "socks"], optional = true }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "gzip", "socks", "stream"], optional = true }
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = ["sync"] }
tokio-stream.workspace = true
url.workspace = true

[dev-dependencies]
http-body-util = "0.1.0"
hyper = { version = "1.0", default-features = false, features = ["server", "http1"] }
hyper-util = { version = "0.1.1", features = ["tokio"] }
tempfile.workspace = true
tokio = { workspace = true, default-features = false, features = ["sync"] }
196 changes: 104 additions & 92 deletions download/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::Path;

use anyhow::Context;
pub use anyhow::Result;
use std::fs::remove_file;
use url::Url;

mod errors;
Expand Down Expand Up @@ -49,108 +50,124 @@ pub enum Event<'a> {

type DownloadCallback<'a> = &'a dyn Fn(Event<'_>) -> Result<()>;

fn download_with_backend(
async fn download_with_backend(
backend: Backend,
url: &Url,
resume_from: u64,
callback: DownloadCallback<'_>,
) -> Result<()> {
match backend {
Backend::Curl => curl::download(url, resume_from, callback),
Backend::Reqwest(tls) => reqwest_be::download(url, resume_from, callback, tls),
Backend::Reqwest(tls) => reqwest_be::download(url, resume_from, callback, tls).await,
}
}

pub fn download_to_path_with_backend(
pub async fn download_to_path_with_backend(
backend: Backend,
url: &Url,
path: &Path,
resume_from_partial: bool,
callback: Option<DownloadCallback<'_>>,
) -> Result<()> {
let Err(err) =
download_to_path_with_backend_(backend, url, path, resume_from_partial, callback).await
else {
return Ok(());
};

// TODO: We currently clear up the cached download on any error, should we restrict it to a subset?
Err(
if let Err(file_err) = remove_file(path).context("cleaning up cached downloads") {
file_err.context(err)
} else {
err
},
)
}

pub async fn download_to_path_with_backend_(
backend: Backend,
url: &Url,
path: &Path,
resume_from_partial: bool,
callback: Option<DownloadCallback<'_>>,
) -> Result<()> {
use std::cell::RefCell;
use std::fs::remove_file;
use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom, Write};

|| -> Result<()> {
let (file, resume_from) = if resume_from_partial {
let possible_partial = OpenOptions::new().read(true).open(path);

let downloaded_so_far = if let Ok(mut partial) = possible_partial {
if let Some(cb) = callback {
cb(Event::ResumingPartialDownload)?;

let mut buf = vec![0; 32768];
let mut downloaded_so_far = 0;
loop {
let n = partial.read(&mut buf)?;
downloaded_so_far += n as u64;
if n == 0 {
break;
}
cb(Event::DownloadDataReceived(&buf[..n]))?;
let (file, resume_from) = if resume_from_partial {
// TODO: blocking call
let possible_partial = OpenOptions::new().read(true).open(path);

let downloaded_so_far = if let Ok(mut partial) = possible_partial {
if let Some(cb) = callback {
cb(Event::ResumingPartialDownload)?;

let mut buf = vec![0; 32768];
let mut downloaded_so_far = 0;
loop {
let n = partial.read(&mut buf)?;
downloaded_so_far += n as u64;
if n == 0 {
break;
}

downloaded_so_far
} else {
let file_info = partial.metadata()?;
file_info.len()
cb(Event::DownloadDataReceived(&buf[..n]))?;
}
} else {
0
};

let mut possible_partial = OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)
.context("error opening file for download")?;

possible_partial.seek(SeekFrom::End(0))?;

(possible_partial, downloaded_so_far)
downloaded_so_far
} else {
let file_info = partial.metadata()?;
file_info.len()
}
} else {
(
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.context("error creating file for download")?,
0,
)
0
};

let file = RefCell::new(file);
// TODO: blocking call
let mut possible_partial = OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)
.context("error opening file for download")?;

download_with_backend(backend, url, resume_from, &|event| {
if let Event::DownloadDataReceived(data) = event {
file.borrow_mut()
.write_all(data)
.context("unable to write download to disk")?;
}
match callback {
Some(cb) => cb(event),
None => Ok(()),
}
})?;
possible_partial.seek(SeekFrom::End(0))?;

file.borrow_mut()
.sync_data()
.context("unable to sync download to disk")?;

Ok(())
}()
.map_err(|e| {
// TODO: We currently clear up the cached download on any error, should we restrict it to a subset?
if let Err(file_err) = remove_file(path).context("cleaning up cached downloads") {
file_err.context(e)
} else {
e
(possible_partial, downloaded_so_far)
} else {
(
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.context("error creating file for download")?,
0,
)
};

let file = RefCell::new(file);

// TODO: the sync callback will stall the async runtime if IO calls block, which is OS dependent. Rearrange.
download_with_backend(backend, url, resume_from, &|event| {
if let Event::DownloadDataReceived(data) = event {
file.borrow_mut()
.write_all(data)
.context("unable to write download to disk")?;
}
match callback {
Some(cb) => cb(event),
None => Ok(()),
}
})
.await?;

file.borrow_mut()
.sync_data()
.context("unable to sync download to disk")?;

Ok::<(), anyhow::Error>(())
}

#[cfg(all(not(feature = "reqwest-backend"), not(feature = "curl-backend")))]
Expand Down Expand Up @@ -285,15 +302,15 @@ pub mod reqwest_be {
use anyhow::{anyhow, Context, Result};
#[cfg(any(feature = "reqwest-rustls-tls", feature = "reqwest-default-tls"))]
use once_cell::sync::Lazy;
use reqwest::blocking::{Client, ClientBuilder, Response};
use reqwest::{header, Proxy};
use reqwest::{header, Client, ClientBuilder, Proxy, Response};
use tokio_stream::StreamExt;
use url::Url;

use super::Event;
use super::TlsBackend;
use crate::errors::*;

pub fn download(
pub async fn download(
url: &Url,
resume_from: u64,
callback: &dyn Fn(Event<'_>) -> Result<()>,
Expand All @@ -304,31 +321,26 @@ pub mod reqwest_be {
return Ok(());
}

let mut res = request(url, resume_from, tls).context("failed to make network request")?;
let res = request(url, resume_from, tls)
.await
.context("failed to make network request")?;

if !res.status().is_success() {
let code: u16 = res.status().into();
return Err(anyhow!(DownloadError::HttpStatus(u32::from(code))));
}

let buffer_size = 0x10000;
let mut buffer = vec![0u8; buffer_size];

if let Some(len) = res.headers().get(header::CONTENT_LENGTH) {
// TODO possible issues during unwrap?
let len = len.to_str().unwrap().parse::<u64>().unwrap() + resume_from;
if let Some(len) = res.content_length() {
let len = len + resume_from;
callback(Event::DownloadContentLengthReceived(len))?;
}

loop {
let bytes_read = io::Read::read(&mut res, &mut buffer)?;

if bytes_read != 0 {
callback(Event::DownloadDataReceived(&buffer[0..bytes_read]))?;
} else {
return Ok(());
}
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
let bytes = item?;
callback(Event::DownloadDataReceived(&bytes))?;
}
Ok(())
}

fn client_generic() -> ClientBuilder {
Expand Down Expand Up @@ -377,7 +389,7 @@ pub mod reqwest_be {
env_proxy::for_url(url).to_url()
}

fn request(
async fn request(
url: &Url,
resume_from: u64,
backend: TlsBackend,
Expand All @@ -402,7 +414,7 @@ pub mod reqwest_be {
req = req.header(header::RANGE, format!("bytes={resume_from}-"));
}

Ok(req.send()?)
Ok(req.send().await?)
}

fn download_from_file_url(
Expand Down
Loading
Loading