Skip to content

Commit b544735

Browse files
committed
Promise extends IVar
1 parent 635c748 commit b544735

File tree

2 files changed

+82
-31
lines changed

2 files changed

+82
-31
lines changed

lib/concurrent/promise.rb

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'thread'
22

3+
require 'concurrent/ivar'
34
require 'concurrent/obligation'
45
require 'concurrent/executor/executor_options'
56

@@ -181,8 +182,7 @@ module Concurrent
181182
# - `on_success { |result| ... }` is the same as `then {|result| ... }`
182183
# - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )`
183184
# - `rescue` is aliased by `catch` and `on_error`
184-
class Promise
185-
include Obligation
185+
class Promise < IVar
186186
include ExecutorOptions
187187

188188
# Initialize a new Promise with the provided options.
@@ -203,6 +203,7 @@ class Promise
203203
# @see http://promises-aplus.github.io/promises-spec/
204204
def initialize(opts = {}, &block)
205205
opts.delete_if { |k, v| v.nil? }
206+
super(IVar::NO_VALUE, opts)
206207

207208
@executor = get_executor_from(opts) || Concurrent.global_io_executor
208209
@args = get_arguments_from(opts)
@@ -214,17 +215,13 @@ def initialize(opts = {}, &block)
214215
@promise_body = block || Proc.new { |result| result }
215216
@state = :unscheduled
216217
@children = []
217-
218-
init_obligation
219-
set_deref_options(opts)
220218
end
221219

222220
# @return [Promise]
223221
def self.fulfill(value, opts = {})
224222
Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) }
225223
end
226224

227-
228225
# @return [Promise]
229226
def self.reject(reason, opts = {})
230227
Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) }
@@ -388,6 +385,18 @@ def self.any?(*promises)
388385
aggregate(:any?, *promises)
389386
end
390387

388+
def set(value)
389+
raise PromiseExecutionError.new('supported only on root promises') unless root?
390+
super
391+
end
392+
393+
def fail(reason = StandardError.new)
394+
raise PromiseExecutionError.new('supported only on root promises') unless root?
395+
super
396+
end
397+
398+
protected :complete
399+
391400
protected
392401

393402
# Aggregate a collection of zero or more promises under a composite promise,
@@ -444,17 +453,21 @@ def notify_child(child)
444453
if_state(:rejected) { child.on_reject(@reason) }
445454
end
446455

456+
# @!visibility private
457+
def complete(success, value, reason)
458+
children_to_notify = mutex.synchronize do
459+
set_state!(success, value, reason)
460+
@children.dup
461+
end
462+
463+
children_to_notify.each { |child| notify_child(child) }
464+
end
465+
447466
# @!visibility private
448467
def realize(task)
449468
@executor.post do
450469
success, value, reason = SafeTaskExecutor.new(task).execute(*@args)
451-
452-
children_to_notify = mutex.synchronize do
453-
set_state!(success, value, reason)
454-
@children.dup
455-
end
456-
457-
children_to_notify.each { |child| notify_child(child) }
470+
complete(success, value, reason)
458471
end
459472
end
460473

spec/concurrent/promise_spec.rb

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -374,41 +374,41 @@ def get_ivar_from_args(opts)
374374

375375
composite = Promise.all?(promise1, promise2, promise3).
376376
then { counter.up; latch.count_down }.
377-
rescue { counter.down; latch.count_down }.
378-
execute
377+
rescue { counter.down; latch.count_down }.
378+
execute
379379

380380
latch.wait(1)
381381

382382
expect(counter.value).to eq 1
383-
end
383+
end
384384

385385
it 'executes the #then condition when no promises are given' do
386386
counter = Concurrent::AtomicFixnum.new(0)
387387
latch = Concurrent::CountDownLatch.new(1)
388388

389389
composite = Promise.all?.
390390
then { counter.up; latch.count_down }.
391-
rescue { counter.down; latch.count_down }.
392-
execute
391+
rescue { counter.down; latch.count_down }.
392+
execute
393393

394394
latch.wait(1)
395395

396396
expect(counter.value).to eq 1
397-
end
397+
end
398398

399399
it 'executes the #rescue handler if even one component fails' do
400400
counter = Concurrent::AtomicFixnum.new(0)
401401
latch = Concurrent::CountDownLatch.new(1)
402402

403403
composite = Promise.all?(promise1, promise2, rejected_subject, promise3).
404404
then { counter.up; latch.count_down }.
405-
rescue { counter.down; latch.count_down }.
406-
execute
405+
rescue { counter.down; latch.count_down }.
406+
execute
407407

408408
latch.wait(1)
409409

410410
expect(counter.value).to eq -1
411-
end
411+
end
412412
end
413413

414414
describe '.any?' do
@@ -429,46 +429,84 @@ def get_ivar_from_args(opts)
429429

430430
composite = Promise.any?(promise1, promise2, rejected_subject, promise3).
431431
then { counter.up; latch.count_down }.
432-
rescue { counter.down; latch.count_down }.
433-
execute
432+
rescue { counter.down; latch.count_down }.
433+
execute
434434

435435
latch.wait(1)
436436

437437
expect(counter.value).to eq 1
438-
end
438+
end
439439

440440
it 'executes the #then condition when no promises are given' do
441441
counter = Concurrent::AtomicFixnum.new(0)
442442
latch = Concurrent::CountDownLatch.new(1)
443443

444444
composite = Promise.any?.
445445
then { counter.up; latch.count_down }.
446-
rescue { counter.down; latch.count_down }.
447-
execute
446+
rescue { counter.down; latch.count_down }.
447+
execute
448448

449449
latch.wait(1)
450450

451451
expect(counter.value).to eq 1
452-
end
452+
end
453453

454454
it 'executes the #rescue handler if all componenst fail' do
455455
counter = Concurrent::AtomicFixnum.new(0)
456456
latch = Concurrent::CountDownLatch.new(1)
457457

458458
composite = Promise.any?(rejected_subject, rejected_subject, rejected_subject, rejected_subject).
459459
then { counter.up; latch.count_down }.
460-
rescue { counter.down; latch.count_down }.
461-
execute
460+
rescue { counter.down; latch.count_down }.
461+
execute
462462

463463
latch.wait(1)
464464

465465
expect(counter.value).to eq -1
466-
end
466+
end
467467
end
468468
end
469469

470470
context 'fulfillment' do
471471

472+
context '#set' do
473+
474+
it '#can only be called on the root promise' do
475+
root = Promise.new{ :foo }
476+
child = root.then{ :bar }
477+
478+
expect { child.set('foo') }.to raise_error PromiseExecutionError
479+
expect { root.set('foo') }.not_to raise_error
480+
end
481+
482+
it 'triggers children' do
483+
expected = nil
484+
root = Promise.new(executor: :immediate){ nil }
485+
root.then{ |result| expected = result }
486+
root.set(20)
487+
expect(expected).to eq 20
488+
end
489+
end
490+
491+
context '#fail' do
492+
493+
it 'can only be called on the root promise' do
494+
root = Promise.new{ :foo }
495+
child = root.then{ :bar }
496+
497+
expect { child.fail }.to raise_error PromiseExecutionError
498+
expect { root.fail }.not_to raise_error
499+
end
500+
501+
it 'rejects children' do
502+
expected = nil
503+
root = Promise.new(executor: :immediate)
504+
root.then(Proc.new{ |reason| expected = reason })
505+
root.fail(ArgumentError.new('simulated error'))
506+
expect(expected).to be_a ArgumentError
507+
end
508+
end
509+
472510
it 'passes the result of each block to all its children' do
473511
expected = nil
474512
Promise.new(executor: executor){ 20 }.then{ |result| expected = result }.execute

0 commit comments

Comments
 (0)