Skip to content

Use 'self: Pin<&mut Self>' on AsyncRead/AsyncWrite #1465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 60 additions & 37 deletions futures-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod if_std {
use std::boxed::Box;
use std::cmp;
use std::io as StdIo;
use std::pin::Pin;
use std::ptr;

// Re-export IoVec for convenience
Expand Down Expand Up @@ -108,7 +109,7 @@ mod if_std {
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Async::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_read(&mut self, waker: &Waker, buf: &mut [u8])
fn poll_read(self: Pin<&mut Self>, waker: &Waker, buf: &mut [u8])
-> Poll<Result<usize>>;

/// Attempt to read from the `AsyncRead` into `vec` using vectored
Expand All @@ -133,7 +134,7 @@ mod if_std {
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Async::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_vectored_read(&mut self, waker: &Waker, vec: &mut [&mut IoVec])
fn poll_vectored_read(self: Pin<&mut Self>, waker: &Waker, vec: &mut [&mut IoVec])
-> Poll<Result<usize>>
{
if let Some(ref mut first_iovec) = vec.get_mut(0) {
Expand Down Expand Up @@ -168,7 +169,7 @@ mod if_std {
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Async::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_write(&mut self, waker: &Waker, buf: &[u8])
fn poll_write(self: Pin<&mut Self>, waker: &Waker, buf: &[u8])
-> Poll<Result<usize>>;

/// Attempt to write bytes from `vec` into the object using vectored
Expand All @@ -194,7 +195,7 @@ mod if_std {
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Async::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_vectored_write(&mut self, waker: &Waker, vec: &[&IoVec])
fn poll_vectored_write(self: Pin<&mut Self>, waker: &Waker, vec: &[&IoVec])
-> Poll<Result<usize>>
{
if let Some(ref first_iovec) = vec.get(0) {
Expand All @@ -221,7 +222,7 @@ mod if_std {
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Async::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_flush(&mut self, waker: &Waker) -> Poll<Result<()>>;
fn poll_flush(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>>;

/// Attempt to close the object.
///
Expand All @@ -238,7 +239,7 @@ mod if_std {
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Async::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_close(&mut self, waker: &Waker) -> Poll<Result<()>>;
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>>;
}

macro_rules! deref_async_read {
Expand All @@ -247,25 +248,25 @@ mod if_std {
(**self).initializer()
}

fn poll_read(&mut self, waker: &Waker, buf: &mut [u8])
fn poll_read(mut self: Pin<&mut Self>, waker: &Waker, buf: &mut [u8])
-> Poll<Result<usize>>
{
(**self).poll_read(waker, buf)
Pin::new(&mut **self).poll_read(waker, buf)
}

fn poll_vectored_read(&mut self, waker: &Waker, vec: &mut [&mut IoVec])
fn poll_vectored_read(mut self: Pin<&mut Self>, waker: &Waker, vec: &mut [&mut IoVec])
-> Poll<Result<usize>>
{
(**self).poll_vectored_read(waker, vec)
Pin::new(&mut **self).poll_vectored_read(waker, vec)
}
}
}

impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
impl<T: ?Sized + AsyncRead + Unpin> AsyncRead for Box<T> {
deref_async_read!();
}

impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
impl<'a, T: ?Sized + AsyncRead + Unpin> AsyncRead for &'a mut T {
deref_async_read!();
}

Expand All @@ -277,10 +278,10 @@ mod if_std {
Initializer::nop()
}

fn poll_read(&mut self, _: &Waker, buf: &mut [u8])
fn poll_read(mut self: Pin<&mut Self>, _: &Waker, buf: &mut [u8])
-> Poll<Result<usize>>
{
Poll::Ready(StdIo::Read::read(self, buf))
Poll::Ready(StdIo::Read::read(&mut *self, buf))
}
}
}
Expand All @@ -293,83 +294,105 @@ mod if_std {
unsafe_delegate_async_read_to_stdio!();
}

impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
impl<T: AsRef<[u8]> + Unpin> AsyncRead for StdIo::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
}

macro_rules! deref_async_write {
() => {
fn poll_write(&mut self, waker: &Waker, buf: &[u8])
fn poll_write(mut self: Pin<&mut Self>, waker: &Waker, buf: &[u8])
-> Poll<Result<usize>>
{
(**self).poll_write(waker, buf)
Pin::new(&mut **self).poll_write(waker, buf)
}

fn poll_vectored_write(&mut self, waker: &Waker, vec: &[&IoVec])
fn poll_vectored_write(mut self: Pin<&mut Self>, waker: &Waker, vec: &[&IoVec])
-> Poll<Result<usize>>
{
(**self).poll_vectored_write(waker, vec)
Pin::new(&mut **self).poll_vectored_write(waker, vec)
}

fn poll_flush(&mut self, waker: &Waker) -> Poll<Result<()>> {
(**self).poll_flush(waker)
fn poll_flush(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>> {
Pin::new(&mut **self).poll_flush(waker)
}

fn poll_close(&mut self, waker: &Waker) -> Poll<Result<()>> {
(**self).poll_close(waker)
fn poll_close(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>> {
Pin::new(&mut **self).poll_close(waker)
}
}
}

impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
impl<T: ?Sized + AsyncWrite + Unpin> AsyncWrite for Box<T> {
deref_async_write!();
}

impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
impl<'a, T: ?Sized + AsyncWrite + Unpin> AsyncWrite for &'a mut T {
deref_async_write!();
}

impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for Pin<&'a mut T> {
fn poll_write(mut self: Pin<&mut Self>, waker: &Waker, buf: &[u8])
-> Poll<Result<usize>>
{
T::poll_write((*self).as_mut(), waker, buf)
}

fn poll_vectored_write(mut self: Pin<&mut Self>, waker: &Waker, vec: &[&IoVec])
-> Poll<Result<usize>>
{
T::poll_vectored_write((*self).as_mut(), waker, vec)
}

fn poll_flush(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>> {
T::poll_flush((*self).as_mut(), waker)
}

fn poll_close(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>> {
T::poll_close((*self).as_mut(), waker)
}
}

macro_rules! delegate_async_write_to_stdio {
() => {
fn poll_write(&mut self, _: &Waker, buf: &[u8])
fn poll_write(mut self: Pin<&mut Self>, _: &Waker, buf: &[u8])
-> Poll<Result<usize>>
{
Poll::Ready(StdIo::Write::write(self, buf))
Poll::Ready(StdIo::Write::write(&mut *self, buf))
}

fn poll_flush(&mut self, _: &Waker) -> Poll<Result<()>> {
Poll::Ready(StdIo::Write::flush(self))
fn poll_flush(mut self: Pin<&mut Self>, _: &Waker) -> Poll<Result<()>> {
Poll::Ready(StdIo::Write::flush(&mut *self))
}

fn poll_close(&mut self, waker: &Waker) -> Poll<Result<()>> {
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>> {
self.poll_flush(waker)
}
}
}

impl<T: AsMut<[u8]>> AsyncWrite for StdIo::Cursor<T> {
impl<T: AsMut<[u8]> + Unpin> AsyncWrite for StdIo::Cursor<T> {
fn poll_write(
&mut self,
mut self: Pin<&mut Self>,
_: &Waker,
buf: &[u8],
) -> Poll<Result<usize>> {
let position = self.position();
let result = {
let out = self.get_mut().as_mut();
let out = (&mut *self).get_mut().as_mut();
let pos = cmp::min(out.len() as u64, position) as usize;
StdIo::Write::write(&mut &mut out[pos..], buf)
};
if let Ok(offset) = result {
self.set_position(position + offset as u64);
self.get_mut().set_position(position + offset as u64);
}
Poll::Ready(result)
}

fn poll_flush(&mut self, _: &Waker) -> Poll<Result<()>> {
Poll::Ready(StdIo::Write::flush(&mut self.get_mut().as_mut()))
fn poll_flush(self: Pin<&mut Self>, _: &Waker) -> Poll<Result<()>> {
Poll::Ready(StdIo::Write::flush(&mut self.get_mut().get_mut().as_mut()))
}

fn poll_close(&mut self, waker: &Waker) -> Poll<Result<()>> {
fn poll_close(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<()>> {
self.poll_flush(waker)
}
}
Expand Down
9 changes: 5 additions & 4 deletions futures-io/tests/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ use futures::Poll;
use futures::future::lazy;
use futures::io::AsyncWrite;
use std::io::Cursor;
use std::pin::Pin;

#[test]
fn cursor_asyncwrite_asmut() {
let mut cursor = Cursor::new([0; 5]);
futures::executor::block_on(lazy(|ctx| {
assert_matches!(cursor.poll_write(ctx, &[1, 2]), Poll::Ready(Ok(2)));
assert_matches!(cursor.poll_write(ctx, &[3, 4]), Poll::Ready(Ok(2)));
assert_matches!(cursor.poll_write(ctx, &[5, 6]), Poll::Ready(Ok(1)));
assert_matches!(cursor.poll_write(ctx, &[6, 7]), Poll::Ready(Ok(0)));
assert_matches!(Pin::new(&mut cursor).poll_write(ctx, &[1, 2]), Poll::Ready(Ok(2)));
assert_matches!(Pin::new(&mut cursor).poll_write(ctx, &[3, 4]), Poll::Ready(Ok(2)));
assert_matches!(Pin::new(&mut cursor).poll_write(ctx, &[5, 6]), Poll::Ready(Ok(1)));
assert_matches!(Pin::new(&mut cursor).poll_write(ctx, &[6, 7]), Poll::Ready(Ok(0)));
}));
assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5]);
}
12 changes: 6 additions & 6 deletions futures-util/src/compat/compat01as03.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ where
mut self: Pin<&mut Self>,
waker: &Waker,
) -> task03::Poll<Option<Self::Item>> {
match self.in_notify(waker, |f| f.poll()) {
match self.in_notify(waker, Stream01::poll) {
Ok(Async01::Ready(Some(t))) => task03::Poll::Ready(Some(Ok(t))),
Ok(Async01::Ready(None)) => task03::Poll::Ready(None),
Ok(Async01::NotReady) => task03::Poll::Pending,
Expand Down Expand Up @@ -354,7 +354,7 @@ mod io {
///
/// let input = b"Hello World!";
/// let reader: impl tokio_io::AsyncRead = std::io::Cursor::new(input);
/// let mut reader: impl futures::io::AsyncRead = reader.compat();
/// let mut reader: impl futures::io::AsyncRead + Unpin = reader.compat();
///
/// let mut output = Vec::with_capacity(12);
/// await!(reader.read_to_end(&mut output)).unwrap();
Expand Down Expand Up @@ -409,27 +409,27 @@ mod io {
}
}

fn poll_read(&mut self, waker: &task03::Waker, buf: &mut [u8])
fn poll_read(mut self: Pin<&mut Self>, waker: &task03::Waker, buf: &mut [u8])
-> task03::Poll<Result<usize, Error>>
{
poll_01_to_03(self.in_notify(waker, |x| x.poll_read(buf)))
}
}

impl<W: AsyncWrite01> AsyncWrite03 for Compat01As03<W> {
fn poll_write(&mut self, waker: &task03::Waker, buf: &[u8])
fn poll_write(mut self: Pin<&mut Self>, waker: &task03::Waker, buf: &[u8])
-> task03::Poll<Result<usize, Error>>
{
poll_01_to_03(self.in_notify(waker, |x| x.poll_write(buf)))
}

fn poll_flush(&mut self, waker: &task03::Waker)
fn poll_flush(mut self: Pin<&mut Self>, waker: &task03::Waker)
-> task03::Poll<Result<(), Error>>
{
poll_01_to_03(self.in_notify(waker, AsyncWrite01::poll_flush))
}

fn poll_close(&mut self, waker: &task03::Waker)
fn poll_close(mut self: Pin<&mut Self>, waker: &task03::Waker)
-> task03::Poll<Result<(), Error>>
{
poll_01_to_03(self.in_notify(waker, AsyncWrite01::shutdown))
Expand Down
16 changes: 8 additions & 8 deletions futures-util/src/compat/compat03as01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,15 @@ mod io {
}
}

impl<R: AsyncRead03> std::io::Read for Compat<R> {
impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let current = Current::new();
let waker = current.as_waker();
poll_03_to_io(self.inner.poll_read(&waker, buf))
poll_03_to_io(Pin::new(&mut self.inner).poll_read(&waker, buf))
}
}

impl<R: AsyncRead03> AsyncRead01 for Compat<R> {
impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
let initializer = self.inner.initializer();
let does_init = initializer.should_initialize();
Expand All @@ -207,25 +207,25 @@ mod io {
}
}

impl<W: AsyncWrite03> std::io::Write for Compat<W> {
impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let current = Current::new();
let waker = current.as_waker();
poll_03_to_io(self.inner.poll_write(&waker, buf))
poll_03_to_io(Pin::new(&mut self.inner).poll_write(&waker, buf))
}

fn flush(&mut self) -> std::io::Result<()> {
let current = Current::new();
let waker = current.as_waker();
poll_03_to_io(self.inner.poll_flush(&waker))
poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&waker))
}
}

impl<W: AsyncWrite03> AsyncWrite01 for Compat<W> {
impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
let current = Current::new();
let waker = current.as_waker();
poll_03_to_01(self.inner.poll_close(&waker))
poll_03_to_01(Pin::new(&mut self.inner).poll_close(&waker))
}
}
}
Loading