diff --git a/rustup-macros/src/lib.rs b/rustup-macros/src/lib.rs index 5ccb3ae5cf..70048983cd 100644 --- a/rustup-macros/src/lib.rs +++ b/rustup-macros/src/lib.rs @@ -39,11 +39,16 @@ pub fn integration_test( .into() } -/// Custom wrapper macro around `#[test]` and `#[tokio::test]` for unit tests. +/// Custom wrapper macro around `#[tokio::test]` for unit tests. /// /// Calls `rustup::test::before_test()` before the test body, and /// `rustup::test::after_test()` after, even in the event of an unwinding panic. -/// For async functions calls the async variants of these functions. +/// +/// This wrapper makes the underlying test function async even if it's sync in nature. +/// This ensures that a [`tokio`] runtime is always present during tests, +/// making it easier to setup [`tracing`] subscribers +/// (e.g. [`opentelemetry_otlp::OtlpTracePipeline`] always requires a [`tokio`] runtime to be +/// installed). #[proc_macro_attribute] pub fn unit_test( args: proc_macro::TokenStream, @@ -77,74 +82,44 @@ pub fn unit_test( .into() } -// False positive from clippy :/ -#[allow(clippy::redundant_clone)] fn test_inner(mod_path: String, mut input: ItemFn) -> syn::Result { - if input.sig.asyncness.is_some() { - let before_ident = format!("{}::before_test_async", mod_path); - let before_ident = syn::parse_str::(&before_ident)?; - let after_ident = format!("{}::after_test_async", mod_path); - let after_ident = syn::parse_str::(&after_ident)?; - - let inner = input.block; - let name = input.sig.ident.clone(); - let new_block: Block = parse_quote! { - { - #before_ident().await; - // Define a function with same name we can instrument inside the - // tracing enablement logic. - #[cfg_attr(feature = "otel", tracing::instrument(skip_all))] - async fn #name() { #inner } - // Thunk through a new thread to permit catching the panic - // without grabbing the entire state machine defined by the - // outer test function. - let result = ::std::panic::catch_unwind(||{ - let handle = tokio::runtime::Handle::current().clone(); - ::std::thread::spawn(move || handle.block_on(#name())).join().unwrap() - }); - #after_ident().await; - match result { - Ok(result) => result, - Err(err) => ::std::panic::resume_unwind(err) - } - } - }; + // Make the test function async even if it's sync. + input.sig.asyncness.get_or_insert_with(Default::default); - input.block = Box::new(new_block); + let before_ident = format!("{}::before_test_async", mod_path); + let before_ident = syn::parse_str::(&before_ident)?; + let after_ident = format!("{}::after_test_async", mod_path); + let after_ident = syn::parse_str::(&after_ident)?; - Ok(quote! { + let inner = input.block; + let name = input.sig.ident.clone(); + let new_block: Block = parse_quote! { + { + let _guard = #before_ident().await; + // Define a function with same name we can instrument inside the + // tracing enablement logic. #[cfg_attr(feature = "otel", tracing::instrument(skip_all))] - #[::tokio::test(flavor = "multi_thread", worker_threads = 1)] - #input - }) - } else { - let before_ident = format!("{}::before_test", mod_path); - let before_ident = syn::parse_str::(&before_ident)?; - let after_ident = format!("{}::after_test", mod_path); - let after_ident = syn::parse_str::(&after_ident)?; - - let inner = input.block; - let name = input.sig.ident.clone(); - let new_block: Block = parse_quote! { - { - #before_ident(); - // Define a function with same name we can instrument inside the - // tracing enablement logic. - #[cfg_attr(feature = "otel", tracing::instrument(skip_all))] - fn #name() { #inner } - let result = ::std::panic::catch_unwind(#name); - #after_ident(); - match result { - Ok(result) => result, - Err(err) => ::std::panic::resume_unwind(err) - } + async fn #name() { #inner } + // Thunk through a new thread to permit catching the panic + // without grabbing the entire state machine defined by the + // outer test function. + let result = ::std::panic::catch_unwind(||{ + let handle = tokio::runtime::Handle::current().clone(); + ::std::thread::spawn(move || handle.block_on(#name())).join().unwrap() + }); + #after_ident().await; + match result { + Ok(result) => result, + Err(err) => ::std::panic::resume_unwind(err) } - }; + } + }; - input.block = Box::new(new_block); - Ok(quote! { - #[::std::prelude::v1::test] - #input - }) - } + input.block = Box::new(new_block); + + Ok(quote! { + #[cfg_attr(feature = "otel", tracing::instrument(skip_all))] + #[::tokio::test(flavor = "multi_thread", worker_threads = 1)] + #input + }) } diff --git a/src/bin/rustup-init.rs b/src/bin/rustup-init.rs index 57b9057be5..b05cbdaade 100644 --- a/src/bin/rustup-init.rs +++ b/src/bin/rustup-init.rs @@ -60,34 +60,17 @@ async fn maybe_trace_rustup() -> Result { } #[cfg(feature = "otel")] { - use std::time::Duration; - - use opentelemetry::{global, KeyValue}; - use opentelemetry_otlp::WithExportConfig; - use opentelemetry_sdk::{ - propagation::TraceContextPropagator, - trace::{self, Sampler}, - Resource, + use tracing_subscriber::{layer::SubscriberExt, Registry}; + + let telemetry = { + use opentelemetry::global; + use opentelemetry_sdk::propagation::TraceContextPropagator; + + global::set_text_map_propagator(TraceContextPropagator::new()); + rustup::cli::log::telemetry() }; - use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry}; - - global::set_text_map_propagator(TraceContextPropagator::new()); - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_timeout(Duration::from_secs(3)), - ) - .with_trace_config( - trace::config() - .with_sampler(Sampler::AlwaysOn) - .with_resource(Resource::new(vec![KeyValue::new("service.name", "rustup")])), - ) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; - let env_filter = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - let subscriber = Registry::default().with(env_filter).with(telemetry); + + let subscriber = Registry::default().with(telemetry); tracing::subscriber::set_global_default(subscriber)?; let result = run_rustup().await; // We're tracing, so block until all spans are exported. diff --git a/src/cli/log.rs b/src/cli/log.rs index 48f1c6d6a4..2a3c68a327 100644 --- a/src/cli/log.rs +++ b/src/cli/log.rs @@ -1,5 +1,13 @@ -use std::fmt; -use std::io::Write; +use std::{fmt, io::Write}; + +#[cfg(feature = "otel")] +use once_cell::sync::Lazy; +#[cfg(feature = "otel")] +use opentelemetry_sdk::trace::Tracer; +#[cfg(feature = "otel")] +use tracing::Subscriber; +#[cfg(feature = "otel")] +use tracing_subscriber::{registry::LookupSpan, EnvFilter, Layer}; use crate::currentprocess::{process, terminalsource}; @@ -71,3 +79,48 @@ pub(crate) fn debug_fmt(args: fmt::Arguments<'_>) { let _ = writeln!(t.lock()); } } + +/// A [`tracing::Subscriber`] [`Layer`][`tracing_subscriber::Layer`] that corresponds to Rustup's +/// optional `opentelemetry` (a.k.a. `otel`) feature. +#[cfg(feature = "otel")] +pub fn telemetry() -> impl Layer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + // NOTE: This reads from the real environment variables instead of `process().var_os()`. + let env_filter = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); + tracing_opentelemetry::layer() + .with_tracer(TELEMETRY_DEFAULT_TRACER.clone()) + .with_filter(env_filter) +} + +/// The default `opentelemetry` tracer used across Rustup. +/// +/// # Note +/// The initializer function will panic if not called within the context of a [`tokio`] runtime. +#[cfg(feature = "otel")] +static TELEMETRY_DEFAULT_TRACER: Lazy = Lazy::new(|| { + use std::time::Duration; + + use opentelemetry::KeyValue; + use opentelemetry_otlp::WithExportConfig; + use opentelemetry_sdk::{ + trace::{self, Sampler}, + Resource, + }; + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_timeout(Duration::from_secs(3)), + ) + .with_trace_config( + trace::config() + .with_sampler(Sampler::AlwaysOn) + .with_resource(Resource::new(vec![KeyValue::new("service.name", "rustup")])), + ) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("error installing `OtlpTracePipeline` in the current `tokio` runtime") +}); diff --git a/src/test.rs b/src/test.rs index 617b765cea..a7a35719e8 100644 --- a/src/test.rs +++ b/src/test.rs @@ -224,85 +224,32 @@ where f(&rustup_home) } -#[cfg(feature = "otel")] -use once_cell::sync::Lazy; - -/// A tokio runtime for the sync tests, permitting the use of tracing. This is -/// never shutdown, instead it is just dropped at end of process. -#[cfg(feature = "otel")] -static TRACE_RUNTIME: Lazy = - Lazy::new(|| tokio::runtime::Runtime::new().unwrap()); -/// A tracer for the tests. -#[cfg(feature = "otel")] -static TRACER: Lazy = Lazy::new(|| { - use std::time::Duration; - - use opentelemetry::{global, KeyValue}; - use opentelemetry_otlp::WithExportConfig; - use opentelemetry_sdk::{ - propagation::TraceContextPropagator, - trace::{self, Sampler}, - Resource, - }; - use tokio::runtime::Handle; - use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Registry}; - - // Use the current runtime, or the sync test runtime otherwise. - let handle = match Handle::try_current() { - Ok(handle) => handle, - Err(_) => TRACE_RUNTIME.handle().clone(), - }; - let _guard = handle.enter(); - - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_timeout(Duration::from_secs(3)), - ) - .with_trace_config( - trace::config() - .with_sampler(Sampler::AlwaysOn) - .with_resource(Resource::new(vec![KeyValue::new("service.name", "rustup")])), - ) - .install_batch(opentelemetry_sdk::runtime::Tokio) - .unwrap(); - - global::set_text_map_propagator(TraceContextPropagator::new()); - let env_filter = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer.clone()); - let subscriber = Registry::default().with(env_filter).with(telemetry); - tracing::subscriber::set_global_default(subscriber).unwrap(); - tracer -}); - -pub fn before_test() { +pub async fn before_test_async() -> Option { #[cfg(feature = "otel")] { - Lazy::force(&TRACER); - } -} + use tracing_subscriber::{layer::SubscriberExt, Registry}; -pub async fn before_test_async() { - #[cfg(feature = "otel")] - { - Lazy::force(&TRACER); - } -} + let telemetry = { + use opentelemetry::global; + use opentelemetry_sdk::propagation::TraceContextPropagator; -pub fn after_test() { - #[cfg(feature = "otel")] + global::set_text_map_propagator(TraceContextPropagator::new()); + crate::cli::log::telemetry() + }; + + let subscriber = Registry::default().with(telemetry); + Some(tracing::subscriber::set_default(subscriber)) + } + #[cfg(not(feature = "otel"))] { - let handle = TRACE_RUNTIME.handle(); - let _guard = handle.enter(); - TRACER.provider().map(|p| p.force_flush()); + None } } pub async fn after_test_async() { #[cfg(feature = "otel")] { - TRACER.provider().map(|p| p.force_flush()); + // We're tracing, so block until all spans are exported. + opentelemetry::global::shutdown_tracer_provider(); } } diff --git a/src/test/mock/clitools.rs b/src/test/mock/clitools.rs index 731e0c97e4..9c3236fcb6 100644 --- a/src/test/mock/clitools.rs +++ b/src/test/mock/clitools.rs @@ -16,11 +16,8 @@ use std::{ use enum_map::{enum_map, Enum, EnumMap}; use once_cell::sync::Lazy; -use tokio::runtime::Builder; use url::Url; -use crate::cli::rustup_mode; -use crate::currentprocess; use crate::test as rustup_test; use crate::test::const_dist_dir; use crate::test::this_host_triple; @@ -679,13 +676,8 @@ impl Config { I: IntoIterator + Clone + Debug, A: AsRef, { - let inprocess = allow_inprocess(name, args.clone()); let start = Instant::now(); - let out = if inprocess { - self.run_inprocess(name, args.clone(), env) - } else { - self.run_subprocess(name, args.clone(), env) - }; + let out = self.run_subprocess(name, args.clone(), env); let duration = Instant::now() - start; let output = SanitizedOutput { ok: matches!(out.status, Some(0)), @@ -694,7 +686,6 @@ impl Config { }; println!("ran: {} {:?}", name, args); - println!("inprocess: {inprocess}"); println!("status: {:?}", out.status); println!("duration: {:.3}s", duration.as_secs_f32()); println!("stdout:\n====\n{}\n====\n", output.stdout); @@ -703,54 +694,6 @@ impl Config { output } - #[cfg_attr(feature = "otel", tracing::instrument(skip_all))] - pub(crate) fn run_inprocess(&self, name: &str, args: I, env: &[(&str, &str)]) -> Output - where - I: IntoIterator, - A: AsRef, - { - // should we use vars_os, or skip over non-stringable vars? This is test - // code after all... - let mut vars: HashMap = HashMap::default(); - self::env(self, &mut vars); - vars.extend(env.iter().map(|(k, v)| (k.to_string(), v.to_string()))); - let mut arg_strings: Vec> = Vec::new(); - arg_strings.push(name.to_owned().into_boxed_str()); - for arg in args { - arg_strings.push( - arg.as_ref() - .to_os_string() - .into_string() - .unwrap() - .into_boxed_str(), - ); - } - let tp = currentprocess::TestProcess::new(&*self.workdir.borrow(), &arg_strings, vars, ""); - let mut builder = Builder::new_multi_thread(); - builder - .enable_all() - .worker_threads(2) - .max_blocking_threads(2); - let process_res = currentprocess::with_runtime( - tp.clone().into(), - builder, - rustup_mode::main(tp.cwd.clone()), - ); - // convert Err's into an ec - let ec = match process_res { - Ok(process_res) => process_res, - Err(e) => { - currentprocess::with(tp.clone().into(), || crate::cli::common::report_error(&e)); - utils::ExitCode(1) - } - }; - Output { - status: Some(ec.0), - stderr: tp.get_stderr(), - stdout: tp.get_stdout(), - } - } - #[track_caller] pub fn run_subprocess(&self, name: &str, args: I, env: &[(&str, &str)]) -> Output where @@ -854,43 +797,6 @@ pub fn env(config: &Config, cmd: &mut E) { config.env(cmd) } -fn allow_inprocess(name: &str, args: I) -> bool -where - I: IntoIterator, - A: AsRef, -{ - // Only the rustup alias is currently ready for in-process testing: - // - -init performs self-update which monkey with global external state. - // - proxies themselves behave appropriately the proxied output needs to be - // collected for assertions to be made on it as our tests traverse layers. - // - self update executions cannot run in-process because on windows the - // process replacement dance would replace the test process. - // - any command with --version in it is testing to see something was - // installed properly, so we have to shell out to it to be sure - if name != "rustup" { - return false; - } - let mut is_update = false; - let mut no_self_update = false; - let mut self_cmd = false; - let mut run = false; - let mut version = false; - for arg in args { - if arg.as_ref() == "update" { - is_update = true; - } else if arg.as_ref() == "--no-self-update" { - no_self_update = true; - } else if arg.as_ref() == "self" { - self_cmd = true; - } else if arg.as_ref() == "run" { - run = true; - } else if arg.as_ref() == "--version" { - version = true; - } - } - !(run || self_cmd || version || (is_update && !no_self_update)) -} - #[derive(Copy, Clone, Eq, PartialEq)] enum RlsStatus { Available,