diff --git a/tokio/src/fs/open_options.rs b/tokio/src/fs/open_options.rs index aaab2378f4b..27e98b5b234 100644 --- a/tokio/src/fs/open_options.rs +++ b/tokio/src/fs/open_options.rs @@ -531,7 +531,10 @@ impl OpenOptions { let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); - if driver_handle.check_and_init(io_uring::opcode::OpenAt::CODE)? { + if driver_handle + .check_and_init(io_uring::opcode::OpenAt::CODE) + .await? + { Op::open(path.as_ref(), opts)?.await } else { let opts = opts.clone().into(); diff --git a/tokio/src/fs/read.rs b/tokio/src/fs/read.rs index 8844e4c148e..aabc994e95f 100644 --- a/tokio/src/fs/read.rs +++ b/tokio/src/fs/read.rs @@ -68,7 +68,10 @@ pub async fn read(path: impl AsRef) -> io::Result> { let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); - if driver_handle.check_and_init(io_uring::opcode::Read::CODE)? { + if driver_handle + .check_and_init(io_uring::opcode::Read::CODE) + .await? + { return read_uring(&path).await; } } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 6a7be979c2c..a2b22fd8cbf 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -37,7 +37,10 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re { let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); - if driver_handle.check_and_init(io_uring::opcode::Write::CODE)? { + if driver_handle + .check_and_init(io_uring::opcode::Write::CODE) + .await? + { return write_uring(path, contents).await; } } diff --git a/tokio/src/runtime/io/driver.rs b/tokio/src/runtime/io/driver.rs index 04540cf2b13..48766215364 100644 --- a/tokio/src/runtime/io/driver.rs +++ b/tokio/src/runtime/io/driver.rs @@ -5,7 +5,7 @@ cfg_signal_internal_and_unix! { cfg_io_uring! { mod uring; use uring::UringContext; - use crate::loom::sync::atomic::AtomicUsize; + use crate::sync::OnceCell; } use crate::io::interest::Interest; @@ -67,7 +67,7 @@ pub(crate) struct Handle { feature = "fs", target_os = "linux", ))] - pub(crate) uring_state: AtomicUsize, + pub(crate) uring_probe: OnceCell>, } #[derive(Debug)] @@ -150,7 +150,7 @@ impl Driver { feature = "fs", target_os = "linux", ))] - uring_state: AtomicUsize::new(0), + uring_probe: OnceCell::new(), }; Ok((driver, handle)) diff --git a/tokio/src/runtime/io/driver/uring.rs b/tokio/src/runtime/io/driver/uring.rs index 5c61739b299..89c97826bdf 100644 --- a/tokio/src/runtime/io/driver/uring.rs +++ b/tokio/src/runtime/io/driver/uring.rs @@ -2,7 +2,6 @@ use io_uring::{squeue::Entry, IoUring, Probe}; use mio::unix::SourceFd; use slab::Slab; -use crate::loom::sync::atomic::Ordering; use crate::runtime::driver::op::{Cancellable, Lifecycle}; use crate::{io::Interest, loom::sync::Mutex}; @@ -13,32 +12,8 @@ use std::{io, mem, task::Waker}; const DEFAULT_RING_SIZE: u32 = 256; -#[repr(usize)] -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -enum State { - Uninitialized = 0, - Initialized = 1, - Unsupported = 2, -} - -impl State { - fn as_usize(&self) -> usize { - *self as usize - } - - fn from_usize(value: usize) -> Self { - match value { - 0 => State::Uninitialized, - 1 => State::Initialized, - 2 => State::Unsupported, - _ => unreachable!("invalid Uring state: {}", value), - } - } -} - pub(crate) struct UringContext { pub(crate) uring: Option, - pub(crate) probe: io_uring::Probe, pub(crate) ops: slab::Slab, } @@ -47,7 +22,6 @@ impl UringContext { Self { ops: Slab::new(), uring: None, - probe: Probe::new(), } } @@ -59,16 +33,12 @@ impl UringContext { self.uring.as_mut().expect("io_uring not initialized") } - pub(crate) fn is_opcode_supported(&self, opcode: u8) -> bool { - self.probe.is_supported(opcode) - } - /// Perform `io_uring_setup` system call, and Returns true if this /// actually initialized the io_uring. /// /// If the machine doesn't support io_uring, then this will return an /// `ENOSYS` error. - pub(crate) fn try_init(&mut self) -> io::Result { + pub(crate) fn try_init(&mut self, probe: &mut Probe) -> io::Result { if self.uring.is_some() { // Already initialized. return Ok(false); @@ -76,7 +46,7 @@ impl UringContext { let uring = IoUring::new(DEFAULT_RING_SIZE)?; - match uring.submitter().register_probe(&mut self.probe) { + match uring.submitter().register_probe(probe) { Ok(_) => {} Err(e) if e.raw_os_error() == Some(libc::EINVAL) => { // The kernel does not support IORING_REGISTER_PROBE. @@ -194,10 +164,6 @@ impl Handle { &self.uring_context } - fn set_uring_state(&self, state: State) { - self.uring_state.store(state.as_usize(), Ordering::Release); - } - /// Check if the io_uring context is initialized. If not, it will try to initialize it. /// Then, check if the provided opcode is supported. /// @@ -206,42 +172,42 @@ impl Handle { /// If either io_uring is unsupported or the opcode is unsupported, /// this returns `Ok(false)`. /// An error is returned if an io_uring syscall returns an unexpected error value. - pub(crate) fn check_and_init(&self, opcode: u8) -> io::Result { - match State::from_usize(self.uring_state.load(Ordering::Acquire)) { - State::Uninitialized => match self.try_init_and_check_opcode(opcode) { - Ok(opcode_supported) => { - self.set_uring_state(State::Initialized); - Ok(opcode_supported) - } - // If the system doesn't support io_uring, we set the state to Unsupported. - Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => { - self.set_uring_state(State::Unsupported); - Ok(false) - } - // If we get EPERM, io-uring syscalls may be blocked (for example, by seccomp). - // In this case, we try to fall back to spawn_blocking for this and future operations. - // See also: https://github.com/tokio-rs/tokio/issues/7691 - Err(e) if e.raw_os_error() == Some(libc::EPERM) => { - self.set_uring_state(State::Unsupported); - Ok(false) + /// + /// TODO: This would like to be a synchronous function, + /// but we require `OnceLock::get_or_try_init`. + /// + pub(crate) async fn check_and_init(&self, opcode: u8) -> io::Result { + let probe = self + .uring_probe + .get_or_try_init(|| async { + let mut probe = Probe::new(); + match self.try_init(&mut probe) { + Ok(()) => Ok(Some(probe)), + // If the system doesn't support io_uring, we set the probe to `None`. + Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => Ok(None), + // If we get EPERM, io-uring syscalls may be blocked (for example, by seccomp). + // In this case, we try to fall back to spawn_blocking for this and future operations. + // See also: https://github.com/tokio-rs/tokio/issues/7691 + Err(e) if e.raw_os_error() == Some(libc::EPERM) => Ok(None), + // For other system errors, we just return it. + Err(e) => Err(e), } - // For other system errors, we just return it. - Err(e) => Err(e), - }, - State::Unsupported => Ok(false), - State::Initialized => Ok(self.get_uring().lock().is_opcode_supported(opcode)), - } + }) + .await?; + + Ok(probe + .as_ref() + .is_some_and(|probe| probe.is_supported(opcode))) } /// Initialize the io_uring context if it hasn't been initialized yet. - /// Then, check whether the given opcode is supported. - fn try_init_and_check_opcode(&self, opcode: u8) -> io::Result { + fn try_init(&self, probe: &mut Probe) -> io::Result<()> { let mut guard = self.get_uring().lock(); - if guard.try_init()? { + if guard.try_init(probe)? { self.add_uring_source(guard.ring().as_raw_fd())?; } - Ok(guard.is_opcode_supported(opcode)) + Ok(()) } /// Register an operation with the io_uring. @@ -255,10 +221,7 @@ impl Handle { /// Callers must ensure that parameters of the entry (such as buffer) are valid and will /// be valid for the entire duration of the operation, otherwise it may cause memory problems. pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result { - // Note: Maybe this check can be removed if upstream callers consistently use `check_and_init`. - if !self.check_and_init(entry.get_opcode() as u8)? { - return Err(io::Error::from_raw_os_error(libc::ENOSYS)); - } + assert!(self.uring_probe.initialized()); // Uring is initialized.