@@ -160,6 +160,7 @@ def schedule(intended_time, executor = :fast, &task)
160
160
# does not block a thread
161
161
# @return [Future]
162
162
def join ( *futures )
163
+ # TODO consider renaming to zip as in scala
163
164
countdown = Concurrent ::AtomicFixnum . new futures . size
164
165
promise = OuterPromise . new ( futures )
165
166
futures . each { |future | future . add_callback :join_callback , countdown , promise , *futures }
@@ -300,13 +301,13 @@ def chain_delay(executor = default_executor, &callback)
300
301
301
302
# lazy version of #then
302
303
def then_delay ( executor = default_executor , &callback )
303
- delay = Delay . new ( self , executor ) { conditioned_callback callback }
304
+ delay = Delay . new ( self , executor ) { conditioned_success_callback callback }
304
305
delay . future
305
306
end
306
307
307
308
# lazy version of #rescue
308
309
def rescue_delay ( executor = default_executor , &callback )
309
- delay = Delay . new ( self , executor ) { callback_on_failure callback }
310
+ delay = Delay . new ( self , executor ) { conditioned_failure_callback callback }
310
311
delay . future
311
312
end
312
313
@@ -436,11 +437,11 @@ def chain_callback(executor, promise, callback)
436
437
end
437
438
438
439
def then_callback ( executor , promise , callback )
439
- with_async ( executor ) { promise . evaluate_to { conditioned_callback callback } }
440
+ with_async ( executor ) { promise . evaluate_to { conditioned_success_callback callback } }
440
441
end
441
442
442
443
def rescue_callback ( executor , promise , callback )
443
- with_async ( executor ) { promise . evaluate_to { callback_on_failure callback } }
444
+ with_async ( executor ) { promise . evaluate_to { conditioned_failure_callback callback } }
444
445
end
445
446
446
447
def with_async ( executor )
@@ -471,10 +472,14 @@ def callback_on_failure(callback)
471
472
callback . call reason if failed?
472
473
end
473
474
474
- def conditioned_callback ( callback )
475
+ def conditioned_success_callback ( callback )
475
476
self . success? ? callback . call ( value ) : raise ( reason )
476
477
end
477
478
479
+ def conditioned_failure_callback ( callback )
480
+ self . failed? ? callback . call ( reason ) : value
481
+ end
482
+
478
483
def call_callback ( method , *args )
479
484
self . send method , *args
480
485
end
@@ -615,6 +620,7 @@ def initialize(executor = :fast, &task)
615
620
end
616
621
end
617
622
623
+ # will be evaluated to task in intended_time
618
624
class Scheduled < Promise
619
625
def initialize ( intended_time , executor = :fast , &task )
620
626
super ( executor )
@@ -702,11 +708,12 @@ def execute_once
702
708
future5 = future3 . with_default_executor ( :io ) # connects new future with different executor, the new future is completed when future3 is
703
709
future6 = future5 . then ( &:capitalize ) # executes on IO_EXECUTOR because default was set to :io on future5
704
710
future7 = ConcurrentNext . join ( future0 , future3 )
711
+ future8 = future0 . rescue { raise 'never happens' } # future0 succeeds so future8'll have same value as future 0
705
712
706
713
p future3 , future5
707
714
p future3 . callbacks , future5 . callbacks
708
715
709
- futures = [ future0 , future1 , future2 , future3 , future4 , future5 , future6 , future7 ]
716
+ futures = [ future0 , future1 , future2 , future3 , future4 , future5 , future6 , future7 , future8 ]
710
717
futures . each &:wait
711
718
712
719
@@ -721,6 +728,7 @@ def execute_once
721
728
# 5 true boo io
722
729
# 6 true Boo io
723
730
# 7 true [3, "boo"] fast
731
+ # 8 true 3 fast
724
732
725
733
puts '-- delay'
726
734
@@ -782,7 +790,7 @@ def execute_once
782
790
puts '-- scheduled'
783
791
784
792
start = Time . now . to_f
785
- ConcurrentNext . schedule ( 0.1 ) { 1 + 1 } . then { |v | p v , Time . now . to_f - start }
793
+ ConcurrentNext . schedule ( 0.1 ) { 1 + 1 } . then { |v | p v , Time . now . to_f - start }
786
794
sleep 0.2
787
795
788
796
puts '-- using shortcuts'
0 commit comments