Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tokio/src/fs/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,10 @@ impl OpenOptions {
let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();

if driver_handle.check_and_init(io_uring::opcode::OpenAt::CODE)? {
if driver_handle
.check_and_init(io_uring::opcode::OpenAt::CODE)
.await?
{
Op::open(path.as_ref(), opts)?.await
} else {
let opts = opts.clone().into();
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/fs/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {

let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init(io_uring::opcode::Read::CODE)? {
if driver_handle
.check_and_init(io_uring::opcode::Read::CODE)
.await?
{
return read_uring(&path).await;
}
}
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/fs/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Re
{
let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init(io_uring::opcode::Write::CODE)? {
if driver_handle
.check_and_init(io_uring::opcode::Write::CODE)
.await?
{
return write_uring(path, contents).await;
}
}
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/io/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cfg_signal_internal_and_unix! {
cfg_io_uring! {
mod uring;
use uring::UringContext;
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::OnceCell;
}

use crate::io::interest::Interest;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) struct Handle {
feature = "fs",
target_os = "linux",
))]
pub(crate) uring_state: AtomicUsize,
pub(crate) uring_probe: OnceCell<Option<io_uring::Probe>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Driver {
feature = "fs",
target_os = "linux",
))]
uring_state: AtomicUsize::new(0),
uring_probe: OnceCell::new(),
};

Ok((driver, handle))
Expand Down
99 changes: 31 additions & 68 deletions tokio/src/runtime/io/driver/uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use io_uring::{squeue::Entry, IoUring, Probe};
use mio::unix::SourceFd;
use slab::Slab;

use crate::loom::sync::atomic::Ordering;
use crate::runtime::driver::op::{Cancellable, Lifecycle};
use crate::{io::Interest, loom::sync::Mutex};

Expand All @@ -13,32 +12,8 @@ use std::{io, mem, task::Waker};

const DEFAULT_RING_SIZE: u32 = 256;

#[repr(usize)]
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum State {
Uninitialized = 0,
Initialized = 1,
Unsupported = 2,
}

impl State {
fn as_usize(&self) -> usize {
*self as usize
}

fn from_usize(value: usize) -> Self {
match value {
0 => State::Uninitialized,
1 => State::Initialized,
2 => State::Unsupported,
_ => unreachable!("invalid Uring state: {}", value),
}
}
}

pub(crate) struct UringContext {
pub(crate) uring: Option<io_uring::IoUring>,
pub(crate) probe: io_uring::Probe,
pub(crate) ops: slab::Slab<Lifecycle>,
}

Expand All @@ -47,7 +22,6 @@ impl UringContext {
Self {
ops: Slab::new(),
uring: None,
probe: Probe::new(),
}
}

Expand All @@ -59,24 +33,20 @@ impl UringContext {
self.uring.as_mut().expect("io_uring not initialized")
}

pub(crate) fn is_opcode_supported(&self, opcode: u8) -> bool {
self.probe.is_supported(opcode)
}

/// Perform `io_uring_setup` system call, and Returns true if this
/// actually initialized the io_uring.
///
/// If the machine doesn't support io_uring, then this will return an
/// `ENOSYS` error.
pub(crate) fn try_init(&mut self) -> io::Result<bool> {
pub(crate) fn try_init(&mut self, probe: &mut Probe) -> io::Result<bool> {
if self.uring.is_some() {
// Already initialized.
return Ok(false);
}

let uring = IoUring::new(DEFAULT_RING_SIZE)?;

match uring.submitter().register_probe(&mut self.probe) {
match uring.submitter().register_probe(probe) {
Ok(_) => {}
Err(e) if e.raw_os_error() == Some(libc::EINVAL) => {
// The kernel does not support IORING_REGISTER_PROBE.
Expand Down Expand Up @@ -194,10 +164,6 @@ impl Handle {
&self.uring_context
}

fn set_uring_state(&self, state: State) {
self.uring_state.store(state.as_usize(), Ordering::Release);
}

/// Check if the io_uring context is initialized. If not, it will try to initialize it.
/// Then, check if the provided opcode is supported.
///
Expand All @@ -206,42 +172,42 @@ impl Handle {
/// If either io_uring is unsupported or the opcode is unsupported,
/// this returns `Ok(false)`.
/// An error is returned if an io_uring syscall returns an unexpected error value.
pub(crate) fn check_and_init(&self, opcode: u8) -> io::Result<bool> {
match State::from_usize(self.uring_state.load(Ordering::Acquire)) {
State::Uninitialized => match self.try_init_and_check_opcode(opcode) {
Ok(opcode_supported) => {
self.set_uring_state(State::Initialized);
Ok(opcode_supported)
}
// If the system doesn't support io_uring, we set the state to Unsupported.
Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => {
self.set_uring_state(State::Unsupported);
Ok(false)
}
// If we get EPERM, io-uring syscalls may be blocked (for example, by seccomp).
// In this case, we try to fall back to spawn_blocking for this and future operations.
// See also: https://github.com/tokio-rs/tokio/issues/7691
Err(e) if e.raw_os_error() == Some(libc::EPERM) => {
self.set_uring_state(State::Unsupported);
Ok(false)
///
/// TODO: This would like to be a synchronous function,
/// but we require `OnceLock::get_or_try_init`.
/// <https://github.com/rust-lang/rust/issues/109737>
pub(crate) async fn check_and_init(&self, opcode: u8) -> io::Result<bool> {
let probe = self
.uring_probe
.get_or_try_init(|| async {
let mut probe = Probe::new();
match self.try_init(&mut probe) {
Ok(()) => Ok(Some(probe)),
// If the system doesn't support io_uring, we set the probe to `None`.
Err(e) if e.raw_os_error() == Some(libc::ENOSYS) => Ok(None),
// If we get EPERM, io-uring syscalls may be blocked (for example, by seccomp).
// In this case, we try to fall back to spawn_blocking for this and future operations.
// See also: https://github.com/tokio-rs/tokio/issues/7691
Err(e) if e.raw_os_error() == Some(libc::EPERM) => Ok(None),
// For other system errors, we just return it.
Err(e) => Err(e),
}
// For other system errors, we just return it.
Err(e) => Err(e),
},
State::Unsupported => Ok(false),
State::Initialized => Ok(self.get_uring().lock().is_opcode_supported(opcode)),
}
})
.await?;

Ok(probe
.as_ref()
.is_some_and(|probe| probe.is_supported(opcode)))
}

/// Initialize the io_uring context if it hasn't been initialized yet.
/// Then, check whether the given opcode is supported.
fn try_init_and_check_opcode(&self, opcode: u8) -> io::Result<bool> {
fn try_init(&self, probe: &mut Probe) -> io::Result<()> {
let mut guard = self.get_uring().lock();
if guard.try_init()? {
if guard.try_init(probe)? {
self.add_uring_source(guard.ring().as_raw_fd())?;
}

Ok(guard.is_opcode_supported(opcode))
Ok(())
}

/// Register an operation with the io_uring.
Expand All @@ -255,10 +221,7 @@ impl Handle {
/// Callers must ensure that parameters of the entry (such as buffer) are valid and will
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
pub(crate) unsafe fn register_op(&self, entry: Entry, waker: Waker) -> io::Result<usize> {
// Note: Maybe this check can be removed if upstream callers consistently use `check_and_init`.
if !self.check_and_init(entry.get_opcode() as u8)? {
return Err(io::Error::from_raw_os_error(libc::ENOSYS));
}
assert!(self.uring_probe.initialized());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to preserve the opcode check, you can access the OnceCell without async using e.g. get.


// Uring is initialized.

Expand Down