From d46c99abe8671479c48b003bf06e98eda7eb85ab Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 12 Feb 2016 00:17:24 -0800 Subject: [PATCH 1/3] std: Funnel read_to_end through to one location This pushes the implementation detail of proxying `read_to_end` through to `read_to_end_uninitialized` all the way down to the `FileDesc` and `Handle` implementations on Unix/Windows. This way intermediate layers will also be able to take advantage of this optimized implementation. This commit also adds the optimized implementation for `ChildStdout` and `ChildStderr`. --- src/libstd/fs.rs | 6 ++++-- src/libstd/io/stdio.rs | 12 ++++++++++-- src/libstd/net/tcp.rs | 5 ++--- src/libstd/process.rs | 6 ++++++ src/libstd/sys/common/net.rs | 4 ++++ src/libstd/sys/unix/fd.rs | 23 +++++++++++++++++++++-- src/libstd/sys/unix/fs.rs | 4 ++++ src/libstd/sys/unix/net.rs | 4 ++++ src/libstd/sys/unix/pipe.rs | 4 ++++ src/libstd/sys/unix/stdio.rs | 9 +++++++++ src/libstd/sys/windows/fs.rs | 5 +++++ src/libstd/sys/windows/handle.rs | 21 ++++++++++++++++++++- src/libstd/sys/windows/net.rs | 21 ++++++++++++++++++++- src/libstd/sys/windows/pipe.rs | 6 ++++++ src/libstd/sys/windows/stdio.rs | 17 +++++++++++++++++ 15 files changed, 136 insertions(+), 11 deletions(-) diff --git a/src/libstd/fs.rs b/src/libstd/fs.rs index 53384fb9b154b..6b88d498b1041 100644 --- a/src/libstd/fs.rs +++ b/src/libstd/fs.rs @@ -22,7 +22,6 @@ use ffi::OsString; use io::{self, SeekFrom, Seek, Read, Write}; use path::{Path, PathBuf}; use sys::fs as fs_imp; -use sys_common::io::read_to_end_uninitialized; use sys_common::{AsInnerMut, FromInner, AsInner, IntoInner}; use vec::Vec; use time::SystemTime; @@ -351,7 +350,7 @@ impl Read for File { self.inner.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.inner.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] @@ -372,6 +371,9 @@ impl<'a> Read for &'a File { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for &'a File { diff --git a/src/libstd/io/stdio.rs b/src/libstd/io/stdio.rs index cd2d5e52462bb..25309a785c45a 100644 --- a/src/libstd/io/stdio.rs +++ b/src/libstd/io/stdio.rs @@ -18,7 +18,6 @@ use io::lazy::Lazy; use io::{self, BufReader, LineWriter}; use sync::{Arc, Mutex, MutexGuard}; use sys::stdio; -use sys_common::io::{read_to_end_uninitialized}; use sys_common::remutex::{ReentrantMutex, ReentrantMutexGuard}; use thread::LocalKeyState; @@ -78,6 +77,9 @@ fn stderr_raw() -> io::Result { stdio::Stderr::new().map(StderrRaw) } impl Read for StdinRaw { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } } impl Write for StdoutRaw { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } @@ -116,6 +118,12 @@ impl io::Read for Maybe { Maybe::Fake => Ok(0) } } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + match *self { + Maybe::Real(ref mut r) => handle_ebadf(r.read_to_end(buf), 0), + Maybe::Fake => Ok(0) + } + } } fn handle_ebadf(r: io::Result, default: T) -> io::Result { @@ -294,7 +302,7 @@ impl<'a> Read for StdinLock<'a> { self.inner.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.inner.read_to_end(buf) } } diff --git a/src/libstd/net/tcp.rs b/src/libstd/net/tcp.rs index f8e3b58bb3e95..414696413f494 100644 --- a/src/libstd/net/tcp.rs +++ b/src/libstd/net/tcp.rs @@ -14,7 +14,6 @@ use io::prelude::*; use fmt; use io; use net::{ToSocketAddrs, SocketAddr, Shutdown}; -use sys_common::io::read_to_end_uninitialized; use sys_common::net as net_imp; use sys_common::{AsInner, FromInner, IntoInner}; use time::Duration; @@ -269,7 +268,7 @@ impl TcpStream { impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] @@ -281,7 +280,7 @@ impl Write for TcpStream { impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] diff --git a/src/libstd/process.rs b/src/libstd/process.rs index 8db8ad324bea9..ec86dd062b540 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -134,6 +134,9 @@ impl Read for ChildStdout { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } impl AsInner for ChildStdout { @@ -161,6 +164,9 @@ impl Read for ChildStderr { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } impl AsInner for ChildStderr { diff --git a/src/libstd/sys/common/net.rs b/src/libstd/sys/common/net.rs index ca4f6e19882b6..aa92e5be11403 100644 --- a/src/libstd/sys/common/net.rs +++ b/src/libstd/sys/common/net.rs @@ -225,6 +225,10 @@ impl TcpStream { self.inner.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let len = cmp::min(buf.len(), ::max_value() as usize) as wrlen_t; let ret = try!(cvt(unsafe { diff --git a/src/libstd/sys/unix/fd.rs b/src/libstd/sys/unix/fd.rs index 299c6ec2731d7..a00e6c3eb7254 100644 --- a/src/libstd/sys/unix/fd.rs +++ b/src/libstd/sys/unix/fd.rs @@ -8,12 +8,15 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use io; +use prelude::v1::*; + +use io::{self, Read}; use libc::{self, c_int, size_t, c_void}; use mem; +use sync::atomic::{AtomicBool, Ordering}; use sys::cvt; use sys_common::AsInner; -use sync::atomic::{AtomicBool, Ordering}; +use sys_common::io::read_to_end_uninitialized; pub struct FileDesc { fd: c_int, @@ -42,6 +45,11 @@ impl FileDesc { Ok(ret as usize) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let ret = try!(cvt(unsafe { libc::write(self.fd, @@ -118,6 +126,17 @@ impl FileDesc { } } +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a FileDesc { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} + impl AsInner for FileDesc { fn as_inner(&self) -> &c_int { &self.fd } } diff --git a/src/libstd/sys/unix/fs.rs b/src/libstd/sys/unix/fs.rs index d1b4b1c5c0895..3985a07470e0d 100644 --- a/src/libstd/sys/unix/fs.rs +++ b/src/libstd/sys/unix/fs.rs @@ -486,6 +486,10 @@ impl File { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.0.write(buf) } diff --git a/src/libstd/sys/unix/net.rs b/src/libstd/sys/unix/net.rs index 8785da51986db..acf501d5fda88 100644 --- a/src/libstd/sys/unix/net.rs +++ b/src/libstd/sys/unix/net.rs @@ -116,6 +116,10 @@ impl Socket { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn set_timeout(&self, dur: Option, kind: libc::c_int) -> io::Result<()> { let timeout = match dur { Some(dur) => { diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index 667f0f9e6bf62..d88193e62273b 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -57,6 +57,10 @@ impl AnonPipe { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.0.write(buf) } diff --git a/src/libstd/sys/unix/stdio.rs b/src/libstd/sys/unix/stdio.rs index ccbb14677c7e4..37d1d9a969ed8 100644 --- a/src/libstd/sys/unix/stdio.rs +++ b/src/libstd/sys/unix/stdio.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + use io; use libc; use sys::fd::FileDesc; @@ -25,6 +27,13 @@ impl Stdin { fd.into_raw(); ret } + + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let fd = FileDesc::new(libc::STDIN_FILENO); + let ret = fd.read_to_end(buf); + fd.into_raw(); + ret + } } impl Stdout { diff --git a/src/libstd/sys/windows/fs.rs b/src/libstd/sys/windows/fs.rs index 95fb1e7c60052..624fef097fcc5 100644 --- a/src/libstd/sys/windows/fs.rs +++ b/src/libstd/sys/windows/fs.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; use io::prelude::*; use os::windows::prelude::*; @@ -312,6 +313,10 @@ impl File { self.handle.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.handle.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.handle.write(buf) } diff --git a/src/libstd/sys/windows/handle.rs b/src/libstd/sys/windows/handle.rs index 47676a927f658..f4b8b2754c5b6 100644 --- a/src/libstd/sys/windows/handle.rs +++ b/src/libstd/sys/windows/handle.rs @@ -8,14 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + use cmp; -use io::ErrorKind; +use io::{ErrorKind, Read}; use io; use mem; use ops::Deref; use ptr; use sys::c; use sys::cvt; +use sys_common::io::read_to_end_uninitialized; use u32; /// An owned container for `HANDLE` object, closing them on Drop. @@ -87,6 +90,11 @@ impl RawHandle { } } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let mut amt = 0; // WriteFile takes a DWORD (u32) for the length so it only supports @@ -111,3 +119,14 @@ impl RawHandle { Ok(Handle::new(ret)) } } + +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a RawHandle { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} diff --git a/src/libstd/sys/windows/net.rs b/src/libstd/sys/windows/net.rs index dfa44a651e61c..01e3a6cd8ed8f 100644 --- a/src/libstd/sys/windows/net.rs +++ b/src/libstd/sys/windows/net.rs @@ -8,8 +8,10 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + use cmp; -use io; +use io::{self, Read}; use libc::{c_int, c_void, c_ulong}; use mem; use net::{SocketAddr, Shutdown}; @@ -20,6 +22,7 @@ use sync::Once; use sys::c; use sys; use sys_common::{self, AsInner, FromInner, IntoInner}; +use sys_common::io::read_to_end_uninitialized; use sys_common::net; use time::Duration; @@ -142,6 +145,11 @@ impl Socket { } } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn set_timeout(&self, dur: Option, kind: c_int) -> io::Result<()> { let timeout = match dur { @@ -206,6 +214,17 @@ impl Socket { } } +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a Socket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} + impl Drop for Socket { fn drop(&mut self) { let _ = unsafe { c::closesocket(self.0) }; diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index aec41885f3b87..8c3171d2470bc 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + use io; use ptr; use sys::cvt; @@ -41,6 +43,10 @@ impl AnonPipe { self.inner.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.inner.write(buf) } diff --git a/src/libstd/sys/windows/stdio.rs b/src/libstd/sys/windows/stdio.rs index 1cd05b61d25b0..5a8705bf0cb98 100644 --- a/src/libstd/sys/windows/stdio.rs +++ b/src/libstd/sys/windows/stdio.rs @@ -18,6 +18,7 @@ use sync::Mutex; use sys::c; use sys::cvt; use sys::handle::Handle; +use sys_common::io::read_to_end_uninitialized; pub struct NoClose(Option); @@ -113,6 +114,22 @@ impl Stdin { // MemReader shouldn't error here since we just filled it utf8.read(buf) } + + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } +} + +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a Stdin { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } } impl Stdout { From 6afa32a2504fa90b48f74979bb4061cb397e9270 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 12 Feb 2016 10:28:03 -0800 Subject: [PATCH 2/3] std: Don't always create stdin for children For example if `Command::output` or `Command::status` is used then stdin is just immediately closed. Add an option for this so as an optimization we can avoid creating pipes entirely. This should help reduce the number of active file descriptors when spawning processes on Unix and the number of active handles on Windows. --- src/libstd/process.rs | 7 ++++--- src/libstd/sys/unix/process.rs | 13 ++++++++----- src/libstd/sys/windows/process.rs | 6 ++++-- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/libstd/process.rs b/src/libstd/process.rs index ec86dd062b540..fe5e49ecb09bb 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -295,7 +295,7 @@ impl Command { /// By default, stdin, stdout and stderr are inherited from the parent. #[stable(feature = "process", since = "1.0.0")] pub fn spawn(&mut self) -> io::Result { - self.inner.spawn(imp::Stdio::Inherit).map(Child::from_inner) + self.inner.spawn(imp::Stdio::Inherit, true).map(Child::from_inner) } /// Executes the command as a child process, waiting for it to finish and @@ -318,7 +318,7 @@ impl Command { /// ``` #[stable(feature = "process", since = "1.0.0")] pub fn output(&mut self) -> io::Result { - self.inner.spawn(imp::Stdio::MakePipe).map(Child::from_inner) + self.inner.spawn(imp::Stdio::MakePipe, false).map(Child::from_inner) .and_then(|p| p.wait_with_output()) } @@ -340,7 +340,8 @@ impl Command { /// ``` #[stable(feature = "process", since = "1.0.0")] pub fn status(&mut self) -> io::Result { - self.spawn().and_then(|mut p| p.wait()) + self.inner.spawn(imp::Stdio::Inherit, false).map(Child::from_inner) + .and_then(|mut p| p.wait()) } } diff --git a/src/libstd/sys/unix/process.rs b/src/libstd/sys/unix/process.rs index 28475f50ce63e..5696cb2b52f73 100644 --- a/src/libstd/sys/unix/process.rs +++ b/src/libstd/sys/unix/process.rs @@ -216,7 +216,7 @@ impl Command { self.stderr = Some(stderr); } - pub fn spawn(&mut self, default: Stdio) + pub fn spawn(&mut self, default: Stdio, needs_stdin: bool) -> io::Result<(Process, StdioPipes)> { const CLOEXEC_MSG_FOOTER: &'static [u8] = b"NOEX"; @@ -225,7 +225,7 @@ impl Command { "nul byte found in provided data")); } - let (ours, theirs) = try!(self.setup_io(default)); + let (ours, theirs) = try!(self.setup_io(default, needs_stdin)); let (input, output) = try!(sys::pipe::anon_pipe()); let pid = unsafe { @@ -298,7 +298,7 @@ impl Command { "nul byte found in provided data") } - match self.setup_io(default) { + match self.setup_io(default, true) { Ok((_, theirs)) => unsafe { self.do_exec(theirs) }, Err(e) => e, } @@ -408,8 +408,11 @@ impl Command { } - fn setup_io(&self, default: Stdio) -> io::Result<(StdioPipes, ChildPipes)> { - let stdin = self.stdin.as_ref().unwrap_or(&default); + fn setup_io(&self, default: Stdio, needs_stdin: bool) + -> io::Result<(StdioPipes, ChildPipes)> { + let null = Stdio::Null; + let default_stdin = if needs_stdin {&default} else {&null}; + let stdin = self.stdin.as_ref().unwrap_or(default_stdin); let stdout = self.stdout.as_ref().unwrap_or(&default); let stderr = self.stderr.as_ref().unwrap_or(&default); let (their_stdin, our_stdin) = try!(stdin.to_child_stdio(true)); diff --git a/src/libstd/sys/windows/process.rs b/src/libstd/sys/windows/process.rs index fa118be6fe6b1..524c932eed439 100644 --- a/src/libstd/sys/windows/process.rs +++ b/src/libstd/sys/windows/process.rs @@ -123,7 +123,7 @@ impl Command { self.stderr = Some(stderr); } - pub fn spawn(&mut self, default: Stdio) + pub fn spawn(&mut self, default: Stdio, needs_stdin: bool) -> io::Result<(Process, StdioPipes)> { // To have the spawning semantics of unix/windows stay the same, we need // to read the *child's* PATH if one is provided. See #15149 for more @@ -181,7 +181,9 @@ impl Command { stdout: None, stderr: None, }; - let stdin = self.stdin.as_ref().unwrap_or(&default); + let null = Stdio::Null; + let default_stdin = if needs_stdin {&default} else {&null}; + let stdin = self.stdin.as_ref().unwrap_or(default_stdin); let stdout = self.stdout.as_ref().unwrap_or(&default); let stderr = self.stderr.as_ref().unwrap_or(&default); let stdin = try!(stdin.to_handle(c::STD_INPUT_HANDLE, &mut pipes.stdin)); From 7c3038f82477491e20c6f80c0139ddb1f1b912ca Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 12 Feb 2016 10:29:25 -0800 Subject: [PATCH 3/3] std: Don't spawn threads in `wait_with_output` Semantically there's actually no reason for us to spawn threads as part of the call to `wait_with_output`, and that's generally an incredibly heavyweight operation for just reading a few bytes (especially when stderr probably rarely has bytes!). An equivalent operation in terms of what's implemented today would be to just drain both pipes of all contents and then call `wait` on the child process itself. On Unix we can implement this through some convenient use of the `select` function, whereas on Windows we can make use of overlapped I/O. Note that on Windows this requires us to use named pipes instead of anonymous pipes, but they're semantically the same under the hood. --- src/liblibc | 2 +- src/libstd/process.rs | 36 +++-- src/libstd/sys/unix/fd.rs | 17 +- src/libstd/sys/unix/pipe.rs | 55 +++++++ src/libstd/sys/unix/process.rs | 2 +- src/libstd/sys/windows/c.rs | 38 ++++- src/libstd/sys/windows/handle.rs | 70 +++++++- src/libstd/sys/windows/net.rs | 2 + src/libstd/sys/windows/pipe.rs | 269 +++++++++++++++++++++++++++++-- src/libstd/sys/windows/stdio.rs | 2 + 10 files changed, 459 insertions(+), 34 deletions(-) diff --git a/src/liblibc b/src/liblibc index e19309c8b4e8b..2278a549559c3 160000 --- a/src/liblibc +++ b/src/liblibc @@ -1 +1 @@ -Subproject commit e19309c8b4e8bbd11f4d84dfffd75e3d1ac477fe +Subproject commit 2278a549559c38872b4338cb002ecc2a80d860dc diff --git a/src/libstd/process.rs b/src/libstd/process.rs index fe5e49ecb09bb..5813d82a315a6 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -20,10 +20,9 @@ use fmt; use io; use path::Path; use str; -use sys::pipe::AnonPipe; +use sys::pipe::{read2, AnonPipe}; use sys::process as imp; use sys_common::{AsInner, AsInnerMut, FromInner, IntoInner}; -use thread::{self, JoinHandle}; /// Representation of a running or exited child process. /// @@ -503,24 +502,29 @@ impl Child { #[stable(feature = "process", since = "1.0.0")] pub fn wait_with_output(mut self) -> io::Result { drop(self.stdin.take()); - fn read(mut input: R) -> JoinHandle>> - where R: Read + Send + 'static - { - thread::spawn(move || { - let mut ret = Vec::new(); - input.read_to_end(&mut ret).map(|_| ret) - }) + + let (mut stdout, mut stderr) = (Vec::new(), Vec::new()); + match (self.stdout.take(), self.stderr.take()) { + (None, None) => {} + (Some(mut out), None) => { + let res = out.read_to_end(&mut stdout); + res.unwrap(); + } + (None, Some(mut err)) => { + let res = err.read_to_end(&mut stderr); + res.unwrap(); + } + (Some(out), Some(err)) => { + let res = read2(out.inner, &mut stdout, err.inner, &mut stderr); + res.unwrap(); + } } - let stdout = self.stdout.take().map(read); - let stderr = self.stderr.take().map(read); - let status = try!(self.wait()); - let stdout = stdout.and_then(|t| t.join().unwrap().ok()); - let stderr = stderr.and_then(|t| t.join().unwrap().ok()); + let status = try!(self.wait()); Ok(Output { status: status, - stdout: stdout.unwrap_or(Vec::new()), - stderr: stderr.unwrap_or(Vec::new()), + stdout: stdout, + stderr: stderr, }) } } diff --git a/src/libstd/sys/unix/fd.rs b/src/libstd/sys/unix/fd.rs index a00e6c3eb7254..8ec073858fd21 100644 --- a/src/libstd/sys/unix/fd.rs +++ b/src/libstd/sys/unix/fd.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(reason = "not public", issue = "0", feature = "fd")] + use prelude::v1::*; use io::{self, Read}; @@ -75,6 +77,20 @@ impl FileDesc { } } + pub fn set_nonblocking(&self, nonblocking: bool) { + unsafe { + let previous = libc::fcntl(self.fd, libc::F_GETFL); + debug_assert!(previous != -1); + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + let ret = libc::fcntl(self.fd, libc::F_SETFL, new); + debug_assert!(ret != -1); + } + } + pub fn duplicate(&self) -> io::Result { // We want to atomically duplicate this file descriptor and set the // CLOEXEC flag, and currently that's done via F_DUPFD_CLOEXEC. This @@ -126,7 +142,6 @@ impl FileDesc { } } -#[unstable(reason = "not public", issue = "0", feature = "fd_read")] impl<'a> Read for &'a FileDesc { fn read(&mut self, buf: &mut [u8]) -> io::Result { (**self).read(buf) diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index d88193e62273b..e5cb37610011b 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + +use cmp; use io; use libc::{self, c_int}; +use mem; use sys::cvt_r; use sys::fd::FileDesc; @@ -68,3 +72,54 @@ impl AnonPipe { pub fn fd(&self) -> &FileDesc { &self.0 } pub fn into_fd(self) -> FileDesc { self.0 } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + // Set both pipes into nonblocking mode as we're gonna be reading from both + // in the `select` loop below, and we wouldn't want one to block the other! + let p1 = p1.into_fd(); + let p2 = p2.into_fd(); + p1.set_nonblocking(true); + p2.set_nonblocking(true); + + let max = cmp::max(p1.raw(), p2.raw()); + loop { + // wait for either pipe to become readable using `select` + try!(cvt_r(|| unsafe { + let mut read: libc::fd_set = mem::zeroed(); + libc::FD_SET(p1.raw(), &mut read); + libc::FD_SET(p2.raw(), &mut read); + libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _, + 0 as *mut _) + })); + + // Read as much as we can from each pipe, ignoring EWOULDBLOCK or + // EAGAIN. If we hit EOF, then this will happen because the underlying + // reader will return Ok(0), in which case we'll see `Ok` ourselves. In + // this case we flip the other fd back into blocking mode and read + // whatever's leftover on that file descriptor. + let read = |fd: &FileDesc, dst: &mut Vec| { + match fd.read_to_end(dst) { + Ok(_) => Ok(true), + Err(e) => { + if e.raw_os_error() == Some(libc::EWOULDBLOCK) || + e.raw_os_error() == Some(libc::EAGAIN) { + Ok(false) + } else { + Err(e) + } + } + } + }; + if try!(read(&p1, v1)) { + p2.set_nonblocking(false); + return p2.read_to_end(v2).map(|_| ()); + } + if try!(read(&p2, v2)) { + p1.set_nonblocking(false); + return p1.read_to_end(v1).map(|_| ()); + } + } +} diff --git a/src/libstd/sys/unix/process.rs b/src/libstd/sys/unix/process.rs index 5696cb2b52f73..47b0ff42f9322 100644 --- a/src/libstd/sys/unix/process.rs +++ b/src/libstd/sys/unix/process.rs @@ -651,7 +651,7 @@ mod tests { cmd.stdin(Stdio::MakePipe); cmd.stdout(Stdio::MakePipe); - let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null)); + let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null, true)); let stdin_write = pipes.stdin.take().unwrap(); let stdout_read = pipes.stdout.take().unwrap(); diff --git a/src/libstd/sys/windows/c.rs b/src/libstd/sys/windows/c.rs index 472ffdf9e1d93..002ffc7c8685a 100644 --- a/src/libstd/sys/windows/c.rs +++ b/src/libstd/sys/windows/c.rs @@ -12,6 +12,7 @@ #![allow(bad_style)] #![cfg_attr(test, allow(dead_code))] +#![unstable(issue = "0", feature = "windows_c")] use os::raw::{c_int, c_uint, c_ulong, c_long, c_longlong, c_ushort,}; use os::raw::{c_char, c_ulonglong}; @@ -181,6 +182,7 @@ pub const ERROR_PATH_NOT_FOUND: DWORD = 3; pub const ERROR_ACCESS_DENIED: DWORD = 5; pub const ERROR_INVALID_HANDLE: DWORD = 6; pub const ERROR_NO_MORE_FILES: DWORD = 18; +pub const ERROR_HANDLE_EOF: DWORD = 38; pub const ERROR_BROKEN_PIPE: DWORD = 109; pub const ERROR_CALL_NOT_IMPLEMENTED: DWORD = 120; pub const ERROR_INSUFFICIENT_BUFFER: DWORD = 122; @@ -188,6 +190,7 @@ pub const ERROR_ALREADY_EXISTS: DWORD = 183; pub const ERROR_NO_DATA: DWORD = 232; pub const ERROR_ENVVAR_NOT_FOUND: DWORD = 203; pub const ERROR_OPERATION_ABORTED: DWORD = 995; +pub const ERROR_IO_PENDING: DWORD = 997; pub const ERROR_TIMEOUT: DWORD = 0x5B4; pub const INVALID_HANDLE_VALUE: HANDLE = !0 as HANDLE; @@ -292,6 +295,14 @@ pub const EXCEPTION_UNWIND: DWORD = EXCEPTION_UNWINDING | EXCEPTION_TARGET_UNWIND | EXCEPTION_COLLIDED_UNWIND; +pub const PIPE_ACCESS_INBOUND: DWORD = 0x00000001; +pub const FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000; +pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000; +pub const PIPE_WAIT: DWORD = 0x00000000; +pub const PIPE_TYPE_BYTE: DWORD = 0x00000000; +pub const PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008; +pub const PIPE_READMODE_BYTE: DWORD = 0x00000000; + #[repr(C)] #[cfg(target_arch = "x86")] pub struct WSADATA { @@ -913,10 +924,6 @@ extern "system" { nOutBufferSize: DWORD, lpBytesReturned: LPDWORD, lpOverlapped: LPOVERLAPPED) -> BOOL; - pub fn CreatePipe(hReadPipe: LPHANDLE, - hWritePipe: LPHANDLE, - lpPipeAttributes: LPSECURITY_ATTRIBUTES, - nSize: DWORD) -> BOOL; pub fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: extern "system" fn(*mut c_void) @@ -1129,6 +1136,29 @@ extern "system" { OriginalContext: *const CONTEXT, HistoryTable: *const UNWIND_HISTORY_TABLE); pub fn GetSystemTimeAsFileTime(lpSystemTimeAsFileTime: LPFILETIME); + + pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR) -> HANDLE; + pub fn WaitForMultipleObjects(nCount: DWORD, + lpHandles: *const HANDLE, + bWaitAll: BOOL, + dwMilliseconds: DWORD) -> DWORD; + pub fn CreateNamedPipeW(lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPSECURITY_ATTRIBUTES) + -> HANDLE; + pub fn CancelIo(handle: HANDLE) -> BOOL; + pub fn GetOverlappedResult(hFile: HANDLE, + lpOverlapped: LPOVERLAPPED, + lpNumberOfBytesTransferred: LPDWORD, + bWait: BOOL) -> BOOL; } // Functions that aren't available on Windows XP, but we still use them and just diff --git a/src/libstd/sys/windows/handle.rs b/src/libstd/sys/windows/handle.rs index f4b8b2754c5b6..1396d670902bb 100644 --- a/src/libstd/sys/windows/handle.rs +++ b/src/libstd/sys/windows/handle.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_handle")] + use prelude::v1::*; use cmp; @@ -42,6 +44,20 @@ impl Handle { Handle(RawHandle::new(handle)) } + pub fn new_event(manual: bool, init: bool) -> io::Result { + unsafe { + let event = c::CreateEventW(0 as *mut _, + manual as c::BOOL, + init as c::BOOL, + 0 as *const _); + if event.is_null() { + Err(io::Error::last_os_error()) + } else { + Ok(Handle::new(event)) + } + } + } + pub fn into_raw(self) -> c::HANDLE { let ret = self.raw(); mem::forget(self); @@ -90,6 +106,59 @@ impl RawHandle { } } + pub unsafe fn read_overlapped(&self, + buf: &mut [u8], + overlapped: *mut c::OVERLAPPED) + -> io::Result> { + let len = cmp::min(buf.len(), ::max_value() as usize) as c::DWORD; + let mut amt = 0; + let res = cvt({ + c::ReadFile(self.0, buf.as_ptr() as c::LPVOID, + len, &mut amt, overlapped) + }); + match res { + Ok(_) => Ok(Some(amt as usize)), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_IO_PENDING as i32) { + Ok(None) + } else if e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) { + Ok(Some(0)) + } else { + Err(e) + } + } + } + } + + pub fn overlapped_result(&self, + overlapped: *mut c::OVERLAPPED, + wait: bool) -> io::Result { + unsafe { + let mut bytes = 0; + let wait = if wait {c::TRUE} else {c::FALSE}; + let res = cvt({ + c::GetOverlappedResult(self.raw(), overlapped, &mut bytes, wait) + }); + match res { + Ok(_) => Ok(bytes as usize), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_HANDLE_EOF as i32) || + e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) { + Ok(0) + } else { + Err(e) + } + } + } + } + } + + pub fn cancel_io(&self) -> io::Result<()> { + unsafe { + cvt(c::CancelIo(self.raw())).map(|_| ()) + } + } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { let mut me = self; (&mut me).read_to_end(buf) @@ -120,7 +189,6 @@ impl RawHandle { } } -#[unstable(reason = "not public", issue = "0", feature = "fd_read")] impl<'a> Read for &'a RawHandle { fn read(&mut self, buf: &mut [u8]) -> io::Result { (**self).read(buf) diff --git a/src/libstd/sys/windows/net.rs b/src/libstd/sys/windows/net.rs index 01e3a6cd8ed8f..bb3c79c5a84fd 100644 --- a/src/libstd/sys/windows/net.rs +++ b/src/libstd/sys/windows/net.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_net")] + use prelude::v1::*; use cmp; diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index 8c3171d2470bc..fbe38d76e9571 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -9,11 +9,16 @@ // except according to those terms. use prelude::v1::*; +use os::windows::prelude::*; +use ffi::OsStr; +use path::Path; use io; -use ptr; -use sys::cvt; +use mem; +use rand::{self, Rng}; +use slice; use sys::c; +use sys::fs::{File, OpenOptions}; use sys::handle::Handle; //////////////////////////////////////////////////////////////////////////////// @@ -25,14 +30,76 @@ pub struct AnonPipe { } pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> { - let mut reader = c::INVALID_HANDLE_VALUE; - let mut writer = c::INVALID_HANDLE_VALUE; - try!(cvt(unsafe { - c::CreatePipe(&mut reader, &mut writer, ptr::null_mut(), 0) - })); - let reader = Handle::new(reader); - let writer = Handle::new(writer); - Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer })) + // Note that we specifically do *not* use `CreatePipe` here because + // unfortunately the anonymous pipes returned do not support overlapped + // operations. + // + // Instead, we create a "hopefully unique" name and create a named pipe + // which has overlapped operations enabled. + // + // Once we do this, we connect do it as usual via `CreateFileW`, and then we + // return those reader/writer halves. + unsafe { + let reader; + let mut name; + let mut tries = 0; + loop { + tries += 1; + let key: u64 = rand::thread_rng().gen(); + name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}", + c::GetCurrentProcessId(), + key); + let wide_name = OsStr::new(&name) + .encode_wide() + .chain(Some(0)) + .collect::>(); + + let handle = c::CreateNamedPipeW(wide_name.as_ptr(), + c::PIPE_ACCESS_INBOUND | + c::FILE_FLAG_FIRST_PIPE_INSTANCE | + c::FILE_FLAG_OVERLAPPED, + c::PIPE_TYPE_BYTE | + c::PIPE_READMODE_BYTE | + c::PIPE_WAIT | + c::PIPE_REJECT_REMOTE_CLIENTS, + 1, + 4096, + 4096, + 0, + 0 as *mut _); + + // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're + // also just doing a best effort at selecting a unique name. If + // ERROR_ACCESS_DENIED is returned then it could mean that we + // accidentally conflicted with an already existing pipe, so we try + // again. + // + // Don't try again too much though as this could also perhaps be a + // legit error. + if handle == c::INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + if tries < 10 && + err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) { + continue + } + return Err(err) + } + reader = Handle::new(handle); + break + } + + // Connect to the named pipe we just created in write-only mode (also + // overlapped for async I/O below). + let mut opts = OpenOptions::new(); + opts.write(true); + opts.read(false); + opts.share_mode(0); + opts.attributes(c::FILE_FLAG_OVERLAPPED); + let writer = try!(File::open(Path::new(&name), &opts)); + let writer = AnonPipe { inner: writer.into_handle() }; + + Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() })) + } } impl AnonPipe { @@ -51,3 +118,185 @@ impl AnonPipe { self.inner.write(buf) } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + let p1 = p1.into_handle(); + let p2 = p2.into_handle(); + + let mut p1 = try!(AsyncPipe::new(p1, v1)); + let mut p2 = try!(AsyncPipe::new(p2, v2)); + let objs = [p1.event.raw(), p2.event.raw()]; + + // In a loop we wait for either pipe's scheduled read operation to complete. + // If the operation completes with 0 bytes, that means EOF was reached, in + // which case we just finish out the other pipe entirely. + // + // Note that overlapped I/O is in general super unsafe because we have to + // be careful to ensure that all pointers in play are valid for the entire + // duration of the I/O operation (where tons of operations can also fail). + // The destructor for `AsyncPipe` ends up taking care of most of this. + loop { + let res = unsafe { + c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) + }; + if res == c::WAIT_OBJECT_0 { + if !try!(p1.result()) || !try!(p1.schedule_read()) { + return p2.finish() + } + } else if res == c::WAIT_OBJECT_0 + 1 { + if !try!(p2.result()) || !try!(p2.schedule_read()) { + return p1.finish() + } + } else { + return Err(io::Error::last_os_error()) + } + } +} + +struct AsyncPipe<'a> { + pipe: Handle, + event: Handle, + overlapped: Box, // needs a stable address + dst: &'a mut Vec, + state: State, +} + +#[derive(PartialEq, Debug)] +enum State { + NotReading, + Reading, + Read(usize), +} + +impl<'a> AsyncPipe<'a> { + fn new(pipe: Handle, dst: &'a mut Vec) -> io::Result> { + // Create an event which we'll use to coordinate our overlapped + // opreations, this event will be used in WaitForMultipleObjects + // and passed as part of the OVERLAPPED handle. + // + // Note that we do a somewhat clever thing here by flagging the + // event as being manually reset and setting it initially to the + // signaled state. This means that we'll naturally fall through the + // WaitForMultipleObjects call above for pipes created initially, + // and the only time an even will go back to "unset" will be once an + // I/O operation is successfully scheduled (what we want). + let event = try!(Handle::new_event(true, true)); + let mut overlapped: Box = unsafe { + Box::new(mem::zeroed()) + }; + overlapped.hEvent = event.raw(); + Ok(AsyncPipe { + pipe: pipe, + overlapped: overlapped, + event: event, + dst: dst, + state: State::NotReading, + }) + } + + /// Executes an overlapped read operation. + /// + /// Must not currently be reading, and returns whether the pipe is currently + /// at EOF or not. If the pipe is not at EOF then `result()` must be called + /// to complete the read later on (may block), but if the pipe is at EOF + /// then `result()` should not be called as it will just block forever. + fn schedule_read(&mut self) -> io::Result { + assert_eq!(self.state, State::NotReading); + let amt = unsafe { + let slice = slice_to_end(self.dst); + try!(self.pipe.read_overlapped(slice, &mut *self.overlapped)) + }; + + // If this read finished immediately then our overlapped event will + // remain signaled (it was signaled coming in here) and we'll progress + // down to the method below. + // + // Otherwise the I/O operation is scheduled and the system set our event + // to not signaled, so we flag ourselves into the reading state and move + // on. + self.state = match amt { + Some(0) => return Ok(false), + Some(amt) => State::Read(amt), + None => State::Reading, + }; + Ok(true) + } + + /// Wait for the result of the overlapped operation previously executed. + /// + /// Takes a parameter `wait` which indicates if this pipe is currently being + /// read whether the function should block waiting for the read to complete. + /// + /// Return values: + /// + /// * `true` - finished any pending read and the pipe is not at EOF (keep + /// going) + /// * `false` - finished any pending read and pipe is at EOF (stop issuing + /// reads) + fn result(&mut self) -> io::Result { + let amt = match self.state { + State::NotReading => return Ok(true), + State::Reading => { + try!(self.pipe.overlapped_result(&mut *self.overlapped, true)) + } + State::Read(amt) => amt, + }; + self.state = State::NotReading; + unsafe { + let len = self.dst.len(); + self.dst.set_len(len + amt); + } + Ok(amt != 0) + } + + /// Finishes out reading this pipe entirely. + /// + /// Waits for any pending and schedule read, and then calls `read_to_end` + /// if necessary to read all the remaining information. + fn finish(&mut self) -> io::Result<()> { + while try!(self.result()) && try!(self.schedule_read()) { + // ... + } + Ok(()) + } +} + +impl<'a> Drop for AsyncPipe<'a> { + fn drop(&mut self) { + match self.state { + State::Reading => {} + _ => return, + } + + // If we have a pending read operation, then we have to make sure that + // it's *done* before we actually drop this type. The kernel requires + // that the `OVERLAPPED` and buffer pointers are valid for the entire + // I/O operation. + // + // To do that, we call `CancelIo` to cancel any pending operation, and + // if that succeeds we wait for the overlapped result. + // + // If anything here fails, there's not really much we can do, so we leak + // the buffer/OVERLAPPED pointers to ensure we're at least memory safe. + if self.pipe.cancel_io().is_err() || self.result().is_err() { + let buf = mem::replace(self.dst, Vec::new()); + let overlapped = Box::new(unsafe { mem::zeroed() }); + let overlapped = mem::replace(&mut self.overlapped, overlapped); + mem::forget((buf, overlapped)); + } + } +} + +unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { + if v.capacity() == 0 { + v.reserve(16); + } + if v.capacity() == v.len() { + v.reserve(1); + } + slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize), + v.capacity() - v.len()) +} diff --git a/src/libstd/sys/windows/stdio.rs b/src/libstd/sys/windows/stdio.rs index 5a8705bf0cb98..5883904c21d72 100644 --- a/src/libstd/sys/windows/stdio.rs +++ b/src/libstd/sys/windows/stdio.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_stdio")] + use prelude::v1::*; use io::prelude::*;