Skip to content

Commit cdca978

Browse files
authored
Merge pull request rust-lang#4892 from WhySoBad/network-socket-io-polling
Handle blocking I/O using Mio, add `accept` and `connect` network socket shims
2 parents f3d49c8 + d7706e5 commit cdca978

19 files changed

Lines changed: 806 additions & 155 deletions

src/tools/miri/Cargo.lock

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -779,9 +779,9 @@ checksum = "db13adb97ab515a3691f56e4dbab09283d0b86cb45abd991d8634a9d6f501760"
779779

780780
[[package]]
781781
name = "libc"
782-
version = "0.2.177"
782+
version = "0.2.182"
783783
source = "registry+https://github.com/rust-lang/crates.io-index"
784-
checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
784+
checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112"
785785

786786
[[package]]
787787
name = "libffi"
@@ -923,11 +923,12 @@ dependencies = [
923923

924924
[[package]]
925925
name = "mio"
926-
version = "1.1.0"
926+
version = "1.1.1"
927927
source = "registry+https://github.com/rust-lang/crates.io-index"
928-
checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873"
928+
checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc"
929929
dependencies = [
930930
"libc",
931+
"log",
931932
"wasi",
932933
"windows-sys 0.61.2",
933934
]
@@ -950,6 +951,7 @@ dependencies = [
950951
"libffi",
951952
"libloading",
952953
"measureme",
954+
"mio",
953955
"nix",
954956
"rand",
955957
"regex",

src/tools/miri/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ chrono-tz = "0.10"
2828
directories = "6"
2929
bitflags = "2.6"
3030
serde_json = { version = "1.0", optional = true }
31+
mio = { version = "1.1.1", features = ["os-poll", "net"] }
3132

3233
[target.'cfg(unix)'.dependencies]
3334
libc = "0.2"
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use std::io;
2+
use std::time::Duration;
3+
4+
use mio::event::Source;
5+
use mio::{Events, Interest, Poll, Token};
6+
use rustc_data_structures::fx::FxHashMap;
7+
8+
use crate::*;
9+
10+
/// Capacity of the event queue which can be polled at a time.
11+
/// Since we don't expect many simultaneous blocking I/O events
12+
/// this value can be set rather low.
13+
const IO_EVENT_CAPACITY: usize = 16;
14+
15+
/// Trait for values that contain a mio [`Source`].
16+
pub trait WithSource {
17+
/// Invoke `f` on the source inside `self`.
18+
fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
19+
}
20+
21+
/// Manager for managing blocking host I/O in a non-blocking manner.
22+
/// We use [`Poll`] to poll for new I/O events from the OS for sources
23+
/// registered using this manager.
24+
///
25+
/// Since blocking host I/O is inherently non-deterministic, no method on this
26+
/// manager should be called when isolation is enabled. The only exception is
27+
/// the [`BlockingIoManager::new`] function to create the manager. Everywhere else,
28+
/// we assert that isolation is disabled!
29+
pub struct BlockingIoManager {
30+
/// Poll instance to monitor I/O events from the OS.
31+
/// This is only [`None`] when Miri is run with isolation enabled.
32+
poll: Option<Poll>,
33+
/// Buffer used to store the ready I/O events when calling [`Poll::poll`].
34+
/// This is not part of the state and only stored to avoid allocating a
35+
/// new buffer for every poll.
36+
events: Events,
37+
/// Map between threads which are currently blocked and the
38+
/// underlying I/O source.
39+
sources: FxHashMap<ThreadId, Box<dyn WithSource>>,
40+
}
41+
42+
impl BlockingIoManager {
43+
/// Create a new blocking I/O manager instance based on the availability
44+
/// of communication with the host.
45+
pub fn new(communicate: bool) -> Result<Self, io::Error> {
46+
let manager = Self {
47+
poll: communicate.then_some(Poll::new()?),
48+
events: Events::with_capacity(IO_EVENT_CAPACITY),
49+
sources: FxHashMap::default(),
50+
};
51+
Ok(manager)
52+
}
53+
54+
/// Poll for new I/O events from the OS or wait until the timeout expired.
55+
///
56+
/// - If the timeout is [`Some`] and contains [`Duration::ZERO`], the poll doesn't block and just
57+
/// reads all events since the last poll.
58+
/// - If the timeout is [`Some`] and contains a non-zero duration, it blocks at most for the
59+
/// specified duration.
60+
/// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
61+
///
62+
/// Returns all threads that are ready because they received an I/O event.
63+
pub fn poll(&mut self, timeout: Option<Duration>) -> Result<Vec<ThreadId>, io::Error> {
64+
let poll =
65+
self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled");
66+
67+
// Poll for new I/O events from OS and store them in the events buffer.
68+
poll.poll(&mut self.events, timeout)?;
69+
70+
let ready = self
71+
.events
72+
.iter()
73+
.map(|event| {
74+
let token = event.token();
75+
ThreadId::new_unchecked(token.0.try_into().unwrap())
76+
})
77+
.collect::<Vec<_>>();
78+
79+
// Deregister all ready sources as we only want to receive one event per thread.
80+
ready.iter().for_each(|thread_id| self.deregister(*thread_id));
81+
82+
Ok(ready)
83+
}
84+
85+
/// Register a blocking I/O source for a thread together with it's poll interests.
86+
///
87+
/// The source will be deregistered automatically once an event for it is received.
88+
///
89+
/// As the OS can always produce spurious wake-ups, it's the callers responsibility to
90+
/// verify the requested I/O interests are really ready and to register again if they're not.
91+
pub fn register(&mut self, source: Box<dyn WithSource>, thread: ThreadId, interests: Interest) {
92+
let poll =
93+
self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
94+
95+
let token = Token(thread.to_u32().to_usize());
96+
97+
// Treat errors from registering as fatal. On UNIX hosts this can only
98+
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
99+
source
100+
.with_source(&mut |source| source.register(poll.registry(), token, interests))
101+
.unwrap();
102+
self.sources
103+
.try_insert(thread, source)
104+
.unwrap_or_else(|_| panic!("A thread cannot be registered twice at the same time"));
105+
}
106+
107+
/// Deregister the event source for a thread. Returns the kind of I/O the thread was
108+
/// blocked on.
109+
fn deregister(&mut self, thread: ThreadId) {
110+
let poll =
111+
self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
112+
113+
let Some(source) = self.sources.remove(&thread) else {
114+
panic!("Attempt to deregister a token which isn't registered")
115+
};
116+
117+
// Treat errors from deregistering as fatal. On UNIX hosts this can only
118+
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
119+
source.with_source(&mut |source| source.deregister(poll.registry())).unwrap();
120+
}
121+
}
122+
123+
impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
124+
pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
125+
/// Block the current thread until some interests on an I/O source
126+
/// are fulfilled or the optional timeout exceeded.
127+
/// The callback will be invoked when the thread gets unblocked.
128+
///
129+
/// There can be spurious wake-ups by the OS and thus it's the callers
130+
/// responsibility to verify that the requested I/O interests are
131+
/// really ready and to block again if they're not.
132+
#[inline]
133+
fn block_thread_for_io(
134+
&mut self,
135+
source: impl WithSource + 'static,
136+
interests: Interest,
137+
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
138+
callback: DynUnblockCallback<'tcx>,
139+
) {
140+
let this = self.eval_context_mut();
141+
this.machine.blocking_io.register(
142+
Box::new(source),
143+
this.machine.threads.active_thread(),
144+
interests,
145+
);
146+
this.block_thread(BlockReason::IO, timeout, callback);
147+
}
148+
}

src/tools/miri/src/concurrency/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod blocking_io;
12
pub mod cpu_affinity;
23
pub mod data_race;
34
mod data_race_handler;

src/tools/miri/src/concurrency/thread.rs

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
//! Implements threads.
22
3-
use std::mem;
43
use std::sync::atomic::Ordering::Relaxed;
54
use std::task::Poll;
65
use std::time::{Duration, SystemTime};
6+
use std::{io, mem};
77

88
use rand::seq::IteratorRandom;
99
use rustc_abi::ExternAbi;
@@ -25,10 +25,11 @@ use crate::*;
2525
enum SchedulingAction {
2626
/// Execute step on the active thread.
2727
ExecuteStep,
28-
/// Execute a timeout callback.
29-
ExecuteTimeoutCallback,
30-
/// Wait for a bit, until there is a timeout to be called.
31-
Sleep(Duration),
28+
/// Wait for a bit, but at most as long as the duration specified.
29+
/// We wake up early if an I/O event happened.
30+
/// If the duration is [`None`], we sleep indefinitely. This is
31+
/// only allowed when isolation is disabled and there are threads waiting for I/O!
32+
SleepAndWaitForIo(Option<Duration>),
3233
}
3334

3435
/// What to do with TLS allocations from terminated threads
@@ -111,6 +112,8 @@ pub enum BlockReason {
111112
Eventfd,
112113
/// Blocked on unnamed_socket.
113114
UnnamedSocket,
115+
/// Blocked on an IO operation.
116+
IO,
114117
/// Blocked for any reason related to GenMC, such as `assume` statements (GenMC mode only).
115118
/// Will be implicitly unblocked when GenMC schedules this thread again.
116119
Genmc,
@@ -765,26 +768,45 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
765768
}
766769

767770
// We are not in GenMC mode, so we control the scheduling.
768-
let thread_manager = &mut this.machine.threads;
769-
let clock = &this.machine.monotonic_clock;
770-
let rng = this.machine.rng.get_mut();
771+
let thread_manager = &this.machine.threads;
771772
// This thread and the program can keep going.
772773
if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
773774
&& !thread_manager.yield_active_thread
774775
{
775776
// The currently active thread is still enabled, just continue with it.
776777
return interp_ok(SchedulingAction::ExecuteStep);
777778
}
778-
// The active thread yielded or got terminated. Let's see if there are any timeouts to take
779-
// care of. We do this *before* running any other thread, to ensure that timeouts "in the
780-
// past" fire before any other thread can take an action. This ensures that for
779+
780+
// The active thread yielded or got terminated. Let's see if there are any I/O events
781+
// or timeouts to take care of.
782+
783+
if this.machine.communicate() {
784+
// When isolation is disabled we need to check for events for
785+
// threads which are blocked on host I/O.
786+
// We do this before running any other threads such that the threads
787+
// which received events are available for scheduling afterwards.
788+
789+
// Perform a non-blocking poll for newly available I/O events from the OS.
790+
this.poll_and_unblock(Some(Duration::ZERO))?;
791+
}
792+
793+
let thread_manager = &this.machine.threads;
794+
let clock = &this.machine.monotonic_clock;
795+
796+
// We also check timeouts before running any other thread, to ensure that timeouts
797+
// "in the past" fire before any other thread can take an action. This ensures that for
781798
// `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
782799
// abstime has already been passed at the time of the call".
783800
// <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
784801
let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
785802
if potential_sleep_time == Some(Duration::ZERO) {
786-
return interp_ok(SchedulingAction::ExecuteTimeoutCallback);
803+
// The timeout exceeded for some thread so we unblock the thread and execute its timeout callback.
804+
this.run_timeout_callback()?;
787805
}
806+
807+
let thread_manager = &mut this.machine.threads;
808+
let rng = this.machine.rng.get_mut();
809+
788810
// No callbacks immediately scheduled, pick a regular thread to execute.
789811
// The active thread blocked or yielded. So we go search for another enabled thread.
790812
// We build the list of threads by starting with the threads after the current one, followed by
@@ -832,7 +854,16 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
832854
// All threads are currently blocked, but we have unexecuted
833855
// timeout_callbacks, which may unblock some of the threads. Hence,
834856
// sleep until the first callback.
835-
interp_ok(SchedulingAction::Sleep(sleep_time))
857+
interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time)))
858+
} else if thread_manager
859+
.threads
860+
.iter()
861+
.any(|thread| thread.state.is_blocked_on(BlockReason::IO))
862+
{
863+
// At least one thread is blocked on host I/O but doesn't
864+
// have a timeout set. Hence, we sleep indefinitely in the
865+
// hope that eventually an I/O event for this thread happens.
866+
interp_ok(SchedulingAction::SleepAndWaitForIo(None))
836867
} else {
837868
throw_machine_stop!(TerminationInfo::GlobalDeadlock);
838869
}
@@ -1300,13 +1331,38 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
13001331
}
13011332
}
13021333
}
1303-
SchedulingAction::ExecuteTimeoutCallback => {
1304-
this.run_timeout_callback()?;
1305-
}
1306-
SchedulingAction::Sleep(duration) => {
1307-
this.machine.monotonic_clock.sleep(duration);
1334+
SchedulingAction::SleepAndWaitForIo(duration) => {
1335+
if this.machine.communicate() {
1336+
// When we're running with isolation disabled, instead of
1337+
// strictly sleeping the duration we allow waking up
1338+
// early for I/O events from the OS.
1339+
1340+
this.poll_and_unblock(duration)?;
1341+
} else {
1342+
let duration = duration.expect(
1343+
"Infinite sleep should not be triggered when isolation is enabled",
1344+
);
1345+
this.machine.monotonic_clock.sleep(duration);
1346+
}
13081347
}
13091348
}
13101349
}
13111350
}
1351+
1352+
/// Poll for I/O events until either an I/O event happened or the timeout expired.
1353+
/// The different timeout values are described in [`BlockingIoManager::poll`].
1354+
fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
1355+
let this = self.eval_context_mut();
1356+
1357+
let ready = match this.machine.blocking_io.poll(timeout) {
1358+
Ok(ready) => ready,
1359+
// We can ignore errors originating from interrupts; that's just a spurious wakeup.
1360+
Err(e) if e.kind() == io::ErrorKind::Interrupted => return interp_ok(()),
1361+
// For other errors we panic. On Linux and BSD hosts this should only be
1362+
// reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred.
1363+
Err(e) => panic!("{e}"),
1364+
};
1365+
1366+
ready.into_iter().try_for_each(|thread_id| this.unblock_thread(thread_id, BlockReason::IO))
1367+
}
13121368
}

src/tools/miri/src/diagnostics.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,6 @@ pub enum NonHaltingDiagnostic {
141141
effective_failure_ordering: AtomicReadOrd,
142142
},
143143
FileInProcOpened,
144-
SocketListenUnsupportedBacklog {
145-
details: bool,
146-
/// Unsupported backlog value provided the by the program.
147-
provided: i32,
148-
/// Supported backlog value by Miri.
149-
supported: i32,
150-
},
151144
}
152145

153146
/// Level of Miri specific diagnostics
@@ -650,8 +643,6 @@ impl<'tcx> MiriMachine<'tcx> {
650643
| WeakMemoryOutdatedLoad { .. } =>
651644
("tracking was triggered here".to_string(), DiagLevel::Note),
652645
FileInProcOpened => ("open a file in `/proc`".to_string(), DiagLevel::Warning),
653-
SocketListenUnsupportedBacklog { .. } =>
654-
("call to `listen` with unsupported backlog value".to_string(), DiagLevel::Warning),
655646
};
656647

657648
let title = match &e {
@@ -700,17 +691,12 @@ impl<'tcx> MiriMachine<'tcx> {
700691
format!("GenMC currently does not model the failure ordering for `compare_exchange`. {was_upgraded_msg}. Miri with GenMC might miss bugs related to this memory access.")
701692
}
702693
FileInProcOpened => format!("files in `/proc` can bypass the Abstract Machine and might not work properly in Miri"),
703-
SocketListenUnsupportedBacklog { provided, supported, .. } => format!("called `listen` on socket with backlog value of {provided} but only {supported} is supported"),
704694
};
705695

706696
let notes = match &e {
707697
ProgressReport { block_count } => {
708698
vec![note!("so far, {block_count} basic blocks have been executed")]
709699
}
710-
SocketListenUnsupportedBacklog { details: true, supported, .. } =>
711-
vec![note!(
712-
"the given value will be ignored and a backlog of {supported} will be used instead"
713-
)],
714700
_ => vec![],
715701
};
716702

0 commit comments

Comments
 (0)