Skip to content

Add with_backlog functionality to TcpListener and UnixListener #94407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
42 changes: 40 additions & 2 deletions library/std/src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,48 @@ impl fmt::Debug for TcpStream {
}

impl TcpListener {
/// Default listen backlog.
const DEFAULT_BACKLOG: usize = 128;

/// Creates a new `TcpListener` which will be bound to the specified
/// address.
///
/// The returned listener is ready for accepting connections.
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted by
/// [`TcpListener::accept`]. The backlog argument overrides the default
/// value of 128; that default is reasonable for most use cases.
///
/// This function is otherwise [`TcpListener::bind`]: see that
/// documentation for full details of operation.
///
/// # Examples
///
/// Creates a TCP listener bound to `127.0.0.1:80` with a backlog of 1000:
///
/// ```no_run
/// #![feature(bind_with_backlog)]
/// use std::net::TcpListener;
///
/// let listener = TcpListener::bind_with_backlog("127.0.0.1:80", 1000).unwrap();
/// ```
///
/// # Errors
///
/// The specified backlog may be larger than supported by the underlying
/// system. In this case an [`io::Error`] with
/// [`io::ErrorKind::InvalidData`] will be returned.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_with_backlog<A: ToSocketAddrs>(addr: A, backlog: usize) -> io::Result<TcpListener> {
super::each_addr(addr, move |a| net_imp::TcpListener::bind_with_backlog(a, backlog))
.map(TcpListener)
}

/// Creates a new `TcpListener` which will be bound to the specified
/// address. The returned listener is ready for accepting
/// connections.
///
/// The listener will have a backlog of 128. See the documentation for
/// [`TcpListener::bind_with_backlog`] for further information.
///
/// Binding with a port number of 0 will request that the OS assigns a port
/// to this listener. The port allocated can be queried via the
Expand Down Expand Up @@ -748,7 +786,7 @@ impl TcpListener {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
super::each_addr(addr, net_imp::TcpListener::bind).map(TcpListener)
Self::bind_with_backlog(addr, TcpListener::DEFAULT_BACKLOG)
}

/// Returns the local socket address of this listener.
Expand Down
32 changes: 27 additions & 5 deletions library/std/src/net/tcp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn connect_error() {
#[test]
fn listen_localhost() {
let socket_addr = next_test_ip4();
let listener = t!(TcpListener::bind(&socket_addr));
let listener = t!(TcpListener::bind_with_backlog(&socket_addr, 1));

let _t = thread::spawn(move || {
let mut stream = t!(TcpStream::connect(&("localhost", socket_addr.port())));
Expand All @@ -64,7 +64,7 @@ fn listen_localhost() {
#[test]
fn connect_loopback() {
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, 1));

let _t = thread::spawn(move || {
let host = match addr {
Expand All @@ -85,7 +85,7 @@ fn connect_loopback() {
#[test]
fn smoke_test() {
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, 1));

let (tx, rx) = channel();
let _t = thread::spawn(move || {
Expand Down Expand Up @@ -172,11 +172,33 @@ fn multiple_connect_serial() {
})
}

#[test]
fn multiple_connect_serial_with_backlog() {
each_ip(&mut |addr| {
let max = 10;
let acceptor = t!(TcpListener::bind_with_backlog(&addr, max));

let _t = thread::spawn(move || {
for _ in 0..max {
let mut stream = t!(TcpStream::connect(&addr));
t!(stream.write(&[99]));
}
});

for stream in acceptor.incoming().take(max) {
let mut stream = t!(stream);
let mut buf = [0];
t!(stream.read(&mut buf));
assert_eq!(buf[0], 99);
}
})
}

#[test]
fn multiple_connect_interleaved_greedy_schedule() {
const MAX: usize = 10;
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, MAX));

let _t = thread::spawn(move || {
let acceptor = acceptor;
Expand Down Expand Up @@ -213,7 +235,7 @@ fn multiple_connect_interleaved_greedy_schedule() {
fn multiple_connect_interleaved_lazy_schedule() {
const MAX: usize = 10;
each_ip(&mut |addr| {
let acceptor = t!(TcpListener::bind(&addr));
let acceptor = t!(TcpListener::bind_with_backlog(&addr, MAX));

let _t = thread::spawn(move || {
for stream in acceptor.incoming().take(MAX) {
Expand Down
98 changes: 96 additions & 2 deletions library/std/src/os/unix/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::sys::cvt;
use crate::sys::net::Socket;
use crate::sys_common::{AsInner, FromInner, IntoInner};
use crate::{fmt, io, mem};
use core::convert::TryInto;

/// A structure representing a Unix domain socket server.
///
Expand Down Expand Up @@ -53,8 +54,14 @@ impl fmt::Debug for UnixListener {
}

impl UnixListener {
/// Default backlog for `bind` and `bind_addr`.
const DEFAULT_BACKLOG: usize = 128;

/// Creates a new `UnixListener` bound to the specified socket.
///
/// The listener will have a backlog of 128. See the documentation for
/// [`UnixListener::bind_with_backlog`] for further information.
///
/// # Examples
///
/// ```no_run
Expand All @@ -70,19 +77,61 @@ impl UnixListener {
/// ```
#[stable(feature = "unix_socket", since = "1.10.0")]
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
UnixListener::bind_with_backlog(path, UnixListener::DEFAULT_BACKLOG)
}

/// Creates a new `UnixListener` bound to the specified socket.
///
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted
/// by [`UnixListener::accept`]. The backlog argument overrides the
/// default backlog of 128; that default is reasonable for most use
/// cases.
///
/// This function is otherwise [`UnixListener::bind`]: see that
/// documentation for full details of operation.
///
/// # Examples
///
/// ```no_run
/// #![feature(bind_with_backlog)]
/// use std::os::unix::net::UnixListener;
///
/// let listener = match UnixListener::bind_with_backlog("/path/to/the/socket", 1000) {
/// Ok(sock) => sock,
/// Err(e) => {
/// println!("Couldn't connect: {:?}", e);
/// return
/// }
/// };
/// ```
///
/// # Errors
///
/// The specified backlog may be larger than supported by the underlying
/// system. In this case an [`io::Error`] with
/// [`io::ErrorKind::InvalidData`] will be returned.
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_with_backlog<P: AsRef<Path>>(path: P, backlog: usize) -> io::Result<UnixListener> {
unsafe {
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;
let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?;
let (addr, len) = sockaddr_un(path.as_ref())?;

cvt(libc::bind(inner.as_inner().as_raw_fd(), &addr as *const _ as *const _, len as _))?;
cvt(libc::listen(inner.as_inner().as_raw_fd(), 128))?;
cvt(libc::listen(inner.as_inner().as_raw_fd(), backlog))?;

Ok(UnixListener(inner))
}
}

/// Creates a new `UnixListener` bound to the specified [`socket address`].
///
/// The listener will have a backlog of 128. See the documentation for
/// [`UnixListener::bind_addr_with_backlog`] for further information.
///
/// [`socket address`]: crate::os::unix::net::SocketAddr
///
/// # Examples
Expand All @@ -107,14 +156,59 @@ impl UnixListener {
/// ```
#[unstable(feature = "unix_socket_abstract", issue = "85410")]
pub fn bind_addr(socket_addr: &SocketAddr) -> io::Result<UnixListener> {
UnixListener::bind_addr_with_backlog(socket_addr, UnixListener::DEFAULT_BACKLOG)
}

/// Creates a new `UnixListener` bound to the specified [`socket address`].
///
/// The given backlog specifies the maximum number of outstanding
/// connections that will be buffered in the OS waiting to be accepted
/// by [`UnixListener::accept`]. The backlog argument overrides the
/// default of 128; that default is reasonable for most use cases.
///
/// This function is otherwise [`UnixListener::bind_addr`]: see that
/// documentation for full details of operation.
///
/// [`socket address`]: crate::os::unix::net::SocketAddr
///
/// # Examples
///
/// ```no_run
/// #![feature(unix_socket_abstract)]
/// #![feature(bind_with_backlog)]
/// use std::os::unix::net::{UnixListener};
///
/// fn main() -> std::io::Result<()> {
/// let listener1 = UnixListener::bind("path/to/socket")?;
/// let addr = listener1.local_addr()?;
///
/// let listener2 = match UnixListener::bind_addr_with_backlog(&addr, 1000) {
/// Ok(sock) => sock,
/// Err(err) => {
/// println!("Couldn't bind: {:?}", err);
/// return Err(err);
/// }
/// };
/// Ok(())
/// }
/// ```
//#[unstable(feature = "unix_socket_abstract", issue = "85410")]
#[unstable(feature = "bind_with_backlog", issue = "94406")]
pub fn bind_addr_with_backlog(
socket_addr: &SocketAddr,
backlog: usize,
) -> io::Result<UnixListener> {
unsafe {
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;
let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?;
cvt(libc::bind(
inner.as_raw_fd(),
&socket_addr.addr as *const _ as *const _,
socket_addr.len as _,
))?;
cvt(libc::listen(inner.as_raw_fd(), 128))?;
cvt(libc::listen(inner.as_raw_fd(), backlog))?;
Ok(UnixListener(inner))
}
}
Expand Down
10 changes: 5 additions & 5 deletions library/std/src/os/unix/net/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn basic() {
let msg1 = b"hello";
let msg2 = b"world!";

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 1));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
let mut buf = [0; 5];
Expand Down Expand Up @@ -111,7 +111,7 @@ fn try_clone() {
let msg1 = b"hello";
let msg2 = b"world";

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 1));
let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
or_panic!(stream.write_all(msg1));
Expand All @@ -135,7 +135,7 @@ fn iter() {
let dir = tmpdir();
let socket_path = dir.path().join("sock");

let listener = or_panic!(UnixListener::bind(&socket_path));
let listener = or_panic!(UnixListener::bind_with_backlog(&socket_path, 2));
let thread = thread::spawn(move || {
for stream in listener.incoming().take(2) {
let mut stream = or_panic!(stream);
Expand Down Expand Up @@ -423,7 +423,7 @@ fn test_abstract_stream_connect() {
let msg2 = b"world";

let socket_addr = or_panic!(SocketAddr::from_abstract_namespace(b"namespace"));
let listener = or_panic!(UnixListener::bind_addr(&socket_addr));
let listener = or_panic!(UnixListener::bind_addr_with_backlog(&socket_addr, 1));

let thread = thread::spawn(move || {
let mut stream = or_panic!(listener.accept()).0;
Expand Down Expand Up @@ -451,7 +451,7 @@ fn test_abstract_stream_connect() {
#[test]
fn test_abstract_stream_iter() {
let addr = or_panic!(SocketAddr::from_abstract_namespace(b"hidden"));
let listener = or_panic!(UnixListener::bind_addr(&addr));
let listener = or_panic!(UnixListener::bind_addr_with_backlog(&addr, 2));

let thread = thread::spawn(move || {
for stream in listener.incoming().take(2) {
Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/unix/l4re.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ pub mod net {
unimpl!();
}

pub fn bind_with_backlog(_: io::Result<&SocketAddr>, _: usize) -> io::Result<TcpListener> {
unimpl!();
}

pub fn socket(&self) -> &Socket {
&self.inner
}
Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/unsupported/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl TcpListener {
unsupported()
}

pub fn bind_with_backlog(_: io::Result<&SocketAddr>, _: usize) -> io::Result<TcpListener> {
unsupported()
}

pub fn socket_addr(&self) -> io::Result<SocketAddr> {
self.0
}
Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/wasi/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ impl TcpListener {
unsupported()
}

pub fn bind_with_backlog(_: io::Result<&SocketAddr>, _: usize) -> io::Result<TcpListener> {
unsupported()
}

pub fn socket_addr(&self) -> io::Result<SocketAddr> {
unsupported()
}
Expand Down
12 changes: 10 additions & 2 deletions library/std/src/sys_common/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,17 @@ pub struct TcpListener {
}

impl TcpListener {
pub fn bind(addr: io::Result<&SocketAddr>) -> io::Result<TcpListener> {
pub fn bind_with_backlog(
addr: io::Result<&SocketAddr>,
backlog: usize,
) -> io::Result<TcpListener> {
let addr = addr?;

// Type-convert the backlog
let backlog = backlog
.try_into()
.map_err(|e| crate::io::Error::new(crate::io::ErrorKind::InvalidData, e))?;

init();

let sock = Socket::new(addr, c::SOCK_STREAM)?;
Expand All @@ -385,7 +393,7 @@ impl TcpListener {
cvt(unsafe { c::bind(sock.as_raw(), addrp, len as _) })?;

// Start listening
cvt(unsafe { c::listen(sock.as_raw(), 128) })?;
cvt(unsafe { c::listen(sock.as_raw(), backlog) })?;
Ok(TcpListener { inner: sock })
}

Expand Down