diff --git a/src/liblibc/lib.rs b/src/liblibc/lib.rs index a4593c1cb5afc..5f67be22068db 100644 --- a/src/liblibc/lib.rs +++ b/src/liblibc/lib.rs @@ -118,7 +118,7 @@ pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR}; pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP}; pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP}; pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP}; -pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL}; +pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL, SHUT_RD}; pub use funcs::c95::ctype::{isalnum, isalpha, iscntrl, isdigit}; pub use funcs::c95::ctype::{islower, isprint, ispunct, isspace}; @@ -226,6 +226,8 @@ pub use funcs::bsd43::{shutdown}; #[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES}; #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING}; #[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0}; +#[cfg(windows)] pub use consts::os::extra::{ERROR_NOT_FOUND}; +#[cfg(windows)] pub use consts::os::extra::{ERROR_OPERATION_ABORTED}; #[cfg(windows)] pub use types::os::common::bsd44::{SOCKET}; #[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf}; #[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES}; @@ -1740,8 +1742,10 @@ pub mod consts { pub static ERROR_NO_DATA: c_int = 232; pub static ERROR_INVALID_ADDRESS : c_int = 487; pub static ERROR_PIPE_CONNECTED: c_int = 535; + pub static ERROR_OPERATION_ABORTED: c_int = 995; pub static ERROR_IO_PENDING: c_int = 997; pub static ERROR_FILE_INVALID : c_int = 1006; + pub static ERROR_NOT_FOUND: c_int = 1168; pub static INVALID_HANDLE_VALUE : c_int = -1; pub static DELETE : DWORD = 0x00010000; diff --git a/src/libnative/io/c_win32.rs b/src/libnative/io/c_win32.rs index 6c84424e97a0d..4fdd05a8b42f8 100644 --- a/src/libnative/io/c_win32.rs +++ b/src/libnative/io/c_win32.rs @@ -61,4 +61,6 @@ extern "system" { optlen: *mut libc::c_int) -> libc::c_int; pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL; + pub fn CancelIoEx(hFile: libc::HANDLE, + lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL; } diff --git a/src/libnative/io/file_unix.rs b/src/libnative/io/file_unix.rs index 2727f9a0b095a..84ea0d29434be 100644 --- a/src/libnative/io/file_unix.rs +++ b/src/libnative/io/file_unix.rs @@ -12,12 +12,12 @@ use libc::{c_int, c_void}; use libc; -use std::sync::arc::UnsafeArc; use std::c_str::CString; use std::io::IoError; use std::io; use std::mem; use std::rt::rtio; +use std::sync::arc::UnsafeArc; use io::{IoResult, retry, keep_going}; @@ -178,6 +178,17 @@ impl rtio::RtioPipe for FileDesc { fn clone(&self) -> Box { box FileDesc { inner: self.inner.clone() } as Box } + + // Only supported on named pipes currently. Note that this doesn't have an + // impact on the std::io primitives, this is never called via + // std::io::PipeStream. If the functionality is exposed in the future, then + // these methods will need to be implemented. + fn close_read(&mut self) -> Result<(), IoError> { + Err(io::standard_error(io::InvalidInput)) + } + fn close_write(&mut self) -> Result<(), IoError> { + Err(io::standard_error(io::InvalidInput)) + } } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/file_win32.rs b/src/libnative/io/file_win32.rs index 018907303b84e..c2acd91d476c4 100644 --- a/src/libnative/io/file_win32.rs +++ b/src/libnative/io/file_win32.rs @@ -210,6 +210,17 @@ impl rtio::RtioPipe for FileDesc { fn clone(&self) -> Box { box FileDesc { inner: self.inner.clone() } as Box } + + // Only supported on named pipes currently. Note that this doesn't have an + // impact on the std::io primitives, this is never called via + // std::io::PipeStream. If the functionality is exposed in the future, then + // these methods will need to be implemented. + fn close_read(&mut self) -> IoResult<()> { + Err(io::standard_error(io::InvalidInput)) + } + fn close_write(&mut self) -> IoResult<()> { + Err(io::standard_error(io::InvalidInput)) + } } impl rtio::RtioTTY for FileDesc { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 880cbaabaf80e..a54fe911ae0bd 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -357,9 +357,10 @@ impl rtio::RtioTcpStream for TcpStream { } as Box } fn close_write(&mut self) -> IoResult<()> { - super::mkerr_libc(unsafe { - libc::shutdown(self.fd(), libc::SHUT_WR) - }) + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + fn close_read(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } } diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 65e9c7448c2d5..94aca1ef748b4 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -149,6 +149,13 @@ impl rtio::RtioPipe for UnixStream { inner: self.inner.clone(), } as Box } + + fn close_write(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) + } + fn close_read(&mut self) -> IoResult<()> { + super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs index f1239285434fa..8050123cedcfb 100644 --- a/src/libnative/io/pipe_win32.rs +++ b/src/libnative/io/pipe_win32.rs @@ -84,13 +84,17 @@ //! the test suite passing (the suite is in libstd), and that's good enough for //! me! -use std::c_str::CString; use libc; +use std::c_str::CString; +use std::intrinsics; +use std::io; use std::os::win32::as_utf16_p; +use std::os; use std::ptr; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::intrinsics; +use std::sync::atomics; +use std::unstable::mutex; use super::IoResult; use super::c; @@ -124,6 +128,20 @@ impl Drop for Event { struct Inner { handle: libc::HANDLE, + lock: mutex::NativeMutex, + read_closed: atomics::AtomicBool, + write_closed: atomics::AtomicBool, +} + +impl Inner { + fn new(handle: libc::HANDLE) -> Inner { + Inner { + handle: handle, + lock: unsafe { mutex::NativeMutex::new() }, + read_closed: atomics::AtomicBool::new(false), + write_closed: atomics::AtomicBool::new(false), + } + } } impl Drop for Inner { @@ -218,7 +236,7 @@ impl UnixStream { loop { match UnixStream::try_connect(p) { Some(handle) => { - let inner = Inner { handle: handle }; + let inner = Inner::new(handle); let mut mode = libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | libc::PIPE_WAIT; @@ -275,6 +293,24 @@ impl UnixStream { } fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } + + fn read_closed(&self) -> bool { + unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) } + } + + fn write_closed(&self) -> bool { + unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) } + } + + fn cancel_io(&self) -> IoResult<()> { + match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } { + 0 if os::errno() == libc::ERROR_NOT_FOUND as uint => { + Ok(()) + } + 0 => Err(super::last_error()), + _ => Ok(()) + } + } } impl rtio::RtioPipe for UnixStream { @@ -287,6 +323,18 @@ impl rtio::RtioPipe for UnixStream { let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; overlapped.hEvent = self.read.get_ref().handle(); + // Pre-flight check to see if the reading half has been closed. This + // must be done before issuing the ReadFile request, but after we + // acquire the lock. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { (*self.inner.get()).lock.lock() }; + if self.read_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + + // Issue a nonblocking requests, succeeding quickly if it happened to + // succeed. let ret = unsafe { libc::ReadFile(self.handle(), buf.as_ptr() as libc::LPVOID, @@ -294,24 +342,41 @@ impl rtio::RtioPipe for UnixStream { &mut bytes_read, &mut overlapped) }; - if ret == 0 { - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_IO_PENDING as libc::DWORD { - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_read, - libc::TRUE) - }; - if ret == 0 { - return Err(super::last_error()) - } - } else { + if ret != 0 { return Ok(bytes_read as uint) } + + // If our errno doesn't say that the I/O is pending, then we hit some + // legitimate error and reeturn immediately. + if os::errno() != libc::ERROR_IO_PENDING as uint { + return Err(super::last_error()) + } + + // Now that we've issued a successful nonblocking request, we need to + // wait for it to finish. This can all be done outside the lock because + // we'll see any invocation of CancelIoEx. We also call this in a loop + // because we're woken up if the writing half is closed, we just need to + // realize that the reading half wasn't closed and we go right back to + // sleep. + drop(guard); + loop { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_read, + libc::TRUE) + }; + // If we succeeded, or we failed for some reason other than + // CancelIoEx, return immediately + if ret != 0 { return Ok(bytes_read as uint) } + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - } - Ok(bytes_read as uint) + // If the reading half is now closed, then we're done. If we woke up + // because the writing half was closed, keep trying. + if self.read_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + } } fn write(&mut self, buf: &[u8]) -> IoResult<()> { @@ -325,6 +390,17 @@ impl rtio::RtioPipe for UnixStream { while offset < buf.len() { let mut bytes_written = 0; + + // This sequence below is quite similar to the one found in read(). + // Some careful looping is done to ensure that if close_write() is + // invoked we bail out early, and if close_read() is invoked we keep + // going after we woke up. + // + // See comments in close_read() about why this lock is necessary. + let guard = unsafe { (*self.inner.get()).lock.lock() }; + if self.write_closed() { + return Err(io::standard_error(io::BrokenPipe)) + } let ret = unsafe { libc::WriteFile(self.handle(), buf.slice_from(offset).as_ptr() as libc::LPVOID, @@ -332,20 +408,29 @@ impl rtio::RtioPipe for UnixStream { &mut bytes_written, &mut overlapped) }; + drop(guard); + if ret == 0 { - let err = unsafe { libc::GetLastError() }; - if err == libc::ERROR_IO_PENDING as libc::DWORD { - let ret = unsafe { - libc::GetOverlappedResult(self.handle(), - &mut overlapped, - &mut bytes_written, - libc::TRUE) - }; - if ret == 0 { + if os::errno() != libc::ERROR_IO_PENDING as uint { + return Err(super::last_error()) + } + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_written, + libc::TRUE) + }; + // If we weren't aborted, this was a legit error, if we were + // aborted, then check to see if the write half was actually + // closed or whether we woke up from the read half closing. + if ret == 0 { + if os::errno() != libc::ERROR_OPERATION_ABORTED as uint { return Err(super::last_error()) } - } else { - return Err(super::last_error()) + if self.write_closed() { + return Err(io::standard_error(io::BrokenPipe)) + } + continue; // retry } } offset += bytes_written as uint; @@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream { write: None, } as Box } + + fn close_read(&mut self) -> IoResult<()> { + // On windows, there's no actual shutdown() method for pipes, so we're + // forced to emulate the behavior manually at the application level. To + // do this, we need to both cancel any pending requests, as well as + // prevent all future requests from succeeding. These two operations are + // not atomic with respect to one another, so we must use a lock to do + // so. + // + // The read() code looks like: + // + // 1. Make sure the pipe is still open + // 2. Submit a read request + // 3. Wait for the read request to finish + // + // The race this lock is preventing is if another thread invokes + // close_read() between steps 1 and 2. By atomically executing steps 1 + // and 2 with a lock with respect to close_read(), we're guaranteed that + // no thread will erroneously sit in a read forever. + let _guard = unsafe { (*self.inner.get()).lock.lock() }; + unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) } + self.cancel_io() + } + + fn close_write(&mut self) -> IoResult<()> { + // see comments in close_read() for why this lock is necessary + let _guard = unsafe { (*self.inner.get()).lock.lock() }; + unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) } + self.cancel_io() + } } //////////////////////////////////////////////////////////////////////////////// @@ -520,7 +635,7 @@ impl UnixAcceptor { // Transfer ownership of our handle into this stream Ok(UnixStream { - inner: UnsafeArc::new(Inner { handle: handle }), + inner: UnsafeArc::new(Inner::new(handle)), read: None, write: None, }) diff --git a/src/librustuv/access.rs b/src/librustuv/access.rs index fbacf1ca314fc..f96fa1e5be6e6 100644 --- a/src/librustuv/access.rs +++ b/src/librustuv/access.rs @@ -33,6 +33,7 @@ pub struct Guard<'a> { struct Inner { queue: Vec, held: bool, + closed: bool, } impl Access { @@ -41,6 +42,7 @@ impl Access { inner: UnsafeArc::new(Inner { queue: vec![], held: false, + closed: false, }) } } @@ -64,6 +66,15 @@ impl Access { Guard { access: self, missile: Some(missile) } } + + pub fn close(&self, _missile: &HomingMissile) { + // This unsafety is OK because with a homing missile we're guaranteed to + // be the only task looking at the `closed` flag (and are therefore + // allowed to modify it). Additionally, no atomics are necessary because + // everyone's running on the same thread and has already done the + // necessary synchronization to be running on this thread. + unsafe { (*self.inner.get()).closed = true; } + } } impl Clone for Access { @@ -72,6 +83,14 @@ impl Clone for Access { } } +impl<'a> Guard<'a> { + pub fn is_closed(&self) -> bool { + // See above for why this unsafety is ok, it just applies to the read + // instead of the write. + unsafe { (*self.access.inner.get()).closed } + } +} + #[unsafe_destructor] impl<'a> Drop for Guard<'a> { fn drop(&mut self) { diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs index a2701a57ca917..0ddf50921fdd2 100644 --- a/src/librustuv/net.rs +++ b/src/librustuv/net.rs @@ -11,6 +11,7 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use libc; use std::cast; +use std::io; use std::io::{IoError, IoResult}; use std::io::net::ip; use std::mem; @@ -411,7 +412,13 @@ impl rtio::RtioSocket for TcpWatcher { impl rtio::RtioTcpStream for TcpWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { let m = self.fire_homing_missile(); - let _g = self.read_access.grant(m); + let access = self.read_access.grant(m); + + // see comments in close_read about this check + if access.is_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + self.stream.read(buf).map_err(uv_error_to_io_error) } @@ -466,36 +473,17 @@ impl rtio::RtioTcpStream for TcpWatcher { } as Box } - fn close_write(&mut self) -> Result<(), IoError> { - struct Ctx { - slot: Option, - status: c_int, - } - let mut req = Request::new(uvll::UV_SHUTDOWN); - - return match unsafe { - uvll::uv_shutdown(req.handle, self.handle, shutdown_cb) - } { - 0 => { - req.defuse(); // uv callback now owns this request - let mut cx = Ctx { slot: None, status: 0 }; - - wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || { - req.set_data(&cx); - }); - - status_to_io_result(cx.status) - } - n => Err(uv_error_to_io_error(UvError(n))) - }; + fn close_read(&mut self) -> Result<(), IoError> { + // see comments in PipeWatcher::close_read + let m = self.fire_homing_missile(); + self.read_access.close(&m); + self.stream.cancel_read(m); + Ok(()) + } - extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) { - let req = Request::wrap(req); - assert!(status != uvll::ECANCELED); - let cx: &mut Ctx = unsafe { req.get_data() }; - cx.status = status; - wakeup(&mut cx.slot); - } + fn close_write(&mut self) -> Result<(), IoError> { + let _m = self.fire_homing_missile(); + shutdown(self.handle, &self.uv_loop()) } } @@ -704,7 +692,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { let m = self.fire_homing_missile(); let _g = self.read_access.grant(m); - let a = match unsafe { + return match unsafe { uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb) } { 0 => { @@ -725,14 +713,12 @@ impl rtio::RtioUdpSocket for UdpWatcher { } n => Err(uv_error_to_io_error(UvError(n))) }; - return a; extern fn alloc_cb(handle: *uvll::uv_udp_t, _suggested_size: size_t, buf: *mut Buf) { unsafe { - let cx: &mut Ctx = - cast::transmute(uvll::get_data_for_uv_handle(handle)); + let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx); *buf = cx.buf.take().expect("recv alloc_cb called more than once") } } @@ -740,8 +726,8 @@ impl rtio::RtioUdpSocket for UdpWatcher { extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf, addr: *libc::sockaddr, _flags: c_uint) { assert!(nread != uvll::ECANCELED as ssize_t); - let cx: &mut Ctx = unsafe { - cast::transmute(uvll::get_data_for_uv_handle(handle)) + let cx = unsafe { + &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx) }; // When there's no data to read the recv callback can be a no-op. @@ -752,13 +738,7 @@ impl rtio::RtioUdpSocket for UdpWatcher { return } - unsafe { - assert_eq!(uvll::uv_udp_recv_stop(handle), 0) - } - - let cx: &mut Ctx = unsafe { - cast::transmute(uvll::get_data_for_uv_handle(handle)) - }; + unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) } let addr = if addr == ptr::null() { None } else { @@ -900,6 +880,40 @@ impl Drop for UdpWatcher { } } +//////////////////////////////////////////////////////////////////////////////// +// Shutdown helper +//////////////////////////////////////////////////////////////////////////////// + +pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> { + struct Ctx { + slot: Option, + status: c_int, + } + let mut req = Request::new(uvll::UV_SHUTDOWN); + + return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } { + 0 => { + req.defuse(); // uv callback now owns this request + let mut cx = Ctx { slot: None, status: 0 }; + + wait_until_woken_after(&mut cx.slot, loop_, || { + req.set_data(&cx); + }); + + status_to_io_result(cx.status) + } + n => Err(uv_error_to_io_error(UvError(n))) + }; + + extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) { + let req = Request::wrap(req); + assert!(status != uvll::ECANCELED); + let cx: &mut Ctx = unsafe { req.get_data() }; + cx.status = status; + wakeup(&mut cx.slot); + } +} + #[cfg(test)] mod test { use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor, diff --git a/src/librustuv/pipe.rs b/src/librustuv/pipe.rs index 0edc13afcf538..7fec4051761de 100644 --- a/src/librustuv/pipe.rs +++ b/src/librustuv/pipe.rs @@ -11,6 +11,7 @@ use libc; use std::c_str::CString; use std::io::IoError; +use std::io; use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor}; use access::Access; @@ -111,7 +112,13 @@ impl PipeWatcher { impl RtioPipe for PipeWatcher { fn read(&mut self, buf: &mut [u8]) -> Result { let m = self.fire_homing_missile(); - let _g = self.read_access.grant(m); + let access = self.read_access.grant(m); + + // see comments in close_read about this check + if access.is_closed() { + return Err(io::standard_error(io::EndOfFile)) + } + self.stream.read(buf).map_err(uv_error_to_io_error) } @@ -131,6 +138,35 @@ impl RtioPipe for PipeWatcher { write_access: self.write_access.clone(), } as Box } + + fn close_read(&mut self) -> Result<(), IoError> { + // The current uv_shutdown method only shuts the writing half of the + // connection, and no method is provided to shut down the reading half + // of the connection. With a lack of method, we emulate shutting down + // the reading half of the connection by manually returning early from + // all future calls to `read`. + // + // Note that we must be careful to ensure that *all* cloned handles see + // the closing of the read half, so we stored the "is closed" bit in the + // Access struct, not in our own personal watcher. Additionally, the + // homing missile is used as a locking mechanism to ensure there is no + // contention over this bit. + // + // To shutdown the read half, we must first flag the access as being + // closed, and then afterwards we cease any pending read. Note that this + // ordering is crucial because we could in theory be rescheduled during + // the uv_read_stop which means that another read invocation could leak + // in before we set the flag. + let m = self.fire_homing_missile(); + self.read_access.close(&m); + self.stream.cancel_read(m); + Ok(()) + } + + fn close_write(&mut self) -> Result<(), IoError> { + let _m = self.fire_homing_missile(); + net::shutdown(self.stream.handle, &self.uv_loop()) + } } impl HomingIO for PipeWatcher { diff --git a/src/librustuv/stream.rs b/src/librustuv/stream.rs index 1fb61c15b830b..a1b606709d87d 100644 --- a/src/librustuv/stream.rs +++ b/src/librustuv/stream.rs @@ -14,6 +14,7 @@ use std::ptr; use std::rt::task::BlockedTask; use Loop; +use homing::HomingMissile; use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after, ForbidUnwind, wakeup}; use uvll; @@ -57,6 +58,7 @@ impl StreamWatcher { // Wrappers should ensure to always reset the field to an appropriate value // if they rely on the field to perform an action. pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher { + unsafe { uvll::set_data_for_uv_handle(stream, 0 as *int) } StreamWatcher { handle: stream, last_write_req: None, @@ -70,7 +72,9 @@ impl StreamWatcher { let mut rcx = ReadContext { buf: Some(slice_to_uv_buf(buf)), - result: 0, + // if the read is canceled, we'll see eof, otherwise this will get + // overwritten + result: uvll::EOF as ssize_t, task: None, }; // When reading a TTY stream on windows, libuv will invoke alloc_cb @@ -78,13 +82,11 @@ impl StreamWatcher { // we must be ready for this to happen (by setting the data in the uv // handle). In theory this otherwise doesn't need to happen until after // the read is succesfully started. - unsafe { - uvll::set_data_for_uv_handle(self.handle, &rcx) - } + unsafe { uvll::set_data_for_uv_handle(self.handle, &rcx) } // Send off the read request, but don't block until we're sure that the // read request is queued. - match unsafe { + let ret = match unsafe { uvll::uv_read_start(self.handle, alloc_cb, read_cb) } { 0 => { @@ -96,6 +98,29 @@ impl StreamWatcher { } } n => Err(UvError(n)) + }; + // Make sure a read cancellation sees that there's no pending read + unsafe { uvll::set_data_for_uv_handle(self.handle, 0 as *int) } + return ret; + } + + pub fn cancel_read(&mut self, m: HomingMissile) { + // When we invoke uv_read_stop, it cancels the read and alloc + // callbacks. We need to manually wake up a pending task (if one was + // present). Note that we wake up the task *outside* the homing missile + // to ensure that we don't switch schedulers when we're not supposed to. + assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0); + let data = unsafe { + let data = uvll::get_data_for_uv_handle(self.handle); + if data.is_null() { return } + uvll::set_data_for_uv_handle(self.handle, 0 as *int); + &mut *(data as *mut ReadContext) + }; + let task = data.task.take(); + drop(m); + match task { + Some(task) => { let _ = task.wake().map(|t| t.reawaken()); } + None => {} } } diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index a2cd69da5aef3..d07b2e556d612 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -32,7 +32,7 @@ use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; /// /// # Example /// -/// ```rust +/// ```no_run /// # #![allow(unused_must_use)] /// use std::io::net::tcp::TcpStream; /// use std::io::net::ip::{Ipv4Addr, SocketAddr}; @@ -109,6 +109,48 @@ impl TcpStream { None => self.obj.letdie(), } } + + /// Closes the reading half of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// # Example + /// + /// ```no_run + /// # #![allow(unused_must_use)] + /// use std::io::timer; + /// use std::io::net::tcp::TcpStream; + /// use std::io::net::ip::{Ipv4Addr, SocketAddr}; + /// + /// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 }; + /// let mut stream = TcpStream::connect(addr).unwrap(); + /// let stream2 = stream.clone(); + /// + /// spawn(proc() { + /// // close this stream after one second + /// timer::sleep(1000); + /// let mut stream = stream2; + /// stream.close_read(); + /// }); + /// + /// // wait for some data, will get canceled after one second + /// let mut buf = [0]; + /// stream.read(buf); + /// ``` + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() } + + /// Closes the writing half of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } } impl Clone for TcpStream { @@ -839,7 +881,11 @@ mod test { // Also make sure that even though the timeout is expired that we will // continue to receive any pending connections. - let l = TcpStream::connect(addr).unwrap(); + let (tx, rx) = channel(); + spawn(proc() { + tx.send(TcpStream::connect(addr).unwrap()); + }); + let l = rx.recv(); for i in range(0, 1001) { match a.accept() { Ok(..) => break, @@ -853,8 +899,69 @@ mod test { // Unset the timeout and make sure that this always blocks. a.set_timeout(None); spawn(proc() { - drop(TcpStream::connect(addr)); + drop(TcpStream::connect(addr).unwrap()); }); a.accept().unwrap(); }) + + iotest!(fn close_readwrite_smoke() { + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut b = [0]; + let mut s = TcpStream::connect(addr).unwrap(); + let mut s2 = s.clone(); + + // closing should prevent reads/writes + s.close_write().unwrap(); + assert!(s.write([0]).is_err()); + s.close_read().unwrap(); + assert!(s.read(b).is_err()); + + // closing should affect previous handles + assert!(s2.write([0]).is_err()); + assert!(s2.read(b).is_err()); + + // closing should affect new handles + let mut s3 = s.clone(); + assert!(s3.write([0]).is_err()); + assert!(s3.read(b).is_err()); + + // make sure these don't die + let _ = s2.close_read(); + let _ = s2.close_write(); + let _ = s3.close_read(); + let _ = s3.close_write(); + }) + + iotest!(fn close_read_wakes_up() { + let addr = next_test_ip4(); + let a = TcpListener::bind(addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut s = TcpStream::connect(addr).unwrap(); + let s2 = s.clone(); + let (tx, rx) = channel(); + spawn(proc() { + let mut s2 = s2; + assert!(s2.read([0]).is_err()); + tx.send(()); + }); + // this should wake up the child task + s.close_read().unwrap(); + + // this test will never finish if the child doesn't wake up + rx.recv(); + }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index f6e985dc27806..bbe39885c030c 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -28,7 +28,6 @@ use prelude::*; use c_str::ToCStr; use clone::Clone; -use io::pipe::PipeStream; use io::{Listener, Acceptor, Reader, Writer, IoResult}; use kinds::Send; use owned::Box; @@ -37,14 +36,10 @@ use rt::rtio::{RtioUnixAcceptor, RtioPipe}; /// A stream which communicates over a named pipe. pub struct UnixStream { - obj: PipeStream, + obj: Box, } impl UnixStream { - fn new(obj: Box) -> UnixStream { - UnixStream { obj: PipeStream::new(obj) } - } - /// Connect to a pipe named by `path`. This will attempt to open a /// connection to the underlying socket. /// @@ -62,7 +57,7 @@ impl UnixStream { /// ``` pub fn connect(path: &P) -> IoResult { LocalIo::maybe_raise(|io| { - io.unix_connect(&path.to_c_str(), None).map(UnixStream::new) + io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p }) }) } @@ -86,9 +81,28 @@ impl UnixStream { timeout_ms: u64) -> IoResult { LocalIo::maybe_raise(|io| { let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms)); - s.map(UnixStream::new) + s.map(|p| UnixStream { obj: p }) }) } + + + /// Closes the reading half of this connection. + /// + /// This method will close the reading portion of this connection, causing + /// all pending and future reads to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() } + + /// Closes the writing half of this connection. + /// + /// This method will close the writing portion of this connection, causing + /// all pending and future writes to immediately return with an error. + /// + /// Note that this method affects all cloned handles associated with this + /// stream, not just this one handle. + pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } } impl Clone for UnixStream { @@ -174,7 +188,7 @@ impl UnixAcceptor { impl Acceptor for UnixAcceptor { fn accept(&mut self) -> IoResult { - self.obj.accept().map(UnixStream::new) + self.obj.accept().map(|s| UnixStream { obj: s }) } } @@ -431,7 +445,12 @@ mod tests { // Also make sure that even though the timeout is expired that we will // continue to receive any pending connections. - let l = UnixStream::connect(&addr).unwrap(); + let (tx, rx) = channel(); + let addr2 = addr.clone(); + spawn(proc() { + tx.send(UnixStream::connect(&addr2).unwrap()); + }); + let l = rx.recv(); for i in range(0, 1001) { match a.accept() { Ok(..) => break, @@ -446,7 +465,7 @@ mod tests { a.set_timeout(None); let addr2 = addr.clone(); spawn(proc() { - drop(UnixStream::connect(&addr2)); + drop(UnixStream::connect(&addr2).unwrap()); }); a.accept().unwrap(); }) @@ -461,4 +480,65 @@ mod tests { let _a = UnixListener::bind(&addr).unwrap().listen().unwrap(); assert!(UnixStream::connect_timeout(&addr, 100).is_ok()); }) + + iotest!(fn close_readwrite_smoke() { + let addr = next_test_unix(); + let a = UnixListener::bind(&addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut b = [0]; + let mut s = UnixStream::connect(&addr).unwrap(); + let mut s2 = s.clone(); + + // closing should prevent reads/writes + s.close_write().unwrap(); + assert!(s.write([0]).is_err()); + s.close_read().unwrap(); + assert!(s.read(b).is_err()); + + // closing should affect previous handles + assert!(s2.write([0]).is_err()); + assert!(s2.read(b).is_err()); + + // closing should affect new handles + let mut s3 = s.clone(); + assert!(s3.write([0]).is_err()); + assert!(s3.read(b).is_err()); + + // make sure these don't die + let _ = s2.close_read(); + let _ = s2.close_write(); + let _ = s3.close_read(); + let _ = s3.close_write(); + }) + + iotest!(fn close_read_wakes_up() { + let addr = next_test_unix(); + let a = UnixListener::bind(&addr).listen().unwrap(); + let (_tx, rx) = channel::<()>(); + spawn(proc() { + let mut a = a; + let _s = a.accept().unwrap(); + let _ = rx.recv_opt(); + }); + + let mut s = UnixStream::connect(&addr).unwrap(); + let s2 = s.clone(); + let (tx, rx) = channel(); + spawn(proc() { + let mut s2 = s2; + assert!(s2.read([0]).is_err()); + tx.send(()); + }); + // this should wake up the child task + s.close_read().unwrap(); + + // this test will never finish if the child doesn't wake up + rx.recv(); + }) } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index fe9f4932a2a20..c5afe7887adc5 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -221,6 +221,7 @@ pub trait RtioTcpStream : RtioSocket { fn letdie(&mut self) -> IoResult<()>; fn clone(&self) -> Box; fn close_write(&mut self) -> IoResult<()>; + fn close_read(&mut self) -> IoResult<()>; } pub trait RtioSocket { @@ -274,6 +275,9 @@ pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> IoResult; fn write(&mut self, buf: &[u8]) -> IoResult<()>; fn clone(&self) -> Box; + + fn close_write(&mut self) -> IoResult<()>; + fn close_read(&mut self) -> IoResult<()>; } pub trait RtioUnixListener {