Skip to content

Commit ef347a8

Browse files
committed
fs: implement std::fs::symlink via io_uring
1 parent f1cb007 commit ef347a8

File tree

5 files changed

+240
-0
lines changed

5 files changed

+240
-0
lines changed

tokio/src/fs/symlink.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,20 @@ pub async fn symlink(original: impl AsRef<Path>, link: impl AsRef<Path>) -> io::
1212
let original = original.as_ref().to_owned();
1313
let link = link.as_ref().to_owned();
1414

15+
#[cfg(all(
16+
tokio_unstable,
17+
feature = "io-uring",
18+
feature = "rt",
19+
feature = "fs",
20+
target_os = "linux"
21+
))]
22+
{
23+
let handle = crate::runtime::Handle::current();
24+
let driver_handle = handle.inner.driver().io();
25+
if driver_handle.check_and_init(io_uring::opcode::SymlinkAt::CODE)? {
26+
return crate::runtime::driver::op::Op::symlink(&original, &link)?.await;
27+
}
28+
}
29+
1530
asyncify(move || std::os::unix::fs::symlink(original, link)).await
1631
}

tokio/src/io/uring/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub(crate) mod open;
22
pub(crate) mod read;
3+
pub(crate) mod symlink;
34
pub(crate) mod utils;
45
pub(crate) mod write;

tokio/src/io/uring/symlink.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use super::utils::cstr;
2+
3+
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
4+
5+
use io_uring::{opcode, types};
6+
use std::ffi::CString;
7+
use std::io;
8+
use std::io::Error;
9+
use std::path::Path;
10+
11+
#[derive(Debug)]
12+
pub(crate) struct Symlink {
13+
/// This field will be read by the kernel during the operation, so we
14+
/// need to ensure it is valid for the entire duration of the operation.
15+
#[allow(dead_code)]
16+
original: CString,
17+
/// This field will be read by the kernel during the operation, so we
18+
/// need to ensure it is valid for the entire duration of the operation.
19+
#[allow(dead_code)]
20+
link: CString,
21+
}
22+
23+
impl Completable for Symlink {
24+
type Output = io::Result<()>;
25+
26+
fn complete(self, cqe: CqeResult) -> Self::Output {
27+
cqe.result.map(|_| ())
28+
}
29+
30+
fn complete_with_error(self, err: Error) -> Self::Output {
31+
Err(err)
32+
}
33+
}
34+
35+
impl Cancellable for Symlink {
36+
fn cancel(self) -> CancelData {
37+
CancelData::Symlink(self)
38+
}
39+
}
40+
41+
impl Op<Symlink> {
42+
/// Submit a request to create a symbolic link.
43+
pub(crate) fn symlink(original: &Path, link: &Path) -> io::Result<Self> {
44+
let original = cstr(original)?;
45+
let link = cstr(link)?;
46+
47+
let symlink_op =
48+
opcode::SymlinkAt::new(types::Fd(libc::AT_FDCWD), original.as_ptr(), link.as_ptr())
49+
.build();
50+
51+
// SAFETY: Parameters are valid for the entire duration of the operation
52+
Ok(unsafe { Op::new(symlink_op, Symlink { original, link }) })
53+
}
54+
}

tokio/src/runtime/driver/op.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::io::uring::open::Open;
22
use crate::io::uring::read::Read;
3+
use crate::io::uring::symlink::Symlink;
34
use crate::io::uring::write::Write;
45
use crate::runtime::Handle;
56

@@ -19,6 +20,7 @@ pub(crate) enum CancelData {
1920
Open(Open),
2021
Write(Write),
2122
Read(Read),
23+
Symlink(Symlink),
2224
}
2325

2426
#[derive(Debug)]

tokio/tests/fs_uring_symlink.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
//! Uring symlink operations tests.
2+
3+
#![cfg(all(
4+
tokio_unstable,
5+
feature = "io-uring",
6+
feature = "rt",
7+
feature = "fs",
8+
target_os = "linux"
9+
))]
10+
11+
use futures::FutureExt;
12+
use std::future::poll_fn;
13+
use std::future::Future;
14+
use std::path::PathBuf;
15+
use std::pin::pin;
16+
use std::sync::mpsc;
17+
use std::task::Poll;
18+
use std::time::Duration;
19+
use tempfile::TempDir;
20+
use tokio::runtime::{Builder, Runtime};
21+
use tokio::task::JoinSet;
22+
use tokio_test::assert_pending;
23+
use tokio_util::task::TaskTracker;
24+
25+
fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
26+
Box::new(move || {
27+
Builder::new_multi_thread()
28+
.worker_threads(n)
29+
.enable_all()
30+
.build()
31+
.unwrap()
32+
})
33+
}
34+
35+
fn current_rt() -> Box<dyn Fn() -> Runtime> {
36+
Box::new(|| Builder::new_current_thread().enable_all().build().unwrap())
37+
}
38+
39+
fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
40+
vec![
41+
current_rt(),
42+
multi_rt(1),
43+
multi_rt(2),
44+
multi_rt(8),
45+
multi_rt(64),
46+
multi_rt(256),
47+
]
48+
}
49+
50+
#[test]
51+
fn shutdown_runtime_while_performing_io_uring_ops() {
52+
fn run(rt: Runtime) {
53+
let (done_tx, done_rx) = mpsc::channel();
54+
let (workdir, target) = create_tmp_dir();
55+
56+
rt.spawn(async move {
57+
// spawning a bunch of uring operations.
58+
for i in 0..usize::MAX {
59+
let link = workdir.path().join(&format!("{i}"));
60+
let target = target.clone();
61+
tokio::spawn(async move {
62+
let mut fut = pin!(tokio::fs::symlink(target, &link));
63+
64+
poll_fn(|cx| {
65+
assert_pending!(fut.as_mut().poll(cx));
66+
Poll::<()>::Pending
67+
})
68+
.await;
69+
70+
fut.await.unwrap();
71+
});
72+
73+
// Avoid busy looping.
74+
tokio::task::yield_now().await;
75+
}
76+
});
77+
78+
std::thread::spawn(move || {
79+
rt.shutdown_timeout(Duration::from_millis(300));
80+
done_tx.send(()).unwrap();
81+
});
82+
83+
done_rx.recv().unwrap();
84+
}
85+
86+
for rt in rt_combinations() {
87+
run(rt());
88+
}
89+
}
90+
91+
#[test]
92+
fn symlink_many_files() {
93+
fn run(rt: Runtime) {
94+
let (workdir, target) = create_tmp_dir();
95+
96+
rt.block_on(async move {
97+
const N_LINKS: usize = 10_000;
98+
99+
let tracker = TaskTracker::new();
100+
101+
for i in 0..N_LINKS {
102+
let target = target.clone();
103+
let link = workdir.path().join(&format!("{i}"));
104+
tracker.spawn(async move {
105+
tokio::fs::symlink(&target, &link).await.unwrap();
106+
});
107+
}
108+
tracker.close();
109+
tracker.wait().await;
110+
111+
let mut resolve_tasks = JoinSet::new();
112+
for i in 0..N_LINKS {
113+
let link = workdir.path().join(&format!("{i}"));
114+
resolve_tasks.spawn(async move { tokio::fs::read_link(&link).await.unwrap() });
115+
}
116+
117+
while let Some(resolve_result) = resolve_tasks.join_next().await {
118+
assert_eq!(&resolve_result.unwrap(), &target);
119+
}
120+
});
121+
}
122+
123+
for rt in rt_combinations() {
124+
run(rt());
125+
}
126+
}
127+
128+
#[tokio::test]
129+
async fn cancel_op_future() {
130+
let (workdir, target) = create_tmp_dir();
131+
132+
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
133+
let handle = tokio::spawn(async move {
134+
poll_fn(|cx| {
135+
let link = workdir.path().join("link");
136+
let fut = tokio::fs::symlink(&target, &link);
137+
138+
// If io_uring is enabled (and not falling back to the thread pool),
139+
// the first poll should return Pending.
140+
let _pending = pin!(fut).poll_unpin(cx);
141+
142+
tx.send(()).unwrap();
143+
144+
Poll::<()>::Pending
145+
})
146+
.await;
147+
});
148+
149+
// Wait for the first poll
150+
rx.recv().await.unwrap();
151+
152+
handle.abort();
153+
154+
let res = handle.await.unwrap_err();
155+
assert!(res.is_cancelled());
156+
}
157+
158+
fn create_tmp_dir() -> (TempDir, PathBuf) {
159+
let workdir = tempfile::tempdir().unwrap();
160+
let target = workdir.path().join("target");
161+
std::fs::OpenOptions::new()
162+
.create_new(true)
163+
.write(true)
164+
.open(&target)
165+
.unwrap();
166+
167+
(workdir, target)
168+
}

0 commit comments

Comments
 (0)