From 78753ad09ba10d27235bbe3e0f17e60ef68c162c Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 28 Jan 2020 16:40:27 +0100 Subject: [PATCH 1/3] Implement Clone for TcpStream --- src/net/tcp/listener.rs | 7 +++---- src/net/tcp/stream.rs | 26 ++++++++++++++++---------- tests/tcp.rs | 22 ++++++++++++++++++++++ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index fe06a96d6..1d7e91a27 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,6 +1,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use crate::future; use crate::io; @@ -75,9 +76,7 @@ impl TcpListener { /// [`local_addr`]: #method.local_addr pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { match mio::net::TcpListener::bind(&addr) { @@ -121,7 +120,7 @@ impl TcpListener { let mio_stream = mio::net::TcpStream::from_stream(io)?; let stream = TcpStream { - watcher: Watcher::new(mio_stream), + watcher: Arc::new(Watcher::new(mio_stream)), }; Ok((stream, addr)) } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 537bd4cdc..c9cdf5e6d 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -1,6 +1,7 @@ use std::io::{IoSlice, IoSliceMut, Read as _, Write as _}; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use crate::future; use crate::io::{self, Read, Write}; @@ -44,9 +45,9 @@ use crate::task::{Context, Poll}; /// # /// # Ok(()) }) } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TcpStream { - pub(super) watcher: Watcher, + pub(super) watcher: Arc>, } impl TcpStream { @@ -71,9 +72,7 @@ impl TcpStream { /// ``` pub async fn connect(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { // mio's TcpStream::connect is non-blocking and may just be in progress @@ -84,16 +83,20 @@ impl TcpStream { Ok(s) => Watcher::new(s), Err(e) => { last_err = Some(e); - continue + continue; } }; future::poll_fn(|cx| watcher.poll_write_ready(cx)).await; match watcher.get_ref().take_error() { - Ok(None) => return Ok(TcpStream { watcher }), + Ok(None) => { + return Ok(TcpStream { + watcher: Arc::new(watcher), + }); + } Ok(Some(e)) => last_err = Some(e), - Err(e) => last_err = Some(e) + Err(e) => last_err = Some(e), } } @@ -369,7 +372,7 @@ impl From for TcpStream { fn from(stream: std::net::TcpStream) -> TcpStream { let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); TcpStream { - watcher: Watcher::new(mio_stream), + watcher: Arc::new(Watcher::new(mio_stream)), } } } @@ -391,7 +394,10 @@ cfg_unix! { impl IntoRawFd for TcpStream { fn into_raw_fd(self) -> RawFd { - self.watcher.into_inner().into_raw_fd() + // TODO(stjepang): This does not mean `RawFd` is now the sole owner of the file + // descriptor because it's possible that there are other clones of this `TcpStream` + // using it at the same time. We should probably document that behavior. + self.as_raw_fd() } } } diff --git a/tests/tcp.rs b/tests/tcp.rs index 00fa3a045..d92cff0db 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -94,3 +94,25 @@ fn smoke_async_stream_to_std_listener() -> io::Result<()> { Ok(()) } + +#[test] +fn cloned_streams() -> io::Result<()> { + task::block_on(async { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let mut stream = TcpStream::connect(&addr).await?; + let mut cloned_stream = stream.clone(); + let mut incoming = listener.incoming(); + let mut write_stream = incoming.next().await.unwrap()?; + write_stream.write_all(b"Each your doing").await?; + + let mut buf = [0; 15]; + stream.read_exact(&mut buf[..8]).await?; + cloned_stream.read_exact(&mut buf[8..]).await?; + + assert_eq!(&buf[..15], b"Each your doing"); + + Ok(()) + }) +} From 2c56dec3c445e9000a32d26888985a41e640ee93 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 28 Jan 2020 16:42:44 +0100 Subject: [PATCH 2/3] Update examples --- examples/deadlock.rs | 21 +++++++++++++++++++++ examples/foo.rs | 30 ++++++++++++++++++++++++++++++ examples/tcp-echo.rs | 5 +++-- examples/tcp-ipv4-and-6-echo.rs | 5 +++-- 4 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 examples/deadlock.rs create mode 100644 examples/foo.rs diff --git a/examples/deadlock.rs b/examples/deadlock.rs new file mode 100644 index 000000000..7921c11af --- /dev/null +++ b/examples/deadlock.rs @@ -0,0 +1,21 @@ +use async_std::task; +use async_std::sync::channel; +use async_std::sync::Mutex; + +async fn say_hi() { + let m = Mutex::new(0); + + let guard = m.lock().await; + let guard = m.lock().await; + // let (s, r) = channel(1); + // drop(r); + // println!("send 1"); + // s.send(1).await; + // println!("send 2"); + // s.send(2).await; +} + +fn main() { + // task::block_on(task::spawn(say_hi())); + task::block_on(say_hi()); +} diff --git a/examples/foo.rs b/examples/foo.rs new file mode 100644 index 000000000..e365ddedb --- /dev/null +++ b/examples/foo.rs @@ -0,0 +1,30 @@ +use async_std::task; + +fn main() { + task::block_on(async { + let mut i = 0; + loop { + i += 1; + dbg!(i); + + task::spawn(async { + let mut v = Vec::new(); + let m = std::sync::Arc::new(async_std::sync::Mutex::new(0)); + + for _ in 0..1000 { + let m = m.clone(); + v.push(task::spawn(async move { + for _ in 0..10000 { + *m.lock().await += 1; + } + })); + } + + for t in v { + t.await; + } + }) + .await; + } + }); +} diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index 7c50be016..c04f07765 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -14,8 +14,9 @@ use async_std::task; async fn process(stream: TcpStream) -> io::Result<()> { println!("Accepted from: {}", stream.peer_addr()?); - let (reader, writer) = &mut (&stream, &stream); - io::copy(reader, writer).await?; + let mut reader = stream.clone(); + let mut writer = stream; + io::copy(&mut reader, &mut writer).await?; Ok(()) } diff --git a/examples/tcp-ipv4-and-6-echo.rs b/examples/tcp-ipv4-and-6-echo.rs index aef5e15e5..a00e1f386 100644 --- a/examples/tcp-ipv4-and-6-echo.rs +++ b/examples/tcp-ipv4-and-6-echo.rs @@ -15,8 +15,9 @@ use async_std::task; async fn process(stream: TcpStream) -> io::Result<()> { println!("Accepted from: {}", stream.peer_addr()?); - let (reader, writer) = &mut (&stream, &stream); - io::copy(reader, writer).await?; + let mut reader = stream.clone(); + let mut writer = stream; + io::copy(&mut reader, &mut writer).await?; Ok(()) } From 5ba18f9438092596c8e4a273fb3b5a619522f7e0 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 28 Jan 2020 16:45:19 +0100 Subject: [PATCH 3/3] Remove accidentally added examples --- examples/deadlock.rs | 21 --------------------- examples/foo.rs | 30 ------------------------------ 2 files changed, 51 deletions(-) delete mode 100644 examples/deadlock.rs delete mode 100644 examples/foo.rs diff --git a/examples/deadlock.rs b/examples/deadlock.rs deleted file mode 100644 index 7921c11af..000000000 --- a/examples/deadlock.rs +++ /dev/null @@ -1,21 +0,0 @@ -use async_std::task; -use async_std::sync::channel; -use async_std::sync::Mutex; - -async fn say_hi() { - let m = Mutex::new(0); - - let guard = m.lock().await; - let guard = m.lock().await; - // let (s, r) = channel(1); - // drop(r); - // println!("send 1"); - // s.send(1).await; - // println!("send 2"); - // s.send(2).await; -} - -fn main() { - // task::block_on(task::spawn(say_hi())); - task::block_on(say_hi()); -} diff --git a/examples/foo.rs b/examples/foo.rs deleted file mode 100644 index e365ddedb..000000000 --- a/examples/foo.rs +++ /dev/null @@ -1,30 +0,0 @@ -use async_std::task; - -fn main() { - task::block_on(async { - let mut i = 0; - loop { - i += 1; - dbg!(i); - - task::spawn(async { - let mut v = Vec::new(); - let m = std::sync::Arc::new(async_std::sync::Mutex::new(0)); - - for _ in 0..1000 { - let m = m.clone(); - v.push(task::spawn(async move { - for _ in 0..10000 { - *m.lock().await += 1; - } - })); - } - - for t in v { - t.await; - } - }) - .await; - } - }); -}