Skip to content

Commit cacb3d8

Browse files
committed
Allows to build promise trees with lazy evaluated branches
- Adds #chain_delay, #then_delay, #rescue_delay which are same as #chain, #then, #rescue but are not evaluated automatically but only when requested by #value. - Restructure class hierarchy. Only one Future with Multiple Promise implementations which are hidden to the user. Provides better encapsulation. - Delay is now implemented as a Promise descendant.
1 parent 16783f3 commit cacb3d8

File tree

1 file changed

+175
-83
lines changed

1 file changed

+175
-83
lines changed

lib/concurrent/next.rb

Lines changed: 175 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ def post(executor = :fast, &job)
5656

5757
# @return [Future]
5858
def future(executor = :fast, &block)
59-
Future.execute executor, &block
59+
Immediate.new(executor, &block).future
6060
end
6161

6262
# @return [Delay]
6363
def delay(executor = :fast, &block)
64-
Delay.new(executor, &block)
64+
Delay.new(nil, executor, &block).future
6565
end
6666

6767
alias_method :async, :future
@@ -144,21 +144,15 @@ module FutureHelpers
144144
# @return [Future]
145145
def join(*futures)
146146
countdown = Concurrent::AtomicFixnum.new futures.size
147-
promise = Promise.new.add_blocked_by(*futures) # TODO add injectable executor
147+
promise = ExternalPromise.new(futures)
148148
futures.each { |future| future.add_callback :join, countdown, promise, *futures }
149149
promise.future
150150
end
151-
152-
# @return [Future]
153-
def execute(executor = :fast, &block)
154-
promise = Promise.new(executor)
155-
Next.executor(executor).post { promise.evaluate_to &block }
156-
promise.future
157-
end
158151
end
159152

160153
class Future < SynchronizedObject
161154
extend FutureHelpers
155+
extend Shortcuts
162156

163157
singleton_class.send :alias_method, :dataflow, :join
164158

@@ -264,26 +258,47 @@ def exception(*args)
264258
reason.exception(*args)
265259
end
266260

267-
# TODO add #then_delay { ... } and such to be able to chain delayed evaluations
261+
# TODO needs better name
262+
def connect(executor = default_executor)
263+
ConnectedPromise.new(self, executor).future
264+
end
268265

269266
# @yield [success, value, reason] of the parent
270267
def chain(executor = default_executor, &callback)
271-
add_callback :chain_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback
268+
add_callback :chain_callback, executor, promise = ExternalPromise.new([self], default_executor), callback
272269
promise.future
273270
end
274271

275272
# @yield [value] executed only on parent success
276273
def then(executor = default_executor, &callback)
277-
add_callback :then_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback
274+
add_callback :then_callback, executor, promise = ExternalPromise.new([self], default_executor), callback
278275
promise.future
279276
end
280277

281278
# @yield [reason] executed only on parent failure
282279
def rescue(executor = default_executor, &callback)
283-
add_callback :rescue_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback
280+
add_callback :rescue_callback, executor, promise = ExternalPromise.new([self], default_executor), callback
284281
promise.future
285282
end
286283

284+
# lazy version of #chain
285+
def chain_delay(executor = default_executor, &callback)
286+
delay = Delay.new(self, executor) { callback_on_completion callback }
287+
delay.future
288+
end
289+
290+
# lazy version of #then
291+
def then_delay(executor = default_executor, &callback)
292+
delay = Delay.new(self, executor) { conditioned_callback callback }
293+
delay.future
294+
end
295+
296+
# lazy version of #rescue
297+
def rescue_delay(executor = default_executor, &callback)
298+
delay = Delay.new(self, executor) { callback_on_failure callback }
299+
delay.future
300+
end
301+
287302
# @yield [success, value, reason] executed async on `executor` when completed
288303
# @return self
289304
def on_completion(executor = default_executor, &callback)
@@ -399,27 +414,15 @@ def with_promise(promise, &block)
399414
end
400415

401416
def chain_callback(executor, promise, callback)
402-
with_async(executor) do
403-
with_promise(promise) do
404-
callback_on_completion callback
405-
end
406-
end
417+
with_async(executor) { with_promise(promise) { callback_on_completion callback } }
407418
end
408419

409420
def then_callback(executor, promise, callback)
410-
with_async(executor) do
411-
with_promise(promise) do
412-
success? ? callback.call(value) : raise(reason)
413-
end
414-
end
421+
with_async(executor) { with_promise(promise) { conditioned_callback callback } }
415422
end
416423

417424
def rescue_callback(executor, promise, callback)
418-
with_async(executor) do
419-
with_promise(promise) do
420-
callback_on_failure callback
421-
end
422-
end
425+
with_async(executor) { with_promise(promise) { callback_on_failure callback } }
423426
end
424427

425428
def with_async(executor)
@@ -450,20 +453,20 @@ def callback_on_failure(callback)
450453
callback.call reason if failed?
451454
end
452455

456+
def conditioned_callback(callback)
457+
self.success? ? callback.call(value) : raise(reason)
458+
end
459+
453460
def call_callback(method, *args)
454461
self.send method, *args
455462
end
456463
end
457464

458465
class Promise < SynchronizedObject
459466
# @api private
460-
def initialize(executor_or_future = :fast)
467+
def initialize(executor = :fast)
461468
super()
462-
future = if Future === executor_or_future
463-
executor_or_future
464-
else
465-
Future.new(self, executor_or_future)
466-
end
469+
future = Future.new(self, executor)
467470

468471
synchronize do
469472
@future = future
@@ -480,6 +483,48 @@ def blocked_by
480483
synchronize { @blocked_by }
481484
end
482485

486+
def state
487+
future.state
488+
end
489+
490+
def touch
491+
blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) }
492+
end
493+
494+
def to_s
495+
"<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>"
496+
end
497+
498+
def inspect
499+
"#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>"
500+
end
501+
502+
private
503+
504+
def add_blocked_by(*futures)
505+
synchronize { @blocked_by += futures }
506+
self
507+
end
508+
509+
def complete(success, value, reason, raise = true)
510+
future.complete(success, value, reason, raise)
511+
synchronize { @blocked_by.clear }
512+
end
513+
514+
# @return [Future]
515+
def evaluate_to(&block) # TODO for parent
516+
complete true, block.call, nil
517+
rescue => error
518+
complete false, nil, error
519+
end
520+
end
521+
522+
class ExternalPromise < Promise
523+
def initialize(blocked_by_futures, executor = :fast)
524+
super executor
525+
add_blocked_by *blocked_by_futures
526+
end
527+
483528
# Set the `IVar` to a value and wake or notify all threads waiting on it.
484529
#
485530
# @param [Object] value the value to store in the `IVar`
@@ -506,76 +551,69 @@ def try_fail(reason = StandardError.new)
506551
!!complete(false, nil, reason, false)
507552
end
508553

509-
def complete(success, value, reason, raise = true)
510-
future.complete(success, value, reason, raise)
511-
synchronize { @blocked_by.clear }
512-
end
513-
514-
def state
515-
future.state
516-
end
517-
518-
# @return [Future]
519-
def evaluate_to(&block)
520-
success block.call
521-
rescue => error
522-
fail error
523-
end
554+
public :evaluate_to
524555

525556
# @return [Future]
526557
def evaluate_to!(&block)
527558
evaluate_to(&block).no_error!
528559
end
560+
end
561+
562+
class ConnectedPromise < Promise
563+
def initialize(future, executor = :fast)
564+
super(executor)
565+
connect_to future
566+
end
567+
568+
# @api private
569+
public :complete
570+
571+
private
529572

530573
# @return [Future]
531574
def connect_to(future)
532575
add_blocked_by future
533576
future.add_callback :set_promise_on_completion, self
534577
self.future
535578
end
579+
end
536580

537-
def touch
538-
blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) }
539-
end
540-
541-
def to_s
542-
"<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>"
543-
end
544-
545-
def inspect
546-
"#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>"
547-
end
548-
549-
# @api private
550-
def add_blocked_by(*futures)
551-
synchronize { @blocked_by += futures }
552-
self
581+
class Immediate < Promise
582+
def initialize(executor = :fast, &task)
583+
super(executor)
584+
Next.executor(executor).post { evaluate_to &task }
553585
end
554586
end
555587

556-
class Delay < Future
557-
558-
def initialize(default_executor = :fast, &block)
559-
super(Promise.new(self), default_executor)
560-
raise ArgumentError.new('no block given') unless block_given?
588+
class Delay < Promise
589+
def initialize(blocked_by_future, executor = :fast, &task)
590+
super(executor)
561591
synchronize do
592+
@task = task
562593
@computing = false
563-
@task = block
564594
end
595+
add_blocked_by blocked_by_future if blocked_by_future
565596
end
566597

567-
def wait(timeout = nil)
568-
touch
569-
super timeout
598+
def touch
599+
if blocked_by.all?(&:completed?)
600+
execute_once
601+
else
602+
blocked_by.each { |f| f.on_success! { self.touch } unless synchronize { @touched } }
603+
super
604+
end
570605
end
571606

572-
# starts executing the value without blocking
573-
def touch
607+
private
608+
609+
def execute_once
574610
execute, task = synchronize do
575611
[(@computing = true unless @computing), @task]
576612
end
577613

578-
Next.executor(default_executor).post { promise.evaluate_to &task } if execute
614+
if execute
615+
Next.executor(future.default_executor).post { evaluate_to &task }
616+
end
579617
self
580618
end
581619
end
@@ -610,7 +648,7 @@ def touch
610648
future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR
611649
future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR
612650
future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR
613-
future5 = Promise.new(:io).connect_to(future3)
651+
future5 = future3.connect(:io) # connects new future with different executor, the new future is completed when future3 is
614652
future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5
615653
future7 = Future.join(future0, future3)
616654

@@ -642,8 +680,9 @@ def touch
642680
puts '-- promise like tree'
643681

644682
# if head of the tree is not constructed with #future but with #delay it does not start execute,
645-
# it's triggered later by calling wait or value on any of the depedent futures or the delay itself
646-
tree = (head = delay { 1 }).then { |v| v.succ }.then(&:succ).then(&:succ)
683+
# it's triggered later by calling wait or value on any of the dependent futures or the delay itself
684+
three = (head = delay { 1 }).then { |v| v.succ }.then(&:succ)
685+
four = three.then_delay(&:succ)
647686

648687
# meaningful to_s and inspect defined for Future and Promise
649688
puts head
@@ -652,12 +691,65 @@ def touch
652691
# <#Concurrent::Next::Delay:7f89b4bccc68 pending [<#Concurrent::Next::Promise:7f89b4bccb00 pending>]]>
653692
p head.callbacks
654693
# [[:then_callback, :fast, <#Concurrent::Next::Promise:0x7fa54b31d218 pending [<#Concurrent::Next::Delay:0x7fa54b31d380 pending>]>, #<Proc:0x007fa54b31d290>]]
655-
p tree.value
694+
695+
# evaluates only up to three, four is left unevaluated
696+
p three.value # 3
697+
p four, four.promise
698+
# until value is called on four
699+
p four.value # 4
700+
701+
# futures hidden behind two delays trigger evaluation of both
702+
double_delay = delay { 1 }.then_delay(&:succ)
703+
p double_delay.value # 2
704+
705+
puts '-- graph'
706+
707+
head = future { 1 }
708+
branch1 = head.then(&:succ).then(&:succ)
709+
branch2 = head.then(&:succ).then_delay(&:succ)
710+
result = Future.join(branch1, branch2).then { |b1, b2| b1 + b2 }
711+
712+
sleep 0.1
713+
p branch1.completed?, branch2.completed? # true, false
714+
# force evaluation of whole graph
715+
p result.value # 6
656716

657717
puts '-- bench'
658718
require 'benchmark'
659719

660-
Benchmark.bmbm(20) do |b|
720+
module Benchmark
721+
def self.bmbmbm(rehearsals, width)
722+
job = Job.new(width)
723+
yield(job)
724+
width = job.width + 1
725+
sync = STDOUT.sync
726+
STDOUT.sync = true
727+
728+
# rehearsal
729+
rehearsals.times do
730+
puts 'Rehearsal '.ljust(width+CAPTION.length, '-')
731+
ets = job.list.inject(Tms.new) { |sum, (label, item)|
732+
print label.ljust(width)
733+
res = Benchmark.measure(&item)
734+
print res.format
735+
sum + res
736+
}.format("total: %tsec")
737+
print " #{ets}\n\n".rjust(width+CAPTION.length+2, '-')
738+
end
739+
740+
# take
741+
print ' '*width + CAPTION
742+
job.list.map { |label, item|
743+
GC.start
744+
print label.ljust(width)
745+
Benchmark.measure(label, &item).tap { |res| print res }
746+
}
747+
ensure
748+
STDOUT.sync = sync unless sync.nil?
749+
end
750+
end
751+
752+
Benchmark.bmbmbm(20, 20) do |b|
661753

662754
parents = [RubySynchronizedObject, (JavaSynchronizedObject if defined? JavaSynchronizedObject)].compact
663755
classes = parents.map do |parent|

0 commit comments

Comments
 (0)