From 79c77be2fcd3010ac7259ae80198ead89ce02661 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 25 Sep 2021 00:48:57 +0200 Subject: [PATCH 1/6] get the base shape working --- Cargo.toml | 6 +- src/future.rs | 47 ++++++++++++++ src/lib.rs | 154 +++------------------------------------------ src/stop_source.rs | 137 ++++++++++++++++++++++++++++++++++++++++ src/stream.rs | 45 +++++++++++++ 5 files changed, 244 insertions(+), 145 deletions(-) create mode 100644 src/future.rs create mode 100644 src/stop_source.rs create mode 100644 src/stream.rs diff --git a/Cargo.toml b/Cargo.toml index 8ec3215..5811f5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,8 @@ description = "Experimental cooperative cancellation for async-std" [dependencies] pin-project-lite = "0.2.0" -async-std = "1.8" +async-channel = "1.6.1" +futures-core = "0.3.17" + +[dev-dependencies] +async-std = "1.10.0" diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..16eb281 --- /dev/null +++ b/src/future.rs @@ -0,0 +1,47 @@ +//! Extension methods and types for the `Future` trait. + +use crate::StopToken; +use core::future::Future; +use core::pin::Pin; + +use pin_project_lite::pin_project; +use std::task::{Context, Poll}; + +pub trait FutureExt: Future { + /// Applies the token to the `future`, such that the resulting future + /// completes with `None` if the token is cancelled. + fn until(self, deadline: StopToken) -> StopFuture + where + Self: Sized, + { + StopFuture { + deadline, + future: self, + } + } +} + +pin_project! { + #[derive(Debug)] + pub struct StopFuture { + #[pin] + deadline: StopToken, + #[pin] + future: F, + } +} + +impl Future for StopFuture { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + if let Poll::Ready(()) = this.deadline.poll(cx) { + return Poll::Ready(None); + } + match this.future.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(it) => Poll::Ready(Some(it)), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 48d1476..cffc9b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,15 +5,6 @@ //! Experimental. The library works as is, breaking changes will bump major //! version, but there are no guarantees of long-term support. //! -//! Additionally, this library uses unstable cargo feature feature of `async-std` and, for -//! this reason, should be used like this: -//! -//! ```toml -//! [dependencies.stop-token] -//! version = "0.1.0" -//! features = [ "unstable" ] -//! ``` -//! //! # Motivation //! //! Rust futures come with a build-in cancellation mechanism: dropping a future @@ -47,7 +38,7 @@ //! //! ``` //! use async_std::prelude::*; -//! use stop_token::StopToken; +//! use stop_token::prelude::*; //! //! struct Event; //! @@ -65,142 +56,17 @@ //! # Lineage //! //! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads). -//! The `StopToken / StopTokenSource` terminology is borrowed from C++ paper P0660: https://wg21.link/p0660. - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use async_std::prelude::*; - -use async_std::channel::{self, Receiver, Sender}; -use pin_project_lite::pin_project; - -enum Never {} - -/// `StopSource` produces `StopToken` and cancels all of its tokens on drop. -/// -/// # Example: -/// -/// ```ignore -/// let stop_source = StopSource::new(); -/// let stop_token = stop_source.stop_token(); -/// schedule_some_work(stop_token); -/// drop(stop_source); // At this point, scheduled work notices that it is canceled. -/// ``` -#[derive(Debug)] -pub struct StopSource { - /// Solely for `Drop`. - _chan: Sender, - stop_token: StopToken, -} - -/// `StopToken` is a future which completes when the associated `StopSource` is dropped. -#[derive(Debug, Clone)] -pub struct StopToken { - chan: Receiver, -} - -impl Default for StopSource { - fn default() -> StopSource { - let (sender, receiver) = channel::bounded::(1); - - StopSource { - _chan: sender, - stop_token: StopToken { chan: receiver }, - } - } -} +//! The `StopToken / StopTokenSource` terminology is borrowed from [C++ paper P0660](https://wg21.link/p0660). -impl StopSource { - /// Creates a new `StopSource`. - pub fn new() -> StopSource { - StopSource::default() - } +pub mod future; +pub mod stream; - /// Produces a new `StopToken`, associated with this source. - /// - /// Once the source is destroyed, `StopToken` future completes. - pub fn stop_token(&self) -> StopToken { - self.stop_token.clone() - } -} - -impl Future for StopToken { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - let chan = Pin::new(&mut self.chan); - match Stream::poll_next(chan, cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(never)) => match never {}, - Poll::Ready(None) => Poll::Ready(()), - } - } -} - -impl StopToken { - /// Applies the token to the `stream`, such that the resulting stream - /// produces no more items once the token becomes cancelled. - pub fn stop_stream(&self, stream: S) -> StopStream { - StopStream { - stop_token: self.clone(), - stream, - } - } - - /// Applies the token to the `future`, such that the resulting future - /// completes with `None` if the token is cancelled. - pub fn stop_future(&self, future: F) -> StopFuture { - StopFuture { - stop_token: self.clone(), - future, - } - } -} - -pin_project! { - #[derive(Debug)] - pub struct StopStream { - #[pin] - stop_token: StopToken, - #[pin] - stream: S, - } -} - -impl Stream for StopStream { - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - if let Poll::Ready(()) = this.stop_token.poll(cx) { - return Poll::Ready(None); - } - this.stream.poll_next(cx) - } -} - -pin_project! { - #[derive(Debug)] - pub struct StopFuture { - #[pin] - stop_token: StopToken, - #[pin] - future: F, - } -} +mod stop_source; -impl Future for StopFuture { - type Output = Option; +pub use stop_source::{StopSource, StopToken}; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - if let Poll::Ready(()) = this.stop_token.poll(cx) { - return Poll::Ready(None); - } - match this.future.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(it) => Poll::Ready(Some(it)), - } - } +/// A prelude for `stop-token`. +pub mod prelude { + pub use crate::future::FutureExt; + pub use crate::stream::StreamExt; } diff --git a/src/stop_source.rs b/src/stop_source.rs new file mode 100644 index 0000000..28d3cfe --- /dev/null +++ b/src/stop_source.rs @@ -0,0 +1,137 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use async_channel::{bounded, Receiver, Sender}; +use futures_core::stream::Stream; +use pin_project_lite::pin_project; + +enum Never {} + +/// `StopSource` produces `StopToken` and cancels all of its tokens on drop. +/// +/// # Example: +/// +/// ```ignore +/// let stop_source = StopSource::new(); +/// let stop_token = stop_source.stop_token(); +/// schedule_some_work(stop_token); +/// drop(stop_source); // At this point, scheduled work notices that it is canceled. +/// ``` +#[derive(Debug)] +pub struct StopSource { + /// Solely for `Drop`. + _chan: Sender, + stop_token: StopToken, +} + +/// `StopToken` is a future which completes when the associated `StopSource` is dropped. +#[derive(Debug, Clone)] +pub struct StopToken { + chan: Receiver, +} + +impl Default for StopSource { + fn default() -> StopSource { + let (sender, receiver) = bounded::(1); + + StopSource { + _chan: sender, + stop_token: StopToken { chan: receiver }, + } + } +} + +impl StopSource { + /// Creates a new `StopSource`. + pub fn new() -> StopSource { + StopSource::default() + } + + /// Produces a new `StopToken`, associated with this source. + /// + /// Once the source is destroyed, `StopToken` future completes. + pub fn stop_token(&self) -> StopToken { + self.stop_token.clone() + } +} + +impl Future for StopToken { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let chan = Pin::new(&mut self.chan); + match Stream::poll_next(chan, cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(never)) => match never {}, + Poll::Ready(None) => Poll::Ready(()), + } + } +} + +impl StopToken { + /// Applies the token to the `stream`, such that the resulting stream + /// produces no more items once the token becomes cancelled. + pub fn stop_stream(&self, stream: S) -> StopStream { + StopStream { + stop_token: self.clone(), + stream, + } + } + + /// Applies the token to the `future`, such that the resulting future + /// completes with `None` if the token is cancelled. + pub fn stop_future(&self, future: F) -> StopFuture { + StopFuture { + stop_token: self.clone(), + future, + } + } +} + +pin_project! { + #[derive(Debug)] + pub struct StopStream { + #[pin] + stop_token: StopToken, + #[pin] + stream: S, + } +} + +impl Stream for StopStream { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + if let Poll::Ready(()) = this.stop_token.poll(cx) { + return Poll::Ready(None); + } + this.stream.poll_next(cx) + } +} + +pin_project! { + #[derive(Debug)] + pub struct StopFuture { + #[pin] + stop_token: StopToken, + #[pin] + future: F, + } +} + +impl Future for StopFuture { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + if let Poll::Ready(()) = this.stop_token.poll(cx) { + return Poll::Ready(None); + } + match this.future.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(it) => Poll::Ready(Some(it)), + } + } +} diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..704e096 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,45 @@ +//! Extension methods and types for the `Stream` trait. + +use crate::StopToken; +use core::future::Future; +use core::pin::Pin; + +use futures_core::Stream; +use pin_project_lite::pin_project; +use std::task::{Context, Poll}; + +pub trait StreamExt: Stream { + /// Applies the token to the `stream`, such that the resulting stream + /// produces no more items once the token becomes cancelled. + fn until(self, deadline: StopToken) -> StopStream + where + Self: Sized, + { + StopStream { + stop_token: deadline, + stream: self, + } + } +} + +pin_project! { + #[derive(Debug)] + pub struct StopStream { + #[pin] + stop_token: StopToken, + #[pin] + stream: S, + } +} + +impl Stream for StopStream { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + if let Poll::Ready(()) = this.stop_token.poll(cx) { + return Poll::Ready(None); + } + this.stream.poll_next(cx) + } +} From 34358edbbbd2bc1f189a79d19c4153c19624c53d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 25 Sep 2021 01:20:25 +0200 Subject: [PATCH 2/6] Add IntoDeadline --- src/deadline.rs | 13 ++++++++ src/future.rs | 19 +++++++----- src/lib.rs | 11 ++++--- src/stop_source.rs | 76 +++++----------------------------------------- src/stream.rs | 23 +++++++++----- tests/tests.rs | 7 +++-- 6 files changed, 59 insertions(+), 90 deletions(-) create mode 100644 src/deadline.rs diff --git a/src/deadline.rs b/src/deadline.rs new file mode 100644 index 0000000..68bf1e7 --- /dev/null +++ b/src/deadline.rs @@ -0,0 +1,13 @@ +use std::future::Future; + +/// Conversion into a deadline. +/// +/// A deadline is a future which elapses after a certain period or event, and +/// returns `()`. +pub trait IntoDeadline { + /// Which kind of future are we turning this into? + type Deadline: Future; + + /// Creates a deadline from a value. + fn into_deadline(self) -> Self::Deadline; +} diff --git a/src/future.rs b/src/future.rs index 16eb281..43290d3 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Future` trait. -use crate::StopToken; +use crate::IntoDeadline; use core::future::Future; use core::pin::Pin; @@ -10,12 +10,13 @@ use std::task::{Context, Poll}; pub trait FutureExt: Future { /// Applies the token to the `future`, such that the resulting future /// completes with `None` if the token is cancelled. - fn until(self, deadline: StopToken) -> StopFuture + fn until(self, target: T) -> StopFuture where Self: Sized, + T: IntoDeadline, { StopFuture { - deadline, + deadline: target.into_deadline(), future: self, } } @@ -23,15 +24,19 @@ pub trait FutureExt: Future { pin_project! { #[derive(Debug)] - pub struct StopFuture { - #[pin] - deadline: StopToken, + pub struct StopFuture { #[pin] future: F, + #[pin] + deadline: D, } } -impl Future for StopFuture { +impl Future for StopFuture +where + F: Future, + D: Future, +{ type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/lib.rs b/src/lib.rs index cffc9b9..77567d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,11 +39,12 @@ //! ``` //! use async_std::prelude::*; //! use stop_token::prelude::*; +//! use stop_token::StopToken; //! //! struct Event; //! -//! async fn do_work(work: impl Stream + Unpin, stop_token: StopToken) { -//! let mut work = stop_token.stop_stream(work); +//! async fn do_work(work: impl Stream + Unpin, stop: StopToken) { +//! let mut work = work.until(stop); //! while let Some(event) = work.next().await { //! process_event(event).await //! } @@ -61,12 +62,14 @@ pub mod future; pub mod stream; +mod deadline; mod stop_source; +pub use deadline::IntoDeadline; pub use stop_source::{StopSource, StopToken}; /// A prelude for `stop-token`. pub mod prelude { - pub use crate::future::FutureExt; - pub use crate::stream::StreamExt; + pub use crate::future::FutureExt as _; + pub use crate::stream::StreamExt as _; } diff --git a/src/stop_source.rs b/src/stop_source.rs index 28d3cfe..4358dcf 100644 --- a/src/stop_source.rs +++ b/src/stop_source.rs @@ -4,7 +4,6 @@ use core::task::{Context, Poll}; use async_channel::{bounded, Receiver, Sender}; use futures_core::stream::Stream; -use pin_project_lite::pin_project; enum Never {} @@ -56,6 +55,14 @@ impl StopSource { } } +impl super::IntoDeadline for StopToken { + type Deadline = Self; + + fn into_deadline(self) -> Self::Deadline { + self + } +} + impl Future for StopToken { type Output = (); @@ -68,70 +75,3 @@ impl Future for StopToken { } } } - -impl StopToken { - /// Applies the token to the `stream`, such that the resulting stream - /// produces no more items once the token becomes cancelled. - pub fn stop_stream(&self, stream: S) -> StopStream { - StopStream { - stop_token: self.clone(), - stream, - } - } - - /// Applies the token to the `future`, such that the resulting future - /// completes with `None` if the token is cancelled. - pub fn stop_future(&self, future: F) -> StopFuture { - StopFuture { - stop_token: self.clone(), - future, - } - } -} - -pin_project! { - #[derive(Debug)] - pub struct StopStream { - #[pin] - stop_token: StopToken, - #[pin] - stream: S, - } -} - -impl Stream for StopStream { - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - if let Poll::Ready(()) = this.stop_token.poll(cx) { - return Poll::Ready(None); - } - this.stream.poll_next(cx) - } -} - -pin_project! { - #[derive(Debug)] - pub struct StopFuture { - #[pin] - stop_token: StopToken, - #[pin] - future: F, - } -} - -impl Future for StopFuture { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - if let Poll::Ready(()) = this.stop_token.poll(cx) { - return Poll::Ready(None); - } - match this.future.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(it) => Poll::Ready(Some(it)), - } - } -} diff --git a/src/stream.rs b/src/stream.rs index 704e096..cd6482e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Stream` trait. -use crate::StopToken; +use crate::IntoDeadline; use core::future::Future; use core::pin::Pin; @@ -11,33 +11,40 @@ use std::task::{Context, Poll}; pub trait StreamExt: Stream { /// Applies the token to the `stream`, such that the resulting stream /// produces no more items once the token becomes cancelled. - fn until(self, deadline: StopToken) -> StopStream + fn until(self, target: T) -> StopStream where Self: Sized, + T: IntoDeadline, { StopStream { - stop_token: deadline, stream: self, + deadline: target.into_deadline(), } } } +impl StreamExt for S {} + pin_project! { #[derive(Debug)] - pub struct StopStream { - #[pin] - stop_token: StopToken, + pub struct StopStream { #[pin] stream: S, + #[pin] + deadline: D, } } -impl Stream for StopStream { +impl Stream for StopStream +where + S: Stream, + D: Future, +{ type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - if let Poll::Ready(()) = this.stop_token.poll(cx) { + if let Poll::Ready(()) = this.deadline.poll(cx) { return Poll::Ready(None); } this.stream.poll_next(cx) diff --git a/tests/tests.rs b/tests/tests.rs index 2fceda8..9039d17 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,8 +1,9 @@ use std::time::Duration; use async_std::prelude::*; +use stop_token::prelude::*; -use async_std::channel; +use async_channel::bounded; use async_std::task; use stop_token::StopSource; @@ -10,14 +11,14 @@ use stop_token::StopSource; #[test] fn smoke() { task::block_on(async { - let (sender, receiver) = channel::bounded::(10); + let (sender, receiver) = bounded::(10); let stop_source = StopSource::new(); let task = task::spawn({ let stop_token = stop_source.stop_token(); let receiver = receiver.clone(); async move { let mut xs = Vec::new(); - let mut stream = stop_token.stop_stream(receiver); + let mut stream = receiver.until(stop_token); while let Some(x) = stream.next().await { xs.push(x) } From 60664b7f1470cff09e23d6309a274800cd4367a8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 13:34:04 +0200 Subject: [PATCH 3/6] Add TimeoutError in all APIs --- src/deadline.rs | 29 ++++++++++++++++++++++++++++- src/future.rs | 10 +++++----- src/lib.rs | 2 +- src/stop_source.rs | 2 +- src/stream.rs | 8 ++++---- tests/tests.rs | 2 +- 6 files changed, 40 insertions(+), 13 deletions(-) diff --git a/src/deadline.rs b/src/deadline.rs index 68bf1e7..ef80342 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -1,4 +1,31 @@ -use std::future::Future; +use core::fmt; +use std::{error::Error, future::Future}; + +/// An error returned when a future times out. +#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] +pub struct TimeoutError { + _private: (), +} + +impl fmt::Debug for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TimeoutError").finish() + } +} + +impl TimeoutError { + pub(crate) fn new() -> Self { + Self { _private: () } + } +} + +impl Error for TimeoutError {} + +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "Future has timed out".fmt(f) + } +} /// Conversion into a deadline. /// diff --git a/src/future.rs b/src/future.rs index 43290d3..4b9e022 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Future` trait. -use crate::IntoDeadline; +use crate::{deadline::TimeoutError, IntoDeadline}; use core::future::Future; use core::pin::Pin; @@ -37,16 +37,16 @@ where F: Future, D: Future, { - type Output = Option; + type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if let Poll::Ready(()) = this.deadline.poll(cx) { - return Poll::Ready(None); + return Poll::Ready(Err(TimeoutError::new())); } match this.future.poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(it) => Poll::Ready(Some(it)), + Poll::Ready(it) => Poll::Ready(Ok(it)), } } } diff --git a/src/lib.rs b/src/lib.rs index 77567d0..76639f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,7 +45,7 @@ //! //! async fn do_work(work: impl Stream + Unpin, stop: StopToken) { //! let mut work = work.until(stop); -//! while let Some(event) = work.next().await { +//! while let Some(Ok(event)) = work.next().await { //! process_event(event).await //! } //! } diff --git a/src/stop_source.rs b/src/stop_source.rs index 4358dcf..4476c4e 100644 --- a/src/stop_source.rs +++ b/src/stop_source.rs @@ -66,7 +66,7 @@ impl super::IntoDeadline for StopToken { impl Future for StopToken { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let chan = Pin::new(&mut self.chan); match Stream::poll_next(chan, cx) { Poll::Pending => Poll::Pending, diff --git a/src/stream.rs b/src/stream.rs index cd6482e..b63df5d 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Stream` trait. -use crate::IntoDeadline; +use crate::{deadline::TimeoutError, IntoDeadline}; use core::future::Future; use core::pin::Pin; @@ -40,13 +40,13 @@ where S: Stream, D: Future, { - type Item = S::Item; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); if let Poll::Ready(()) = this.deadline.poll(cx) { - return Poll::Ready(None); + return Poll::Ready(Some(Err(TimeoutError::new()))); } - this.stream.poll_next(cx) + this.stream.poll_next(cx).map(|el| el.map(|el| Ok(el))) } } diff --git a/tests/tests.rs b/tests/tests.rs index 9039d17..7bd4139 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -19,7 +19,7 @@ fn smoke() { async move { let mut xs = Vec::new(); let mut stream = receiver.until(stop_token); - while let Some(x) = stream.next().await { + while let Some(Ok(x)) = stream.next().await { xs.push(x) } xs From 8bcb139b3ea940c15c4bc156def39c35f592e95f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 14:15:40 +0200 Subject: [PATCH 4/6] Add async-io timeout support --- .github/workflows/ci.yml | 2 +- Cargo.toml | 9 +++++ src/deadline.rs | 21 ++++++---- src/future.rs | 22 +++++----- src/lib.rs | 3 +- src/stream.rs | 10 +++-- src/time.rs | 87 ++++++++++++++++++++++++++++++++++++++++ 7 files changed, 132 insertions(+), 22 deletions(-) create mode 100644 src/time.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4c4b2b..8b5e9ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,4 +28,4 @@ jobs: - name: tests uses: actions-rs/cargo@v1 with: - command: test + command: test --features async-io diff --git a/Cargo.toml b/Cargo.toml index 5811f5e..a7330ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,19 @@ repository = "https://github.com/async-rs/stop-token" description = "Experimental cooperative cancellation for async-std" +[package.metadata.docs.rs] +features = ["docs"] +rustdoc-args = ["--cfg", "feature=\"docs\""] + +[features] +docs = ["async-io"] + [dependencies] pin-project-lite = "0.2.0" async-channel = "1.6.1" futures-core = "0.3.17" +tokio = { version = "1.12.0", optional = true } +async-io = { version = "1.6.0", optional = true } [dev-dependencies] async-std = "1.10.0" diff --git a/src/deadline.rs b/src/deadline.rs index ef80342..df80381 100644 --- a/src/deadline.rs +++ b/src/deadline.rs @@ -1,27 +1,33 @@ use core::fmt; -use std::{error::Error, future::Future}; +use std::{error::Error, future::Future, io}; /// An error returned when a future times out. #[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] -pub struct TimeoutError { +pub struct TimedOutError { _private: (), } -impl fmt::Debug for TimeoutError { +impl fmt::Debug for TimedOutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("TimeoutError").finish() } } -impl TimeoutError { +impl TimedOutError { pub(crate) fn new() -> Self { Self { _private: () } } } -impl Error for TimeoutError {} +impl Error for TimedOutError {} -impl fmt::Display for TimeoutError { +impl Into for TimedOutError { + fn into(self) -> io::Error { + io::Error::new(io::ErrorKind::TimedOut, "Future has timed out") + } +} + +impl fmt::Display for TimedOutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { "Future has timed out".fmt(f) } @@ -29,8 +35,7 @@ impl fmt::Display for TimeoutError { /// Conversion into a deadline. /// -/// A deadline is a future which elapses after a certain period or event, and -/// returns `()`. +/// A deadline is a future which resolves after a certain period or event. pub trait IntoDeadline { /// Which kind of future are we turning this into? type Deadline: Future; diff --git a/src/future.rs b/src/future.rs index 4b9e022..ef4729e 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,21 +1,21 @@ //! Extension methods and types for the `Future` trait. -use crate::{deadline::TimeoutError, IntoDeadline}; +use crate::{deadline::TimedOutError, IntoDeadline}; use core::future::Future; use core::pin::Pin; use pin_project_lite::pin_project; use std::task::{Context, Poll}; +/// Extend the `Future` trait with the `until` method. pub trait FutureExt: Future { - /// Applies the token to the `future`, such that the resulting future - /// completes with `None` if the token is cancelled. - fn until(self, target: T) -> StopFuture + /// Run a future until it resolves, or until a deadline is hit. + fn until(self, target: T) -> Stop where Self: Sized, T: IntoDeadline, { - StopFuture { + Stop { deadline: target.into_deadline(), future: self, } @@ -23,8 +23,12 @@ pub trait FutureExt: Future { } pin_project! { + /// Run a future until it resolves, or until a deadline is hit. + /// + /// This method is returned by [`FutureExt::deadline`]. + #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] - pub struct StopFuture { + pub struct Stop { #[pin] future: F, #[pin] @@ -32,17 +36,17 @@ pin_project! { } } -impl Future for StopFuture +impl Future for Stop where F: Future, D: Future, { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if let Poll::Ready(()) = this.deadline.poll(cx) { - return Poll::Ready(Err(TimeoutError::new())); + return Poll::Ready(Err(TimedOutError::new())); } match this.future.poll(cx) { Poll::Pending => Poll::Pending, diff --git a/src/lib.rs b/src/lib.rs index 76639f9..9ae16d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,11 +61,12 @@ pub mod future; pub mod stream; +pub mod time; mod deadline; mod stop_source; -pub use deadline::IntoDeadline; +pub use deadline::{IntoDeadline, TimedOutError}; pub use stop_source::{StopSource, StopToken}; /// A prelude for `stop-token`. diff --git a/src/stream.rs b/src/stream.rs index b63df5d..66d4c27 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,6 +1,6 @@ //! Extension methods and types for the `Stream` trait. -use crate::{deadline::TimeoutError, IntoDeadline}; +use crate::{deadline::TimedOutError, IntoDeadline}; use core::future::Future; use core::pin::Pin; @@ -26,6 +26,10 @@ pub trait StreamExt: Stream { impl StreamExt for S {} pin_project! { + /// Run a future until it resolves, or until a deadline is hit. + /// + /// This method is returned by [`FutureExt::deadline`]. + #[must_use = "Futures do nothing unless polled or .awaited"] #[derive(Debug)] pub struct StopStream { #[pin] @@ -40,12 +44,12 @@ where S: Stream, D: Future, { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); if let Poll::Ready(()) = this.deadline.poll(cx) { - return Poll::Ready(Some(Err(TimeoutError::new()))); + return Poll::Ready(Some(Err(TimedOutError::new()))); } this.stream.poll_next(cx).map(|el| el.map(|el| Ok(el))) } diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..4f8090c --- /dev/null +++ b/src/time.rs @@ -0,0 +1,87 @@ +//! Create deadlines from `Duration` and `Instant` types. +//! +//! # Features +//! +//! This module is empty when no features are enabled. To implement deadlines +//! for `Instant` and `Duration` you can enable one of the following features: +//! +//! - `async-io`: use this when using the `async-std` or `smol` runtimes. +//! - `tokio`: use this when using the `tokio` runtime. +//! +//! # Examples +//! +//! ``` +//! use std::time::Instant; +//! use async_std::prelude::*; +//! use stop_token::prelude::*; +//! use stop_token::StopToken; +//! +//! struct Event; +//! +//! async fn do_work(work: impl Stream + Unpin, until: Instant) { +//! let mut work = work.until(until); +//! while let Some(Ok(event)) = work.next().await { +//! process_event(event).await +//! } +//! } +//! +//! async fn process_event(_event: Event) { +//! } +//! ``` + +#[cfg(feature = "async-io")] +pub use asyncio::*; + +#[cfg(any(feature = "async-io", feature = "docs"))] +mod asyncio { + use async_io::Timer; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use crate::IntoDeadline; + + use pin_project_lite::pin_project; + + impl IntoDeadline for std::time::Duration { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + delay: Timer::after(self), + } + } + } + + pin_project! { + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Deadline { + #[pin] + delay: Timer, + } + } + + impl Future for Deadline { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.delay.poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } + } + + impl IntoDeadline for std::time::Instant { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + delay: Timer::at(self), + } + } + } +} From c0fb376c0d7a2941b31552e4a21d0679862a521d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 14:46:16 +0200 Subject: [PATCH 5/6] Enable tokio support --- .github/workflows/ci.yml | 8 ++++- Cargo.toml | 3 +- src/lib.rs | 8 +++++ src/time.rs | 72 ++++++++++++++++++++++++++++++++++------ tests/tests.rs | 56 +++++++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 12 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8b5e9ec..206dd6f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,13 @@ jobs: toolchain: nightly override: true - - name: tests + - name: tests async-io uses: actions-rs/cargo@v1 with: command: test --features async-io + + - name: tests tokio + uses: actions-rs/cargo@v1 + with: + command: test --features tokio + diff --git a/Cargo.toml b/Cargo.toml index a7330ea..4e2e6db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,8 +19,9 @@ docs = ["async-io"] pin-project-lite = "0.2.0" async-channel = "1.6.1" futures-core = "0.3.17" -tokio = { version = "1.12.0", optional = true } +tokio = { version = "1.12.0", features = ["time"], optional = true } async-io = { version = "1.6.0", optional = true } [dev-dependencies] async-std = "1.10.0" +tokio = { version = "1.12.0", features = ["rt", "macros"] } diff --git a/src/lib.rs b/src/lib.rs index 9ae16d1..b125fdc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,6 +54,14 @@ //! } //! ``` //! +//! # Features +//! +//! The `time` submodule is empty when no features are enabled. To implement [`Deadline`] +//! for `Instant` and `Duration` you can enable one of the following features: +//! +//! - `async-io`: for use with the `async-std` or `smol` runtimes. +//! - `tokio`: for use with the `tokio` runtime. +//! //! # Lineage //! //! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads). diff --git a/src/time.rs b/src/time.rs index 4f8090c..17a3ba0 100644 --- a/src/time.rs +++ b/src/time.rs @@ -43,16 +43,6 @@ mod asyncio { use pin_project_lite::pin_project; - impl IntoDeadline for std::time::Duration { - type Deadline = Deadline; - - fn into_deadline(self) -> Self::Deadline { - Deadline { - delay: Timer::after(self), - } - } - } - pin_project! { /// A future that times out after a duration of time. #[must_use = "Futures do nothing unless polled or .awaited"] @@ -75,6 +65,16 @@ mod asyncio { } } + impl IntoDeadline for std::time::Duration { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + delay: Timer::after(self), + } + } + } + impl IntoDeadline for std::time::Instant { type Deadline = Deadline; @@ -85,3 +85,55 @@ mod asyncio { } } } + +#[cfg(feature = "tokio")] +pub use tokiooo::*; + +#[cfg(any(feature = "tokio", feature = "docs"))] +mod tokiooo { + use std::future::{pending, Future, Pending}; + use std::pin::Pin; + use std::task::{Context, Poll}; + use tokio::time::{timeout, timeout_at, Instant as TokioInstant, Timeout}; + + use crate::IntoDeadline; + + /// A future that times out after a duration of time. + #[must_use = "Futures do nothing unless polled or .awaited"] + #[derive(Debug)] + pub struct Deadline { + delay: Pin>>>, + } + + impl Future for Deadline { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => Poll::Ready(()), + Poll::Pending => Poll::Pending, + } + } + } + + impl IntoDeadline for std::time::Duration { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + Deadline { + delay: Box::pin(timeout(self, pending())), + } + } + } + + impl IntoDeadline for std::time::Instant { + type Deadline = Deadline; + + fn into_deadline(self) -> Self::Deadline { + let instant = TokioInstant::from(self); + Deadline { + delay: Box::pin(timeout_at(instant, pending())), + } + } + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 7bd4139..f302230 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -39,3 +39,59 @@ fn smoke() { assert_eq!(task.await, vec![1, 2, 3]); }) } + +#[cfg(feature = "async-io")] +#[test] +fn async_io_time() { + task::block_on(async { + let (sender, receiver) = bounded::(10); + let task = task::spawn({ + let receiver = receiver.clone(); + async move { + let mut xs = Vec::new(); + let mut stream = receiver.until(Duration::from_millis(200)); + while let Some(Ok(x)) = stream.next().await { + xs.push(x) + } + xs + } + }); + sender.send(1).await.unwrap(); + sender.send(2).await.unwrap(); + sender.send(3).await.unwrap(); + + task::sleep(Duration::from_millis(250)).await; + + sender.send(4).await.unwrap(); + sender.send(5).await.unwrap(); + sender.send(6).await.unwrap(); + assert_eq!(task.await, vec![1, 2, 3]); + }) +} + +#[cfg(feature = "tokio")] +#[tokio::test] +async fn tokio_time() { + let (sender, receiver) = bounded::(10); + let task = tokio::task::spawn({ + let receiver = receiver.clone(); + async move { + let mut xs = Vec::new(); + let mut stream = receiver.until(Duration::from_millis(200)); + while let Some(Ok(x)) = stream.next().await { + xs.push(x) + } + xs + } + }); + sender.send(1).await.unwrap(); + sender.send(2).await.unwrap(); + sender.send(3).await.unwrap(); + + task::sleep(Duration::from_millis(250)).await; + + sender.send(4).await.unwrap(); + sender.send(5).await.unwrap(); + sender.send(6).await.unwrap(); + assert_eq!(task.await.unwrap(), vec![1, 2, 3]); +} From 3082b37ec955d23c8e816290b5c8cd15716ae015 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Oct 2021 15:00:22 +0200 Subject: [PATCH 6/6] fix ci --- .github/workflows/ci.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 206dd6f..a10d225 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,10 +28,12 @@ jobs: - name: tests async-io uses: actions-rs/cargo@v1 with: - command: test --features async-io + command: test + args: --features async-io - name: tests tokio uses: actions-rs/cargo@v1 with: - command: test --features tokio + command: test + args: --features tokio