diff --git a/futures-core/src/executor.rs b/futures-core/src/executor.rs deleted file mode 100644 index f505bbc47e..0000000000 --- a/futures-core/src/executor.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Executors. - -pub use core::task::{Executor, SpawnErrorKind, SpawnObjError}; - diff --git a/futures-core/src/lib.rs b/futures-core/src/lib.rs index e3a1cb3f82..b9b8e87aa6 100644 --- a/futures-core/src/lib.rs +++ b/futures-core/src/lib.rs @@ -83,5 +83,3 @@ pub use stream::Stream; pub mod task; pub use task::Poll; - -pub mod executor; diff --git a/futures-core/src/task/mod.rs b/futures-core/src/task/mod.rs index af4f5191b6..c890a48530 100644 --- a/futures-core/src/task/mod.rs +++ b/futures-core/src/task/mod.rs @@ -2,23 +2,24 @@ use Future; -pub use core::task::{UnsafeWake, Waker, LocalWaker}; +mod poll; +pub use self::poll::Poll; + +pub use core::task::{ + Context, Executor, + Waker, LocalWaker, UnsafeWake, + TaskObj, LocalTaskObj, UnsafeTask, + SpawnErrorKind, SpawnObjError, SpawnLocalObjError, +}; #[cfg(feature = "std")] pub use std::task::{Wake, local_waker, local_waker_from_nonlocal}; -pub use core::task::Context; - -mod poll; -pub use self::poll::Poll; - #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] mod atomic_waker; #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] pub use self::atomic_waker::AtomicWaker; -pub use core::task::{TaskObj, UnsafeTask}; - if_std! { use std::boxed::PinBox; diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 62848c909d..ff18e4fd93 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -7,8 +7,9 @@ use std::sync::Arc; use std::thread::{self, Thread}; use futures_core::{Future, Poll, Stream}; -use futures_core::task::{self, Context, LocalWaker, TaskObj, Wake}; -use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind}; +use futures_core::task::{ + self, Context, LocalWaker, TaskObj, LocalTaskObj, Wake, + Executor, SpawnObjError, SpawnLocalObjError, SpawnErrorKind}; use futures_util::stream::FuturesUnordered; use futures_util::stream::StreamExt; @@ -27,7 +28,7 @@ use ThreadPool; /// single-threaded, it supports a special form of task spawning for non-`Send` /// futures, via [`spawn_local`](LocalExecutor::spawn_local). pub struct LocalPool { - pool: FuturesUnordered, + pool: FuturesUnordered, incoming: Rc, } @@ -38,7 +39,7 @@ pub struct LocalExecutor { incoming: Weak, } -type Incoming = RefCell>; +type Incoming = RefCell>; pub(crate) struct ThreadNotify { thread: Thread @@ -255,7 +256,7 @@ impl Iterator for BlockingStream where S: Unpin { impl Executor for LocalExecutor { fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> { if let Some(incoming) = self.incoming.upgrade() { - incoming.borrow_mut().push(task); + incoming.borrow_mut().push(task.into()); Ok(()) } else { Err(SpawnObjError{ task, kind: SpawnErrorKind::shutdown() }) @@ -272,15 +273,15 @@ impl Executor for LocalExecutor { } impl LocalExecutor { - /* /// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool). - pub fn spawn_local(&mut self, f: F) -> Result<(), SpawnObjError> - where F: Future + 'static + pub fn spawn_local_obj(&mut self, task: LocalTaskObj) + -> Result<(), SpawnLocalObjError> { - self.spawn_task(Task { - fut: Box::new(f), - map: LocalMap::new(), - }) + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(task); + Ok(()) + } else { + Err(SpawnLocalObjError{ task, kind: SpawnErrorKind::shutdown() }) + } } - */ } diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index 5d95d7ae4b..7199f2e33b 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -8,8 +8,7 @@ use std::thread; use std::fmt; use futures_core::*; -use futures_core::task::{self, Wake, TaskObj}; -use futures_core::executor::{Executor, SpawnObjError}; +use futures_core::task::{self, Wake, TaskObj, Executor, SpawnObjError}; use enter; use num_cpus; diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index f286753d79..ab03b9b2c0 100755 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -6,20 +6,20 @@ extern crate futures; extern crate futures_executor; extern crate futures_channel; +use std::boxed::PinBox; use std::cell::{Cell, RefCell}; -use std::sync::Arc; +use std::mem::PinMut; +use std::rc::Rc; use std::thread; use std::time::Duration; -use std::mem::PinMut; use futures::future::lazy; use futures::prelude::*; -use futures::executor::Executor; -use futures::task; +use futures::task::{self, Executor}; use futures_executor::*; use futures_channel::oneshot; -struct Pending(Arc<()>); +struct Pending(Rc<()>); impl Future for Pending { type Output = (); @@ -30,7 +30,7 @@ impl Future for Pending { } fn pending() -> Pending { - Pending(Arc::new(())) + Pending(Rc::new(())) } #[test] @@ -54,7 +54,7 @@ fn run_until_single_future() { fn run_until_ignores_spawned() { let mut pool = LocalPool::new(); let mut exec = pool.executor(); - exec.spawn_obj(Box::new(pending()).into()).unwrap(); // This test used the currently not implemented spawn_local method before + exec.spawn_local_obj(PinBox::new(pending()).into()).unwrap(); assert_eq!(pool.run_until(lazy(|_| ()), &mut exec), ()); } @@ -63,14 +63,13 @@ fn run_until_executes_spawned() { let (tx, rx) = oneshot::channel(); let mut pool = LocalPool::new(); let mut exec = pool.executor(); - exec.spawn_obj(Box::new(lazy(move |_| { // This test used the currently not implemented spawn_local method before + exec.spawn_local_obj(PinBox::new(lazy(move |_| { tx.send(()).unwrap(); () })).into()).unwrap(); pool.run_until(rx, &mut exec).unwrap(); } -/* // This test does not work because it relies on spawn_local which is not implemented #[test] fn run_executes_spawned() { let cnt = Rc::new(Cell::new(0)); @@ -80,8 +79,8 @@ fn run_executes_spawned() { let mut exec = pool.executor(); let mut exec2 = pool.executor(); - exec.spawn_local(Box::new(lazy(move |_| { - exec2.spawn_local(Box::new(lazy(move |_| { + exec.spawn_local_obj(PinBox::new(lazy(move |_| { + exec2.spawn_local_obj(PinBox::new(lazy(move |_| { cnt2.set(cnt2.get() + 1); () })).into()).unwrap(); @@ -105,10 +104,10 @@ fn run_spawn_many() { for _ in 0..ITER { let cnt = cnt.clone(); - exec.spawn_local(Box::new(lazy(move |_| { + exec.spawn_local_obj(PinBox::new(lazy(move |_| { cnt.set(cnt.get() + 1); () - }))).unwrap(); + })).into()).unwrap(); } pool.run(&mut exec); @@ -122,12 +121,11 @@ fn nesting_run() { let mut pool = LocalPool::new(); let mut exec = pool.executor(); - exec.spawn(Box::new(lazy(|_| { + exec.spawn_obj(PinBox::new(lazy(|_| { let mut pool = LocalPool::new(); let mut exec = pool.executor(); pool.run(&mut exec); - Ok(()) - }))).unwrap(); + })).into()).unwrap(); pool.run(&mut exec); } @@ -143,7 +141,7 @@ fn tasks_are_scheduled_fairly() { impl Future for Spin { type Output = (); - fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll<()> { + fn poll(self: PinMut, cx: &mut task::Context) -> Poll<()> { let mut state = self.state.borrow_mut(); if self.idx == 0 { @@ -170,16 +168,16 @@ fn tasks_are_scheduled_fairly() { let mut pool = LocalPool::new(); let mut exec = pool.executor(); - exec.spawn_local(Box::new(Spin { + exec.spawn_local_obj(PinBox::new(Spin { state: state.clone(), idx: 0, - })).unwrap(); + }).into()).unwrap(); - exec.spawn_local(Box::new(Spin { + exec.spawn_local_obj(PinBox::new(Spin { state: state, idx: 1, - })).unwrap(); + }).into()).unwrap(); pool.run(&mut exec); } -*/ + diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index a4c93ae15e..2b80c2259a 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -605,7 +605,7 @@ pub trait FutureExt: Future { #[cfg(feature = "std")] fn with_executor(self, executor: E) -> WithExecutor where Self: Sized, - E: ::futures_core::executor::Executor + E: ::futures_core::task::Executor { with_executor::new(self, executor) } diff --git a/futures-util/src/future/with_executor.rs b/futures-util/src/future/with_executor.rs index 02a49e53e6..7f4ab4225e 100644 --- a/futures-util/src/future/with_executor.rs +++ b/futures-util/src/future/with_executor.rs @@ -2,8 +2,7 @@ use core::marker::Unpin; use core::mem::PinMut; use futures_core::{Future, Poll}; -use futures_core::task; -use futures_core::executor::Executor; +use futures_core::task::{self, Executor}; /// Future for the `with_executor` combinator, assigning an executor /// to be used when spawning other futures. diff --git a/futures/src/lib.rs b/futures/src/lib.rs index b2976127bb..c1ae6a4539 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -175,7 +175,6 @@ pub mod executor { ThreadPool, ThreadPoolBuilder, JoinHandle, block_on, block_on_stream, enter, spawn, spawn_with_handle }; - pub use futures_core::executor::{SpawnObjError, Executor}; } pub mod future { @@ -254,9 +253,6 @@ pub mod prelude { Future, TryFuture, Stream, Poll, task }; - #[cfg(feature = "std")] - pub use futures_core::executor::Executor; - #[cfg(feature = "nightly")] pub use futures_stable::{ StableFuture, @@ -381,7 +377,10 @@ pub mod task { //! executors or dealing with synchronization issues around task wakeup. pub use futures_core::task::{ - Context, Waker, UnsafeWake + Context, Waker, UnsafeWake, + Executor, + TaskObj, LocalTaskObj, + SpawnErrorKind, SpawnObjError, SpawnLocalObjError, }; #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]