diff --git a/spellcheck.dic b/spellcheck.dic index d05aa25774a..5dc05bf1dcc 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -312 +313 & + < @@ -240,6 +240,7 @@ spawner Splitter spmc spsc +SQPOLL src stabilised startup diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index aea4de7503e..502492c2f56 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -58,6 +58,16 @@ pub struct Builder { enable_io: bool, nevents: usize, + /// Idle timeout (in milliseconds) for the io_uring SQPOLL kernel thread. + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + uring_setup_sqpoll: Option, + /// Whether or not to enable the time driver enable_time: bool, @@ -275,6 +285,15 @@ impl Builder { enable_io: false, nevents: 1024, + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + uring_setup_sqpoll: None, + // Time defaults to "off" enable_time: false, @@ -1598,6 +1617,19 @@ impl Builder { cfg.timer_flavor = TimerFlavor::Traditional; let (driver, driver_handle) = driver::Driver::new(cfg)?; + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + if let Some(idle_timeout) = self.uring_setup_sqpoll { + if let Some(io) = driver_handle.io.as_ref() { + io.setup_uring_sqpoll(idle_timeout); + } + } + // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); let blocking_spawner = blocking_pool.spawner().clone(); @@ -1736,12 +1768,43 @@ cfg_io_uring! { /// .build() /// .unwrap(); /// ``` - #[cfg_attr(docsrs, doc(cfg(feature = "io-uring")))] + #[cfg_attr(docsrs, doc(cfg(all(feature = "io-uring", feature = "rt", feature = "fs"))))] pub fn enable_io_uring(&mut self) -> &mut Self { // Currently, the uring flag is equivalent to `enable_io`. self.enable_io = true; self } + + /// Enables SQPOLL for the io_uring driver and sets the idle timeout (in milliseconds). + /// + /// When SQPOLL is enabled, a kernel thread is created to poll the + /// submission queue. This can reduce syscall overhead. + /// + /// # Prerequisites + /// + /// SQPOLL requires Linux kernel 5.1 or later. + /// Until Linux 5.10, using this flag required the `CAP_SYS_ADMIN` capability. + /// From Linux 5.11, the `CAP_SYS_ADMIN` capability is no longer required. + /// + /// If SQPOLL is enabled but not supported by the kernel, the first I/O + /// operation using `io_uring` will fail. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_multi_thread() + /// .enable_io_uring() + /// .uring_setup_sqpoll(2000) + /// .build() + /// .unwrap(); + /// ``` + #[cfg_attr(docsrs, doc(cfg(all(feature = "io-uring", feature = "rt", feature = "fs"))))] + pub fn uring_setup_sqpoll(&mut self, idle_timeout: u32) -> &mut Self { + self.uring_setup_sqpoll = Some(idle_timeout); + self + } } } @@ -1781,6 +1844,13 @@ cfg_rt_multi_thread! { let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + #[cfg(all(tokio_unstable, feature = "io-uring", feature = "rt", feature = "fs", target_os = "linux"))] + if let Some(idle_timeout) = self.uring_setup_sqpoll { + if let Some(io) = driver_handle.io.as_ref() { + io.setup_uring_sqpoll(idle_timeout); + } + } + // Create the blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + worker_threads); diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 92b2350db9d..6256d783bce 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -45,7 +45,7 @@ pub(crate) struct Cfg { impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { - let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?; + let (io_stack, io_handle, signal_handle) = create_io_stack(&cfg)?; let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); @@ -146,12 +146,12 @@ cfg_io_driver! { Disabled(UnparkThread), } - fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> { #[cfg(loom)] - assert!(!enabled); + assert!(!cfg.enable_io); - let ret = if enabled { - let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?; + let ret = if cfg.enable_io { + let (io_driver, io_handle) = crate::runtime::io::Driver::new(cfg.nevents)?; let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?; let process_driver = create_process_driver(signal_driver); @@ -212,7 +212,7 @@ cfg_not_io_driver! { #[derive(Debug)] pub(crate) struct IoStack(ParkThread); - fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> { + fn create_io_stack(_cfg: &Cfg) -> io::Result<(IoStack, IoHandle, SignalHandle)> { let park_thread = ParkThread::new(); let unpark_thread = park_thread.unpark(); Ok((IoStack(park_thread), unpark_thread, Default::default())) diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index 89c97826bdf..0c232106d5c 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -15,6 +15,7 @@ const DEFAULT_RING_SIZE: u32 = 256; pub(crate) struct UringContext { pub(crate) uring: Option, pub(crate) ops: slab::Slab, + pub(crate) sqpoll_idle: Option, } impl UringContext { @@ -22,6 +23,7 @@ impl UringContext { Self { ops: Slab::new(), uring: None, + sqpoll_idle: None, } } @@ -44,7 +46,13 @@ impl UringContext { return Ok(false); } - let uring = IoUring::new(DEFAULT_RING_SIZE)?; + let uring = if let Some(idle_timeout) = self.sqpoll_idle { + IoUring::builder() + .setup_sqpoll(idle_timeout) + .build(DEFAULT_RING_SIZE)? + } else { + IoUring::new(DEFAULT_RING_SIZE)? + }; match uring.submitter().register_probe(probe) { Ok(_) => {} @@ -97,6 +105,14 @@ impl UringContext { } pub(crate) fn submit(&mut self) -> io::Result<()> { + if self.sqpoll_idle.is_some() { + let mut sq = self.ring_mut().submission(); + sq.sync(); + if !sq.need_wakeup() { + return Ok(()); + } + } + loop { // Errors from io_uring_enter: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html#ERRORS match self.ring().submit() { @@ -164,6 +180,11 @@ impl Handle { &self.uring_context } + pub(crate) fn setup_uring_sqpoll(&self, idle_timeout: u32) { + let mut guard = self.get_uring().lock(); + guard.sqpoll_idle = Some(idle_timeout); + } + /// Check if the io_uring context is initialized. If not, it will try to initialize it. /// Then, check if the provided opcode is supported. /// diff --git a/tokio/tests/fs_uring_sqpoll.rs b/tokio/tests/fs_uring_sqpoll.rs new file mode 100644 index 00000000000..5bdadfb9150 --- /dev/null +++ b/tokio/tests/fs_uring_sqpoll.rs @@ -0,0 +1,83 @@ +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +use std::io::{Read, Seek, SeekFrom}; +use tempfile::NamedTempFile; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::runtime::Builder; + +#[test] +fn test_sqpoll_current_thread() { + let rt = Builder::new_current_thread() + .enable_all() + .uring_setup_sqpoll(1000) + .build() + .unwrap(); + + rt.block_on(async { + let mut temp = NamedTempFile::new().unwrap(); + let path = temp.path().to_path_buf(); + + let mut file = tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .await + .unwrap(); + + file.write_all(b"hello").await.unwrap(); + file.flush().await.unwrap(); + + // Check if data was actually written to the underlying file + let mut buf = vec![0; 5]; + temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap(); + temp.as_file_mut().read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"hello"); + + file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let mut buf = vec![0; 5]; + file.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"hello"); + }); +} + +#[test] +fn test_sqpoll_multi_thread() { + let rt = Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .uring_setup_sqpoll(1000) + .build() + .unwrap(); + + rt.block_on(async { + let mut temp = NamedTempFile::new().unwrap(); + let path = temp.path().to_path_buf(); + + let mut file = tokio::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&path) + .await + .unwrap(); + + file.write_all(b"world").await.unwrap(); + file.flush().await.unwrap(); + + // Check if data was actually written to the underlying file + let mut buf = vec![0; 5]; + temp.as_file_mut().seek(SeekFrom::Start(0)).unwrap(); + temp.as_file_mut().read_exact(&mut buf).unwrap(); + assert_eq!(&buf, b"world"); + + file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let mut buf = vec![0; 5]; + file.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"world"); + }); +}