Skip to content

proc_macro: use crossbeam channels for the proc_macro cross-thread bridge #99123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3885,6 +3885,7 @@ dependencies = [
name = "rustc_expand"
version = "0.0.0"
dependencies = [
"crossbeam-channel",
"rustc_ast",
"rustc_ast_passes",
"rustc_ast_pretty",
Expand Down
1 change: 1 addition & 0 deletions compiler/rustc_expand/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ rustc_parse = { path = "../rustc_parse" }
rustc_session = { path = "../rustc_session" }
smallvec = { version = "1.8.1", features = ["union", "may_dangle"] }
rustc_ast = { path = "../rustc_ast" }
crossbeam-channel = "0.5.0"
44 changes: 37 additions & 7 deletions compiler/rustc_expand/src/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,37 @@ use rustc_ast::tokenstream::{TokenStream, TokenTree};
use rustc_data_structures::sync::Lrc;
use rustc_errors::ErrorGuaranteed;
use rustc_parse::parser::ForceCollect;
use rustc_session::config::ProcMacroExecutionStrategy;
use rustc_span::profiling::SpannedEventArgRecorder;
use rustc_span::{Span, DUMMY_SP};

const EXEC_STRATEGY: pm::bridge::server::SameThread = pm::bridge::server::SameThread;
struct CrossbeamMessagePipe<T> {
tx: crossbeam_channel::Sender<T>,
rx: crossbeam_channel::Receiver<T>,
}

impl<T> pm::bridge::server::MessagePipe<T> for CrossbeamMessagePipe<T> {
fn new() -> (Self, Self) {
let (tx1, rx1) = crossbeam_channel::bounded(1);
let (tx2, rx2) = crossbeam_channel::bounded(1);
(CrossbeamMessagePipe { tx: tx1, rx: rx2 }, CrossbeamMessagePipe { tx: tx2, rx: rx1 })
}

fn send(&mut self, value: T) {
self.tx.send(value).unwrap();
}

fn recv(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}

fn exec_strategy(ecx: &ExtCtxt<'_>) -> impl pm::bridge::server::ExecutionStrategy {
pm::bridge::server::MaybeCrossThread::<CrossbeamMessagePipe<_>>::new(
ecx.sess.opts.unstable_opts.proc_macro_execution_strategy
== ProcMacroExecutionStrategy::CrossThread,
)
}

pub struct BangProcMacro {
pub client: pm::bridge::client::Client<pm::TokenStream, pm::TokenStream>,
Expand All @@ -30,8 +57,9 @@ impl base::BangProcMacro for BangProcMacro {
});

let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace).map_err(|e| {
self.client.run(&strategy, server, input, proc_macro_backtrace).map_err(|e| {
let mut err = ecx.struct_span_err(span, "proc macro panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
Expand Down Expand Up @@ -59,16 +87,17 @@ impl base::AttrProcMacro for AttrProcMacro {
});

let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client
.run(&EXEC_STRATEGY, server, annotation, annotated, proc_macro_backtrace)
.map_err(|e| {
self.client.run(&strategy, server, annotation, annotated, proc_macro_backtrace).map_err(
|e| {
let mut err = ecx.struct_span_err(span, "custom attribute panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
}
err.emit()
})
},
)
}
}

Expand Down Expand Up @@ -105,8 +134,9 @@ impl MultiItemModifier for DeriveProcMacro {
recorder.record_arg_with_span(ecx.expansion_descr(), span);
});
let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
match self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace) {
match self.client.run(&strategy, server, input, proc_macro_backtrace) {
Ok(stream) => stream,
Err(e) => {
let mut err = ecx.struct_span_err(span, "proc-macro derive panicked");
Expand Down
3 changes: 2 additions & 1 deletion compiler/rustc_interface/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rustc_session::config::{
};
use rustc_session::config::{
BranchProtection, Externs, OomStrategy, OutputType, OutputTypes, PAuthKey, PacRet,
SymbolManglingVersion, WasiExecModel,
ProcMacroExecutionStrategy, SymbolManglingVersion, WasiExecModel,
};
use rustc_session::config::{CFGuard, ExternEntry, LinkerPluginLto, LtoCli, SwitchWithOptPath};
use rustc_session::lint::Level;
Expand Down Expand Up @@ -685,6 +685,7 @@ fn test_unstable_options_tracking_hash() {
untracked!(print_mono_items, Some(String::from("abc")));
untracked!(print_type_sizes, true);
untracked!(proc_macro_backtrace, true);
untracked!(proc_macro_execution_strategy, ProcMacroExecutionStrategy::CrossThread);
untracked!(query_dep_graph, true);
untracked!(save_analysis, true);
untracked!(self_profile, SwitchWithOptPath::Enabled(None));
Expand Down
10 changes: 10 additions & 0 deletions compiler/rustc_session/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2959,3 +2959,13 @@ impl OomStrategy {
}
}
}

/// How to run proc-macro code when building this crate
#[derive(Clone, Copy, PartialEq, Hash, Debug)]
pub enum ProcMacroExecutionStrategy {
/// Run the proc-macro code on the same thread as the server.
SameThread,

/// Run the proc-macro code on a different thread.
CrossThread,
}
17 changes: 17 additions & 0 deletions compiler/rustc_session/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ mod desc {
"one of (`none` (default), `basic`, `strong`, or `all`)";
pub const parse_branch_protection: &str =
"a `,` separated combination of `bti`, `b-key`, `pac-ret`, or `leaf`";
pub const parse_proc_macro_execution_strategy: &str =
"one of supported execution strategies (`same-thread`, or `cross-thread`)";
}

mod parse {
Expand Down Expand Up @@ -1062,6 +1064,18 @@ mod parse {
}
true
}

pub(crate) fn parse_proc_macro_execution_strategy(
slot: &mut ProcMacroExecutionStrategy,
v: Option<&str>,
) -> bool {
*slot = match v {
Some("same-thread") => ProcMacroExecutionStrategy::SameThread,
Some("cross-thread") => ProcMacroExecutionStrategy::CrossThread,
_ => return false,
};
true
}
}

options! {
Expand Down Expand Up @@ -1457,6 +1471,9 @@ options! {
"print layout information for each type encountered (default: no)"),
proc_macro_backtrace: bool = (false, parse_bool, [UNTRACKED],
"show backtraces for panics during proc-macro execution (default: no)"),
proc_macro_execution_strategy: ProcMacroExecutionStrategy = (ProcMacroExecutionStrategy::SameThread,
parse_proc_macro_execution_strategy, [UNTRACKED],
"how to run proc-macro code (default: same-thread)"),
Comment on lines +1474 to +1476
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnkfelix I'm running out of ideas... what if you move this to the end? (as in, maybe the order of the fields or their offsets, in this huge struct, is causing weird things)

It could even be its presence - the flag could be removed and exec_strategy (the sole user of the flag, in compiler/rustc_expand/src/proc_macro.rs) made to, idk, use std::env::var instead? (or just hardcode the choice, but that might cause changes in optimizations)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wacky! I moved the field to the end of the struct, and that made the two methods with the two highest instruction counts (parse_tt and MacroRulesMacroExpander::expand) each have lower instruction-counts (Ir) according to cachegrind, but the overall instruction-count increased:

Instruction Counts 8f68c43c bd84c73f field-at-end
TOTAL 988,908,196 (100.0%) 1,002,719,736 (100.0%) 1,138,908,984 (100.0%)
parse_tt 134,903,906 (13.64%) 149,152,914 (14.87%) 99,794,157 (8.76%)
MacroRules
MacroExpander
::expand
56,387,220 (5.70%) 56,379,944 (5.62%) 50,969,953 (4.48%)
SipHasher128::finish128 5,975,227 (0.60%) 5,975,227 (0.60%) 36,508,882 (3.21%)
token_name_eq (not in report) (not in report) 29,133,905 (2.56%)
try_mark_previous_green 53,862,272 (5.45%) 53,872,906 (5.37%) 25,344,946 (2.23%)

I don't really want to spend too much more trying to dissect this thing that is probably just an LLVM codegen oddity, but I will at least try eliminating the field entirely as you suggest.

(As an aside, the fact that this kind of field shuffling had such an enormous impact on parse_tt does make me wonder if there's some kind of low-hanging fruit embedded in the code there, e.g. maybe the code is accessing the session when it should be locally stashing an unchanging value? But isn't that the kind of thing you'd like to believe your compiler is doing for you automatically? 😆 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that the code is hot, and also that register spilling is happening in a different way or inlining choices are a little different. I.e. the kind of thing that's hard to control, and even if you manage to get it in the faster state now a tiny change later on might make it go back to the slower state.

profile: bool = (false, parse_bool, [TRACKED],
"insert profiling code (default: no)"),
profile_closures: bool = (false, parse_no_flag, [UNTRACKED],
Expand Down
136 changes: 64 additions & 72 deletions library/proc_macro/src/bridge/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use super::*;

use std::marker::PhantomData;

// FIXME(eddyb) generate the definition of `HandleStore` in `server.rs`.
use super::client::HandleStore;

Expand Down Expand Up @@ -143,6 +145,41 @@ pub trait ExecutionStrategy {
) -> Buffer;
}

pub struct MaybeCrossThread<P> {
cross_thread: bool,
marker: PhantomData<P>,
}

impl<P> MaybeCrossThread<P> {
pub const fn new(cross_thread: bool) -> Self {
MaybeCrossThread { cross_thread, marker: PhantomData }
}
}

impl<P> ExecutionStrategy for MaybeCrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
if self.cross_thread {
<CrossThread<P>>::new().run_bridge_and_client(
dispatcher,
input,
run_client,
force_show_panics,
)
} else {
SameThread.run_bridge_and_client(dispatcher, input, run_client, force_show_panics)
}
}
}

pub struct SameThread;

impl ExecutionStrategy for SameThread {
Expand All @@ -164,28 +201,31 @@ impl ExecutionStrategy for SameThread {
}
}

// NOTE(eddyb) Two implementations are provided, the second one is a bit
// faster but neither is anywhere near as fast as same-thread execution.
pub struct CrossThread<P>(PhantomData<P>);

pub struct CrossThread1;
impl<P> CrossThread<P> {
pub const fn new() -> Self {
CrossThread(PhantomData)
}
}

impl ExecutionStrategy for CrossThread1 {
impl<P> ExecutionStrategy for CrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::mpsc::channel;

let (req_tx, req_rx) = channel();
let (res_tx, res_rx) = channel();
let (mut server, mut client) = P::new();

let join_handle = thread::spawn(move || {
let mut dispatch = |buf| {
req_tx.send(buf).unwrap();
res_rx.recv().unwrap()
let mut dispatch = |b: Buffer| -> Buffer {
client.send(b);
client.recv().expect("server died while client waiting for reply")
};

run_client(BridgeConfig {
Expand All @@ -196,75 +236,27 @@ impl ExecutionStrategy for CrossThread1 {
})
});

for b in req_rx {
res_tx.send(dispatcher.dispatch(b)).unwrap();
while let Some(b) = server.recv() {
server.send(dispatcher.dispatch(b));
}

join_handle.join().unwrap()
}
}

pub struct CrossThread2;
/// A message pipe used for communicating between server and client threads.
pub trait MessagePipe<T>: Sized {
/// Create a new pair of endpoints for the message pipe.
fn new() -> (Self, Self);

impl ExecutionStrategy for CrossThread2 {
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::{Arc, Mutex};

enum State<T> {
Req(T),
Res(T),
}

let mut state = Arc::new(Mutex::new(State::Res(Buffer::new())));

let server_thread = thread::current();
let state2 = state.clone();
let join_handle = thread::spawn(move || {
let mut dispatch = |b| {
*state2.lock().unwrap() = State::Req(b);
server_thread.unpark();
loop {
thread::park();
if let State::Res(b) = &mut *state2.lock().unwrap() {
break b.take();
}
}
};

let r = run_client(BridgeConfig {
input,
dispatch: (&mut dispatch).into(),
force_show_panics,
_marker: marker::PhantomData,
});

// Wake up the server so it can exit the dispatch loop.
drop(state2);
server_thread.unpark();

r
});

// Check whether `state2` was dropped, to know when to stop.
while Arc::get_mut(&mut state).is_none() {
thread::park();
let mut b = match &mut *state.lock().unwrap() {
State::Req(b) => b.take(),
_ => continue,
};
b = dispatcher.dispatch(b.take());
*state.lock().unwrap() = State::Res(b);
join_handle.thread().unpark();
}
/// Send a message to the other endpoint of this pipe.
fn send(&mut self, value: T);

join_handle.join().unwrap()
}
/// Receive a message from the other endpoint of this pipe.
///
/// Returns `None` if the other end of the pipe has been destroyed, and no
/// message was received.
fn recv(&mut self) -> Option<T>;
}

fn run_server<
Expand Down
1 change: 1 addition & 0 deletions src/test/rustdoc-ui/z-help.stdout
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
-Z print-mono-items=val -- print the result of the monomorphization collection pass
-Z print-type-sizes=val -- print layout information for each type encountered (default: no)
-Z proc-macro-backtrace=val -- show backtraces for panics during proc-macro execution (default: no)
-Z proc-macro-execution-strategy=val -- how to run proc-macro code (default: same-thread)
-Z profile=val -- insert profiling code (default: no)
-Z profile-closures=val -- profile size of closures
-Z profile-emit=val -- file path to emit profiling data at runtime when using 'profile' (default based on relative source path)
Expand Down