Skip to content

protocols/plaintext: Move to stable futures and use unsigned varints #1306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Next Version

- Use varints instead of fixed sized (4 byte) integers to delimit plaintext 2.0 messages to align implementation with the specification.

# Version 0.13.0 (2019-11-05)

- Reworked the transport upgrade API. See https://github.com/libp2p/rust-libp2p/pull/1240 for more information.
Expand Down
15 changes: 11 additions & 4 deletions protocols/plaintext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
futures-preview = "0.3.0-alpha.18"
libp2p-core = { version = "0.13.0", path = "../../core" }
bytes = "0.4.12"
futures = "0.3.1"
futures_codec = "0.3.1"
libp2p-core = { version = "0.13.0", path = "../../core" }
log = "0.4.8"
void = "1.0.2"
tokio-io = "0.1.12"
protobuf = "2.8.1"
rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" }
unsigned-varint = { version = "0.2.3", features = ["futures-codec"] }
void = "1.0.2"

[dev-dependencies]
env_logger = "0.7.1"
quickcheck = "0.9.0"
rand = "0.7"
futures-timer = "2.0"
71 changes: 27 additions & 44 deletions protocols/plaintext/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@

use bytes::BytesMut;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use futures::Future;
use futures::future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::{PublicKey, PeerId};
use log::{debug, trace};
use crate::pb::structs::Exchange;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use tokio_io::codec::length_delimited::Framed;
use protobuf::Message;
use crate::error::PlainTextError;
use crate::PlainText2Config;
Expand Down Expand Up @@ -109,45 +104,33 @@ impl HandshakeContext<Local> {
}
}

pub fn handshake<S>(socket: S, config: PlainText2Config)
-> impl Future<Item = (Framed<S, BytesMut>, Remote), Error = PlainTextError>
pub async fn handshake<S>(socket: S, config: PlainText2Config)
-> Result<(Framed<S, unsigned_varint::codec::UviBytes<Vec<u8>>>, Remote), PlainTextError>
Copy link
Contributor

@twittner twittner Nov 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would import unsigned_varint::codec::UviBytes and shorten this line. Again, I would keep using BytesMut.

where
S: AsyncRead + AsyncWrite + Send,
S: AsyncRead + AsyncWrite + Send + Unpin,
{
let socket = length_delimited::Builder::new()
.big_endian()
.length_field_length(4)
.new_framed(socket);
// The handshake messages all start with a variable-length integer indicating the size.
let mut socket = Framed::new(
socket,
unsigned_varint::codec::UviBytes::<Vec<u8>>::default()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can write Framed::new(socket, UviBytes::default()) here.

);

future::ok::<_, PlainTextError>(())
.and_then(|_| {
trace!("starting handshake");
Ok(HandshakeContext::new(config)?)
})
// Send our local `Exchange`.
.and_then(|context| {
trace!("sending exchange to remote");
socket.send(BytesMut::from(context.state.exchange_bytes.clone()))
.from_err()
.map(|s| (s, context))
})
// Receive the remote's `Exchange`.
.and_then(move |(socket, context)| {
trace!("receiving the remote's exchange");
socket.into_future()
.map_err(|(e, _)| e.into())
.and_then(move |(prop_raw, socket)| {
let context = match prop_raw {
Some(p) => context.with_remote(p)?,
None => {
debug!("unexpected eof while waiting for remote's exchange");
let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof");
return Err(err.into());
}
};
trace!("starting handshake");
let context = HandshakeContext::new(config)?;

trace!("received exchange from remote; pubkey = {:?}", context.state.public_key);
Ok((socket, context.state))
})
})
trace!("sending exchange to remote");
socket.send(context.state.exchange_bytes.clone()).await?;

trace!("receiving the remote's exchange");
let context = match socket.next().await {
Some(p) => context.with_remote(p?)?,
None => {
debug!("unexpected eof while waiting for remote's exchange");
let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof");
return Err(err.into());
}
};

trace!("received exchange from remote; pubkey = {:?}", context.state.public_key);
Ok((socket, context.state))
}
149 changes: 76 additions & 73 deletions protocols/plaintext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::future::{self, Ready};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited::Framed;
use crate::error::PlainTextError;
use void::Void;
use futures::future::FutureResult;
use unsigned_varint::codec::UviBytes;
use crate::handshake::Remote;
use futures::future::{self, Ready};
use futures::{Sink, Stream};
use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::{identity, InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated, PeerId, PublicKey};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is much longer than 100 characters (cf. .editorconfig)

use log::debug;
use rw_stream_sink::RwStreamSink;
use std::{io, iter, pin::Pin, task::{Context, Poll}};
use void::Void;

mod error;
mod handshake;
Expand Down Expand Up @@ -108,144 +111,144 @@ impl UpgradeInfo for PlainText2Config {

impl<C> InboundUpgrade<C> for PlainText2Config
where
C: AsyncRead + AsyncWrite + Send + 'static
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
type Output = (PeerId, PlainTextOutput<Negotiated<C>>);
type Error = PlainTextError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

(assuming an use futures::future::BoxFuture;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, cool. Wasn't aware of this type alias. Thanks.


fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(self.handshake(socket))
Box::pin(self.handshake(socket))
}
}

impl<C> OutboundUpgrade<C> for PlainText2Config
where
C: AsyncRead + AsyncWrite + Send + 'static
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
type Output = (PeerId, PlainTextOutput<Negotiated<C>>);
type Error = PlainTextError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like above

Suggested change
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;


fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(self.handshake(socket))
Box::pin(self.handshake(socket))
}
}

impl PlainText2Config {
fn handshake<T>(self, socket: T) -> impl Future<Item = (PeerId, PlainTextOutput<T>), Error = PlainTextError>
async fn handshake<T>(self, socket: T) -> Result<(PeerId, PlainTextOutput<T>), PlainTextError>
where
T: AsyncRead + AsyncWrite + Send + 'static
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
debug!("Starting plaintext upgrade");
PlainTextMiddleware::handshake(socket, self)
.map(|(stream_sink, remote)| {
let mapped = stream_sink.map_err(map_err as fn(_) -> _);
(
remote.peer_id,
PlainTextOutput {
stream: RwStreamSink::new(mapped),
remote_key: remote.public_key,
}
)
})
let (stream_sink, remote) = PlainTextMiddleware::handshake(socket, self).await?;
let mapped = stream_sink.map_err(map_err as fn(_) -> _);
Ok((
remote.peer_id,
PlainTextOutput {
stream: RwStreamSink::new(mapped),
remote_key: remote.public_key,
}
))
}
}

#[inline]
fn map_err(err: io::Error) -> io::Error {
debug!("error during plaintext handshake {:?}", err);
io::Error::new(io::ErrorKind::InvalidData, err)
}

pub struct PlainTextMiddleware<S> {
inner: Framed<S, BytesMut>,
inner: Framed<S, UviBytes<Vec<u8>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to move from BytesMut to Vec<u8>?

}

impl<S> PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite + Send,
S: AsyncRead + AsyncWrite + Send + Unpin,
{
fn handshake(socket: S, config: PlainText2Config)
-> impl Future<Item = (PlainTextMiddleware<S>, Remote), Error = PlainTextError>
async fn handshake(socket: S, config: PlainText2Config)
-> Result<(PlainTextMiddleware<S>, Remote), PlainTextError>
{
handshake::handshake(socket, config).map(|(inner, remote)| {
(PlainTextMiddleware { inner }, remote)
})
let (inner, remote) = handshake::handshake(socket, config).await?;
Ok((PlainTextMiddleware { inner }, remote))
}
}

impl<S> Sink for PlainTextMiddleware<S>
impl<S> Sink<Vec<u8>> for PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite,
S: AsyncRead + AsyncWrite + Unpin,
{
type SinkItem = BytesMut;
type SinkError = io::Error;
type Error = io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_ready(Pin::new(&mut self.inner), cx)
}

#[inline]
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.inner.start_send(item)
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
Sink::start_send(Pin::new(&mut self.inner), item)
}

#[inline]
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete()
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_flush(Pin::new(&mut self.inner), cx)
}

#[inline]
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.inner.close()
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_close(Pin::new(&mut self.inner), cx)
}
}

impl<S> Stream for PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite,
S: AsyncRead + AsyncWrite + Unpin,
{
type Item = BytesMut;
type Error = io::Error;
type Item = Result<Vec<u8>, io::Error>;

#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll()
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: Too much of a hack? (BytesMut -> Vec<u8>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would stick with BytesMut instead of copying the data.

match Stream::poll_next(Pin::new(&mut self.inner), cx) {
Poll::Ready(ready) => {
Poll::Ready(ready.map(|res| res.map(|buf| buf.to_vec())))
},
Poll::Pending => Poll::Pending,
}
}
}

/// Output of the plaintext protocol.
pub struct PlainTextOutput<S>
where
S: AsyncRead + AsyncWrite,
S: AsyncRead + AsyncWrite + Unpin,
{
/// The plaintext stream.
pub stream: RwStreamSink<StreamMapErr<PlainTextMiddleware<S>, fn(io::Error) -> io::Error>>,
pub stream: RwStreamSink<futures::stream::MapErr<PlainTextMiddleware<S>, fn(io::Error) -> io::Error>>,
/// The public key of the remote.
pub remote_key: PublicKey,
}

impl<S: AsyncRead + AsyncWrite> std::io::Read for PlainTextOutput<S> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.stream.read(buf)
}
}

impl<S: AsyncRead + AsyncWrite> AsyncRead for PlainTextOutput<S> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.stream.prepare_uninitialized_buffer(buf)
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for PlainTextOutput<S> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
-> Poll<Result<usize, io::Error>>
{
AsyncRead::poll_read(Pin::new(&mut self.stream), cx, buf)
}
}

impl<S: AsyncRead + AsyncWrite> std::io::Write for PlainTextOutput<S> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.stream.write(buf)
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for PlainTextOutput<S> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
-> Poll<Result<usize, io::Error>>
{
AsyncWrite::poll_write(Pin::new(&mut self.stream), cx, buf)
}

fn flush(&mut self) -> std::io::Result<()> {
self.stream.flush()
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<(), io::Error>>
{
AsyncWrite::poll_flush(Pin::new(&mut self.stream), cx)
}
}

impl<S: AsyncRead + AsyncWrite> AsyncWrite for PlainTextOutput<S> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.stream.shutdown()
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<(), io::Error>>
{
AsyncWrite::poll_close(Pin::new(&mut self.stream), cx)
}
}
Loading