Skip to content
7 changes: 3 additions & 4 deletions tokio/src/runtime/io/driver/signal.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::signal::unix::pipe;

use super::{Driver, Handle, TOKEN_SIGNAL};

use std::io;

impl Handle {
pub(crate) fn register_signal_receiver(
&self,
receiver: &mut mio::net::UnixStream,
) -> io::Result<()> {
pub(crate) fn register_signal_receiver(&self, receiver: &mut pipe::Receiver) -> io::Result<()> {
self.registry
.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
Expand Down
32 changes: 6 additions & 26 deletions tokio/src/runtime/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use crate::runtime::{driver, io};
use crate::signal::registry::globals;
use crate::signal::unix::pipe;

use mio::net::UnixStream;
use std::io::{self as std_io, Read};
use std::io::{self as std_io};
Comment thread
ipetkov marked this conversation as resolved.
Outdated
use std::sync::{Arc, Weak};
use std::time::Duration;

Expand All @@ -21,7 +21,7 @@ pub(crate) struct Driver {
io: io::Driver,

/// A pipe for receiving wake events from the signal handler
receiver: UnixStream,
receiver: pipe::Receiver,

/// Shared state. The driver keeps a strong ref and the handle keeps a weak
/// ref. The weak ref is used to check if the driver is still active before
Expand All @@ -41,9 +41,6 @@ pub(crate) struct Handle {
impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

// NB: We give each driver a "fresh" receiver file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
Expand All @@ -63,14 +60,7 @@ impl Driver {
// safe as each dup is registered with separate reactors **and** we
// only expect at least one dup to receive the notification.

// Manually drop as we don't actually own this instance of UnixStream.
let receiver_fd = globals().receiver.as_raw_fd();

// safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let mut receiver = UnixStream::from_std(original.try_clone()?);

let mut receiver = globals().receiver()?;
io_handle.register_signal_receiver(&mut receiver)?;

Ok(Self {
Expand Down Expand Up @@ -109,18 +99,8 @@ impl Driver {
return;
}

// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
#[allow(clippy::unused_io_amount)]
loop {
match self.receiver.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {e}"),
}
}
// consume value
let _ = self.receiver.read();

// Broadcast any signals which were received
globals().broadcast();
Expand Down
107 changes: 107 additions & 0 deletions tokio/src/signal/pipe/eventfd.rs
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparing with what mio does, this seems to not have a bunch of logic to reset the eventfd and so on... Is it missing here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A read on fd returned by eventfd resets eventfd. Here's libc::eventfd_read in Receiver::read() function.

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::{
io,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
};

use mio::{event, unix::SourceFd};

#[derive(Debug)]
pub(crate) struct Sender {
fd: OwnedFd,
}

#[derive(Debug)]
pub(crate) struct Receiver {
fd: OwnedFd,
}

impl event::Source for Receiver {
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).register(registry, token, interests)
}

fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).reregister(registry, token, interests)
}

fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).deregister(registry)
}
}

impl Sender {
pub(crate) fn new() -> std::io::Result<Self> {
// SAFETY: it's ok to call libc API
let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
if fd == -1 {
return Err(io::Error::last_os_error());
}
Ok(Sender {
// SAFETY: fd just opened by the above libc::eventfd
fd: unsafe { OwnedFd::from_raw_fd(fd) },
})
}

pub(crate) fn receiver(&self) -> std::io::Result<Receiver> {
Ok(Receiver {
fd: self.fd.try_clone()?,
})
}
}

impl Sender {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be merged with the impl above.

pub(crate) fn write(&self) -> std::io::Result<usize> {
// SAFETY: it's ok to call libc API
let r = unsafe { libc::eventfd_write(self.fd.as_raw_fd(), 1) };
if r == 0 {
Ok(0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small different between the eventfd and unixstream impls here.
The eventfd's Sender::write() returns 0 on success. The UnixStream returns the number of written bytes (1).
AFAIS the successful result is not used.
Maybe change eventfd to return 1 too or change both to return Result<()> ?!

} else {
Err(std::io::Error::last_os_error())
}
}
}

impl Receiver {
pub(crate) fn read(&mut self) -> std::io::Result<libc::c_int> {
let fd = self.fd.as_raw_fd();
let mut value: libc::eventfd_t = 0;

// SAFETY: it's ok to call libc API
let r = unsafe { libc::eventfd_read(fd, &mut value as *mut libc::eventfd_t) };
if r == 0 {
Ok(0)
} else {
Err(std::io::Error::last_os_error())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is non-blocking it may return EAGAIN (ErrorKind::WouldBlock) which is not an error.

    let err = std::io::Error::last_os_error();
    // On a non-blocking eventfd, it is expected to get an EAGAIN
    // when the counter is 0.
    if err.kind() == std::io::ErrorKind::WouldBlock {
        Ok(0)
    } else {
        Err(err)
    }

}
}
}

pub(crate) struct OsExtraData {
sender: Sender,
}

impl OsExtraData {
pub(crate) fn new() -> std::io::Result<Self> {
Sender::new().map(|sender| Self { sender })
}
}

impl OsExtraData {
pub(crate) fn receiver(&self) -> std::io::Result<Receiver> {
self.sender.receiver()
}

pub(crate) fn sender(&self) -> &Sender {
&self.sender
}
}
95 changes: 95 additions & 0 deletions tokio/src/signal/pipe/unixstream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use mio::net::UnixStream;
use std::io::{self, Read, Write};
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

pub(crate) struct Sender {
inner: UnixStream,
}

#[derive(Debug)]
pub(crate) struct Receiver {
inner: UnixStream,
}

impl mio::event::Source for Receiver {
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
self.inner.register(registry, token, interests)
}

fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
self.inner.reregister(registry, token, interests)
}

fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
self.inner.deregister(registry)
}
}

impl Sender {
pub(crate) fn write(&self) -> std::io::Result<usize> {
(&self.inner).write(&[1])
}
}

impl Receiver {
pub(crate) fn read(&mut self) -> std::io::Result<libc::c_int> {
// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
#[allow(clippy::unused_io_amount)]
loop {
match self.inner.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) => {
return Err(e);
}
}
}
Ok(0)
}
}

pub(crate) fn channel() -> std::io::Result<(Sender, Receiver)> {
let (sender, receiver) = UnixStream::pair()?;
Ok((Sender { inner: sender }, Receiver { inner: receiver }))
}

pub(crate) struct OsExtraData {
sender: Sender,
receiver: Receiver,
}

impl OsExtraData {
pub(crate) fn new() -> std::io::Result<Self> {
let (sender, receiver) = channel()?;
Ok(Self { sender, receiver })
}
}

impl OsExtraData {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This impl could be merged with the one above

pub(crate) fn receiver(&self) -> std::io::Result<Receiver> {
let receiver_fd = self.receiver.inner.as_raw_fd();
// SAFETY: fd owned by receiver is opened
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let inner = UnixStream::from_std(original.try_clone()?);
Ok(Receiver { inner })
}

pub(crate) fn sender(&self) -> &Sender {
&self.sender
}
}
6 changes: 3 additions & 3 deletions tokio/src/signal/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ impl Globals {

fn globals_init() -> Globals
where
OsExtraData: 'static + Send + Sync + Default,
OsExtraData: 'static + Send + Sync,
OsStorage: 'static + Send + Sync + Default,
{
Globals {
extra: OsExtraData::default(),
extra: OsExtraData::new().expect("failed to initialize OsExtraData"),
Comment thread
ipetkov marked this conversation as resolved.
Outdated
registry: Registry::new(OsStorage::default()),
}
}

pub(crate) fn globals() -> &'static Globals
where
OsExtraData: 'static + Send + Sync + Default,
OsExtraData: 'static + Send + Sync,
OsStorage: 'static + Send + Sync + Default,
{
static GLOBALS: OnceLock<Globals> = OnceLock::new();
Expand Down
45 changes: 27 additions & 18 deletions tokio/src/signal/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,36 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Storage};
use crate::signal::RxFuture;
use crate::sync::watch;

use mio::net::UnixStream;
use std::io::{self, Error, ErrorKind, Write};
use std::io::{self, Error, ErrorKind};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};

#[cfg_attr(
any(
target_os = "android",
target_os = "espidf",
target_os = "fuchsia",
target_os = "hermit",
target_os = "illumos",
target_os = "linux",
),
path = "pipe/eventfd.rs"
)]
#[cfg_attr(
not(any(
target_os = "android",
target_os = "espidf",
target_os = "fuchsia",
target_os = "hermit",
target_os = "illumos",
target_os = "linux",
)),
path = "pipe/unixstream.rs"
)]
pub(crate) mod pipe;
pub(crate) use pipe::OsExtraData;

#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
pub(crate) struct OsStorage([SignalInfo; 33]);

Expand Down Expand Up @@ -61,20 +85,6 @@ impl Storage for OsStorage {
}
}

#[derive(Debug)]
pub(crate) struct OsExtraData {
sender: UnixStream,
pub(crate) receiver: UnixStream,
}

impl Default for OsExtraData {
fn default() -> Self {
let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream");

Self { sender, receiver }
}
}

/// Represents the specific kind of signal to listen for.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct SignalKind(libc::c_int);
Expand Down Expand Up @@ -268,8 +278,7 @@ fn action(globals: &'static Globals, signal: libc::c_int) {

// Send a wakeup, ignore any errors (anything reasonably possible is
// full pipe and then it will wake up anyway).
let mut sender = &globals.sender;
drop(sender.write(&[1]));
let _ = globals.sender().write();
}

/// Enables this module to receive signal notifications for the `signal`
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/signal/windows/sys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ impl Storage for OsStorage {
#[derive(Debug, Default)]
pub(crate) struct OsExtraData {}

impl OsExtraData {
pub(crate) fn new() -> std::io::Result<Self> {
Ok(OsExtraData {})
}
}

fn global_init() -> io::Result<()> {
static INIT: Once = Once::new();

Expand Down