Skip to content

Commit 26ae888

Browse files
committed
Initial implementation of Promises.all, .any, .none, and .one class methods.
1 parent df20680 commit 26ae888

File tree

3 files changed

+432
-124
lines changed

3 files changed

+432
-124
lines changed

doc/promise.md

Lines changed: 0 additions & 123 deletions
This file was deleted.

lib/concurrent/promise.rb

Lines changed: 205 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,131 @@
55

66
module Concurrent
77

8-
# {include:file:doc/promise.md}
8+
PromiseExecutionError = Class.new(StandardError)
9+
10+
# Promises are inspired by the JavaScript [Promises/A](http://wiki.commonjs.org/wiki/Promises/A) and [Promises/A+](http://promises-aplus.github.io/promises-spec/) specifications.
11+
#
12+
# > A promise represents the eventual value returned from the single completion of an operation.
13+
#
14+
# Promises are similar to futures and share many of the same behaviours. Promises are far more robust, however. Promises can be chained in a tree structure where each promise may have zero or more children. Promises are chained using the `then` method. The result of a call to `then` is always another promise. Promises are resolved asynchronously (with respect to the main thread) but in a strict order: parents are guaranteed to be resolved before their children, children before their younger siblings. The `then` method takes two parameters: an optional block to be executed upon parent resolution and an optional callable to be executed upon parent failure. The result of each promise is passed to each of its children upon resolution. When a promise is rejected all its children will be summarily rejected and will receive the reason.
15+
#
16+
# Promises have four possible states: *unscheduled*, *pending*, *rejected*, and *fulfilled*. A Promise created using `.new` will be *unscheduled*. It is scheduled by calling the `execute` method. Upon execution the Promise and all its children will be set to *pending*. When a promise is *pending* it will remain in that state until processing is complete. A completed Promise is either *rejected*, indicating that an exception was thrown during processing, or *fulfilled*, indicating it succeeded. If a Promise is *fulfilled* its `value` will be updated to reflect the result of the operation. If *rejected* the `reason` will be updated with a reference to the thrown exception. The predicate methods `unscheduled?`, `pending?`, `rejected?`, and `fulfilled?` can be called at any time to obtain the state of the Promise, as can the `state` method, which returns a symbol. A Promise created using `.execute` will be *pending*, a Promise created using `.fulfill(value)` will be *fulfilled* with the given value and a Promise created using `.reject(reason)` will be *rejected* with the given reason.
17+
#
18+
# Retrieving the value of a promise is done through the `value` (alias: `deref`) method. Obtaining the value of a promise is a potentially blocking operation. When a promise is *rejected* a call to `value` will return `nil` immediately. When a promise is *fulfilled* a call to `value` will immediately return the current value. When a promise is *pending* a call to `value` will block until the promise is either *rejected* or *fulfilled*. A *timeout* value can be passed to `value` to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call will not block. Any other integer or float value will indicate the maximum number of seconds to block.
19+
#
20+
# Promises run on the global thread pool.
21+
#
22+
# ### Examples
23+
#
24+
# Start by requiring promises
25+
#
26+
# ```ruby
27+
# require 'concurrent'
28+
# ```
29+
#
30+
# Then create one
31+
#
32+
# ```ruby
33+
# p = Promise.execute do
34+
# # do something
35+
# 42
36+
# end
37+
# ```
38+
#
39+
# Promises can be chained using the `then` method. The `then` method accepts a block, to be executed on fulfillment, and a callable argument to be executed on rejection. The result of the each promise is passed as the block argument to chained promises.
40+
#
41+
# ```ruby
42+
# p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute
43+
# ```
44+
#
45+
# And so on, and so on, and so on...
46+
#
47+
# ```ruby
48+
# p = Concurrent::Promise.fulfill(20).
49+
# then{|result| result - 10 }.
50+
# then{|result| result * 3 }.
51+
# then{|result| result % 5 }.execute
52+
# ```
53+
#
54+
# The initial state of a newly created Promise depends on the state of its parent:
55+
# - if parent is *unscheduled* the child will be *unscheduled*
56+
# - if parent is *pending* the child will be *pending*
57+
# - if parent is *fulfilled* the child will be *pending*
58+
# - if parent is *rejected* the child will be *pending* (but will ultimately be *rejected*)
59+
#
60+
# Promises are executed asynchronously from the main thread. By the time a child Promise finishes initialization it may be in a different state that its parent (by the time a child is created its parent may have completed execution and changed state). Despite being asynchronous, however, the order of execution of Promise objects in a chain (or tree) is strictly defined.
61+
#
62+
# There are multiple ways to create and execute a new `Promise`. Both ways provide identical behavior:
63+
#
64+
# ```ruby
65+
# # create, operate, then execute
66+
# p1 = Concurrent::Promise.new{ "Hello World!" }
67+
# p1.state #=> :unscheduled
68+
# p1.execute
69+
#
70+
# # create and immediately execute
71+
# p2 = Concurrent::Promise.new{ "Hello World!" }.execute
72+
#
73+
# # execute during creation
74+
# p3 = Concurrent::Promise.execute{ "Hello World!" }
75+
# ```
76+
#
77+
# Once the `execute` method is called a `Promise` becomes `pending`:
78+
#
79+
# ```ruby
80+
# p = Concurrent::Promise.execute{ "Hello, world!" }
81+
# p.state #=> :pending
82+
# p.pending? #=> true
83+
# ```
84+
#
85+
# Wait a little bit, and the promise will resolve and provide a value:
86+
#
87+
# ```ruby
88+
# p = Concurrent::Promise.execute{ "Hello, world!" }
89+
# sleep(0.1)
90+
#
91+
# p.state #=> :fulfilled
92+
# p.fulfilled? #=> true
93+
# p.value #=> "Hello, world!"
94+
# ```
95+
#
96+
# If an exception occurs, the promise will be rejected and will provide
97+
# a reason for the rejection:
98+
#
99+
# ```ruby
100+
# p = Concurrent::Promise.execute{ raise StandardError.new("Here comes the Boom!") }
101+
# sleep(0.1)
102+
#
103+
# p.state #=> :rejected
104+
# p.rejected? #=> true
105+
# p.reason #=> "#<StandardError: Here comes the Boom!>"
106+
# ```
107+
#
108+
# #### Rejection
109+
#
110+
# When a promise is rejected all its children will be rejected and will receive the rejection `reason` as the rejection callable parameter:
111+
#
112+
# ```ruby
113+
# p = [ Concurrent::Promise.execute{ Thread.pass; raise StandardError } ]
114+
#
115+
# c1 = p.then(Proc.new{ |reason| 42 })
116+
# c2 = p.then(Proc.new{ |reason| raise 'Boom!' })
117+
#
118+
# sleep(0.1)
119+
#
120+
# c1.state #=> :rejected
121+
# c2.state #=> :rejected
122+
# ```
123+
#
124+
# Once a promise is rejected it will continue to accept children that will receive immediately rejection (they will be executed asynchronously).
125+
#
126+
# #### Aliases
127+
#
128+
# The `then` method is the most generic alias: it accepts a block to be executed upon parent fulfillment and a callable to be executed upon parent rejection. At least one of them should be passed. The default block is `{ |result| result }` that fulfills the child with the parent value. The default callable is `{ |reason| raise reason }` that rejects the child with the parent reason.
129+
#
130+
# - `on_success { |result| ... }` is the same as `then {|result| ... }`
131+
# - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )`
132+
# - `rescue` is aliased by `catch` and `on_error`
9133
class Promise
10134
# TODO unify promise and future to single class, with dataflow
11135
include Obligation
@@ -168,6 +292,86 @@ def zip(*others)
168292
self.class.zip(self, *others)
169293
end
170294

295+
# Aggregate a collection of zero or more promises under a composite promise,
296+
# execute the aggregated promises and collect them into a standard Ruby array,
297+
# call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`,
298+
# or `one?`) on the collection checking for the success or failure of each,
299+
# then executing the composite's `#then` handlers if the predicate returns
300+
# `true` or executing the composite's `#rescue` handlers if the predicate
301+
# returns false.
302+
#
303+
# @!macro [attach] promise_self_aggregate
304+
#
305+
# The returned promise will not yet have been executed. Additional `#then`
306+
# and `#rescue` handlers may still be provided. Once the returned promise
307+
# is execute the aggregate promises will be also be executed (if they have
308+
# not been executed already). The results of the aggregate promises will
309+
# be checked upon completion. The necessary `#then` and `#rescue` blocks
310+
# on the aggregating promise will then be executed as appropriate. If the
311+
# `#rescue` handlers are executed the raises exception will be
312+
# `Concurrent::PromiseExecutionError`.
313+
#
314+
# @param [Array] promises Zero or more promises to aggregate
315+
# @return [Promise] an unscheduled (not executed) promise that aggregates
316+
# the promises given as arguments
317+
def self.aggregate(method, *promises)
318+
composite = Promise.new do
319+
completed = promises.collect do |promise|
320+
promise.execute if promise.unscheduled?
321+
promise.wait
322+
promise
323+
end
324+
unless completed.empty? || completed.send(method){|promise| promise.fulfilled? }
325+
raise PromiseExecutionError
326+
end
327+
end
328+
composite
329+
end
330+
331+
# Aggregates a collection of promises and executes the `then` condition
332+
# if all aggregated promises succeed. Executes the `rescue` handler with
333+
# a `Concurrent::PromiseExecutionError` if any of the aggregated promises
334+
# fail. Upon execution will execute any of the aggregate promises that
335+
# were not already executed.
336+
#
337+
# @!macro promise_self_aggregate
338+
def self.all(*promises)
339+
aggregate(:all?, *promises)
340+
end
341+
342+
# Aggregates a collection of promises and executes the `then` condition
343+
# if any aggregated promises succeed. Executes the `rescue` handler with
344+
# a `Concurrent::PromiseExecutionError` if any of the aggregated promises
345+
# fail. Upon execution will execute any of the aggregate promises that
346+
# were not already executed.
347+
#
348+
# @!macro promise_self_aggregate
349+
def self.any(*promises)
350+
aggregate(:any?, *promises)
351+
end
352+
353+
# Aggregates a collection of promises and executes the `then` condition
354+
# if all aggregated promises fail. Executes the `rescue` handler with
355+
# a `Concurrent::PromiseExecutionError` if any of the aggregated promises
356+
# succeed. Upon execution will execute any of the aggregate promises that
357+
# were not already executed.
358+
#
359+
# @!macro promise_self_aggregate
360+
def self.none(*promises)
361+
aggregate(:none?, *promises)
362+
end
363+
364+
# Aggregates a collection of promises and executes the `then` condition
365+
# if one and only one of the aggregated promises succeeds. Executes the
366+
# `rescue` handler with a `Concurrent::PromiseExecutionError` more than one
367+
# of the aggregated promises succeed. Upon execution will execute any of
368+
# the aggregate promises that were not already executed.
369+
#
370+
# @!macro promise_self_aggregate
371+
def self.one(*promises)
372+
aggregate(:one?, *promises)
373+
end
374+
171375
protected
172376

173377
def set_pending

0 commit comments

Comments
 (0)