From c152fb2682e2acc20adf5f1e834af96ae46b2c5c Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 12 Feb 2016 00:17:24 -0800 Subject: [PATCH 1/3] std: Funnel read_to_end through to one location This pushes the implementation detail of proxying `read_to_end` through to `read_to_end_uninitialized` all the way down to the `FileDesc` and `Handle` implementations on Unix/Windows. This way intermediate layers will also be able to take advantage of this optimized implementation. This commit also adds the optimized implementation for `ChildStdout` and `ChildStderr`. --- src/libstd/fs.rs | 6 ++++-- src/libstd/io/stdio.rs | 12 ++++++++++-- src/libstd/net/tcp.rs | 5 ++--- src/libstd/process.rs | 6 ++++++ src/libstd/sys/common/net.rs | 4 ++++ src/libstd/sys/unix/fd.rs | 23 +++++++++++++++++++++-- src/libstd/sys/unix/fs.rs | 4 ++++ src/libstd/sys/unix/net.rs | 4 ++++ src/libstd/sys/unix/pipe.rs | 4 ++++ src/libstd/sys/unix/stdio.rs | 9 +++++++++ src/libstd/sys/windows/fs.rs | 5 +++++ src/libstd/sys/windows/handle.rs | 21 ++++++++++++++++++++- src/libstd/sys/windows/net.rs | 21 ++++++++++++++++++++- src/libstd/sys/windows/pipe.rs | 6 ++++++ src/libstd/sys/windows/stdio.rs | 17 +++++++++++++++++ 15 files changed, 136 insertions(+), 11 deletions(-) diff --git a/src/libstd/fs.rs b/src/libstd/fs.rs index d42b948918049..db2e951e08f2a 100644 --- a/src/libstd/fs.rs +++ b/src/libstd/fs.rs @@ -22,7 +22,6 @@ use ffi::OsString; use io::{self, SeekFrom, Seek, Read, Write}; use path::{Path, PathBuf}; use sys::fs as fs_imp; -use sys_common::io::read_to_end_uninitialized; use sys_common::{AsInnerMut, FromInner, AsInner, IntoInner}; use vec::Vec; use time::SystemTime; @@ -351,7 +350,7 @@ impl Read for File { self.inner.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.inner.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] @@ -372,6 +371,9 @@ impl<'a> Read for &'a File { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for &'a File { diff --git a/src/libstd/io/stdio.rs b/src/libstd/io/stdio.rs index cd2d5e52462bb..25309a785c45a 100644 --- a/src/libstd/io/stdio.rs +++ b/src/libstd/io/stdio.rs @@ -18,7 +18,6 @@ use io::lazy::Lazy; use io::{self, BufReader, LineWriter}; use sync::{Arc, Mutex, MutexGuard}; use sys::stdio; -use sys_common::io::{read_to_end_uninitialized}; use sys_common::remutex::{ReentrantMutex, ReentrantMutexGuard}; use thread::LocalKeyState; @@ -78,6 +77,9 @@ fn stderr_raw() -> io::Result { stdio::Stderr::new().map(StderrRaw) } impl Read for StdinRaw { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } } impl Write for StdoutRaw { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } @@ -116,6 +118,12 @@ impl io::Read for Maybe { Maybe::Fake => Ok(0) } } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + match *self { + Maybe::Real(ref mut r) => handle_ebadf(r.read_to_end(buf), 0), + Maybe::Fake => Ok(0) + } + } } fn handle_ebadf(r: io::Result, default: T) -> io::Result { @@ -294,7 +302,7 @@ impl<'a> Read for StdinLock<'a> { self.inner.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.inner.read_to_end(buf) } } diff --git a/src/libstd/net/tcp.rs b/src/libstd/net/tcp.rs index f9c38c3845847..0ebbd1cb07b68 100644 --- a/src/libstd/net/tcp.rs +++ b/src/libstd/net/tcp.rs @@ -14,7 +14,6 @@ use io::prelude::*; use fmt; use io; use net::{ToSocketAddrs, SocketAddr, Shutdown}; -use sys_common::io::read_to_end_uninitialized; use sys_common::net as net_imp; use sys_common::{AsInner, FromInner, IntoInner}; use time::Duration; @@ -186,7 +185,7 @@ impl TcpStream { impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] @@ -198,7 +197,7 @@ impl Write for TcpStream { impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] diff --git a/src/libstd/process.rs b/src/libstd/process.rs index 8db8ad324bea9..ec86dd062b540 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -134,6 +134,9 @@ impl Read for ChildStdout { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } impl AsInner for ChildStdout { @@ -161,6 +164,9 @@ impl Read for ChildStderr { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } impl AsInner for ChildStderr { diff --git a/src/libstd/sys/common/net.rs b/src/libstd/sys/common/net.rs index 7e05895b2ccbd..75bd491ab8ce4 100644 --- a/src/libstd/sys/common/net.rs +++ b/src/libstd/sys/common/net.rs @@ -197,6 +197,10 @@ impl TcpStream { self.inner.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let ret = try!(cvt(unsafe { c::send(*self.inner.as_inner(), diff --git a/src/libstd/sys/unix/fd.rs b/src/libstd/sys/unix/fd.rs index 299c6ec2731d7..a00e6c3eb7254 100644 --- a/src/libstd/sys/unix/fd.rs +++ b/src/libstd/sys/unix/fd.rs @@ -8,12 +8,15 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use io; +use prelude::v1::*; + +use io::{self, Read}; use libc::{self, c_int, size_t, c_void}; use mem; +use sync::atomic::{AtomicBool, Ordering}; use sys::cvt; use sys_common::AsInner; -use sync::atomic::{AtomicBool, Ordering}; +use sys_common::io::read_to_end_uninitialized; pub struct FileDesc { fd: c_int, @@ -42,6 +45,11 @@ impl FileDesc { Ok(ret as usize) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let ret = try!(cvt(unsafe { libc::write(self.fd, @@ -118,6 +126,17 @@ impl FileDesc { } } +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a FileDesc { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} + impl AsInner for FileDesc { fn as_inner(&self) -> &c_int { &self.fd } } diff --git a/src/libstd/sys/unix/fs.rs b/src/libstd/sys/unix/fs.rs index ee81c516e33d6..68993e6b8c524 100644 --- a/src/libstd/sys/unix/fs.rs +++ b/src/libstd/sys/unix/fs.rs @@ -470,6 +470,10 @@ impl File { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.0.write(buf) } diff --git a/src/libstd/sys/unix/net.rs b/src/libstd/sys/unix/net.rs index 507cc0f4ea461..83b3bd2dd098a 100644 --- a/src/libstd/sys/unix/net.rs +++ b/src/libstd/sys/unix/net.rs @@ -116,6 +116,10 @@ impl Socket { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn set_timeout(&self, dur: Option, kind: libc::c_int) -> io::Result<()> { let timeout = match dur { Some(dur) => { diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index 667f0f9e6bf62..d88193e62273b 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -57,6 +57,10 @@ impl AnonPipe { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.0.write(buf) } diff --git a/src/libstd/sys/unix/stdio.rs b/src/libstd/sys/unix/stdio.rs index ccbb14677c7e4..37d1d9a969ed8 100644 --- a/src/libstd/sys/unix/stdio.rs +++ b/src/libstd/sys/unix/stdio.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + use io; use libc; use sys::fd::FileDesc; @@ -25,6 +27,13 @@ impl Stdin { fd.into_raw(); ret } + + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let fd = FileDesc::new(libc::STDIN_FILENO); + let ret = fd.read_to_end(buf); + fd.into_raw(); + ret + } } impl Stdout { diff --git a/src/libstd/sys/windows/fs.rs b/src/libstd/sys/windows/fs.rs index 3062d38f8c259..8c9f9383786f8 100644 --- a/src/libstd/sys/windows/fs.rs +++ b/src/libstd/sys/windows/fs.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; use io::prelude::*; use os::windows::prelude::*; @@ -318,6 +319,10 @@ impl File { self.handle.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.handle.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.handle.write(buf) } diff --git a/src/libstd/sys/windows/handle.rs b/src/libstd/sys/windows/handle.rs index cb41b05daaea2..61708db880db2 100644 --- a/src/libstd/sys/windows/handle.rs +++ b/src/libstd/sys/windows/handle.rs @@ -8,13 +8,16 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use io::ErrorKind; +use prelude::v1::*; + +use io::{ErrorKind, Read}; use io; use mem; use ops::Deref; use ptr; use sys::c; use sys::cvt; +use sys_common::io::read_to_end_uninitialized; /// An owned container for `HANDLE` object, closing them on Drop. /// @@ -83,6 +86,11 @@ impl RawHandle { } } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let mut amt = 0; try!(cvt(unsafe { @@ -105,3 +113,14 @@ impl RawHandle { Ok(Handle::new(ret)) } } + +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a RawHandle { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} diff --git a/src/libstd/sys/windows/net.rs b/src/libstd/sys/windows/net.rs index 3e69902dcb6bd..dde50026ae45c 100644 --- a/src/libstd/sys/windows/net.rs +++ b/src/libstd/sys/windows/net.rs @@ -8,7 +8,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use io; +use prelude::v1::*; + +use io::{self, Read}; use libc::{c_int, c_void}; use mem; use net::{SocketAddr, Shutdown}; @@ -19,6 +21,7 @@ use sync::Once; use sys::c; use sys; use sys_common::{self, AsInner, FromInner, IntoInner}; +use sys_common::io::read_to_end_uninitialized; use sys_common::net; use time::Duration; @@ -141,6 +144,11 @@ impl Socket { } } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn set_timeout(&self, dur: Option, kind: c_int) -> io::Result<()> { let timeout = match dur { @@ -186,6 +194,17 @@ impl Socket { } } +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a Socket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} + impl Drop for Socket { fn drop(&mut self) { let _ = unsafe { c::closesocket(self.0) }; diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index aec41885f3b87..8c3171d2470bc 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + use io; use ptr; use sys::cvt; @@ -41,6 +43,10 @@ impl AnonPipe { self.inner.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.inner.write(buf) } diff --git a/src/libstd/sys/windows/stdio.rs b/src/libstd/sys/windows/stdio.rs index 8f37dc02e87af..812bf4b8e03bc 100644 --- a/src/libstd/sys/windows/stdio.rs +++ b/src/libstd/sys/windows/stdio.rs @@ -18,6 +18,7 @@ use sync::Mutex; use sys::c; use sys::cvt; use sys::handle::Handle; +use sys_common::io::read_to_end_uninitialized; pub struct NoClose(Option); @@ -113,6 +114,22 @@ impl Stdin { // MemReader shouldn't error here since we just filled it utf8.read(buf) } + + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } +} + +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a Stdin { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } } impl Stdout { From 616ca6ae8f7d2d01222a72ee6af17ca6a1b2e84f Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 12 Feb 2016 10:28:03 -0800 Subject: [PATCH 2/3] std: Don't always create stdin for children For example if `Command::output` or `Command::status` is used then stdin is just immediately closed. Add an option for this so as an optimization we can avoid creating pipes entirely. This should help reduce the number of active file descriptors when spawning processes on Unix and the number of active handles on Windows. --- src/libstd/process.rs | 7 ++++--- src/libstd/sys/unix/process.rs | 13 ++++++++----- src/libstd/sys/windows/process.rs | 6 ++++-- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/libstd/process.rs b/src/libstd/process.rs index ec86dd062b540..fe5e49ecb09bb 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -295,7 +295,7 @@ impl Command { /// By default, stdin, stdout and stderr are inherited from the parent. #[stable(feature = "process", since = "1.0.0")] pub fn spawn(&mut self) -> io::Result { - self.inner.spawn(imp::Stdio::Inherit).map(Child::from_inner) + self.inner.spawn(imp::Stdio::Inherit, true).map(Child::from_inner) } /// Executes the command as a child process, waiting for it to finish and @@ -318,7 +318,7 @@ impl Command { /// ``` #[stable(feature = "process", since = "1.0.0")] pub fn output(&mut self) -> io::Result { - self.inner.spawn(imp::Stdio::MakePipe).map(Child::from_inner) + self.inner.spawn(imp::Stdio::MakePipe, false).map(Child::from_inner) .and_then(|p| p.wait_with_output()) } @@ -340,7 +340,8 @@ impl Command { /// ``` #[stable(feature = "process", since = "1.0.0")] pub fn status(&mut self) -> io::Result { - self.spawn().and_then(|mut p| p.wait()) + self.inner.spawn(imp::Stdio::Inherit, false).map(Child::from_inner) + .and_then(|mut p| p.wait()) } } diff --git a/src/libstd/sys/unix/process.rs b/src/libstd/sys/unix/process.rs index 28475f50ce63e..5696cb2b52f73 100644 --- a/src/libstd/sys/unix/process.rs +++ b/src/libstd/sys/unix/process.rs @@ -216,7 +216,7 @@ impl Command { self.stderr = Some(stderr); } - pub fn spawn(&mut self, default: Stdio) + pub fn spawn(&mut self, default: Stdio, needs_stdin: bool) -> io::Result<(Process, StdioPipes)> { const CLOEXEC_MSG_FOOTER: &'static [u8] = b"NOEX"; @@ -225,7 +225,7 @@ impl Command { "nul byte found in provided data")); } - let (ours, theirs) = try!(self.setup_io(default)); + let (ours, theirs) = try!(self.setup_io(default, needs_stdin)); let (input, output) = try!(sys::pipe::anon_pipe()); let pid = unsafe { @@ -298,7 +298,7 @@ impl Command { "nul byte found in provided data") } - match self.setup_io(default) { + match self.setup_io(default, true) { Ok((_, theirs)) => unsafe { self.do_exec(theirs) }, Err(e) => e, } @@ -408,8 +408,11 @@ impl Command { } - fn setup_io(&self, default: Stdio) -> io::Result<(StdioPipes, ChildPipes)> { - let stdin = self.stdin.as_ref().unwrap_or(&default); + fn setup_io(&self, default: Stdio, needs_stdin: bool) + -> io::Result<(StdioPipes, ChildPipes)> { + let null = Stdio::Null; + let default_stdin = if needs_stdin {&default} else {&null}; + let stdin = self.stdin.as_ref().unwrap_or(default_stdin); let stdout = self.stdout.as_ref().unwrap_or(&default); let stderr = self.stderr.as_ref().unwrap_or(&default); let (their_stdin, our_stdin) = try!(stdin.to_child_stdio(true)); diff --git a/src/libstd/sys/windows/process.rs b/src/libstd/sys/windows/process.rs index fa118be6fe6b1..524c932eed439 100644 --- a/src/libstd/sys/windows/process.rs +++ b/src/libstd/sys/windows/process.rs @@ -123,7 +123,7 @@ impl Command { self.stderr = Some(stderr); } - pub fn spawn(&mut self, default: Stdio) + pub fn spawn(&mut self, default: Stdio, needs_stdin: bool) -> io::Result<(Process, StdioPipes)> { // To have the spawning semantics of unix/windows stay the same, we need // to read the *child's* PATH if one is provided. See #15149 for more @@ -181,7 +181,9 @@ impl Command { stdout: None, stderr: None, }; - let stdin = self.stdin.as_ref().unwrap_or(&default); + let null = Stdio::Null; + let default_stdin = if needs_stdin {&default} else {&null}; + let stdin = self.stdin.as_ref().unwrap_or(default_stdin); let stdout = self.stdout.as_ref().unwrap_or(&default); let stderr = self.stderr.as_ref().unwrap_or(&default); let stdin = try!(stdin.to_handle(c::STD_INPUT_HANDLE, &mut pipes.stdin)); From 05c188ddaed90a62dfbb364c89caec5b78461348 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 12 Feb 2016 10:29:25 -0800 Subject: [PATCH 3/3] std: Don't spawn threads in `wait_with_output` Semantically there's actually no reason for us to spawn threads as part of the call to `wait_with_output`, and that's generally an incredibly heavyweight operation for just reading a few bytes (especially when stderr probably rarely has bytes!). An equivalent operation in terms of what's implemented today would be to just drain both pipes of all contents and then call `wait` on the child process itself. On Unix we can implement this through some convenient use of the `select` function, whereas on Windows we can make use of overlapped I/O. Note that on Windows this requires us to use named pipes instead of anonymous pipes, but they're semantically the same under the hood. --- src/libstd/process.rs | 36 ++--- src/libstd/sys/unix/fd.rs | 14 ++ src/libstd/sys/unix/pipe.rs | 55 ++++++++ src/libstd/sys/unix/process.rs | 2 +- src/libstd/sys/windows/c.rs | 32 +++++ src/libstd/sys/windows/handle.rs | 53 +++++++- src/libstd/sys/windows/pipe.rs | 218 +++++++++++++++++++++++++++++-- 7 files changed, 381 insertions(+), 29 deletions(-) diff --git a/src/libstd/process.rs b/src/libstd/process.rs index fe5e49ecb09bb..277e4ec3eaac7 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -20,10 +20,9 @@ use fmt; use io; use path::Path; use str; -use sys::pipe::AnonPipe; +use sys::pipe::{read2, AnonPipe}; use sys::process as imp; use sys_common::{AsInner, AsInnerMut, FromInner, IntoInner}; -use thread::{self, JoinHandle}; /// Representation of a running or exited child process. /// @@ -503,24 +502,29 @@ impl Child { #[stable(feature = "process", since = "1.0.0")] pub fn wait_with_output(mut self) -> io::Result { drop(self.stdin.take()); - fn read(mut input: R) -> JoinHandle>> - where R: Read + Send + 'static - { - thread::spawn(move || { - let mut ret = Vec::new(); - input.read_to_end(&mut ret).map(|_| ret) - }) + + let (mut stdout, mut stderr) = (Vec::new(), Vec::new()); + match (self.stdout.take(), self.stderr.take()) { + (None, None) => {} + (Some(mut out), None) => { + let res = out.read_to_end(&mut stdout); + debug_assert!(res.is_ok()); + } + (None, Some(mut err)) => { + let res = err.read_to_end(&mut stderr); + debug_assert!(res.is_ok()); + } + (Some(out), Some(err)) => { + let res = read2(out.inner, &mut stdout, err.inner, &mut stderr); + debug_assert!(res.is_ok()); + } } - let stdout = self.stdout.take().map(read); - let stderr = self.stderr.take().map(read); - let status = try!(self.wait()); - let stdout = stdout.and_then(|t| t.join().unwrap().ok()); - let stderr = stderr.and_then(|t| t.join().unwrap().ok()); + let status = try!(self.wait()); Ok(Output { status: status, - stdout: stdout.unwrap_or(Vec::new()), - stderr: stderr.unwrap_or(Vec::new()), + stdout: stdout, + stderr: stderr, }) } } diff --git a/src/libstd/sys/unix/fd.rs b/src/libstd/sys/unix/fd.rs index a00e6c3eb7254..0de87f862f05b 100644 --- a/src/libstd/sys/unix/fd.rs +++ b/src/libstd/sys/unix/fd.rs @@ -75,6 +75,20 @@ impl FileDesc { } } + pub fn set_nonblocking(&self, nonblocking: bool) { + unsafe { + let previous = libc::fcntl(self.fd, libc::F_GETFL); + debug_assert!(previous != -1); + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + let ret = libc::fcntl(self.fd, libc::F_SETFL, new); + debug_assert!(ret != -1); + } + } + pub fn duplicate(&self) -> io::Result { // We want to atomically duplicate this file descriptor and set the // CLOEXEC flag, and currently that's done via F_DUPFD_CLOEXEC. This diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index d88193e62273b..02e5882fccafd 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + +use cmp; use io; use libc::{self, c_int}; +use mem; use sys::cvt_r; use sys::fd::FileDesc; @@ -68,3 +72,54 @@ impl AnonPipe { pub fn fd(&self) -> &FileDesc { &self.0 } pub fn into_fd(self) -> FileDesc { self.0 } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + // Set both pipes into nonblocking mode as we're gonna be reading from both + // in the `select` loop below, and we wouldn't want one to block the other! + let p1 = p1.into_fd(); + let p2 = p2.into_fd(); + p1.set_nonblocking(true); + p2.set_nonblocking(true); + + let max = cmp::max(p1.raw(), p2.raw()); + loop { + // wait for either pipe to become readable using `select` + try!(cvt_r(|| unsafe { + let mut read: libc::fd_set = mem::zeroed(); + libc::FD_SET(p1.raw(), &mut read); + libc::FD_SET(p2.raw(), &mut read); + libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _, + 0 as *mut _) + })); + + // Read as much as we can from each pipe, ignoring EWOULDBLOCK or + // EAGAIN. If we hit EOF, then this will happen because the underlying + // reader will return Ok(0), in which case we'll see `Ok` ourselves. In + // this case we flip the other fd back into blocking mode and read + // whatever's leftover on that file descriptor. + let read = |fd: &FileDesc, dst: &mut Vec| { + match fd.read_to_end(dst) { + Ok(_) => Ok(true), + Err(e) => { + if e.raw_os_error() == Some(libc::EWOULDBLOCK) || + e.raw_os_error() == Some(libc::EAGAIN) { + Ok(false) + } else { + Err(e) + } + } + } + }; + if try!(read(&p1, v1)) { + p2.set_nonblocking(false); + return p2.read_to_end(v2); + } + if try!(read(&p2, v2)) { + p1.set_nonblocking(false); + return p1.read_to_end(v1); + } + } +} diff --git a/src/libstd/sys/unix/process.rs b/src/libstd/sys/unix/process.rs index 5696cb2b52f73..47b0ff42f9322 100644 --- a/src/libstd/sys/unix/process.rs +++ b/src/libstd/sys/unix/process.rs @@ -651,7 +651,7 @@ mod tests { cmd.stdin(Stdio::MakePipe); cmd.stdout(Stdio::MakePipe); - let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null)); + let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null, true)); let stdin_write = pipes.stdin.take().unwrap(); let stdout_read = pipes.stdout.take().unwrap(); diff --git a/src/libstd/sys/windows/c.rs b/src/libstd/sys/windows/c.rs index 9fdeb0aef14c8..d1fcd5bf880ec 100644 --- a/src/libstd/sys/windows/c.rs +++ b/src/libstd/sys/windows/c.rs @@ -268,6 +268,7 @@ pub const ERROR_PATH_NOT_FOUND: DWORD = 3; pub const ERROR_ACCESS_DENIED: DWORD = 5; pub const ERROR_INVALID_HANDLE: DWORD = 6; pub const ERROR_NO_MORE_FILES: DWORD = 18; +pub const ERROR_HANDLE_EOF: DWORD = 38; pub const ERROR_BROKEN_PIPE: DWORD = 109; pub const ERROR_DISK_FULL: DWORD = 112; pub const ERROR_CALL_NOT_IMPLEMENTED: DWORD = 120; @@ -361,6 +362,14 @@ pub const EXCEPTION_UNWIND: DWORD = EXCEPTION_UNWINDING | EXCEPTION_TARGET_UNWIND | EXCEPTION_COLLIDED_UNWIND; +pub const PIPE_ACCESS_INBOUND: DWORD = 0x00000001; +pub const FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000; +pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000; +pub const PIPE_WAIT: DWORD = 0x00000000; +pub const PIPE_TYPE_BYTE: DWORD = 0x00000000; +pub const PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008; +pub const PIPE_READMODE_BYTE: DWORD = 0x00000000; + #[repr(C)] #[cfg(target_arch = "x86")] pub struct WSADATA { @@ -1261,6 +1270,29 @@ extern "system" { OriginalContext: *const CONTEXT, HistoryTable: *const UNWIND_HISTORY_TABLE); pub fn GetSystemTimeAsFileTime(lpSystemTimeAsFileTime: LPFILETIME); + + pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR) -> HANDLE; + pub fn WaitForMultipleObjects(nCount: DWORD, + lpHandles: *const HANDLE, + bWaitAll: BOOL, + dwMilliseconds: DWORD) -> DWORD; + pub fn CreateNamedPipeW(lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPSECURITY_ATTRIBUTES) + -> HANDLE; + pub fn CancelIo(handle: HANDLE) -> BOOL; + pub fn GetOverlappedResult(hFile: HANDLE, + lpOverlapped: LPOVERLAPPED, + lpNumberOfBytesTransferred: LPDWORD, + bWait: BOOL) -> BOOL; } // Functions that aren't available on Windows XP, but we still use them and just diff --git a/src/libstd/sys/windows/handle.rs b/src/libstd/sys/windows/handle.rs index 61708db880db2..7c8796196a53e 100644 --- a/src/libstd/sys/windows/handle.rs +++ b/src/libstd/sys/windows/handle.rs @@ -69,8 +69,8 @@ impl RawHandle { let mut read = 0; let res = cvt(unsafe { c::ReadFile(self.0, buf.as_ptr() as c::LPVOID, - buf.len() as c::DWORD, &mut read, - ptr::null_mut()) + buf.len() as c::DWORD, &mut read, + 0 as *mut _) }); match res { @@ -86,6 +86,55 @@ impl RawHandle { } } + pub unsafe fn read_overlapped(&self, + buf: &mut [u8], + overlapped: *mut c::OVERLAPPED) + -> io::Result { + let res = cvt({ + c::ReadFile(self.0, buf.as_ptr() as c::LPVOID, + buf.len() as c::DWORD, 0 as *mut _, + overlapped) + }); + match res { + Ok(_) => Ok(true), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_IO_PENDING as i32) { + Ok(false) + } else { + Err(e) + } + } + } + } + + pub fn overlapped_result(&self, + overlapped: *mut c::OVERLAPPED, + wait: bool) -> io::Result { + unsafe { + let mut bytes = 0; + let wait = if wait {c::TRUE} else {c::FALSE}; + let res = cvt({ + c::GetOverlappedResult(self.raw(), overlapped, &mut bytes, wait) + }); + match res { + Ok(_) => Ok(bytes as usize), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_HANDLE_EOF as i32) { + Ok(0) + } else { + Err(e) + } + } + } + } + } + + pub fn cancel_io(&self) -> io::Result<()> { + unsafe { + cvt(c::CancelIo(self.raw())).map(|_| ()) + } + } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { let mut me = self; (&mut me).read_to_end(buf) diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index 8c3171d2470bc..a32b3d1df28cc 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -9,11 +9,16 @@ // except according to those terms. use prelude::v1::*; +use os::windows::prelude::*; +use ffi::OsStr; +use path::Path; use io; -use ptr; -use sys::cvt; +use mem; +use rand::{self, Rng}; +use slice; use sys::c; +use sys::fs::{File, OpenOptions}; use sys::handle::Handle; //////////////////////////////////////////////////////////////////////////////// @@ -25,14 +30,52 @@ pub struct AnonPipe { } pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> { - let mut reader = c::INVALID_HANDLE_VALUE; - let mut writer = c::INVALID_HANDLE_VALUE; - try!(cvt(unsafe { - c::CreatePipe(&mut reader, &mut writer, ptr::null_mut(), 0) - })); - let reader = Handle::new(reader); - let writer = Handle::new(writer); - Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer })) + // Note that we specifically do *not* use `CreatePipe` here because + // unfortunately the anonymous pipes returned do not support overlapped + // operations. + // + // Instead, we create a "hopefully unique" name and create a named pipe + // which has overlapped operations enabled. + // + // Once we do this, we connect do it as usual via `CreateFileW`, and then we + // return thos reader/writer halves. + unsafe { + let key: u64 = rand::thread_rng().gen(); + let name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}", + c::GetCurrentProcessId(), + key); + let wide_name = OsStr::new(&name) + .encode_wide() + .chain(Some(0)) + .collect::>(); + + let reader = c::CreateNamedPipeW(wide_name.as_ptr(), + c::PIPE_ACCESS_INBOUND | + c::FILE_FLAG_FIRST_PIPE_INSTANCE | + c::FILE_FLAG_OVERLAPPED, + c::PIPE_TYPE_BYTE | + c::PIPE_READMODE_BYTE | + c::PIPE_WAIT | + c::PIPE_REJECT_REMOTE_CLIENTS, + 1, + 4096, + 4096, + 0, + 0 as *mut _); + if reader == c::INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error()) + } + let reader = AnonPipe { inner: Handle::new(reader) }; + + let mut opts = OpenOptions::new(); + opts.write(true); + opts.read(false); + opts.attributes(c::FILE_FLAG_OVERLAPPED); + let writer = try!(File::open(Path::new(&name), &opts)); + let writer = AnonPipe { inner: writer.into_handle() }; + + Ok((reader, writer)) + } } impl AnonPipe { @@ -51,3 +94,158 @@ impl AnonPipe { self.inner.write(buf) } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + let p1 = p1.into_handle(); + let p2 = p2.into_handle(); + + let mut p1 = try!(AsyncPipe::new(&p1, v1)); + let mut p2 = try!(AsyncPipe::new(&p2, v2)); + let objs = [p1.event.raw(), p2.event.raw()]; + + try!(p1.schedule_read()); + try!(p2.schedule_read()); + + // In a loop we wait for either pipe's scheduled read operation to complete. + // If the operation completes with 0 bytes, that means EOF was reached, in + // which case we just finish out the other pipe entirely. + // + // Note that overlapped I/O is in general super unsafe because we have to + // be careful to ensure that all pointers in play are valid for the entire + // duration of the I/O operation (where tons of operations can also fail). + // The destructor for `AsyncPipe` ends up taking care of most of this. + loop { + let res = unsafe { + c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) + }; + if res == c::WAIT_OBJECT_0 { + if try!(p1.result()) == 0 { + return p2.finish() + } + try!(p1.schedule_read()); + } else if res == c::WAIT_OBJECT_0 + 1 { + if try!(p2.result()) == 0 { + return p1.finish() + } + try!(p2.schedule_read()); + } else { + return Err(io::Error::last_os_error()) + } + } +} + +struct AsyncPipe<'a> { + pipe: &'a Handle, + event: Handle, + overlapped: Box, // needs a stable address + dst: &'a mut Vec, + reading: bool, +} + +impl<'a> AsyncPipe<'a> { + fn new(pipe: &'a Handle, dst: &'a mut Vec) -> io::Result> { + unsafe { + // Create an event which we'll use to coordinate our overlapped + // opreations, this event will be used in WaitForMultipleObjects + // and passed as part of the OVERLAPPED handle. + let event = c::CreateEventW(0 as *mut _, c::FALSE, c::FALSE, + 0 as *const _); + let event = if event.is_null() { + return Err(io::Error::last_os_error()) + } else { + Handle::new(event) + }; + let mut overlapped: Box = Box::new(mem::zeroed()); + overlapped.hEvent = event.raw(); + Ok(AsyncPipe { + pipe: pipe, + overlapped: overlapped, + event: event, + dst: dst, + reading: false, + }) + } + } + + /// Executes an overlapped read operation, returning whether the operation + /// was successfully issued. + /// + /// Must not currently be reading, and once the read is done `result` must + /// be called to figure out how the read went. + fn schedule_read(&mut self) -> io::Result<()> { + assert!(!self.reading); + unsafe { + let slice = slice_to_end(self.dst); + try!(self.pipe.read_overlapped(slice, &mut *self.overlapped)); + } + self.reading = true; + Ok(()) + } + + /// Wait for the result of the overlapped operation previously executed. + /// + /// If this pipe is being read, this will wait for the scheduled overlapped + /// operation to be completed. Returns how many bytes were read from the + /// operation. + fn result(&mut self) -> io::Result { + if !self.reading { + return Ok(0) + } + let amt = try!(self.pipe.overlapped_result(&mut *self.overlapped, true)); + self.reading = false; + unsafe { + let len = self.dst.len(); + self.dst.set_len(len + amt); + } + Ok(amt) + } + + /// Finishes out reading this pipe entirely. + /// + /// Waits for any pending and schedule read, and then calls `read_to_end` + /// if necessary to read all the remaining information. + fn finish(&mut self) -> io::Result<()> { + while try!(self.result()) != 0 { + try!(self.schedule_read()); + } + Ok(()) + } +} + +impl<'a> Drop for AsyncPipe<'a> { + fn drop(&mut self) { + if !self.reading { + return + } + // If we have a pending read operation, then we have to make sure that + // it's *done* before we actually drop this type. The kernel requires + // that the `OVERLAPPED` and buffer pointers are valid for the entire + // I/O operation. + // + // To do that, we call `CancelIo` to cancel any pending operation, and + // if that succeeds we wait for the overlapped result. + // + // If anything here fails, there's not really much we can do, so we leak + // the buffer/OVERLAPPED pointers to ensure we're at least memory safe. + if self.pipe.cancel_io().is_err() || self.result().is_err() { + let buf = mem::replace(self.dst, Vec::new()); + let overlapped = Box::new(unsafe { mem::zeroed() }); + let overlapped = mem::replace(&mut self.overlapped, overlapped); + mem::forget((buf, overlapped)); + } + } +} + +unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { + if v.capacity() == 0 { + v.reserve(16); + } + if v.capacity() == v.len() { + v.reserve(1); + } + slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize), + v.capacity() - v.len()) +}