Skip to content

Commit e38b941

Browse files
authored
Merge pull request rust-lang#4915 from WhySoBad/unblock-all-exceeded-threads
Unblock all threads with exceeded timeouts at once
2 parents b584d4d + 69fcf26 commit e38b941

1 file changed

Lines changed: 66 additions & 79 deletions

File tree

src/tools/miri/src/concurrency/thread.rs

Lines changed: 66 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -656,64 +656,10 @@ impl<'tcx> ThreadManager<'tcx> {
656656
// We should only switch stacks between steps.
657657
self.yield_active_thread = true;
658658
}
659-
660-
/// Get the wait time for the next timeout, or `None` if no timeout is pending.
661-
fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
662-
self.threads
663-
.iter()
664-
.filter_map(|t| {
665-
match &t.state {
666-
ThreadState::Blocked { timeout: Some(timeout), .. } =>
667-
Some(timeout.get_wait_time(clock)),
668-
_ => None,
669-
}
670-
})
671-
.min()
672-
}
673659
}
674660

675661
impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
676662
trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
677-
/// Execute a timeout callback on the callback's thread.
678-
#[inline]
679-
fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
680-
let this = self.eval_context_mut();
681-
let mut found_callback = None;
682-
// Find a blocked thread that has timed out.
683-
for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
684-
match &thread.state {
685-
ThreadState::Blocked { timeout: Some(timeout), .. }
686-
if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
687-
{
688-
let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
689-
let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
690-
found_callback = Some((id, callback));
691-
// Run the fallback (after the loop because borrow-checking).
692-
break;
693-
}
694-
_ => {}
695-
}
696-
}
697-
if let Some((thread, callback)) = found_callback {
698-
// This back-and-forth with `set_active_thread` is here because of two
699-
// design decisions:
700-
// 1. Make the caller and not the callback responsible for changing
701-
// thread.
702-
// 2. Make the scheduler the only place that can change the active
703-
// thread.
704-
let old_thread = this.machine.threads.set_active_thread_id(thread);
705-
callback.call(this, UnblockKind::TimedOut)?;
706-
this.machine.threads.set_active_thread_id(old_thread);
707-
}
708-
// found_callback can remain None if the computer's clock
709-
// was shifted after calling the scheduler and before the call
710-
// to get_ready_callback (see issue
711-
// https://github.com/rust-lang/miri/issues/1763). In this case,
712-
// just do nothing, which effectively just returns to the
713-
// scheduler.
714-
interp_ok(())
715-
}
716-
717663
#[inline]
718664
fn run_on_stack_empty(&mut self) -> InterpResult<'tcx, Poll<()>> {
719665
let this = self.eval_context_mut();
@@ -790,19 +736,12 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
790736
this.poll_and_unblock(Some(Duration::ZERO))?;
791737
}
792738

793-
let thread_manager = &this.machine.threads;
794-
let clock = &this.machine.monotonic_clock;
795-
796739
// We also check timeouts before running any other thread, to ensure that timeouts
797740
// "in the past" fire before any other thread can take an action. This ensures that for
798741
// `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
799742
// abstime has already been passed at the time of the call".
800743
// <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
801-
let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
802-
if potential_sleep_time == Some(Duration::ZERO) {
803-
// The timeout exceeded for some thread so we unblock the thread and execute its timeout callback.
804-
this.run_timeout_callback()?;
805-
}
744+
let potential_sleep_time = this.unblock_expired_timeouts()?;
806745

807746
let thread_manager = &mut this.machine.threads;
808747
let rng = this.machine.rng.get_mut();
@@ -868,6 +807,71 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
868807
throw_machine_stop!(TerminationInfo::GlobalDeadlock);
869808
}
870809
}
810+
811+
/// Poll for I/O events until either an I/O event happened or the timeout expired.
812+
/// The different timeout values are described in [`BlockingIoManager::poll`].
813+
fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
814+
let this = self.eval_context_mut();
815+
816+
let ready = match this.machine.blocking_io.poll(timeout) {
817+
Ok(ready) => ready,
818+
// We can ignore errors originating from interrupts; that's just a spurious wakeup.
819+
Err(e) if e.kind() == io::ErrorKind::Interrupted => return interp_ok(()),
820+
// For other errors we panic. On Linux and BSD hosts this should only be
821+
// reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred.
822+
Err(e) => panic!("unexpected error while polling: {e}"),
823+
};
824+
825+
ready.into_iter().try_for_each(|thread_id| this.unblock_thread(thread_id, BlockReason::IO))
826+
}
827+
828+
/// Find all threads with expired timeouts, unblock them and execute their timeout callbacks.
829+
///
830+
/// This method returns the minimum duration until the next thread timeout expires.
831+
/// If all ready threads have no timeout set, [`None`] is returned.
832+
fn unblock_expired_timeouts(&mut self) -> InterpResult<'tcx, Option<Duration>> {
833+
let this = self.eval_context_mut();
834+
let clock = &this.machine.monotonic_clock;
835+
836+
let mut min_wait_time = Option::<Duration>::None;
837+
let mut callbacks = Vec::new();
838+
839+
for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
840+
match &thread.state {
841+
ThreadState::Blocked { timeout: Some(timeout), .. } => {
842+
let wait_time = timeout.get_wait_time(clock);
843+
if wait_time.is_zero() {
844+
// The timeout expired for this thread.
845+
let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
846+
let ThreadState::Blocked { callback, .. } = old_state else {
847+
unreachable!()
848+
};
849+
// Add callback to list to be run after this loop because of borrow-checking.
850+
callbacks.push((id, callback));
851+
} else {
852+
// Update `min_wait_time` to contain the smallest duration until
853+
// the next timeout expires.
854+
min_wait_time = Some(wait_time.min(min_wait_time.unwrap_or(Duration::MAX)));
855+
}
856+
}
857+
_ => {}
858+
}
859+
}
860+
861+
for (thread, callback) in callbacks {
862+
// This back-and-forth with `set_active_thread` is here because of two
863+
// design decisions:
864+
// 1. Make the caller and not the callback responsible for changing
865+
// thread.
866+
// 2. Make the scheduler the only place that can change the active
867+
// thread.
868+
let old_thread = this.machine.threads.set_active_thread_id(thread);
869+
callback.call(this, UnblockKind::TimedOut)?;
870+
this.machine.threads.set_active_thread_id(old_thread);
871+
}
872+
873+
interp_ok(min_wait_time)
874+
}
871875
}
872876

873877
// Public interface to thread management.
@@ -1348,21 +1352,4 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
13481352
}
13491353
}
13501354
}
1351-
1352-
/// Poll for I/O events until either an I/O event happened or the timeout expired.
1353-
/// The different timeout values are described in [`BlockingIoManager::poll`].
1354-
fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
1355-
let this = self.eval_context_mut();
1356-
1357-
let ready = match this.machine.blocking_io.poll(timeout) {
1358-
Ok(ready) => ready,
1359-
// We can ignore errors originating from interrupts; that's just a spurious wakeup.
1360-
Err(e) if e.kind() == io::ErrorKind::Interrupted => return interp_ok(()),
1361-
// For other errors we panic. On Linux and BSD hosts this should only be
1362-
// reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred.
1363-
Err(e) => panic!("{e}"),
1364-
};
1365-
1366-
ready.into_iter().try_for_each(|thread_id| this.unblock_thread(thread_id, BlockReason::IO))
1367-
}
13681355
}

0 commit comments

Comments
 (0)