diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fccb9a489e..9549979e729 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - [`libp2p-pnet` CHANGELOG](transports/pnet/CHANGELOG.md) - [`libp2p-quic` CHANGELOG](transports/quic/CHANGELOG.md) - [`libp2p-tcp` CHANGELOG](transports/tcp/CHANGELOG.md) +- [`libp2p-unix-stream` CHANGELOG](transports/unix-stream/CHANGELOG.md) - [`libp2p-tls` CHANGELOG](transports/tls/CHANGELOG.md) - [`libp2p-uds` CHANGELOG](transports/uds/CHANGELOG.md) - [`libp2p-wasm-ext` CHANGELOG](transports/wasm-ext/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index 22c82b5f793..5f72812a163 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1199,7 +1199,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.100", ] [[package]] @@ -2686,6 +2686,7 @@ dependencies = [ "libp2p-tcp", "libp2p-tls", "libp2p-uds", + "libp2p-unix-stream", "libp2p-upnp", "libp2p-webrtc-websys", "libp2p-websocket", @@ -3413,6 +3414,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "libp2p-unix-stream" +version = "0.43.0" +dependencies = [ + "async-io", + "async-std", + "futures", + "futures-timer", + "libp2p-core", + "percent-encoding-rfc3986", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "libp2p-upnp" version = "0.4.1" @@ -4343,6 +4359,12 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "percent-encoding-rfc3986" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3637c05577168127568a64e9dc5a6887da720efef07b3d9472d45f63ab191166" + [[package]] name = "pin-project" version = "1.1.10" @@ -6921,7 +6943,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5f6059fcbb8..1194ddab5da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ members = [ "transports/pnet", "transports/quic", "transports/tcp", + "transports/unix-stream", "transports/tls", "transports/uds", "transports/webrtc-websys", @@ -106,6 +107,7 @@ libp2p-swarm = { version = "0.47.0", path = "swarm" } libp2p-swarm-derive = { version = "=0.35.1", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. libp2p-swarm-test = { version = "0.6.0", path = "swarm-test" } libp2p-tcp = { version = "0.43.0", path = "transports/tcp" } +libp2p-unix-stream = { version = "0.43.0", path = "transports/unix-stream" } libp2p-tls = { version = "0.6.2", path = "transports/tls" } libp2p-uds = { version = "0.42.0", path = "transports/uds" } libp2p-upnp = { version = "0.4.1", path = "protocols/upnp" } diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 0ff11d88787..27a55421987 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -41,6 +41,7 @@ full = [ "secp256k1", "serde", "tcp", + "unix-stream", "tls", "tokio", "uds", @@ -53,7 +54,7 @@ full = [ "upnp", ] -async-std = ["libp2p-swarm/async-std", "libp2p-tcp?/async-io", "libp2p-dns?/async-std"] +async-std = ["libp2p-swarm/async-std", "libp2p-tcp?/async-io", "libp2p-unix-stream?/async-io", "libp2p-dns?/async-std"] autonat = ["dep:libp2p-autonat"] cbor = ["libp2p-request-response?/cbor"] dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"] @@ -82,8 +83,9 @@ rsa = ["libp2p-identity/rsa"] secp256k1 = ["libp2p-identity/secp256k1"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] tcp = ["dep:libp2p-tcp"] +unix-stream = ["dep:libp2p-unix-stream"] tls = ["dep:libp2p-tls"] -tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio", "libp2p-upnp?/tokio"] +tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-unix-stream?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio", "libp2p-upnp?/tokio"] uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "getrandom/js", "libp2p-swarm/wasm-bindgen", "libp2p-gossipsub?/wasm-bindgen"] webrtc-websys = ['dep:libp2p-webrtc-websys'] @@ -137,6 +139,7 @@ libp2p-mdns = { workspace = true, optional = true } libp2p-memory-connection-limits = { workspace = true, optional = true } libp2p-quic = { workspace = true, optional = true } libp2p-tcp = { workspace = true, optional = true } +libp2p-unix-stream = { workspace = true, optional = true } libp2p-tls = { workspace = true, optional = true } libp2p-uds = { workspace = true, optional = true } libp2p-upnp = { workspace = true, optional = true } diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 1b02b7523b3..87bb4e9f58f 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -112,6 +112,11 @@ pub use libp2p_swarm as swarm; #[cfg_attr(docsrs, doc(cfg(feature = "tcp")))] #[doc(inline)] pub use libp2p_tcp as tcp; +#[cfg(feature = "unix-stream")] +#[cfg(unix)] +#[cfg_attr(docsrs, doc(cfg(feature = "unix-stream")))] +#[doc(inline)] +pub use libp2p_unix_stream as unix_stream; #[cfg(feature = "tls")] #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] #[cfg(not(target_arch = "wasm32"))] diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 7eb25c81e92..4529de3c9e4 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -402,8 +402,7 @@ where (Err(err), _) => return Err(err), }; - let stream = T::new_stream(socket.into()).await?; - Ok(stream) + T::new_stream(socket.into()).await } .boxed()) } diff --git a/transports/unix-stream/CHANGELOG.md b/transports/unix-stream/CHANGELOG.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/transports/unix-stream/Cargo.toml b/transports/unix-stream/Cargo.toml new file mode 100644 index 00000000000..b38f75bca6c --- /dev/null +++ b/transports/unix-stream/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "libp2p-unix-stream" +edition.workspace = true +rust-version = { workspace = true } +description = "UNIX-domain stream socket transport protocol for libp2p" +version = "0.43.0" +authors = ["Parity Technologies "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +async-io = { version = "2.3.3", optional = true } +futures = { workspace = true } +futures-timer = "3.0" +libp2p-core = { workspace = true } +percent-encoding-rfc3986 = "0.1" +tokio = { workspace = true, default-features = false, features = ["net"], optional = true } +tracing = { workspace = true } + +[features] +tokio = ["dep:tokio"] +async-io = ["dep:async-io"] + +[dev-dependencies] +async-std = { version = "1.6.5", features = ["attributes"] } +tokio = { workspace = true, features = ["full"] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } + +# Passing arguments to the docsrs builder in order to properly document cfg's. +# More information: https://docs.rs/about/builds#cross-compiling +[package.metadata.docs.rs] +all-features = true + + +[lints] +workspace = true diff --git a/transports/unix-stream/src/lib.rs b/transports/unix-stream/src/lib.rs new file mode 100644 index 00000000000..dea4b23947f --- /dev/null +++ b/transports/unix-stream/src/lib.rs @@ -0,0 +1,638 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the libp2p [`libp2p_core::Transport`] trait for UNIX-domain stream sockets. +//! +//! # Usage +//! +//! This crate provides a [`async_io::Transport`] and [`tokio::Transport`], depending on +//! the enabled features, which implement the [`libp2p_core::Transport`] trait for use as a +//! transport with `libp2p-core` or `libp2p-swarm`. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#![cfg(unix)] + +mod provider; + +#[cfg(any(target_os = "android", target_os = "linux"))] +use std::os::linux::net::SocketAddrExt; +use std::{ + borrow::Cow, + collections::VecDeque, + ffi::OsStr, + io, + os::unix::{ + ffi::OsStrExt, + net::{SocketAddr, UnixListener, UnixStream}, + }, + pin::Pin, + task::{Context, Poll, Waker}, + time::Duration, +}; + +use futures::{future::Ready, prelude::*, stream::SelectAll}; +use futures_timer::Delay; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + transport::{DialOpts, ListenerId, TransportError, TransportEvent}, +}; +use percent_encoding_rfc3986::{percent_decode_str, percent_encode}; +#[cfg(feature = "async-io")] +pub use provider::async_io; +#[cfg(feature = "tokio")] +pub use provider::tokio; +use provider::{Incoming, Provider}; + +/// An abstract [`libp2p_core::Transport`] implementation. +/// +/// You shouldn't need to use this type directly. Use one of the following instead: +/// +/// - [`tokio::Transport`] +/// - [`async_io::Transport`] +pub struct Transport +where + T: Provider + Send, +{ + /// All the active listeners. + /// The [`ListenStream`] struct contains a stream that we want to be pinned. Since the + /// `VecDeque` can be resized, the only way is to use a `Pin>`. + listeners: SelectAll>, + /// Pending transport events to return from [`libp2p_core::Transport::poll`]. + pending_events: + VecDeque::ListenerUpgrade, io::Error>>, +} + +impl Transport +where + T: Provider + Send, +{ + /// Create a new instance of [`Transport`]. + /// + /// It is best to call this function through one of the type-aliases of this type: + /// + /// - [`tokio::Transport::new`] + /// - [`async_io::Transport::new`] + pub fn new() -> Self { + Default::default() + } + + fn do_listen( + &mut self, + id: ListenerId, + socket_addr: &SocketAddr, + ) -> io::Result> { + let listener = UnixListener::bind_addr(socket_addr)?; + listener.set_nonblocking(true)?; + + let listen_addr = socketaddr_to_multiaddr(socket_addr).ok_or_else(|| { + io::Error::new( + io::ErrorKind::AddrNotAvailable, + format!("{:?}", socket_addr), + ) + })?; + self.pending_events.push_back(TransportEvent::NewAddress { + listener_id: id, + listen_addr, + }); + ListenStream::::new(id, listener) + } +} + +impl Default for Transport +where + T: Provider + Send, +{ + /// Creates a [`Transport`] with reasonable defaults. + /// + /// This transport will have port-reuse disabled. + fn default() -> Self { + Transport { + listeners: SelectAll::new(), + pending_events: VecDeque::new(), + } + } +} + +impl libp2p_core::Transport for Transport +where + T: Provider + Send + 'static, + T::Listener: Unpin, + T::Stream: Unpin, +{ + type Output = T::Stream; + type Error = io::Error; + type Dial = Pin> + Send>>; + type ListenerUpgrade = Ready>; + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + let socket_addr = multiaddr_to_socketaddr(addr.clone()) + .map_err(|_| TransportError::MultiaddrNotSupported(addr))?; + tracing::debug!("listening on {:?}", socket_addr); + let listener = self + .do_listen(id, &socket_addr) + .map_err(TransportError::Other)?; + self.listeners.push(listener); + Ok(()) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) { + listener.close(Ok(())); + true + } else { + false + } + } + + fn dial( + &mut self, + addr: Multiaddr, + _: DialOpts, + ) -> Result> { + let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + tracing::debug!(address=?socket_addr, "dialing address"); + + // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus + // do the `connect` call within the [`Future`]. + Ok(async move { + // We can't set non-blocking mode before connect(2) in Rust; this always completes + // instantly + let socket = UnixStream::connect_addr(&socket_addr)?; + socket.set_nonblocking(true)?; + + T::new_stream(socket.into()).await + } + .boxed()) + } + + /// Poll all listeners. + #[tracing::instrument(level = "trace", name = "Transport::poll", skip(self, cx))] + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // Return pending events from closed listeners. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); + } + + match self.listeners.poll_next_unpin(cx) { + Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event), + _ => Poll::Pending, + } + } +} + +/// A stream of incoming connections on one or more interfaces. +struct ListenStream +where + T: Provider, +{ + /// The ID of this listener. + listener_id: ListenerId, + /// The async listening socket for incoming connections. + listener: T::Listener, + /// How long to sleep after a (non-fatal) error while trying + /// to accept a new connection. + sleep_on_error: Duration, + /// The current pause, if any. + pause: Option, + /// Pending event to reported. + pending_event: Option<::Item>, + /// The listener can be manually closed with + /// [`Transport::remove_listener`](libp2p_core::Transport::remove_listener). + is_closed: bool, + /// The stream must be awaken after it has been closed to deliver the last event. + close_listener_waker: Option, +} + +impl ListenStream +where + T: Provider, +{ + /// Constructs a [`ListenStream`] for incoming connections around + /// the given [`UnixListener`]. + fn new(listener_id: ListenerId, listener: UnixListener) -> io::Result { + let listener = T::new_listener(listener)?; + + Ok(ListenStream { + listener, + listener_id, + pause: None, + sleep_on_error: Duration::from_millis(100), + pending_event: None, + is_closed: false, + close_listener_waker: None, + }) + } + + /// Close the listener. + /// + /// This will create a [`TransportEvent::ListenerClosed`] and + /// terminate the stream once the event has been reported. + fn close(&mut self, reason: Result<(), io::Error>) { + if self.is_closed { + return; + } + self.pending_event = Some(TransportEvent::ListenerClosed { + listener_id: self.listener_id, + reason, + }); + self.is_closed = true; + + // Wake the stream to deliver the last event. + if let Some(waker) = self.close_listener_waker.take() { + waker.wake(); + } + } +} + +impl Stream for ListenStream +where + T: Provider, + T::Listener: Unpin, + T::Stream: Unpin, +{ + type Item = TransportEvent>, io::Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Some(mut pause) = self.pause.take() { + match pause.poll_unpin(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + self.pause = Some(pause); + return Poll::Pending; + } + } + } + + if let Some(event) = self.pending_event.take() { + return Poll::Ready(Some(event)); + } + + if self.is_closed { + // Terminate the stream if the listener closed + // and all remaining events have been reported. + return Poll::Ready(None); + } + + // Take the pending connection from the backlog. + match T::poll_accept(&mut self.listener, cx) { + Poll::Ready(Ok(Incoming { + local_addr, + remote_addr, + stream, + })) => { + let local_addr = socketaddr_to_multiaddr(&local_addr) + .expect("can't be anonymous address on accept"); + let remote_addr = + socketaddr_to_multiaddr(&remote_addr).unwrap_or(Multiaddr::empty()); // this will, more often than not, be anonymous + + tracing::debug!( + remote_address=?remote_addr, + local_address=?local_addr, + "Incoming connection from remote at local" + ); + + return Poll::Ready(Some(TransportEvent::Incoming { + listener_id: self.listener_id, + upgrade: future::ok(stream), + local_addr, + send_back_addr: remote_addr, + })); + } + Poll::Ready(Err(error)) => { + // These errors are non-fatal for the listener stream. + self.pause = Some(Delay::new(self.sleep_on_error)); + return Poll::Ready(Some(TransportEvent::ListenerError { + listener_id: self.listener_id, + error, + })); + } + Poll::Pending => {} + } + + self.close_listener_waker = Some(cx.waker().clone()); + Poll::Pending + } +} + +/// The valid characters in a /unix/ are ALPHA DIGIT -._~ !$&'()*+,;= :@ +const MULTI_UNIX: percent_encoding_rfc3986::AsciiSet = percent_encoding_rfc3986::NON_ALPHANUMERIC + .remove(b'-') + .remove(b'.') + .remove(b'_') + .remove(b'~') + .remove(b'!') + .remove(b'$') + .remove(b'&') + .remove(b'\'') + .remove(b'(') + .remove(b')') + .remove(b'*') + .remove(b'+') + .remove(b',') + .remove(b';') + .remove(b'=') + .remove(b':') + .remove(b'@'); + +/// Extracts a `SocketAddr` from a given `Multiaddr`. +/// +/// Fails if the given `Multiaddr` does not begin with a `/unix` +/// or if that `/unix` lists an invalid path or unusable address +/// (i.e. is abstract on non-linux). +fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result { + // "Pop" the UNIX path from the end of the address, + // ignoring a `/p2p/...` suffix as well as any prefix of possibly + // outer protocols, if present. + while let Some(proto) = addr.pop() { + match proto { + Protocol::Unix(path) => { + let bytes: Cow<_> = percent_decode_str(&path).map_err(|_| ())?.into(); + if bytes.starts_with(b"\0") { + #[cfg(any(target_os = "android", target_os = "linux"))] + return SocketAddr::from_abstract_name(&bytes[1..]).map_err(|_| ()); + #[allow(unreachable_code)] + return Err(()); + } else { + return SocketAddr::from_pathname(OsStr::from_bytes(&bytes)).map_err(|_| ()); + } + } + Protocol::P2p(_) => {} + _ => return Err(()), + } + } + Err(()) +} + +// Create a [`Multiaddr`] from the given IP address and port number. +fn socketaddr_to_multiaddr(sock: &SocketAddr) -> Option { + let mut path: Option> = sock + .as_pathname() + .map(|p| percent_encode(p.as_os_str().as_bytes(), &MULTI_UNIX).into()); + + if path.is_none() { + if let Some(name) = sock.as_abstract_name() { + path = Some(format!("%00{}", percent_encode(name, &MULTI_UNIX)).into()); + } + } + + Some(Multiaddr::empty().with(Protocol::Unix(path?))) +} + +#[cfg(test)] +mod tests { + use futures::channel::mpsc; + use libp2p_core::{transport::PortUse, Endpoint, Transport as _}; + + use super::*; + + #[test] + fn multiaddr_to_unix_conversion() { + // SocketAddr is !Display, but its Debug implementations contain all the interesting data + fn d(sa: SocketAddr) -> String { + format!("{:?}", sa) + } + + assert!( + multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::().unwrap()) + .is_err() + ); + + assert_eq!( + multiaddr_to_socketaddr( + "/unix/%2Frun%2Fsystemd%2Fnotify" + .parse::() + .unwrap() + ) + .map(d), + Ok(SocketAddr::from_pathname("/run/systemd/notify").unwrap()).map(d) + ); + assert_eq!( + multiaddr_to_socketaddr("/unix/notify".parse::().unwrap()).map(d), + Ok(SocketAddr::from_pathname("notify").unwrap()).map(d) + ); + #[cfg(any(target_os = "android", target_os = "linux"))] + assert_eq!( + multiaddr_to_socketaddr( + "/unix/%009e48095dfe839881%2Fbus%2Fsystemd-timesyn%2Fbus-api-timesync" + .parse::() + .unwrap() + ) + .map(d), + Ok(SocketAddr::from_abstract_name( + "9e48095dfe839881/bus/systemd-timesyn/bus-api-timesync" + ) + .unwrap()) + .map(d) + ); + #[cfg(not(any(target_os = "android", target_os = "linux")))] + assert!(multiaddr_to_socketaddr( + "%009e48095dfe839881%2Fbus%2Fsystemd-timesyn%2Fbus-api-timesync" + .parse::() + .unwrap() + ) + .is_err()); + } + + // https://github.com/multiformats/multiaddr/pull/174#issuecomment-2964331099 + #[test] + fn socketaddr_to_multiaddr_conversion() { + use libp2p_core::multiaddr::multiaddr; + assert_eq!( + socketaddr_to_multiaddr(&SocketAddr::from_pathname("/run/systemd/notify").unwrap()), + Some(multiaddr!(Unix(Cow::from("%2Frun%2Fsystemd%2Fnotify")))), + ); + assert_eq!( + socketaddr_to_multiaddr(&SocketAddr::from_pathname("notify").unwrap()), + Some(multiaddr!(Unix(Cow::from("notify")))), + ); + #[cfg(any(target_os = "android", target_os = "linux"))] + assert_eq!( + socketaddr_to_multiaddr( + &SocketAddr::from_abstract_name( + "9e48095dfe839881/bus/systemd-timesyn/bus-api-timesync" + ) + .unwrap() + ), + Some(multiaddr!(Unix(Cow::from( + "%009e48095dfe839881%2Fbus%2Fsystemd-timesyn%2Fbus-api-timesync" + )))), + ); + } + + #[test] + fn communicating_between_dialer_and_listener() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { + let mut tcp = Transport::::default().boxed(); + tcp.listen_on(ListenerId::next(), addr).unwrap(); + loop { + match tcp.select_next_some().await { + TransportEvent::NewAddress { listen_addr, .. } => { + ready_tx.send(listen_addr).await.unwrap(); + } + TransportEvent::Incoming { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + return; + } + e => panic!("Unexpected transport event: {e:?}"), + } + } + } + + async fn dialer(mut ready_rx: mpsc::Receiver) { + let addr = ready_rx.next().await.unwrap(); + let mut tcp = Transport::::default(); + + // Obtain a future socket through dialing + let mut socket = tcp + .dial( + addr.clone(), + DialOpts { + role: Endpoint::Dialer, + port_use: PortUse::Reuse, + }, + ) + .unwrap() + .await + .unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + } + + fn test(addr: Multiaddr, rm: Option<&str>) { + rm.and_then(|r| std::fs::remove_file(r).ok()); + #[cfg(feature = "async-io")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let listener = async_std::task::spawn(listener); + async_std::task::block_on(dialer); + async_std::task::block_on(listener); + } + + rm.and_then(|r| std::fs::remove_file(r).ok()); + #[cfg(feature = "tokio")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr, ready_tx); + let dialer = dialer::(ready_rx); + let rt = ::tokio::runtime::Builder::new_current_thread() + .enable_io() + .build() + .unwrap(); + let tasks = ::tokio::task::LocalSet::new(); + let listener = tasks.spawn_local(listener); + tasks.block_on(&rt, dialer); + tasks.block_on(&rt, listener).unwrap(); + } + rm.and_then(|r| std::fs::remove_file(r).ok()); + } + test( + "/unix/communicating_between_dialer_and_listener" + .parse() + .unwrap(), + Some("communicating_between_dialer_and_listener"), + ); + #[cfg(any(target_os = "android", target_os = "linux"))] + test( + "/unix/%00communicating_between_dialer_and_listener" + .parse() + .unwrap(), + None, + ); + } + + #[test] + fn listen_invalid_addr() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let mut tcp = async_io::Transport::default(); + assert!(tcp.listen_on(ListenerId::next(), addr.clone()).is_err()); + } + + #[cfg(feature = "tokio")] + { + let mut tcp = tokio::Transport::default(); + assert!(tcp.listen_on(ListenerId::next(), addr).is_err()); + } + } + + test("/unix/".parse().unwrap()); // "empty" (len=0) address is the only really invalid one + } + + #[test] + fn test_remove_listener() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + async fn cycle_listeners() -> bool { + let mut tcp = Transport::::default().boxed(); + let listener_id = ListenerId::next(); + tcp.listen_on(listener_id, "/unix/cycle_listeners".parse().unwrap()) + .unwrap(); + tcp.remove_listener(listener_id) + } + + let _ = std::fs::remove_file("cycle_listeners"); + #[cfg(feature = "async-io")] + { + assert!(async_std::task::block_on(cycle_listeners::< + async_io::UnixStrm, + >())); + } + + let _ = std::fs::remove_file("cycle_listeners"); + #[cfg(feature = "tokio")] + { + let rt = ::tokio::runtime::Builder::new_current_thread() + .enable_io() + .build() + .unwrap(); + assert!(rt.block_on(cycle_listeners::())); + } + let _ = std::fs::remove_file("cycle_listeners"); + } +} diff --git a/transports/unix-stream/src/provider.rs b/transports/unix-stream/src/provider.rs new file mode 100644 index 00000000000..5e4cc1b28f3 --- /dev/null +++ b/transports/unix-stream/src/provider.rs @@ -0,0 +1,71 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The interface for providers of non-blocking UNIX-domain stream socket implementations. + +#[cfg(feature = "async-io")] +pub mod async_io; + +#[cfg(feature = "tokio")] +pub mod tokio; + +use std::{ + fmt, io, + os::unix::net::{SocketAddr, UnixListener, UnixStream}, + task::{Context, Poll}, +}; + +use futures::{ + future::BoxFuture, + io::{AsyncRead, AsyncWrite}, +}; + +/// An incoming connection returned from [`Provider::poll_accept()`]. +pub struct Incoming { + pub stream: S, + pub local_addr: SocketAddr, + pub remote_addr: SocketAddr, +} + +/// The interface for non-blocking UNIX-domain stream socket I/O providers. +pub trait Provider: Clone + Send + 'static { + /// The type of UNIX-domain stream sockets obtained from [`Provider::new_stream`] + /// and [`Provider::poll_accept`]. + type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; + /// The type of UNIX-domain stream socket listeners obtained from [`Provider::new_listener`]. + type Listener: Send + Unpin; + + /// Creates a new listener wrapping the given [`UnixListener`] that + /// can be polled for incoming connections via [`Self::poll_accept()`]. + fn new_listener(_: UnixListener) -> io::Result; + + /// Creates a new stream for an outgoing connection, wrapping the + /// given [`UnixStream`]. The given `UnixStream` is initiating a + /// connection, but implementations must wait for the connection + /// setup to complete, i.e. for the stream to be writable. + fn new_stream(_: UnixStream) -> BoxFuture<'static, io::Result>; + + /// Polls a [`Self::Listener`] for an incoming connection, ensuring a task wakeup, + /// if necessary. + fn poll_accept( + _: &mut Self::Listener, + _: &mut Context<'_>, + ) -> Poll>>; +} diff --git a/transports/unix-stream/src/provider/async_io.rs b/transports/unix-stream/src/provider/async_io.rs new file mode 100644 index 00000000000..e2b8e0fdddf --- /dev/null +++ b/transports/unix-stream/src/provider/async_io.rs @@ -0,0 +1,122 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + io, + os::unix::net, + task::{Context, Poll}, +}; + +use async_io::Async; +use futures::future::{BoxFuture, FutureExt}; + +use super::{Incoming, Provider}; + +/// A UNIX-domain stream [`Transport`](libp2p_core::Transport) that works with the `async-std` +/// ecosystem. +/// +/// # Example +/// +/// ```rust +/// # use libp2p_unix_stream as unix_stream; +/// # use libp2p_core::{Transport, transport::ListenerId}; +/// # use futures::future; +/// # use std::pin::Pin; +/// # +/// # #[async_std::main] +/// # async fn main() { +/// let mut transport = unix_stream::async_io::Transport::new(); +/// let id = ListenerId::next(); +/// let _ = std::fs::remove_file("socket"); +/// transport +/// .listen_on(id, "/unix/socket".parse().unwrap()) +/// .unwrap(); +/// +/// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)) +/// .await +/// .into_new_address() +/// .unwrap(); +/// +/// println!("Listening on {addr}"); +/// # let _ = std::fs::remove_file("socket"); +/// # } +/// ``` +pub type Transport = crate::Transport; + +#[derive(Copy, Clone)] +#[doc(hidden)] +pub enum UnixStrm {} + +impl Provider for UnixStrm { + type Stream = UnixStream; + type Listener = Async; + + fn new_listener(l: net::UnixListener) -> io::Result { + Async::new(l) + } + + fn new_stream(s: net::UnixStream) -> BoxFuture<'static, io::Result> { + async move { + // Taken from [`Async::connect`]. + + let stream = Async::new(s)?; + + // The stream becomes writable when connected. + stream.writable().await?; + + // Check if there was an error while connecting. + match stream.get_ref().take_error()? { + None => Ok(stream), + Some(err) => Err(err), + } + } + .boxed() + } + + fn poll_accept( + l: &mut Self::Listener, + cx: &mut Context<'_>, + ) -> Poll>> { + let (stream, remote_addr) = loop { + match l.poll_readable(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Ready(Ok(())) => match l.accept().now_or_never() { + Some(Err(e)) => return Poll::Ready(Err(e)), + Some(Ok(res)) => break res, + None => { + // Since it doesn't do any harm, account for false positives of + // `poll_readable` just in case, i.e. try again. + } + }, + } + }; + + let local_addr = stream.get_ref().local_addr()?; + + Poll::Ready(Ok(Incoming { + stream, + local_addr, + remote_addr, + })) + } +} + +pub type UnixStream = Async; diff --git a/transports/unix-stream/src/provider/tokio.rs b/transports/unix-stream/src/provider/tokio.rs new file mode 100644 index 00000000000..7b34e3029d4 --- /dev/null +++ b/transports/unix-stream/src/provider/tokio.rs @@ -0,0 +1,171 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + io, + os::unix::net, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{ + future::{BoxFuture, FutureExt}, + prelude::*, +}; + +use super::{Incoming, Provider}; + +/// A UNIX-domain stream [`Transport`](libp2p_core::Transport) that works with the `tokio` +/// ecosystem. +/// +/// # Example +/// +/// ```rust +/// # use libp2p_unix_stream as unix_stream; +/// # use libp2p_core::{Transport, transport::ListenerId}; +/// # use futures::future; +/// # use std::pin::Pin; +/// # +/// # #[tokio::main] +/// # async fn main() { +/// let mut transport = unix_stream::tokio::Transport::new(); +/// let _ = std::fs::remove_file("socket"); +/// let id = transport +/// .listen_on(ListenerId::next(), "/unix/socket".parse().unwrap()) +/// .unwrap(); +/// +/// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)) +/// .await +/// .into_new_address() +/// .unwrap(); +/// +/// println!("Listening on {addr}"); +/// # let _ = std::fs::remove_file("socket"); +/// # } +/// ``` +pub type Transport = crate::Transport; + +#[derive(Copy, Clone)] +#[doc(hidden)] +pub enum UnixStrm {} + +impl Provider for UnixStrm { + type Stream = UnixStream; + type Listener = tokio::net::UnixListener; + + fn new_listener(l: net::UnixListener) -> io::Result { + tokio::net::UnixListener::try_from(l) + } + + fn new_stream(s: net::UnixStream) -> BoxFuture<'static, io::Result> { + async move { + // Taken from [`tokio::net::UnixStream::connect_mio`]. + + let stream = tokio::net::UnixStream::try_from(s)?; + + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + stream.writable().await?; + + if let Some(e) = stream.take_error()? { + return Err(e); + } + + Ok(UnixStream(stream)) + } + .boxed() + } + + fn poll_accept( + l: &mut Self::Listener, + cx: &mut Context<'_>, + ) -> Poll>> { + let (stream, remote_addr) = match l.poll_accept(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok((stream, remote_addr))) => (stream, remote_addr.into()), + }; + + let local_addr = stream.local_addr()?.into(); + let stream = UnixStream(stream); + + Poll::Ready(Ok(Incoming { + stream, + local_addr, + remote_addr, + })) + } +} + +/// A [`tokio::net::UnixStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. +#[derive(Debug)] +pub struct UnixStream(pub tokio::net::UnixStream); + +impl From for tokio::net::UnixStream { + fn from(t: UnixStream) -> tokio::net::UnixStream { + t.0 + } +} + +impl AsyncRead for UnixStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let mut read_buf = tokio::io::ReadBuf::new(buf); + futures::ready!(tokio::io::AsyncRead::poll_read( + Pin::new(&mut self.0), + cx, + &mut read_buf + ))?; + Poll::Ready(Ok(read_buf.filled().len())) + } +} + +impl AsyncWrite for UnixStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) + } +}