Skip to content

Initial implementation of Promises.all? and Promise.any? class methods. #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 0 additions & 123 deletions doc/promise.md

This file was deleted.

220 changes: 219 additions & 1 deletion lib/concurrent/promise.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,167 @@

module Concurrent

# {include:file:doc/promise.md}
PromiseExecutionError = Class.new(StandardError)

# Promises are inspired by the JavaScript [Promises/A](http://wiki.commonjs.org/wiki/Promises/A)
# and [Promises/A+](http://promises-aplus.github.io/promises-spec/) specifications.
#
# > A promise represents the eventual value returned from the single completion of an operation.
#
# Promises are similar to futures and share many of the same behaviours. Promises are far more
# robust, however. Promises can be chained in a tree structure where each promise may have zero
# or more children. Promises are chained using the `then` method. The result of a call to `then`
# is always another promise. Promises are resolved asynchronously (with respect to the main thread)
# but in a strict order: parents are guaranteed to be resolved before their children, children
# before their younger siblings. The `then` method takes two parameters: an optional block to
# be executed upon parent resolution and an optional callable to be executed upon parent failure.
# The result of each promise is passed to each of its children upon resolution. When a promise
# is rejected all its children will be summarily rejected and will receive the reason.
#
# Promises have four possible states: *unscheduled*, *pending*, *rejected*, and *fulfilled*. A
# Promise created using `.new` will be *unscheduled*. It is scheduled by calling the `execute`
# method. Upon execution the Promise and all its children will be set to *pending*. When a promise
# is *pending* it will remain in that state until processing is complete. A completed Promise is
# either *rejected*, indicating that an exception was thrown during processing, or *fulfilled*,
# indicating it succeeded. If a Promise is *fulfilled* its `value` will be updated to reflect
# the result of the operation. If *rejected* the `reason` will be updated with a reference to
# the thrown exception. The predicate methods `unscheduled?`, `pending?`, `rejected?`, and
# `fulfilled?` can be called at any time to obtain the state of the Promise, as can the `state`
# method, which returns a symbol. A Promise created using `.execute` will be *pending*, a Promise
# created using `.fulfill(value)` will be *fulfilled* with the given value and a Promise created
# using `.reject(reason)` will be *rejected* with the given reason.
#
# Retrieving the value of a promise is done through the `value` (alias: `deref`) method. Obtaining
# the value of a promise is a potentially blocking operation. When a promise is *rejected* a call
# to `value` will return `nil` immediately. When a promise is *fulfilled* a call to `value` will
# immediately return the current value. When a promise is *pending* a call to `value` will block
# until the promise is either *rejected* or *fulfilled*. A *timeout* value can be passed to `value`
# to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call
# will not block. Any other integer or float value will indicate the maximum number of seconds to block.
#
# Promises run on the global thread pool.
#
# ### Examples
#
# Start by requiring promises
#
# ```ruby
# require 'concurrent'
# ```
#
# Then create one
#
# ```ruby
# p = Concurrent::Promise.execute do
# # do something
# 42
# end
# ```
#
# Promises can be chained using the `then` method. The `then` method accepts a block, to be executed
# on fulfillment, and a callable argument to be executed on rejection. The result of the each promise
# is passed as the block argument to chained promises.
#
# ```ruby
# p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute
# ```
#
# And so on, and so on, and so on...
#
# ```ruby
# p = Concurrent::Promise.fulfill(20).
# then{|result| result - 10 }.
# then{|result| result * 3 }.
# then{|result| result % 5 }.execute
# ```
#
# The initial state of a newly created Promise depends on the state of its parent:
# - if parent is *unscheduled* the child will be *unscheduled*
# - if parent is *pending* the child will be *pending*
# - if parent is *fulfilled* the child will be *pending*
# - if parent is *rejected* the child will be *pending* (but will ultimately be *rejected*)
#
# Promises are executed asynchronously from the main thread. By the time a child Promise finishes
# nitialization it may be in a different state that its parent (by the time a child is created its parent
# may have completed execution and changed state). Despite being asynchronous, however, the order of
# execution of Promise objects in a chain (or tree) is strictly defined.
#
# There are multiple ways to create and execute a new `Promise`. Both ways provide identical behavior:
#
# ```ruby
# # create, operate, then execute
# p1 = Concurrent::Promise.new{ "Hello World!" }
# p1.state #=> :unscheduled
# p1.execute
#
# # create and immediately execute
# p2 = Concurrent::Promise.new{ "Hello World!" }.execute
#
# # execute during creation
# p3 = Concurrent::Promise.execute{ "Hello World!" }
# ```
#
# Once the `execute` method is called a `Promise` becomes `pending`:
#
# ```ruby
# p = Concurrent::Promise.execute{ "Hello, world!" }
# p.state #=> :pending
# p.pending? #=> true
# ```
#
# Wait a little bit, and the promise will resolve and provide a value:
#
# ```ruby
# p = Concurrent::Promise.execute{ "Hello, world!" }
# sleep(0.1)
#
# p.state #=> :fulfilled
# p.fulfilled? #=> true
# p.value #=> "Hello, world!"
# ```
#
# If an exception occurs, the promise will be rejected and will provide
# a reason for the rejection:
#
# ```ruby
# p = Concurrent::Promise.execute{ raise StandardError.new("Here comes the Boom!") }
# sleep(0.1)
#
# p.state #=> :rejected
# p.rejected? #=> true
# p.reason #=> "#<StandardError: Here comes the Boom!>"
# ```
#
# #### Rejection
#
# When a promise is rejected all its children will be rejected and will receive the rejection `reason`
# as the rejection callable parameter:
#
# ```ruby
# p = [ Concurrent::Promise.execute{ Thread.pass; raise StandardError } ]
#
# c1 = p.then(Proc.new{ |reason| 42 })
# c2 = p.then(Proc.new{ |reason| raise 'Boom!' })
#
# sleep(0.1)
#
# c1.state #=> :rejected
# c2.state #=> :rejected
# ```
#
# Once a promise is rejected it will continue to accept children that will receive immediately rejection
# (they will be executed asynchronously).
#
# #### Aliases
#
# The `then` method is the most generic alias: it accepts a block to be executed upon parent fulfillment
# and a callable to be executed upon parent rejection. At least one of them should be passed. The default
# block is `{ |result| result }` that fulfills the child with the parent value. The default callable is
# `{ |reason| raise reason }` that rejects the child with the parent reason.
#
# - `on_success { |result| ... }` is the same as `then {|result| ... }`
# - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )`
# - `rescue` is aliased by `catch` and `on_error`
class Promise
# TODO unify promise and future to single class, with dataflow
include Obligation
Expand Down Expand Up @@ -168,8 +328,66 @@ def zip(*others)
self.class.zip(self, *others)
end

# Aggregates a collection of promises and executes the `then` condition
# if all aggregated promises succeed. Executes the `rescue` handler with
# a `Concurrent::PromiseExecutionError` if any of the aggregated promises
# fail. Upon execution will execute any of the aggregate promises that
# were not already executed.
#
# @!macro [attach] promise_self_aggregate
#
# The returned promise will not yet have been executed. Additional `#then`
# and `#rescue` handlers may still be provided. Once the returned promise
# is execute the aggregate promises will be also be executed (if they have
# not been executed already). The results of the aggregate promises will
# be checked upon completion. The necessary `#then` and `#rescue` blocks
# on the aggregating promise will then be executed as appropriate. If the
# `#rescue` handlers are executed the raises exception will be
# `Concurrent::PromiseExecutionError`.
#
# @param [Array] promises Zero or more promises to aggregate
# @return [Promise] an unscheduled (not executed) promise that aggregates
# the promises given as arguments
def self.all?(*promises)
aggregate(:all?, *promises)
end

# Aggregates a collection of promises and executes the `then` condition
# if any aggregated promises succeed. Executes the `rescue` handler with
# a `Concurrent::PromiseExecutionError` if any of the aggregated promises
# fail. Upon execution will execute any of the aggregate promises that
# were not already executed.
#
# @!macro promise_self_aggregate
def self.any?(*promises)
aggregate(:any?, *promises)
end

protected

# Aggregate a collection of zero or more promises under a composite promise,
# execute the aggregated promises and collect them into a standard Ruby array,
# call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`,
# or `one?`) on the collection checking for the success or failure of each,
# then executing the composite's `#then` handlers if the predicate returns
# `true` or executing the composite's `#rescue` handlers if the predicate
# returns false.
#
# @!macro promise_self_aggregate
def self.aggregate(method, *promises)
composite = Promise.new do
completed = promises.collect do |promise|
promise.execute if promise.unscheduled?
promise.wait
promise
end
unless completed.empty? || completed.send(method){|promise| promise.fulfilled? }
raise PromiseExecutionError
end
end
composite
end

def set_pending
mutex.synchronize do
@state = :pending
Expand Down
Loading