Skip to content

Commit 1df67c8

Browse files
committed
feat: Avoid spawning async-io thread if possible
This commit adds conditions that prevent the async-io thread from being spawned in certain cases. This is important because sometimes async-io is used in environments where spawning additional threads can interfere with other functionality in the program; for example, in DPDK poll loops. If there is only one async-io::block_on invocation, there is no need to spawn the "async-io" thread; we can just modify block_on to not give up the reactor if it can detect that it is the only block_on call running in the program. From there, we also need to avoid spawning the async-io thread if there are no resources left when block_on exits. This should allow async-io to not spawn additional threads in single-threaded programs where it is not needed. Signed-off-by: John Nunley <dev@notgull.net>
1 parent 576b470 commit 1df67c8

File tree

7 files changed

+214
-22
lines changed

7 files changed

+214
-22
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ autocfg = "1"
4545
[dev-dependencies]
4646
async-channel = "2.0.0"
4747
async-net = "2.0.0"
48+
async-task = "4.7.1"
4849
blocking = "1"
4950
criterion = { version = "0.7", default-features = false, features = ["cargo_bench_support"] }
51+
fastrand = "2.3.0"
5052
getrandom = "0.3"
5153
signal-hook = "0.3"
5254
tempfile = "3"

src/driver.rs

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::cell::{Cell, RefCell};
22
use std::future::Future;
33
use std::pin::pin;
4-
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4+
use std::sync::atomic::{fence, AtomicBool, AtomicUsize, Ordering};
55
use std::sync::{Arc, OnceLock};
66
use std::task::{Context, Poll, Waker};
77
use std::thread;
@@ -14,10 +14,14 @@ use crate::reactor::Reactor;
1414
/// Number of currently active `block_on()` invocations.
1515
static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
1616

17+
/// Number of currently alive `Async` or `Timer` instances.
18+
static ALIVE_RESOURCE_COUNT: AtomicUsize = AtomicUsize::new(0);
19+
20+
/// Raw unparker for the "async-io" thread.
21+
static UNPARKER: OnceLock<parking::Unparker> = OnceLock::new();
22+
1723
/// Unparker for the "async-io" thread.
1824
fn unparker() -> &'static parking::Unparker {
19-
static UNPARKER: OnceLock<parking::Unparker> = OnceLock::new();
20-
2125
UNPARKER.get_or_init(|| {
2226
let (parker, unparker) = parking::pair();
2327

@@ -35,9 +39,35 @@ fn unparker() -> &'static parking::Unparker {
3539
})
3640
}
3741

38-
/// Initializes the "async-io" thread.
39-
pub(crate) fn init() {
40-
let _ = unparker();
42+
/// Tell if the "async-io" thread is spawned.
43+
pub fn is_async_io_thread_spawned() -> bool {
44+
UNPARKER.get().is_some()
45+
}
46+
47+
/// Decrement the number of currently alive `Async` or `Timer` instances.
48+
#[inline]
49+
pub(crate) fn decrement_alive() {
50+
ALIVE_RESOURCE_COUNT.fetch_sub(1, Ordering::Relaxed);
51+
}
52+
53+
/// Increments the number of currently alive `Async` or `Timer` instances.
54+
#[inline]
55+
pub(crate) fn increment_alive() {
56+
// If this is the first resource, spawn the `async-io` thread if necessary.
57+
if ALIVE_RESOURCE_COUNT.fetch_add(1, Ordering::Relaxed) == 0 {
58+
init();
59+
}
60+
61+
#[cold]
62+
fn init() {
63+
// Emit a fence to ensure everything is in order.
64+
fence(Ordering::SeqCst);
65+
66+
// If there are no `block_on()` calls, spawn the `async-io` thread.
67+
if BLOCK_ON_COUNT.load(Ordering::SeqCst) == 0 {
68+
unparker().unpark();
69+
}
70+
}
4171
}
4272

4373
/// The main loop for the "async-io" thread.
@@ -118,12 +148,20 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
118148
let _enter = span.enter();
119149

120150
// Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
121-
BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
151+
if BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst) > 0 {
152+
// There are other `block_on()` calls active, "async-io" must be spawned
153+
// if it isn't already.
154+
let _ = unparker();
155+
}
122156

123157
// Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
124158
let _guard = CallOnDrop(|| {
125159
BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
126-
unparker().unpark();
160+
161+
// Only wake the `async-io` thread iff there are live resources.
162+
if ALIVE_RESOURCE_COUNT.load(Ordering::SeqCst) > 0 {
163+
unparker().unpark();
164+
}
127165
});
128166

129167
// Creates a parker and an associated waker that unparks it.
@@ -263,8 +301,11 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
263301
break;
264302
}
265303

266-
// Check if this thread been handling I/O events for a long time.
267-
if start.elapsed() > Duration::from_micros(500) {
304+
// Check if this thread been handling I/O events for a long time
305+
// and if there are other threads waiting for I/O events.
306+
if start.elapsed() > Duration::from_micros(500)
307+
&& BLOCK_ON_COUNT.load(Ordering::SeqCst) > 1
308+
{
268309
#[cfg(feature = "tracing")]
269310
tracing::trace!("stops hogging the reactor");
270311

src/lib.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,31 @@
2525
//! [`block_on()`] function. The "async-io" thread is therefore just a fallback mechanism
2626
//! processing I/O events in case no other threads are.
2727
//!
28+
//! This thread is only spawned under either of the following conditions:
29+
//!
30+
//! - [`Async`]s or [`Timer`]s are created without an active [`block_on`] call.
31+
//! - More than one thread is calling [`block_on`] at a time.
32+
//!
33+
//! Therefore, if you would like to prevent this thread from being spawned, ensure
34+
//! that there is only one thread calling [`block_on`] at a time. This thread
35+
//! will handle all I/O events in lieu of the "async-io" thread.
36+
//!
37+
//! **Note:** If you are using [`async-process`], keep in mind that [`async-process`]'s
38+
//! reaper thread calls [`block_on`] on Unix platforms in order to reap child
39+
//! processes. If you are using [`async-process`] and are trying to avoid spawning
40+
//! the "async-io" thread, ensure that your executor is running the [`async_process::driver()`]
41+
//! future. In addition, the executor backing [`smol::spawn()`] as well as the
42+
//! multithreaded version of [`smol_macros::main!`] use [`block_on`] as well.
43+
//!
2844
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
2945
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
3046
//! [event ports]: https://illumos.org/man/port_create
3147
//! [IOCP]: https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports
3248
//! [`polling`]: https://docs.rs/polling
49+
//! [`async-process`]: https://docs.rs/async-process
50+
//! [`async_process::driver()`]: https://docs.rs/async-process/latest/async_process/fn.driver.html
51+
//! [`smol::spawn()`]: https://docs.rs/smol/latest/smol/fn.spawn.html
52+
//! [`smol_macros::main!`]: https://docs.rs/smol-macros/latest/smol_macros/macro.main.html
3353
//!
3454
//! # Examples
3555
//!
@@ -79,6 +99,10 @@ use std::{
7999
#[cfg(windows)]
80100
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket, OwnedSocket, RawSocket};
81101

102+
// Not public API.
103+
#[doc(hidden)]
104+
pub use driver::is_async_io_thread_spawned;
105+
82106
use futures_io::{AsyncRead, AsyncWrite};
83107
use futures_lite::stream::{self, Stream};
84108

@@ -192,6 +216,8 @@ impl Timer {
192216
/// # });
193217
/// ```
194218
pub fn never() -> Timer {
219+
crate::driver::increment_alive();
220+
195221
Timer {
196222
id_and_waker: None,
197223
when: None,
@@ -271,6 +297,8 @@ impl Timer {
271297
/// # });
272298
/// ```
273299
pub fn interval_at(start: Instant, period: Duration) -> Timer {
300+
crate::driver::increment_alive();
301+
274302
Timer {
275303
id_and_waker: None,
276304
when: Some(start),
@@ -458,6 +486,8 @@ impl Drop for Timer {
458486
// Deregister the timer from the reactor.
459487
Reactor::get().remove_timer(when, id);
460488
}
489+
490+
crate::driver::decrement_alive();
461491
}
462492
}
463493

@@ -680,6 +710,8 @@ impl<T: AsFd> Async<T> {
680710
/// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
681711
/// and cause a deadlock in an asynchronous context.
682712
pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
713+
crate::driver::increment_alive();
714+
683715
// SAFETY: It is impossible to drop the I/O source while it is registered through
684716
// this type.
685717
let registration = unsafe { Registration::new(io.as_fd()) };
@@ -774,6 +806,8 @@ impl<T: AsSocket> Async<T> {
774806
/// it is not set. If not set to non-blocking mode, I/O operations may block the current thread
775807
/// and cause a deadlock in an asynchronous context.
776808
pub fn new_nonblocking(io: T) -> io::Result<Async<T>> {
809+
crate::driver::increment_alive();
810+
777811
// Create the registration.
778812
//
779813
// SAFETY: It is impossible to drop the I/O source while it is registered through
@@ -1165,6 +1199,8 @@ impl<T> Drop for Async<T> {
11651199
// Drop the I/O handle to close it.
11661200
self.io.take();
11671201
}
1202+
1203+
crate::driver::decrement_alive();
11681204
}
11691205
}
11701206

src/reactor.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,13 @@ impl Reactor {
9393
pub(crate) fn get() -> &'static Reactor {
9494
static REACTOR: OnceLock<Reactor> = OnceLock::new();
9595

96-
REACTOR.get_or_init(|| {
97-
crate::driver::init();
98-
Reactor {
99-
poller: Poller::new().expect("cannot initialize I/O event notification"),
100-
ticker: AtomicUsize::new(0),
101-
sources: Mutex::new(Slab::new()),
102-
events: Mutex::new(Events::new()),
103-
timers: Mutex::new(BTreeMap::new()),
104-
timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
105-
}
96+
REACTOR.get_or_init(|| Reactor {
97+
poller: Poller::new().expect("cannot initialize I/O event notification"),
98+
ticker: AtomicUsize::new(0),
99+
sources: Mutex::new(Slab::new()),
100+
events: Mutex::new(Events::new()),
101+
timers: Mutex::new(BTreeMap::new()),
102+
timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
106103
})
107104
}
108105

tests/async.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::Arc;
66
use std::thread;
77
use std::time::Duration;
88

9-
use async_io::{Async, Timer};
9+
use async_io::{is_async_io_thread_spawned, Async, Timer};
1010
use futures_lite::{future, prelude::*};
1111
#[cfg(unix)]
1212
use tempfile::tempdir;
@@ -36,6 +36,7 @@ fn spawn<T: Send + 'static>(
3636
fn tcp_connect() -> io::Result<()> {
3737
future::block_on(async {
3838
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
39+
assert!(is_async_io_thread_spawned());
3940
let addr = listener.get_ref().local_addr()?;
4041
let task = spawn(async move { listener.accept().await });
4142

tests/driver_not_spawned.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use async_channel::bounded;
2+
use async_io::{block_on, is_async_io_thread_spawned, Async, Timer};
3+
use futures_lite::prelude::*;
4+
use std::net::{TcpListener, TcpStream};
5+
use std::sync::atomic;
6+
use std::thread;
7+
use std::time::Duration;
8+
9+
const TIMER_COUNT: u64 = 10_000;
10+
const IO_COUNT: usize = 200;
11+
12+
static TOTAL_IO: atomic::AtomicU64 = atomic::AtomicU64::new(0);
13+
14+
#[test]
15+
fn test_one_block_on() {
16+
// The driver thread should not be spawned.
17+
assert!(!is_async_io_thread_spawned());
18+
19+
// Running smol::block_on() should not spawn the driver thread.
20+
block_on(async {
21+
assert!(!is_async_io_thread_spawned());
22+
23+
// We should be able to handle a lot of timers and sources.
24+
let mut timers = Vec::new();
25+
for i in 1..TIMER_COUNT {
26+
timers.push(Timer::after(Duration::from_millis(i / 5)));
27+
}
28+
29+
let (spawner, executor) = async_channel::unbounded();
30+
let mut tasks = Vec::new();
31+
32+
for _ in 0..IO_COUNT {
33+
let (runnable, task) = async_task::Builder::new().propagate_panic(true).spawn(
34+
move |_| async move {
35+
let mut rng = fastrand::Rng::new();
36+
37+
// Create a TCP pipe and send bytes to and from.
38+
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
39+
let stream1 =
40+
Async::<TcpStream>::connect(listener.get_ref().local_addr()?).await?;
41+
let stream2 = listener.accept().await?.0;
42+
43+
let mut bytes = [0u8; 64];
44+
let mut read_buffer = [0u8; 64];
45+
46+
loop {
47+
rng.fill(&mut bytes);
48+
49+
Timer::after(Duration::from_micros(rng.u64(..1_000))).await;
50+
(&stream1).write_all(&bytes).await?;
51+
Timer::after(Duration::from_micros(rng.u64(..1_000))).await;
52+
(&stream2).read_exact(&mut read_buffer).await?;
53+
54+
assert_eq!(bytes, read_buffer);
55+
TOTAL_IO.fetch_add(bytes.len() as u64, atomic::Ordering::Relaxed);
56+
futures_lite::future::yield_now().await;
57+
}
58+
59+
#[allow(unreachable_code)]
60+
std::io::Result::Ok(())
61+
},
62+
{
63+
let spawner = spawner.clone();
64+
move |task| {
65+
spawner.try_send(task).ok();
66+
}
67+
},
68+
);
69+
runnable.schedule();
70+
tasks.push(task);
71+
}
72+
73+
// Future to process timers.
74+
let process_timers = async move {
75+
for timer in timers {
76+
timer.await;
77+
}
78+
79+
// After the timer is done, cancel every task.
80+
for task in tasks {
81+
if let Some(res) = task.cancel().await {
82+
res.unwrap();
83+
}
84+
}
85+
};
86+
87+
// Future to process sources.
88+
let process_sources = async move {
89+
while let Ok(task) = executor.recv().await {
90+
task.run();
91+
futures_lite::future::yield_now().await;
92+
}
93+
};
94+
95+
process_timers.or(process_sources).await;
96+
97+
// Spawning another thread should spawn the driver thread.
98+
let (thread_spawned_signal, thread_spawned) = bounded::<()>(1);
99+
let (signal, shutdown) = bounded::<()>(1);
100+
thread::spawn(move || {
101+
block_on(async move {
102+
thread_spawned_signal.send(()).await.unwrap();
103+
shutdown.recv().await.unwrap();
104+
})
105+
});
106+
107+
thread_spawned.recv().await.unwrap();
108+
assert!(is_async_io_thread_spawned());
109+
110+
// Stopping the other thread should not stop the driver thread.
111+
signal.send(()).await.unwrap();
112+
assert!(is_async_io_thread_spawned());
113+
});
114+
}

tests/timer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
44
use std::thread;
55
use std::time::{Duration, Instant};
66

7-
use async_io::Timer;
7+
use async_io::{is_async_io_thread_spawned, Timer};
88
use futures_lite::{future, FutureExt, StreamExt};
99

1010
fn spawn<T: Send + 'static>(
@@ -26,6 +26,7 @@ fn smoke() {
2626
future::block_on(async {
2727
let start = Instant::now();
2828
Timer::after(Duration::from_secs(1)).await;
29+
assert!(is_async_io_thread_spawned());
2930
assert!(start.elapsed() >= Duration::from_secs(1));
3031
});
3132
}

0 commit comments

Comments
 (0)