From b27158c1961435a72777b5f47c8a288b74999939 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Thu, 15 Aug 2019 21:18:39 +0200 Subject: [PATCH] Stream::cycle implementation --- examples/stream-cycle.rs | 25 ++++++++++++++++++ src/stream/cycle.rs | 57 ++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 2 ++ 3 files changed, 84 insertions(+) create mode 100644 examples/stream-cycle.rs create mode 100644 src/stream/cycle.rs diff --git a/examples/stream-cycle.rs b/examples/stream-cycle.rs new file mode 100644 index 000000000..66050c665 --- /dev/null +++ b/examples/stream-cycle.rs @@ -0,0 +1,25 @@ +//! Repeats given stream values and sum them + +#![feature(async_await)] + +use async_std::io; +use async_std::prelude::*; +use async_std::stream; +use async_std::task; + +fn main() -> io::Result<()> { + task::block_on(async { + let mut s = stream::cycle(vec![6, 7, 8]); + let mut total = 0; + + while let Some(v) = s.next().await { + total += v; + if total == 42 { + println!("Found {} the meaning of life!", total); + break; + } + } + + Ok(()) + }) +} diff --git a/src/stream/cycle.rs b/src/stream/cycle.rs new file mode 100644 index 000000000..6be84fcb2 --- /dev/null +++ b/src/stream/cycle.rs @@ -0,0 +1,57 @@ +use std::pin::Pin; +use std::sync::Mutex; + +use crate::task::{Context, Poll}; + +/// Creates a stream that yields the given elements continually. +/// +/// # Examples +/// +/// ``` +/// # #![feature(async_await)] +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let mut s = stream::cycle(vec![1, 2, 3]); +/// +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(2)); +/// assert_eq!(s.next().await, Some(3)); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(2)); +/// # +/// # }) } +/// ``` +pub fn cycle(items: Vec) -> Cycle +where + T: Clone, +{ + Cycle { + items, + cursor: Mutex::new(0_usize), + } +} + +/// A stream that yields the given elements continually. +/// +/// This stream is constructed by the [`cycle`] function. +/// +/// [`cycle`]: fn.cycle.html +#[derive(Debug)] +pub struct Cycle { + items: Vec, + cursor: Mutex, +} + +impl futures::Stream for Cycle { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let cursor = &mut *self.cursor.lock().unwrap(); + let p = Poll::Ready(self.items.get(*cursor).map(|x| x.to_owned())); + *cursor = (*cursor + 1_usize) % self.items.len(); + p + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a760a9b7a..9d268bd06 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -22,11 +22,13 @@ //! # }) } //! ``` +pub use cycle::{cycle, Cycle}; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use stream::{Stream, Take}; +mod cycle; mod empty; mod once; mod repeat;