diff --git a/CHANGELOG.md b/CHANGELOG.md index ad4aa543a64..5e51a391527 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# Next Version + +- Use varints instead of fixed sized (4 byte) integers to delimit plaintext 2.0 messages to align implementation with the specification. + # Version 0.13.0 (2019-11-05) - Reworked the transport upgrade API. See https://github.com/libp2p/rust-libp2p/pull/1240 for more information. diff --git a/protocols/plaintext/Cargo.toml b/protocols/plaintext/Cargo.toml index c6db29d8bac..1632f8e8a4a 100644 --- a/protocols/plaintext/Cargo.toml +++ b/protocols/plaintext/Cargo.toml @@ -10,11 +10,18 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -futures-preview = "0.3.0-alpha.18" -libp2p-core = { version = "0.13.0", path = "../../core" } bytes = "0.4.12" +futures = "0.3.1" +futures_codec = "0.3.1" +libp2p-core = { version = "0.13.0", path = "../../core" } log = "0.4.8" -void = "1.0.2" -tokio-io = "0.1.12" protobuf = "2.8.1" rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" } +unsigned-varint = { version = "0.2.3", features = ["futures-codec"] } +void = "1.0.2" + +[dev-dependencies] +env_logger = "0.7.1" +quickcheck = "0.9.0" +rand = "0.7" +futures-timer = "2.0" diff --git a/protocols/plaintext/src/handshake.rs b/protocols/plaintext/src/handshake.rs index 8b073937c62..9a295766a3f 100644 --- a/protocols/plaintext/src/handshake.rs +++ b/protocols/plaintext/src/handshake.rs @@ -18,21 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::PlainText2Config; +use crate::error::PlainTextError; +use crate::pb::structs::Exchange; + use bytes::BytesMut; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use futures::Future; -use futures::future; -use futures::sink::Sink; -use futures::stream::Stream; +use futures::prelude::*; +use futures_codec::Framed; use libp2p_core::{PublicKey, PeerId}; use log::{debug, trace}; -use crate::pb::structs::Exchange; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; -use tokio_io::codec::length_delimited::Framed; use protobuf::Message; -use crate::error::PlainTextError; -use crate::PlainText2Config; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use unsigned_varint::codec::UviBytes; struct HandshakeContext { config: PlainText2Config, @@ -68,7 +65,9 @@ impl HandshakeContext { }) } - fn with_remote(self, exchange_bytes: BytesMut) -> Result, PlainTextError> { + fn with_remote(self, exchange_bytes: BytesMut) + -> Result, PlainTextError> + { let mut prop = match protobuf::parse_from_bytes::(&exchange_bytes) { Ok(prop) => prop, Err(e) => { @@ -95,7 +94,7 @@ impl HandshakeContext { // Check the validity of the remote's `Exchange`. if peer_id != public_key.clone().into_peer_id() { - debug!("The remote's `PeerId` of the exchange isn't consist with the remote public key"); + debug!("the remote's `PeerId` isn't consistent with the remote's public key"); return Err(PlainTextError::InvalidPeerId) } @@ -109,45 +108,30 @@ impl HandshakeContext { } } -pub fn handshake(socket: S, config: PlainText2Config) - -> impl Future, Remote), Error = PlainTextError> +pub async fn handshake(socket: S, config: PlainText2Config) + -> Result<(Framed>, Remote), PlainTextError> where - S: AsyncRead + AsyncWrite + Send, + S: AsyncRead + AsyncWrite + Send + Unpin, { - let socket = length_delimited::Builder::new() - .big_endian() - .length_field_length(4) - .new_framed(socket); - - future::ok::<_, PlainTextError>(()) - .and_then(|_| { - trace!("starting handshake"); - Ok(HandshakeContext::new(config)?) - }) - // Send our local `Exchange`. - .and_then(|context| { - trace!("sending exchange to remote"); - socket.send(BytesMut::from(context.state.exchange_bytes.clone())) - .from_err() - .map(|s| (s, context)) - }) - // Receive the remote's `Exchange`. - .and_then(move |(socket, context)| { - trace!("receiving the remote's exchange"); - socket.into_future() - .map_err(|(e, _)| e.into()) - .and_then(move |(prop_raw, socket)| { - let context = match prop_raw { - Some(p) => context.with_remote(p)?, - None => { - debug!("unexpected eof while waiting for remote's exchange"); - let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof"); - return Err(err.into()); - } - }; - - trace!("received exchange from remote; pubkey = {:?}", context.state.public_key); - Ok((socket, context.state)) - }) - }) + // The handshake messages all start with a variable-length integer indicating the size. + let mut socket = Framed::new(socket, UviBytes::default()); + + trace!("starting handshake"); + let context = HandshakeContext::new(config)?; + + trace!("sending exchange to remote"); + socket.send(BytesMut::from(context.state.exchange_bytes.clone())).await?; + + trace!("receiving the remote's exchange"); + let context = match socket.next().await { + Some(p) => context.with_remote(p?)?, + None => { + debug!("unexpected eof while waiting for remote's exchange"); + let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof"); + return Err(err.into()); + } + }; + + trace!("received exchange from remote; pubkey = {:?}", context.state.public_key); + Ok((socket, context.state)) } diff --git a/protocols/plaintext/src/lib.rs b/protocols/plaintext/src/lib.rs index 8013e199f68..985ff0e3a93 100644 --- a/protocols/plaintext/src/lib.rs +++ b/protocols/plaintext/src/lib.rs @@ -18,16 +18,29 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::future::{self, Ready}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated}; -use std::iter; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited::Framed; use crate::error::PlainTextError; -use void::Void; -use futures::future::FutureResult; use crate::handshake::Remote; +use bytes::BytesMut; +use futures::future::{self, Ready}; +use futures::prelude::*; +use futures::{future::BoxFuture, Sink, Stream}; +use futures_codec::Framed; +use libp2p_core::{ + identity, + InboundUpgrade, + OutboundUpgrade, + UpgradeInfo, + upgrade::Negotiated, + PeerId, + PublicKey, +}; +use log::debug; +use rw_stream_sink::RwStreamSink; +use std::{io, iter, pin::Pin, task::{Context, Poll}}; +use unsigned_varint::codec::UviBytes; +use void::Void; + mod error; mod handshake; mod pb; @@ -108,144 +121,138 @@ impl UpgradeInfo for PlainText2Config { impl InboundUpgrade for PlainText2Config where - C: AsyncRead + AsyncWrite + Send + 'static + C: AsyncRead + AsyncWrite + Send + Unpin + 'static { type Output = (PeerId, PlainTextOutput>); type Error = PlainTextError; - type Future = Box + Send>; + type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { - Box::new(self.handshake(socket)) + Box::pin(self.handshake(socket)) } } impl OutboundUpgrade for PlainText2Config where - C: AsyncRead + AsyncWrite + Send + 'static + C: AsyncRead + AsyncWrite + Send + Unpin + 'static { type Output = (PeerId, PlainTextOutput>); type Error = PlainTextError; - type Future = Box + Send>; + type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { - Box::new(self.handshake(socket)) + Box::pin(self.handshake(socket)) } } impl PlainText2Config { - fn handshake(self, socket: T) -> impl Future), Error = PlainTextError> + async fn handshake(self, socket: T) -> Result<(PeerId, PlainTextOutput), PlainTextError> where - T: AsyncRead + AsyncWrite + Send + 'static + T: AsyncRead + AsyncWrite + Send + Unpin + 'static { debug!("Starting plaintext upgrade"); - PlainTextMiddleware::handshake(socket, self) - .map(|(stream_sink, remote)| { - let mapped = stream_sink.map_err(map_err as fn(_) -> _); - ( - remote.peer_id, - PlainTextOutput { - stream: RwStreamSink::new(mapped), - remote_key: remote.public_key, - } - ) - }) + let (stream_sink, remote) = PlainTextMiddleware::handshake(socket, self).await?; + let mapped = stream_sink.map_err(map_err as fn(_) -> _); + Ok(( + remote.peer_id, + PlainTextOutput { + stream: RwStreamSink::new(mapped), + remote_key: remote.public_key, + } + )) } } -#[inline] fn map_err(err: io::Error) -> io::Error { debug!("error during plaintext handshake {:?}", err); io::Error::new(io::ErrorKind::InvalidData, err) } pub struct PlainTextMiddleware { - inner: Framed, + inner: Framed>, } impl PlainTextMiddleware where - S: AsyncRead + AsyncWrite + Send, + S: AsyncRead + AsyncWrite + Send + Unpin, { - fn handshake(socket: S, config: PlainText2Config) - -> impl Future, Remote), Error = PlainTextError> + async fn handshake(socket: S, config: PlainText2Config) + -> Result<(PlainTextMiddleware, Remote), PlainTextError> { - handshake::handshake(socket, config).map(|(inner, remote)| { - (PlainTextMiddleware { inner }, remote) - }) + let (inner, remote) = handshake::handshake(socket, config).await?; + Ok((PlainTextMiddleware { inner }, remote)) } } -impl Sink for PlainTextMiddleware +impl Sink for PlainTextMiddleware where - S: AsyncRead + AsyncWrite, + S: AsyncRead + AsyncWrite + Unpin, { - type SinkItem = BytesMut; - type SinkError = io::Error; + type Error = io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Sink::poll_ready(Pin::new(&mut self.inner), cx) + } - #[inline] - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - self.inner.start_send(item) + fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> { + Sink::start_send(Pin::new(&mut self.inner), item) } - #[inline] - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Sink::poll_flush(Pin::new(&mut self.inner), cx) } - #[inline] - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.close() + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Sink::poll_close(Pin::new(&mut self.inner), cx) } } impl Stream for PlainTextMiddleware where - S: AsyncRead + AsyncWrite, + S: AsyncRead + AsyncWrite + Unpin, { - type Item = BytesMut; - type Error = io::Error; + type Item = Result; - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Stream::poll_next(Pin::new(&mut self.inner), cx) } } /// Output of the plaintext protocol. pub struct PlainTextOutput where - S: AsyncRead + AsyncWrite, + S: AsyncRead + AsyncWrite + Unpin, { /// The plaintext stream. - pub stream: RwStreamSink, fn(io::Error) -> io::Error>>, + pub stream: RwStreamSink, fn(io::Error) -> io::Error>>, /// The public key of the remote. pub remote_key: PublicKey, } -impl std::io::Read for PlainTextOutput { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.stream.read(buf) - } -} - -impl AsyncRead for PlainTextOutput { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.stream.prepare_uninitialized_buffer(buf) +impl AsyncRead for PlainTextOutput { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) + -> Poll> + { + AsyncRead::poll_read(Pin::new(&mut self.stream), cx, buf) } } -impl std::io::Write for PlainTextOutput { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.stream.write(buf) +impl AsyncWrite for PlainTextOutput { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) + -> Poll> + { + AsyncWrite::poll_write(Pin::new(&mut self.stream), cx, buf) } - fn flush(&mut self) -> std::io::Result<()> { - self.stream.flush() + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) + -> Poll> + { + AsyncWrite::poll_flush(Pin::new(&mut self.stream), cx) } -} -impl AsyncWrite for PlainTextOutput { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.stream.shutdown() + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) + -> Poll> + { + AsyncWrite::poll_close(Pin::new(&mut self.stream), cx) } } diff --git a/protocols/plaintext/tests/smoke.rs b/protocols/plaintext/tests/smoke.rs new file mode 100644 index 00000000000..aedbda21239 --- /dev/null +++ b/protocols/plaintext/tests/smoke.rs @@ -0,0 +1,121 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::io::{AsyncWriteExt, AsyncReadExt}; +use futures::stream::TryStreamExt; +use libp2p_core::{ + identity, + multiaddr::Multiaddr, + transport::{Transport, ListenerEvent}, + upgrade, +}; +use libp2p_plaintext::PlainText2Config; +use log::debug; +use quickcheck::QuickCheck; + +#[test] +fn variable_msg_length() { + let _ = env_logger::try_init(); + + fn prop(msg: Vec) { + let mut msg_to_send = msg.clone(); + let msg_to_receive = msg; + + let server_id = identity::Keypair::generate_ed25519(); + let server_id_public = server_id.public(); + + let client_id = identity::Keypair::generate_ed25519(); + let client_id_public = client_id.public(); + + futures::executor::block_on(async { + let server_transport = libp2p_core::transport::MemoryTransport{}.and_then( + move |output, endpoint| { + upgrade::apply( + output, + PlainText2Config{local_public_key: server_id_public}, + endpoint, + libp2p_core::upgrade::Version::V1, + ) + } + ); + + let client_transport = libp2p_core::transport::MemoryTransport{}.and_then( + move |output, endpoint| { + upgrade::apply( + output, + PlainText2Config{local_public_key: client_id_public}, + endpoint, + libp2p_core::upgrade::Version::V1, + ) + } + ); + + + let server_address: Multiaddr = format!( + "/memory/{}", + std::cmp::Ord::max(1, rand::random::()) + ).parse().unwrap(); + + let mut server = server_transport.listen_on(server_address.clone()).unwrap(); + + // Ignore server listen address event. + let _ = server.try_next() + .await + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + let client_fut = async { + debug!("dialing {:?}", server_address); + let (received_server_id, mut client_channel) = client_transport.dial(server_address).unwrap().await.unwrap(); + assert_eq!(received_server_id, server_id.public().into_peer_id()); + + debug!("Client: writing message."); + client_channel.write_all(&mut msg_to_send).await.expect("no error"); + debug!("Client: flushing channel."); + client_channel.flush().await.expect("no error"); + }; + + let server_fut = async { + let mut server_channel = server.try_next() + .await + .expect("some event") + .map(ListenerEvent::into_upgrade) + .expect("no error") + .map(|client| client.0) + .expect("listener upgrade xyz") + .await + .map(|(_, session)| session) + .expect("no error"); + + let mut server_buffer = vec![0; msg_to_receive.len()]; + debug!("Server: reading message."); + server_channel.read_exact(&mut server_buffer).await.expect("reading client message"); + + assert_eq!(server_buffer, msg_to_receive); + }; + + futures::future::join(server_fut, client_fut).await; + }) + } + + QuickCheck::new().max_tests(30).quickcheck(prop as fn(Vec)) +} diff --git a/protocols/secio/src/codec/mod.rs b/protocols/secio/src/codec/mod.rs index 73c06e09581..e02bd00b27a 100644 --- a/protocols/secio/src/codec/mod.rs +++ b/protocols/secio/src/codec/mod.rs @@ -103,7 +103,7 @@ impl Hmac { } /// Takes control of `socket`. Returns an object that implements `future::Sink` and -/// `future::Stream`. The `Stream` and `Sink` produce and accept `BytesMut` objects. +/// `future::Stream`. The `Stream` and `Sink` produce and accept `Vec` objects. /// /// The conversion between the stream/sink items and the socket is done with the given cipher and /// hash algorithm (which are generally decided during the handshake).