From d260c2df67024a7a67c0ee3b76f1058bed8cf956 Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 9 Apr 2024 22:01:31 -0700 Subject: [PATCH 01/29] Base implementation --- Cargo.toml | 3 + src/lib.rs | 209 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 6920042..c4c8ce5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,9 @@ keywords = ["asynchronous", "executor", "single", "multi", "spawn"] categories = ["asynchronous", "concurrency"] exclude = ["/.*"] +[features] +leak = [] + [dependencies] async-task = "4.4.0" concurrent-queue = "2.0.0" diff --git a/src/lib.rs b/src/lib.rs index 0c51fd5..4641049 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -425,6 +425,25 @@ impl<'a> Executor<'a> { } } +impl Executor<'static> { + /// Consumes the [`Executor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`LeakedExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + #[cfg(feature = "leak")] + pub fn leak(self) -> LeakedExecutor { + let ptr = self.state_ptr().cast(); + std::mem::forget(self); + LeakedExecutor { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + state: unsafe { &*ptr }, + } + } +} + impl Drop for Executor<'_> { fn drop(&mut self) { let ptr = *self.state.get_mut(); @@ -681,6 +700,191 @@ impl<'a> Default for LocalExecutor<'a> { } } +/// A leaked async [`Executor`]. +/// +/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but several +/// operations are optimized to require fewer operations when spawning, running, and +/// finishing tasks. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned +/// tasks to permanently leak. Any tasks at the time will not be cancelled. +/// +/// Unlike [`Executor`], this type trivially implements both [`Clone`] and [`Copy`]> +/// +/// This type *cannot* be converted back to a `Executor`. +#[derive(Copy, Clone)] +#[cfg(feature = "leak")] +pub struct LeakedExecutor { + state: &'static State, +} + +// SAFETY: Executor stores no thread local state that can be accessed via other thread. +#[cfg(feature = "leak")] +unsafe impl Send for LeakedExecutor {} +// SAFETY: Executor internally synchronizes all of it's operations internally. +#[cfg(feature = "leak")] +unsafe impl Sync for LeakedExecutor {} + +#[cfg(feature = "leak")] +impl UnwindSafe for LeakedExecutor {} +#[cfg(feature = "leak")] +impl RefUnwindSafe for LeakedExecutor {} + +#[cfg(feature = "leak")] +impl fmt::Debug for LeakedExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(self.state, "Executor", f) + } +} + +#[cfg(feature = "leak")] +impl LeakedExecutor { + /// Spawns a task onto the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// + /// let ex = Executor::new(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> Task { + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is `'static`. + // + // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// + /// let ex = Executor::new(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + match self.state.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.state.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + let runnable = Ticker::new(self.state).runnable().await; + runnable.run(); + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new(); + /// + /// let task = ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self.state); + let mut rng = fastrand::Rng::new(); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } +} + /// The state of a executor. struct State { /// The global queue. @@ -1068,6 +1272,11 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_ // in state_ptr. let state = unsafe { &*ptr }; + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { /// Debug wrapper for the number of active tasks. struct ActiveTasks<'a>(&'a Mutex>); From 99c737d508fc0c4de9109ed6cb6957ae0e7f3d4d Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 9 Apr 2024 23:08:22 -0700 Subject: [PATCH 02/29] Add benchmarks --- Cargo.toml | 1 + benches/executor.rs | 301 ++++++++++++++++++++++++++++++-------------- 2 files changed, 209 insertions(+), 93 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c4c8ce5..89c2531 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,3 +40,4 @@ once_cell = "1.16.0" [[bench]] name = "executor" harness = false +required-features = ["leak"] diff --git a/benches/executor.rs b/benches/executor.rs index 791610f..78b439d 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -1,6 +1,6 @@ use std::thread::available_parallelism; -use async_executor::Executor; +use async_executor::{Executor, LeakedExecutor}; use criterion::{criterion_group, criterion_main, Criterion}; use futures_lite::{future, prelude::*}; @@ -26,6 +26,22 @@ fn run(f: impl FnOnce(), multithread: bool) { }); } +fn run_leaked(executor: LeakedExecutor, f: impl FnOnce(), multithread: bool) { + let limit = if multithread { + available_parallelism().unwrap().get() + } else { + 1 + }; + + let (s, r) = async_channel::bounded::<()>(1); + easy_parallel::Parallel::new() + .each(0..limit, |_| future::block_on(executor.run(r.recv()))) + .finish(move || { + let _s = s; + f() + }); +} + fn create(c: &mut Criterion) { c.bench_function("executor::create", |b| { b.iter(|| { @@ -37,108 +53,207 @@ fn create(c: &mut Criterion) { } fn running_benches(c: &mut Criterion) { - for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { - let mut group = c.benchmark_group(group_name.to_string()); - - group.bench_function("executor::spawn_one", |b| { - run( - || { - b.iter(|| { - future::block_on(async { EX.spawn(async {}).await }); - }); - }, - *multithread, - ); - }); + let leaked = Executor::new().leak(); + for with_leaked in [false, true] { + for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { + let prefix = if with_leaked { + "leaked_executor" + } else { + "executor" + }; + let mut group = c.benchmark_group(group_name.to_string()); - group.bench_function("executor::spawn_batch", |b| { - run( - || { - let mut handles = vec![]; + group.bench_function(format!("{}::spawn_one", prefix), |b| { + if with_leaked { + run_leaked( + leaked, + || { + b.iter(|| { + future::block_on(async { leaked.spawn(async {}).await }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(|| { + future::block_on(async { EX.spawn(async {}).await }); + }); + }, + *multithread, + ); + } + }); - b.iter(|| { - EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); - }); + if !with_leaked { + group.bench_function("executor::spawn_batch", |b| { + run( + || { + let mut handles = vec![]; - handles.clear(); - }, - *multithread, - ) - }); + b.iter(|| { + EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); + }); - group.bench_function("executor::spawn_many_local", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..LIGHT_TASKS { - tasks.push(EX.spawn(async {})); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); + handles.clear(); + }, + *multithread, + ) + }); + } + + group.bench_function(format!("{}::spawn_many_local", prefix), |b| { + if with_leaked { + run_leaked( + leaked, + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(leaked.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..LIGHT_TASKS { + tasks.push(EX.spawn(async {})); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); - group.bench_function("executor::spawn_recursively", |b| { - #[allow(clippy::manual_async_fn)] - fn go(i: usize) -> impl Future + Send + 'static { - async move { - if i != 0 { - EX.spawn(async move { - let fut = go(i - 1).boxed(); - fut.await; - }) - .await; + group.bench_function(format!("{}::spawn_recursively", prefix),|b| { + #[allow(clippy::manual_async_fn)] + fn go(i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + EX.spawn(async move { + let fut = go(i - 1).boxed(); + fut.await; + }) + .await; + } } } - } - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(go(STEPS))); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); + #[allow(clippy::manual_async_fn)] + fn go_leaked(executor: LeakedExecutor, i: usize) -> impl Future + Send + 'static { + async move { + if i != 0 { + executor.spawn(async move { + let fut = go_leaked(executor, i - 1).boxed(); + fut.await; + }) + .await; + } + } + } - group.bench_function("executor::yield_now", |b| { - run( - || { - b.iter(move || { - future::block_on(async { - let mut tasks = Vec::new(); - for _ in 0..TASKS { - tasks.push(EX.spawn(async move { - for _ in 0..STEPS { - future::yield_now().await; + if with_leaked { + run_leaked( + leaked, + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(leaked.spawn(go_leaked(leaked, STEPS))); } - })); - } - for task in tasks { - task.await; - } - }); - }); - }, - *multithread, - ); - }); + for task in tasks { + task.await; + } + }); + }); + }, + *multithread + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(go(STEPS))); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + + group.bench_function(format!("{}::yield_now", prefix), |b| { + if with_leaked { + run_leaked( + leaked, + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(leaked.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread + ); + } else { + run( + || { + b.iter(move || { + future::block_on(async { + let mut tasks = Vec::new(); + for _ in 0..TASKS { + tasks.push(EX.spawn(async move { + for _ in 0..STEPS { + future::yield_now().await; + } + })); + } + for task in tasks { + task.await; + } + }); + }); + }, + *multithread, + ); + } + }); + } } } From dd81438914a28d67424128b47219c1beb22a16dc Mon Sep 17 00:00:00 2001 From: james7132 Date: Tue, 9 Apr 2024 23:23:38 -0700 Subject: [PATCH 03/29] Cleanup lifetimes and safety comments --- benches/executor.rs | 22 +++++++++++++--------- src/lib.rs | 27 ++++++++++----------------- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/benches/executor.rs b/benches/executor.rs index 78b439d..f7261f0 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -142,7 +142,7 @@ fn running_benches(c: &mut Criterion) { } }); - group.bench_function(format!("{}::spawn_recursively", prefix),|b| { + group.bench_function(format!("{}::spawn_recursively", prefix), |b| { #[allow(clippy::manual_async_fn)] fn go(i: usize) -> impl Future + Send + 'static { async move { @@ -157,14 +157,18 @@ fn running_benches(c: &mut Criterion) { } #[allow(clippy::manual_async_fn)] - fn go_leaked(executor: LeakedExecutor, i: usize) -> impl Future + Send + 'static { + fn go_leaked( + executor: LeakedExecutor, + i: usize, + ) -> impl Future + Send + 'static { async move { if i != 0 { - executor.spawn(async move { - let fut = go_leaked(executor, i - 1).boxed(); - fut.await; - }) - .await; + executor + .spawn(async move { + let fut = go_leaked(executor, i - 1).boxed(); + fut.await; + }) + .await; } } } @@ -185,7 +189,7 @@ fn running_benches(c: &mut Criterion) { }); }); }, - *multithread + *multithread, ); } else { run( @@ -228,7 +232,7 @@ fn running_benches(c: &mut Criterion) { }); }); }, - *multithread + *multithread, ); } else { run( diff --git a/src/lib.rs b/src/lib.rs index 4641049..27d5859 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -433,7 +433,7 @@ impl Executor<'static> { /// when spawning, running, and finishing tasks. #[cfg(feature = "leak")] pub fn leak(self) -> LeakedExecutor { - let ptr = self.state_ptr().cast(); + let ptr = self.state_ptr(); std::mem::forget(self); LeakedExecutor { // SAFETY: So long as an Executor lives, it's state pointer will always be valid @@ -753,28 +753,21 @@ impl LeakedExecutor { /// println!("Hello world"); /// }); /// ``` - pub fn spawn( + pub fn spawn<'a, T: Send + 'a>( &self, - future: impl Future + Send + 'static, + future: impl Future + Send + 'a, ) -> Task { // Create the task and register it in the set of active tasks. // // SAFETY: // - // If `future` is not `Send`, this must be a `LocalExecutor` as per this - // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, - // `try_tick`, `tick` and `run` can only be called from the origin - // thread of the `LocalExecutor`. Similarly, `spawn` can only be called - // from the origin thread, ensuring that `future` and the executor share - // the same origin thread. The `Runnable` can be scheduled from other - // threads, but because of the above `Runnable` can only be called or - // dropped on the origin thread. - // - // `future` is `'static`. - // - // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. - // Therefore we do not need to worry about what is done with the - // `Waker`. + // - `future` is `Send` + // - `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. The executor cannot be dropped, and thus the `Runnable` + // must outlive 'a. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. let (runnable, task) = unsafe { Builder::new() .propagate_panic(true) From b60069a36b060f84c9ecc98c26ac90f488b2b06e Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 00:19:08 -0700 Subject: [PATCH 04/29] Cleanup docs --- src/lib.rs | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 27d5859..cdacc9a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -702,15 +702,16 @@ impl<'a> Default for LocalExecutor<'a> { /// A leaked async [`Executor`]. /// -/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but several -/// operations are optimized to require fewer operations when spawning, running, and -/// finishing tasks. +/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and +/// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A leaked executor may require signficantly less overhead in both single-threaded and +/// mulitthreaded use cases. /// /// As this type does not implement `Drop`, losing the handle to the executor or failing /// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned /// tasks to permanently leak. Any tasks at the time will not be cancelled. /// -/// Unlike [`Executor`], this type trivially implements both [`Clone`] and [`Copy`]> +/// Unlike [`Executor`], this type trivially implements both [`Clone`] and [`Copy`]. /// /// This type *cannot* be converted back to a `Executor`. #[derive(Copy, Clone)] @@ -747,24 +748,37 @@ impl LeakedExecutor { /// ``` /// use async_executor::Executor; /// - /// let ex = Executor::new(); + /// let ex = Executor::new().leak(); /// /// let task = ex.spawn(async { /// println!("Hello world"); /// }); /// ``` - pub fn spawn<'a, T: Send + 'a>( + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: Send + 'static>( &self, future: impl Future + Send + 'a, ) -> Task { - // Create the task and register it in the set of active tasks. - // // SAFETY: // // - `future` is `Send` - // - `future` is not `'static`, but we make sure that the `Runnable` does - // not outlive `'a`. The executor cannot be dropped, and thus the `Runnable` - // must outlive 'a. + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. // Therefore we do not need to worry about what is done with the // `Waker`. @@ -786,7 +800,7 @@ impl LeakedExecutor { /// ``` /// use async_executor::Executor; /// - /// let ex = Executor::new(); + /// let ex = Executor::new().leak(); /// assert!(!ex.try_tick()); // no tasks to run /// /// let task = ex.spawn(async { @@ -821,7 +835,7 @@ impl LeakedExecutor { /// use async_executor::Executor; /// use futures_lite::future; /// - /// let ex = Executor::new(); + /// let ex = Executor::new().leak(); /// /// let task = ex.spawn(async { /// println!("Hello world"); @@ -841,7 +855,7 @@ impl LeakedExecutor { /// use async_executor::Executor; /// use futures_lite::future; /// - /// let ex = Executor::new(); + /// let ex = Executor::new().leak(); /// /// let task = ex.spawn(async { 1 + 2 }); /// let res = future::block_on(ex.run(async { task.await * 2 })); From 8f03deb9c6360e573c5dbc8a0b5b2ce4c365d1cb Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 00:31:34 -0700 Subject: [PATCH 05/29] Extend CI to check all features --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index be05945..50eae7c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,7 +44,7 @@ jobs: - name: Run cargo check (without dev-dependencies to catch missing feature flags) if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - - run: cargo test + - run: cargo test --all-features - run: cargo check --all --all-features --target wasm32-unknown-unknown - run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps @@ -78,7 +78,7 @@ jobs: - uses: actions/checkout@v4 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - - run: cargo miri test + - run: cargo miri test --all-features env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout From 900a95ee5fb4977b27e1fb0683103b8c6e0f8ed7 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 00:32:20 -0700 Subject: [PATCH 06/29] Reorder safety comments --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index cdacc9a..43abee7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -720,11 +720,11 @@ pub struct LeakedExecutor { state: &'static State, } -// SAFETY: Executor stores no thread local state that can be accessed via other thread. #[cfg(feature = "leak")] +// SAFETY: Executor stores no thread local state that can be accessed via other thread. unsafe impl Send for LeakedExecutor {} -// SAFETY: Executor internally synchronizes all of it's operations internally. #[cfg(feature = "leak")] +// SAFETY: Executor internally synchronizes all of it's operations internally. unsafe impl Sync for LeakedExecutor {} #[cfg(feature = "leak")] From 22ffdce7456e0c93b40b40f14443c2178676b0f7 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 00:34:07 -0700 Subject: [PATCH 07/29] Formatting --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 43abee7..d076799 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -702,9 +702,9 @@ impl<'a> Default for LocalExecutor<'a> { /// A leaked async [`Executor`]. /// -/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and +/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and /// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. -/// A leaked executor may require signficantly less overhead in both single-threaded and +/// A leaked executor may require signficantly less overhead in both single-threaded and /// mulitthreaded use cases. /// /// As this type does not implement `Drop`, losing the handle to the executor or failing From 1c1493d7b59a6207b289ddd8f716958238cf3d09 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 01:06:47 -0700 Subject: [PATCH 08/29] Allow leaks only when testing the leak feature --- .github/workflows/ci.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 50eae7c..6626825 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,10 +78,14 @@ jobs: - uses: actions/checkout@v4 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - - run: cargo miri test --all-features + - run: cargo miri test env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout + - run: cargo miri test --all-features + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-allow-leaks + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout security_audit: permissions: From f5f0871719786915e1f9a4106ec269d4ed365c44 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 01:11:27 -0700 Subject: [PATCH 09/29] Use the right flags --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6626825..7fecc95 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,7 +84,7 @@ jobs: RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout - run: cargo miri test --all-features env: - MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-allow-leaks + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout security_audit: From 4ed3f9496665a4955f5e137457ebd002ea4025e4 Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 11 Apr 2024 18:03:28 -0700 Subject: [PATCH 10/29] Apply suggestions from code review Co-authored-by: John Nunley --- src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index d076799..510d5c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -735,7 +735,7 @@ impl RefUnwindSafe for LeakedExecutor {} #[cfg(feature = "leak")] impl fmt::Debug for LeakedExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_state(self.state, "Executor", f) + debug_state(self.state, "LeakedExecutor", f) } } @@ -768,6 +768,7 @@ impl LeakedExecutor { /// Spawns a non-`'static` task onto the executor. /// /// ## Safety + /// /// The caller must ensure that the returned task terminates /// or is cancelled before the end of 'a. pub unsafe fn spawn_scoped<'a, T: Send + 'static>( From 354223cda31c88b35991e3a96cc3959b73984a6c Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 18:11:11 -0700 Subject: [PATCH 11/29] Move common function impls onto State --- src/lib.rs | 108 +++++++++++++++++++++++------------------------------ 1 file changed, 46 insertions(+), 62 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 510d5c5..6f28392 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -292,18 +292,7 @@ impl<'a> Executor<'a> { /// assert!(ex.try_tick()); // a task was found /// ``` pub fn try_tick(&self) -> bool { - match self.state().queue.pop() { - Err(_) => false, - Ok(runnable) => { - // Notify another ticker now to pick up where this ticker left off, just in case - // running the task takes a long time. - self.state().notify(); - - // Run the task. - runnable.run(); - true - } - } + self.state().try_tick() } /// Runs a single task. @@ -326,9 +315,7 @@ impl<'a> Executor<'a> { /// future::block_on(ex.tick()); // runs the task /// ``` pub async fn tick(&self) { - let state = self.state(); - let runnable = Ticker::new(state).runnable().await; - runnable.run(); + self.state().tick().await; } /// Runs the executor until the given future completes. @@ -347,22 +334,7 @@ impl<'a> Executor<'a> { /// assert_eq!(res, 6); /// ``` pub async fn run(&self, future: impl Future) -> T { - let mut runner = Runner::new(self.state()); - let mut rng = fastrand::Rng::new(); - - // A future that runs tasks forever. - let run_forever = async { - loop { - for _ in 0..200 { - let runnable = runner.runnable(&mut rng).await; - runnable.run(); - } - future::yield_now().await; - } - }; - - // Run `future` and `run_forever` concurrently until `future` completes. - future.or(run_forever).await + self.state().run(future).await } /// Returns a function that schedules a runnable task when it gets woken up. @@ -768,7 +740,7 @@ impl LeakedExecutor { /// Spawns a non-`'static` task onto the executor. /// /// ## Safety - /// + /// /// The caller must ensure that the returned task terminates /// or is cancelled before the end of 'a. pub unsafe fn spawn_scoped<'a, T: Send + 'static>( @@ -810,18 +782,7 @@ impl LeakedExecutor { /// assert!(ex.try_tick()); // a task was found /// ``` pub fn try_tick(&self) -> bool { - match self.state.queue.pop() { - Err(_) => false, - Ok(runnable) => { - // Notify another ticker now to pick up where this ticker left off, just in case - // running the task takes a long time. - self.state.notify(); - - // Run the task. - runnable.run(); - true - } - } + self.state.try_tick() } /// Runs a single task. @@ -844,8 +805,7 @@ impl LeakedExecutor { /// future::block_on(ex.tick()); // runs the task /// ``` pub async fn tick(&self) { - let runnable = Ticker::new(self.state).runnable().await; - runnable.run(); + self.state.tick().await; } /// Runs the executor until the given future completes. @@ -864,22 +824,7 @@ impl LeakedExecutor { /// assert_eq!(res, 6); /// ``` pub async fn run(&self, future: impl Future) -> T { - let mut runner = Runner::new(self.state); - let mut rng = fastrand::Rng::new(); - - // A future that runs tasks forever. - let run_forever = async { - loop { - for _ in 0..200 { - let runnable = runner.runnable(&mut rng).await; - runnable.run(); - } - future::yield_now().await; - } - }; - - // Run `future` and `run_forever` concurrently until `future` completes. - future.or(run_forever).await + self.state.run(future).await } /// Returns a function that schedules a runnable task when it gets woken up. @@ -941,6 +886,45 @@ impl State { } } } + + pub(crate) fn try_tick(&self) -> bool { + match self.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + pub(crate) async fn tick(&self) { + let runnable = Ticker::new(self).runnable().await; + runnable.run(); + } + + pub async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self); + let mut rng = fastrand::Rng::new(); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } } /// A list of sleeping tickers. From 8c656f5cf264323bf56e4c600efcfe6a271702b3 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 18:23:22 -0700 Subject: [PATCH 12/29] Move LeakedExecutor to another file --- src/leaked.rs | 184 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 190 ++------------------------------------------------ 2 files changed, 189 insertions(+), 185 deletions(-) create mode 100644 src/leaked.rs diff --git a/src/leaked.rs b/src/leaked.rs new file mode 100644 index 0000000..e1e2b41 --- /dev/null +++ b/src/leaked.rs @@ -0,0 +1,184 @@ +use crate::{debug_state, Executor, State}; +use async_task::{Builder, Runnable, Task}; +use std::{ + fmt, + future::Future, + panic::{RefUnwindSafe, UnwindSafe}, +}; + +impl Executor<'static> { + /// Consumes the [`Executor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`LeakedExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + pub fn leak(self) -> LeakedExecutor { + let ptr = self.state_ptr(); + std::mem::forget(self); + LeakedExecutor { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + state: unsafe { &*ptr }, + } + } +} + +/// A leaked async [`Executor`]. +/// +/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and +/// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A leaked executor may require signficantly less overhead in both single-threaded and +/// mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned +/// tasks to permanently leak. Any tasks at the time will not be cancelled. +/// +/// Unlike [`Executor`], this type trivially implements both [`Clone`] and [`Copy`]. +/// +/// This type *cannot* be converted back to a `Executor`. +#[derive(Copy, Clone)] +pub struct LeakedExecutor { + state: &'static State, +} + +// SAFETY: Executor stores no thread local state that can be accessed via other thread. +unsafe impl Send for LeakedExecutor {} +// SAFETY: Executor internally synchronizes all of it's operations internally. +unsafe impl Sync for LeakedExecutor {} + +impl UnwindSafe for LeakedExecutor {} +impl RefUnwindSafe for LeakedExecutor {} + +impl fmt::Debug for LeakedExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(self.state, "LeakedExecutor", f) + } +} + +impl LeakedExecutor { + /// Spawns a task onto the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// + /// let ex = Executor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: Send + 'static>( + &self, + future: impl Future + Send + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is `Send` + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// + /// let ex = Executor::new().leak(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new().leak(); + /// + /// let task = ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 6f28392..c19be34 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,8 +51,13 @@ use concurrent_queue::ConcurrentQueue; use futures_lite::{future, prelude::*}; use slab::Slab; +#[cfg(feature = "leak")] +mod leaked; + #[doc(no_inline)] pub use async_task::Task; +#[cfg(feature = "leak")] +pub use leaked::*; /// An async executor. /// @@ -397,25 +402,6 @@ impl<'a> Executor<'a> { } } -impl Executor<'static> { - /// Consumes the [`Executor`] and intentionally leaks it. - /// - /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced - /// [`LeakedExecutor`]'s functions are optimized to require fewer synchronizing operations - /// when spawning, running, and finishing tasks. - #[cfg(feature = "leak")] - pub fn leak(self) -> LeakedExecutor { - let ptr = self.state_ptr(); - std::mem::forget(self); - LeakedExecutor { - // SAFETY: So long as an Executor lives, it's state pointer will always be valid - // when accessed through state_ptr. This executor will live for the full 'static - // lifetime so this isn't an arbitrary lifetime extension. - state: unsafe { &*ptr }, - } - } -} - impl Drop for Executor<'_> { fn drop(&mut self) { let ptr = *self.state.get_mut(); @@ -672,172 +658,6 @@ impl<'a> Default for LocalExecutor<'a> { } } -/// A leaked async [`Executor`]. -/// -/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and -/// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. -/// A leaked executor may require signficantly less overhead in both single-threaded and -/// mulitthreaded use cases. -/// -/// As this type does not implement `Drop`, losing the handle to the executor or failing -/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned -/// tasks to permanently leak. Any tasks at the time will not be cancelled. -/// -/// Unlike [`Executor`], this type trivially implements both [`Clone`] and [`Copy`]. -/// -/// This type *cannot* be converted back to a `Executor`. -#[derive(Copy, Clone)] -#[cfg(feature = "leak")] -pub struct LeakedExecutor { - state: &'static State, -} - -#[cfg(feature = "leak")] -// SAFETY: Executor stores no thread local state that can be accessed via other thread. -unsafe impl Send for LeakedExecutor {} -#[cfg(feature = "leak")] -// SAFETY: Executor internally synchronizes all of it's operations internally. -unsafe impl Sync for LeakedExecutor {} - -#[cfg(feature = "leak")] -impl UnwindSafe for LeakedExecutor {} -#[cfg(feature = "leak")] -impl RefUnwindSafe for LeakedExecutor {} - -#[cfg(feature = "leak")] -impl fmt::Debug for LeakedExecutor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_state(self.state, "LeakedExecutor", f) - } -} - -#[cfg(feature = "leak")] -impl LeakedExecutor { - /// Spawns a task onto the executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; - /// - /// let ex = Executor::new().leak(); - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// ``` - pub fn spawn( - &self, - future: impl Future + Send + 'static, - ) -> Task { - let (runnable, task) = Builder::new() - .propagate_panic(true) - .spawn(|()| future, self.schedule()); - runnable.schedule(); - task - } - - /// Spawns a non-`'static` task onto the executor. - /// - /// ## Safety - /// - /// The caller must ensure that the returned task terminates - /// or is cancelled before the end of 'a. - pub unsafe fn spawn_scoped<'a, T: Send + 'static>( - &self, - future: impl Future + Send + 'a, - ) -> Task { - // SAFETY: - // - // - `future` is `Send` - // - `future` is not `'static`, but the caller guarantees that the - // task, and thus its `Runnable` must not live longer than `'a`. - // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. - // Therefore we do not need to worry about what is done with the - // `Waker`. - let (runnable, task) = unsafe { - Builder::new() - .propagate_panic(true) - .spawn_unchecked(|()| future, self.schedule()) - }; - runnable.schedule(); - task - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; - /// - /// let ex = Executor::new().leak(); - /// assert!(!ex.try_tick()); // no tasks to run - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(ex.try_tick()); // a task was found - /// ``` - pub fn try_tick(&self) -> bool { - self.state.try_tick() - } - - /// Runs a single task. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; - /// use futures_lite::future; - /// - /// let ex = Executor::new().leak(); - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// future::block_on(ex.tick()); // runs the task - /// ``` - pub async fn tick(&self) { - self.state.tick().await; - } - - /// Runs the executor until the given future completes. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; - /// use futures_lite::future; - /// - /// let ex = Executor::new().leak(); - /// - /// let task = ex.spawn(async { 1 + 2 }); - /// let res = future::block_on(ex.run(async { task.await * 2 })); - /// - /// assert_eq!(res, 6); - /// ``` - pub async fn run(&self, future: impl Future) -> T { - self.state.run(future).await - } - - /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.state; - // TODO: If possible, push into the current local queue and notify the ticker. - move |runnable| { - state.queue.push(runnable).unwrap(); - state.notify(); - } - } -} - /// The state of a executor. struct State { /// The global queue. From fb494f373d2ec6a42c34cbe86c050cffd8fac720 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 18:24:48 -0700 Subject: [PATCH 13/29] Backlink to Executor::leak --- src/leaked.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/leaked.rs b/src/leaked.rs index e1e2b41..8c605b7 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -24,7 +24,7 @@ impl Executor<'static> { } } -/// A leaked async [`Executor`]. +/// A leaked async [`Executor`] created from [`Executor::leak`]. /// /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and /// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. From 5aa751ccf34a978db69cba00f008875a928b9ec6 Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 18:30:02 -0700 Subject: [PATCH 14/29] Ensure all active tasks are rescheduled when leaking. --- src/leaked.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/leaked.rs b/src/leaked.rs index 8c605b7..d2df543 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -1,5 +1,6 @@ use crate::{debug_state, Executor, State}; use async_task::{Builder, Runnable, Task}; +use slab::Slab; use std::{ fmt, future::Future, @@ -14,13 +15,24 @@ impl Executor<'static> { /// when spawning, running, and finishing tasks. pub fn leak(self) -> LeakedExecutor { let ptr = self.state_ptr(); + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + let state = unsafe { &*ptr }; + std::mem::forget(self); - LeakedExecutor { - // SAFETY: So long as an Executor lives, it's state pointer will always be valid - // when accessed through state_ptr. This executor will live for the full 'static - // lifetime so this isn't an arbitrary lifetime extension. - state: unsafe { &*ptr }, + + let mut active = state.active.lock().unwrap(); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); } + + LeakedExecutor { state } } } From 48134c7055e71a6a52f396bdf842296a4104e28e Mon Sep 17 00:00:00 2001 From: james7132 Date: Thu, 11 Apr 2024 18:34:27 -0700 Subject: [PATCH 15/29] Add example for Executor::leak --- src/leaked.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/leaked.rs b/src/leaked.rs index d2df543..7efc7c0 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -13,6 +13,21 @@ impl Executor<'static> { /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced /// [`LeakedExecutor`]'s functions are optimized to require fewer synchronizing operations /// when spawning, running, and finishing tasks. + /// + /// # Example + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` pub fn leak(self) -> LeakedExecutor { let ptr = self.state_ptr(); // SAFETY: So long as an Executor lives, it's state pointer will always be valid From 0f975a477088c5da3324e310febfaa9999162a48 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 12 Apr 2024 21:47:21 -0700 Subject: [PATCH 16/29] Test without all features --- .github/workflows/ci.yml | 1 + src/leaked.rs | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7fecc95..db2bd1a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,7 @@ jobs: - name: Run cargo check (without dev-dependencies to catch missing feature flags) if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep + - run: cargo test - run: cargo test --all-features - run: cargo check --all --all-features --target wasm32-unknown-unknown - run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps diff --git a/src/leaked.rs b/src/leaked.rs index 7efc7c0..735f9f8 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -11,7 +11,7 @@ impl Executor<'static> { /// Consumes the [`Executor`] and intentionally leaks it. /// /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced - /// [`LeakedExecutor`]'s functions are optimized to require fewer synchronizing operations + /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations /// when spawning, running, and finishing tasks. /// /// # Example @@ -28,7 +28,7 @@ impl Executor<'static> { /// /// future::block_on(ex.run(task)); /// ``` - pub fn leak(self) -> LeakedExecutor { + pub fn leak(self) -> StaticExecutor { let ptr = self.state_ptr(); // SAFETY: So long as an Executor lives, it's state pointer will always be valid // when accessed through state_ptr. This executor will live for the full 'static @@ -47,7 +47,7 @@ impl Executor<'static> { *active = Slab::new(); } - LeakedExecutor { state } + StaticExecutor { state } } } @@ -66,25 +66,25 @@ impl Executor<'static> { /// /// This type *cannot* be converted back to a `Executor`. #[derive(Copy, Clone)] -pub struct LeakedExecutor { +pub struct StaticExecutor { state: &'static State, } // SAFETY: Executor stores no thread local state that can be accessed via other thread. -unsafe impl Send for LeakedExecutor {} +unsafe impl Send for StaticExecutor {} // SAFETY: Executor internally synchronizes all of it's operations internally. -unsafe impl Sync for LeakedExecutor {} +unsafe impl Sync for StaticExecutor {} -impl UnwindSafe for LeakedExecutor {} -impl RefUnwindSafe for LeakedExecutor {} +impl UnwindSafe for StaticExecutor {} +impl RefUnwindSafe for StaticExecutor {} -impl fmt::Debug for LeakedExecutor { +impl fmt::Debug for StaticExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_state(self.state, "LeakedExecutor", f) + debug_state(self.state, "StaticExecutor", f) } } -impl LeakedExecutor { +impl StaticExecutor { /// Spawns a task onto the executor. /// /// # Examples From 75ad689db1af1c156611818604dcba12b3944838 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 12 Apr 2024 22:05:44 -0700 Subject: [PATCH 17/29] Add a StaticLocalExecutor --- src/leaked.rs | 209 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 208 insertions(+), 1 deletion(-) diff --git a/src/leaked.rs b/src/leaked.rs index 735f9f8..861f710 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -1,10 +1,12 @@ -use crate::{debug_state, Executor, State}; +use crate::{debug_state, Executor, LocalExecutor, State}; use async_task::{Builder, Runnable, Task}; use slab::Slab; use std::{ fmt, future::Future, panic::{RefUnwindSafe, UnwindSafe}, + marker::PhantomData, + cell::UnsafeCell, }; impl Executor<'static> { @@ -51,6 +53,50 @@ impl Executor<'static> { } } +impl LocalExecutor<'static> { + /// Consumes the [`LocalExecutor`] and intentionally leaks it. + /// + /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced + /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations + /// when spawning, running, and finishing tasks. + /// + /// # Example + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// + /// future::block_on(ex.run(task)); + /// ``` + pub fn leak(self) -> StaticLocalExecutor { + let ptr = self.inner.state_ptr(); + // SAFETY: So long as an LocalExecutor lives, it's state pointer will always be valid + // when accessed through state_ptr. This executor will live for the full 'static + // lifetime so this isn't an arbitrary lifetime extension. + let state = unsafe { &*ptr }; + + std::mem::forget(self); + + let mut active = state.active.lock().unwrap(); + if !active.is_empty() { + // Reschedule all of the active tasks. + for waker in active.drain() { + waker.wake(); + } + // Overwrite to ensure that the slab is deallocated. + *active = Slab::new(); + } + + StaticLocalExecutor { state, marker_: PhantomData } + } +} + /// A leaked async [`Executor`] created from [`Executor::leak`]. /// /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and @@ -209,3 +255,164 @@ impl StaticExecutor { } } } + +/// A leaked async [`LocalExecutor`] created from [`LocalExecutor::leak`]. +/// +/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and +/// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A leaked executor may require signficantly less overhead in both single-threaded and +/// mulitthreaded use cases. +/// +/// As this type does not implement `Drop`, losing the handle to the executor or failing +/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned +/// tasks to permanently leak. Any tasks at the time will not be cancelled. +/// +/// Unlike [`LocalExecutor`], this type trivially implements both [`Clone`] and [`Copy`]. +/// +/// This type *cannot* be converted back to a `Executor`. +#[derive(Copy, Clone)] +pub struct StaticLocalExecutor { + state: &'static State, + marker_: PhantomData>, +} + +impl UnwindSafe for StaticLocalExecutor {} +impl RefUnwindSafe for StaticLocalExecutor {} + +impl fmt::Debug for StaticLocalExecutor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_state(self.state, "StaticLocalExecutor", f) + } +} + +impl StaticLocalExecutor { + /// Spawns a task onto the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn( + &self, + future: impl Future + 'static, + ) -> Task { + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn_local(|()| future, self.schedule()); + runnable.schedule(); + task + } + + /// Spawns a non-`'static` task onto the executor. + /// + /// ## Safety + /// + /// The caller must ensure that the returned task terminates + /// or is cancelled before the end of 'a. + pub unsafe fn spawn_scoped<'a, T: 'static>( + &self, + future: impl Future + 'a, + ) -> Task { + // SAFETY: + // + // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only + // be called from the origin thread, ensuring that `future` and the executor + // share the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // - `future` is not `'static`, but the caller guarantees that the + // task, and thus its `Runnable` must not live longer than `'a`. + // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()) + }; + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// + /// let ex = LocalExecutor::new().leak(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.state.try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.state.tick().await; + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new().leak(); + /// + /// let task = ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.state.run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state; + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } +} \ No newline at end of file From db9afd73654c9142d85e4c5c8e555eba1f8e589c Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 12 Apr 2024 22:07:51 -0700 Subject: [PATCH 18/29] Use doc_auto_cfg on docs.rs builds --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 0d1632a..e20d1bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] +#[cfg_attr(docsrs, feature(doc_auto_cfg))] use std::fmt; use std::marker::PhantomData; From 281b601e86c42ed626b87f61f30e50ebbd87d5c0 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 12 Apr 2024 22:13:09 -0700 Subject: [PATCH 19/29] Fix benchmarks and formatting --- benches/executor.rs | 55 ++++++++++++++++++++++++--------------------- src/leaked.rs | 20 ++++++++--------- src/lib.rs | 1 - 3 files changed, 39 insertions(+), 37 deletions(-) diff --git a/benches/executor.rs b/benches/executor.rs index f7261f0..d71dad6 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -1,6 +1,6 @@ use std::thread::available_parallelism; -use async_executor::{Executor, LeakedExecutor}; +use async_executor::{Executor, StaticExecutor}; use criterion::{criterion_group, criterion_main, Criterion}; use futures_lite::{future, prelude::*}; @@ -26,7 +26,7 @@ fn run(f: impl FnOnce(), multithread: bool) { }); } -fn run_leaked(executor: LeakedExecutor, f: impl FnOnce(), multithread: bool) { +fn run_static(executor: StaticExecutor, f: impl FnOnce(), multithread: bool) { let limit = if multithread { available_parallelism().unwrap().get() } else { @@ -53,23 +53,23 @@ fn create(c: &mut Criterion) { } fn running_benches(c: &mut Criterion) { - let leaked = Executor::new().leak(); - for with_leaked in [false, true] { + let static_executor = Executor::new().leak(); + for with_static in [false, true] { for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { - let prefix = if with_leaked { - "leaked_executor" + let prefix = if with_static { + "static_executor" } else { "executor" }; let mut group = c.benchmark_group(group_name.to_string()); group.bench_function(format!("{}::spawn_one", prefix), |b| { - if with_leaked { - run_leaked( - leaked, + if with_static { + run_static( + static_executor, || { b.iter(|| { - future::block_on(async { leaked.spawn(async {}).await }); + future::block_on(async { static_executor.spawn(async {}).await }); }); }, *multithread, @@ -86,7 +86,7 @@ fn running_benches(c: &mut Criterion) { } }); - if !with_leaked { + if !with_static { group.bench_function("executor::spawn_batch", |b| { run( || { @@ -104,15 +104,15 @@ fn running_benches(c: &mut Criterion) { } group.bench_function(format!("{}::spawn_many_local", prefix), |b| { - if with_leaked { - run_leaked( - leaked, + if with_static { + run_static( + static_executor, || { b.iter(move || { future::block_on(async { let mut tasks = Vec::new(); for _ in 0..LIGHT_TASKS { - tasks.push(leaked.spawn(async {})); + tasks.push(static_executor.spawn(async {})); } for task in tasks { task.await; @@ -157,15 +157,15 @@ fn running_benches(c: &mut Criterion) { } #[allow(clippy::manual_async_fn)] - fn go_leaked( - executor: LeakedExecutor, + fn go_static( + executor: StaticExecutor, i: usize, ) -> impl Future + Send + 'static { async move { if i != 0 { executor .spawn(async move { - let fut = go_leaked(executor, i - 1).boxed(); + let fut = go_static(executor, i - 1).boxed(); fut.await; }) .await; @@ -173,15 +173,18 @@ fn running_benches(c: &mut Criterion) { } } - if with_leaked { - run_leaked( - leaked, + if with_static { + run_static( + static_executor, || { b.iter(move || { future::block_on(async { let mut tasks = Vec::new(); for _ in 0..TASKS { - tasks.push(leaked.spawn(go_leaked(leaked, STEPS))); + tasks.push( + static_executor + .spawn(go_static(static_executor, STEPS)), + ); } for task in tasks { task.await; @@ -212,15 +215,15 @@ fn running_benches(c: &mut Criterion) { }); group.bench_function(format!("{}::yield_now", prefix), |b| { - if with_leaked { - run_leaked( - leaked, + if with_static { + run_static( + static_executor, || { b.iter(move || { future::block_on(async { let mut tasks = Vec::new(); for _ in 0..TASKS { - tasks.push(leaked.spawn(async move { + tasks.push(static_executor.spawn(async move { for _ in 0..STEPS { future::yield_now().await; } diff --git a/src/leaked.rs b/src/leaked.rs index 861f710..33a4025 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -2,11 +2,11 @@ use crate::{debug_state, Executor, LocalExecutor, State}; use async_task::{Builder, Runnable, Task}; use slab::Slab; use std::{ + cell::UnsafeCell, fmt, future::Future, - panic::{RefUnwindSafe, UnwindSafe}, marker::PhantomData, - cell::UnsafeCell, + panic::{RefUnwindSafe, UnwindSafe}, }; impl Executor<'static> { @@ -93,7 +93,10 @@ impl LocalExecutor<'static> { *active = Slab::new(); } - StaticLocalExecutor { state, marker_: PhantomData } + StaticLocalExecutor { + state, + marker_: PhantomData, + } } } @@ -299,10 +302,7 @@ impl StaticLocalExecutor { /// println!("Hello world"); /// }); /// ``` - pub fn spawn( - &self, - future: impl Future + 'static, - ) -> Task { + pub fn spawn(&self, future: impl Future + 'static) -> Task { let (runnable, task) = Builder::new() .propagate_panic(true) .spawn_local(|()| future, self.schedule()); @@ -324,8 +324,8 @@ impl StaticLocalExecutor { // // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`, // `try_tick`, `tick` and `run` can only be called from the origin - // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only - // be called from the origin thread, ensuring that `future` and the executor + // thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only + // be called from the origin thread, ensuring that `future` and the executor // share the same origin thread. The `Runnable` can be scheduled from other // threads, but because of the above `Runnable` can only be called or // dropped on the origin thread. @@ -415,4 +415,4 @@ impl StaticLocalExecutor { state.notify(); } } -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index e20d1bb..4960549 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,6 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] #[cfg_attr(docsrs, feature(doc_auto_cfg))] - use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; From 0fc9770eaaed10b68366d3ef2a6e6f4f5aeefde1 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 12 Apr 2024 22:24:35 -0700 Subject: [PATCH 20/29] Fix StaticLocalExecutor doc tests --- src/leaked.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/leaked.rs b/src/leaked.rs index 33a4025..6be6329 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -63,7 +63,7 @@ impl LocalExecutor<'static> { /// # Example /// /// ``` - /// use async_executor::Executor; + /// use async_executor::LocalExecutor; /// use futures_lite::future; /// /// let ex = LocalExecutor::new().leak(); @@ -294,7 +294,7 @@ impl StaticLocalExecutor { /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::LocalExecutor; /// /// let ex = LocalExecutor::new().leak(); /// @@ -350,7 +350,7 @@ impl StaticLocalExecutor { /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::LocalExecutor; /// /// let ex = LocalExecutor::new().leak(); /// assert!(!ex.try_tick()); // no tasks to run @@ -373,7 +373,7 @@ impl StaticLocalExecutor { /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::LocalExecutor; /// use futures_lite::future; /// /// let ex = LocalExecutor::new().leak(); @@ -392,7 +392,7 @@ impl StaticLocalExecutor { /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::LocalExecutor; /// use futures_lite::future; /// /// let ex = LocalExecutor::new().leak(); From c4b59c50c51fef8a197f97399cddceea61b860c6 Mon Sep 17 00:00:00 2001 From: James Liu Date: Sat, 13 Apr 2024 23:21:46 -0700 Subject: [PATCH 21/29] Make the doc_auto_cfg a crate-level attribute. Co-authored-by: John Nunley --- src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 4960549..ce6b271 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,7 +37,8 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] -#[cfg_attr(docsrs, feature(doc_auto_cfg))] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + use std::fmt; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; From 6e2bc3ba9d8ffb583eeba5b0ad3e30c4e16d00a0 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 14 Apr 2024 00:30:54 -0700 Subject: [PATCH 22/29] Use const construction to allow direct initialization in static variables --- Cargo.toml | 2 +- src/leaked.rs | 173 ++++++++++++++++++++++++++++++++++---------------- src/lib.rs | 2 +- 3 files changed, 119 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 43f997f..153ddce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ leak = [] [dependencies] async-task = "4.4.0" -concurrent-queue = "2.0.0" +concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue", branch = "master" } fastrand = "2.0.0" futures-lite = { version = "2.0.0", default-features = false } slab = "0.4.4" diff --git a/src/leaked.rs b/src/leaked.rs index 6be6329..e3ae647 100644 --- a/src/leaked.rs +++ b/src/leaked.rs @@ -16,6 +16,9 @@ impl Executor<'static> { /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations /// when spawning, running, and finishing tasks. /// + /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// /// # Example /// /// ``` @@ -30,12 +33,12 @@ impl Executor<'static> { /// /// future::block_on(ex.run(task)); /// ``` - pub fn leak(self) -> StaticExecutor { + pub fn leak(self) -> &'static StaticExecutor { let ptr = self.state_ptr(); // SAFETY: So long as an Executor lives, it's state pointer will always be valid // when accessed through state_ptr. This executor will live for the full 'static // lifetime so this isn't an arbitrary lifetime extension. - let state = unsafe { &*ptr }; + let state: &'static State = unsafe { &*ptr }; std::mem::forget(self); @@ -49,7 +52,10 @@ impl Executor<'static> { *active = Slab::new(); } - StaticExecutor { state } + // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) }; + static_executor } } @@ -60,6 +66,9 @@ impl LocalExecutor<'static> { /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations /// when spawning, running, and finishing tasks. /// + /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is + /// irreversible without the use of unsafe. + /// /// # Example /// /// ``` @@ -74,12 +83,12 @@ impl LocalExecutor<'static> { /// /// future::block_on(ex.run(task)); /// ``` - pub fn leak(self) -> StaticLocalExecutor { + pub fn leak(self) -> &'static StaticLocalExecutor { let ptr = self.inner.state_ptr(); - // SAFETY: So long as an LocalExecutor lives, it's state pointer will always be valid + // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid // when accessed through state_ptr. This executor will live for the full 'static // lifetime so this isn't an arbitrary lifetime extension. - let state = unsafe { &*ptr }; + let state: &'static State = unsafe { &*ptr }; std::mem::forget(self); @@ -93,30 +102,29 @@ impl LocalExecutor<'static> { *active = Slab::new(); } - StaticLocalExecutor { - state, - marker_: PhantomData, - } + // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent). + // The lifetime is not altered: 'static -> 'static. + let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) }; + static_executor } } -/// A leaked async [`Executor`] created from [`Executor::leak`]. +/// A static-lifetimed async [`Executor`]. /// -/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and -/// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. -/// A leaked executor may require signficantly less overhead in both single-threaded and -/// mulitthreaded use cases. +/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static +/// contexts via [`Executor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. /// /// As this type does not implement `Drop`, losing the handle to the executor or failing /// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned /// tasks to permanently leak. Any tasks at the time will not be cancelled. /// -/// Unlike [`Executor`], this type trivially implements both [`Clone`] and [`Copy`]. -/// -/// This type *cannot* be converted back to a `Executor`. -#[derive(Copy, Clone)] +/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html +#[repr(transparent)] pub struct StaticExecutor { - state: &'static State, + state: State, } // SAFETY: Executor stores no thread local state that can be accessed via other thread. @@ -129,26 +137,44 @@ impl RefUnwindSafe for StaticExecutor {} impl fmt::Debug for StaticExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_state(self.state, "StaticExecutor", f) + debug_state(&self.state, "StaticExecutor", f) } } impl StaticExecutor { + /// Creates a new StaticExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticExecutor; + /// + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + } + } + /// Spawns a task onto the executor. /// + /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::StaticExecutor; /// - /// let ex = Executor::new().leak(); + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); /// - /// let task = ex.spawn(async { + /// let task = EXECUTOR.spawn(async { /// println!("Hello world"); /// }); /// ``` pub fn spawn( - &self, + &'static self, future: impl Future + Send + 'static, ) -> Task { let (runnable, task) = Builder::new() @@ -165,7 +191,7 @@ impl StaticExecutor { /// The caller must ensure that the returned task terminates /// or is cancelled before the end of 'a. pub unsafe fn spawn_scoped<'a, T: Send + 'static>( - &self, + &'static self, future: impl Future + Send + 'a, ) -> Task { // SAFETY: @@ -192,15 +218,17 @@ impl StaticExecutor { /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::StaticExecutor; /// - /// let ex = Executor::new().leak(); - /// assert!(!ex.try_tick()); // no tasks to run + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); /// - /// let task = ex.spawn(async { + /// assert!(!EXECUTOR.try_tick()); // no tasks to run + /// + /// let task = EXECUTOR.spawn(async { /// println!("Hello world"); /// }); - /// assert!(ex.try_tick()); // a task was found + /// + /// assert!(EXECUTOR.try_tick()); // a task was found /// ``` pub fn try_tick(&self) -> bool { self.state.try_tick() @@ -215,15 +243,16 @@ impl StaticExecutor { /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::StaticExecutor; /// use futures_lite::future; /// - /// let ex = Executor::new().leak(); + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); /// - /// let task = ex.spawn(async { + /// let task = EXECUTOR.spawn(async { /// println!("Hello world"); /// }); - /// future::block_on(ex.tick()); // runs the task + /// + /// future::block_on(EXECUTOR.tick()); // runs the task /// ``` pub async fn tick(&self) { self.state.tick().await; @@ -234,13 +263,13 @@ impl StaticExecutor { /// # Examples /// /// ``` - /// use async_executor::Executor; + /// use async_executor::StaticExecutor; /// use futures_lite::future; /// - /// let ex = Executor::new().leak(); + /// static EXECUTOR: StaticExecutor = StaticExecutor::new(); /// - /// let task = ex.spawn(async { 1 + 2 }); - /// let res = future::block_on(ex.run(async { task.await * 2 })); + /// let task = EXECUTOR.spawn(async { 1 + 2 }); + /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 })); /// /// assert_eq!(res, 6); /// ``` @@ -249,8 +278,8 @@ impl StaticExecutor { } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.state; + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { state.queue.push(runnable).unwrap(); @@ -259,23 +288,28 @@ impl StaticExecutor { } } -/// A leaked async [`LocalExecutor`] created from [`LocalExecutor::leak`]. +impl Default for StaticExecutor { + fn default() -> Self { + Self::new() + } +} + +/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`]. /// -/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but spawning, running, and -/// finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. -/// A leaked executor may require signficantly less overhead in both single-threaded and -/// mulitthreaded use cases. +/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static +/// contexts via [`LocalExecutor::leak`]. +/// +/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed. +/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases. /// /// As this type does not implement `Drop`, losing the handle to the executor or failing /// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned /// tasks to permanently leak. Any tasks at the time will not be cancelled. /// -/// Unlike [`LocalExecutor`], this type trivially implements both [`Clone`] and [`Copy`]. -/// -/// This type *cannot* be converted back to a `Executor`. -#[derive(Copy, Clone)] +/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html +#[repr(transparent)] pub struct StaticLocalExecutor { - state: &'static State, + state: State, marker_: PhantomData>, } @@ -284,13 +318,34 @@ impl RefUnwindSafe for StaticLocalExecutor {} impl fmt::Debug for StaticLocalExecutor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_state(self.state, "StaticLocalExecutor", f) + debug_state(&self.state, "StaticLocalExecutor", f) } } impl StaticLocalExecutor { + /// Creates a new StaticLocalExecutor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::StaticLocalExecutor; + /// + /// thread_local! { + /// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new(); + /// } + /// ``` + pub const fn new() -> Self { + Self { + state: State::new(), + marker_: PhantomData, + } + } + /// Spawns a task onto the executor. /// + /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static` + /// borrow on the executor. + /// /// # Examples /// /// ``` @@ -302,7 +357,7 @@ impl StaticLocalExecutor { /// println!("Hello world"); /// }); /// ``` - pub fn spawn(&self, future: impl Future + 'static) -> Task { + pub fn spawn(&'static self, future: impl Future + 'static) -> Task { let (runnable, task) = Builder::new() .propagate_panic(true) .spawn_local(|()| future, self.schedule()); @@ -317,7 +372,7 @@ impl StaticLocalExecutor { /// The caller must ensure that the returned task terminates /// or is cancelled before the end of 'a. pub unsafe fn spawn_scoped<'a, T: 'static>( - &self, + &'static self, future: impl Future + 'a, ) -> Task { // SAFETY: @@ -407,8 +462,8 @@ impl StaticLocalExecutor { } /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.state; + fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state: &'static State = &self.state; // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { state.queue.push(runnable).unwrap(); @@ -416,3 +471,9 @@ impl StaticLocalExecutor { } } } + +impl Default for StaticLocalExecutor { + fn default() -> Self { + Self::new() + } +} diff --git a/src/lib.rs b/src/lib.rs index ce6b271..e339227 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -679,7 +679,7 @@ struct State { impl State { /// Creates state for a new executor. - fn new() -> State { + const fn new() -> State { State { queue: ConcurrentQueue::unbounded(), local_queues: RwLock::new(Vec::new()), From eb7935dfbdbce2307d96b719dd23f7ba4035999f Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 14 Apr 2024 00:34:08 -0700 Subject: [PATCH 23/29] Change name of modules away from leaking --- Cargo.toml | 3 ++- src/lib.rs | 8 ++++---- src/{leaked.rs => static_executors.rs} | 0 3 files changed, 6 insertions(+), 5 deletions(-) rename src/{leaked.rs => static_executors.rs} (100%) diff --git a/Cargo.toml b/Cargo.toml index 153ddce..0f62915 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,8 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [features] -leak = [] +# Adds support for executors optimized for use in static variables. +static = [] [dependencies] async-task = "4.4.0" diff --git a/src/lib.rs b/src/lib.rs index e339227..7c5d49d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,13 +52,13 @@ use concurrent_queue::ConcurrentQueue; use futures_lite::{future, prelude::*}; use slab::Slab; -#[cfg(feature = "leak")] -mod leaked; +#[cfg(feature = "static")] +mod static_executors; #[doc(no_inline)] pub use async_task::{FallibleTask, Task}; -#[cfg(feature = "leak")] -pub use leaked::*; +#[cfg(feature = "static")] +pub use static_executors::*; /// An async executor. /// diff --git a/src/leaked.rs b/src/static_executors.rs similarity index 100% rename from src/leaked.rs rename to src/static_executors.rs From 99abde65c39a033f78ee196bba2220edbfcb0819 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 14 Apr 2024 00:36:49 -0700 Subject: [PATCH 24/29] Bump MSRV to 1.63 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 0f62915..61ffd97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-executor" version = "1.11.0" authors = ["Stjepan Glavina ", "John Nunley "] edition = "2021" -rust-version = "1.60" +rust-version = "1.63" description = "Async executor" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-executor" From d3199af1af8f7b14e276273a65369643c167e39a Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 15 Apr 2024 17:45:51 -0700 Subject: [PATCH 25/29] Fix features Co-authored-by: Alain Emilia Anna Zscheile --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 61ffd97..ff5a837 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,4 +41,4 @@ once_cell = "1.16.0" [[bench]] name = "executor" harness = false -required-features = ["leak"] +required-features = ["static"] From f9e991c02071c81163113122324920dab89a6915 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 15 Apr 2024 17:47:28 -0700 Subject: [PATCH 26/29] Address benches comment --- benches/executor.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/benches/executor.rs b/benches/executor.rs index d71dad6..0ade74f 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -54,13 +54,8 @@ fn create(c: &mut Criterion) { fn running_benches(c: &mut Criterion) { let static_executor = Executor::new().leak(); - for with_static in [false, true] { + for (prefix, with_static) in [("static_executor", false), ("executor", true) ] { for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { - let prefix = if with_static { - "static_executor" - } else { - "executor" - }; let mut group = c.benchmark_group(group_name.to_string()); group.bench_function(format!("{}::spawn_one", prefix), |b| { From 17dde2400dc04fdb9940506297e8a601eba47f22 Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 15 Apr 2024 18:22:16 -0700 Subject: [PATCH 27/29] Actually fix the benchmarks --- benches/executor.rs | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/benches/executor.rs b/benches/executor.rs index 0ade74f..13da9ae 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -9,6 +9,7 @@ const STEPS: usize = 300; const LIGHT_TASKS: usize = 25_000; static EX: Executor<'_> = Executor::new(); +static STATIC_EX: StaticExecutor = StaticExecutor::new(); fn run(f: impl FnOnce(), multithread: bool) { let limit = if multithread { @@ -26,7 +27,7 @@ fn run(f: impl FnOnce(), multithread: bool) { }); } -fn run_static(executor: StaticExecutor, f: impl FnOnce(), multithread: bool) { +fn run_static(f: impl FnOnce(), multithread: bool) { let limit = if multithread { available_parallelism().unwrap().get() } else { @@ -35,7 +36,7 @@ fn run_static(executor: StaticExecutor, f: impl FnOnce(), multithread: bool) { let (s, r) = async_channel::bounded::<()>(1); easy_parallel::Parallel::new() - .each(0..limit, |_| future::block_on(executor.run(r.recv()))) + .each(0..limit, |_| future::block_on(STATIC_EX.run(r.recv()))) .finish(move || { let _s = s; f() @@ -53,18 +54,16 @@ fn create(c: &mut Criterion) { } fn running_benches(c: &mut Criterion) { - let static_executor = Executor::new().leak(); - for (prefix, with_static) in [("static_executor", false), ("executor", true) ] { + for (prefix, with_static) in [("executor", false), ("static_executor", true)] { for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { let mut group = c.benchmark_group(group_name.to_string()); group.bench_function(format!("{}::spawn_one", prefix), |b| { if with_static { run_static( - static_executor, || { b.iter(|| { - future::block_on(async { static_executor.spawn(async {}).await }); + future::block_on(async { STATIC_EX.spawn(async {}).await }); }); }, *multithread, @@ -101,13 +100,12 @@ fn running_benches(c: &mut Criterion) { group.bench_function(format!("{}::spawn_many_local", prefix), |b| { if with_static { run_static( - static_executor, || { b.iter(move || { future::block_on(async { let mut tasks = Vec::new(); for _ in 0..LIGHT_TASKS { - tasks.push(static_executor.spawn(async {})); + tasks.push(STATIC_EX.spawn(async {})); } for task in tasks { task.await; @@ -152,15 +150,12 @@ fn running_benches(c: &mut Criterion) { } #[allow(clippy::manual_async_fn)] - fn go_static( - executor: StaticExecutor, - i: usize, - ) -> impl Future + Send + 'static { + fn go_static(i: usize) -> impl Future + Send + 'static { async move { if i != 0 { - executor + STATIC_EX .spawn(async move { - let fut = go_static(executor, i - 1).boxed(); + let fut = go_static(i - 1).boxed(); fut.await; }) .await; @@ -170,16 +165,12 @@ fn running_benches(c: &mut Criterion) { if with_static { run_static( - static_executor, || { b.iter(move || { future::block_on(async { let mut tasks = Vec::new(); for _ in 0..TASKS { - tasks.push( - static_executor - .spawn(go_static(static_executor, STEPS)), - ); + tasks.push(STATIC_EX.spawn(go_static(STEPS))); } for task in tasks { task.await; @@ -212,13 +203,12 @@ fn running_benches(c: &mut Criterion) { group.bench_function(format!("{}::yield_now", prefix), |b| { if with_static { run_static( - static_executor, || { b.iter(move || { future::block_on(async { let mut tasks = Vec::new(); for _ in 0..TASKS { - tasks.push(static_executor.spawn(async move { + tasks.push(STATIC_EX.spawn(async move { for _ in 0..STEPS { future::yield_now().await; } From 890b82334575850214e965a73186b7b07cec94eb Mon Sep 17 00:00:00 2001 From: james7132 Date: Mon, 15 Apr 2024 21:37:04 -0700 Subject: [PATCH 28/29] Relax the lifetime constraints on T for spawn_scoped --- src/static_executors.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/static_executors.rs b/src/static_executors.rs index e3ae647..c1724e9 100644 --- a/src/static_executors.rs +++ b/src/static_executors.rs @@ -190,7 +190,7 @@ impl StaticExecutor { /// /// The caller must ensure that the returned task terminates /// or is cancelled before the end of 'a. - pub unsafe fn spawn_scoped<'a, T: Send + 'static>( + pub unsafe fn spawn_scoped<'a, T: Send + 'a>( &'static self, future: impl Future + Send + 'a, ) -> Task { @@ -371,7 +371,7 @@ impl StaticLocalExecutor { /// /// The caller must ensure that the returned task terminates /// or is cancelled before the end of 'a. - pub unsafe fn spawn_scoped<'a, T: 'static>( + pub unsafe fn spawn_scoped<'a, T: 'a>( &'static self, future: impl Future + 'a, ) -> Task { From 47007be9b9f430bf50a4ee043a8ee283d2d6bb8a Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 9 May 2024 15:26:43 -0700 Subject: [PATCH 29/29] Update to published concurrent-queue. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ff5a837..16d33bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ static = [] [dependencies] async-task = "4.4.0" -concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue", branch = "master" } +concurrent-queue = "2.5.0" fastrand = "2.0.0" futures-lite = { version = "2.0.0", default-features = false } slab = "0.4.4"