From 003b8ad365a69a39410c0c404526464742872a8b Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 23 Oct 2014 16:40:43 +0200 Subject: [PATCH 01/12] First draft IVar-Future-Promise-Probe unification --- lib/concurrent/next.rb | 468 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 468 insertions(+) create mode 100644 lib/concurrent/next.rb diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb new file mode 100644 index 000000000..a2391fb21 --- /dev/null +++ b/lib/concurrent/next.rb @@ -0,0 +1,468 @@ +require 'concurrent' + +module Concurrent + + # TODO Dereferencable + # TODO document new global pool setting: no overflow, user has to buffer when there is too many tasks + module Next + + + # executors do not allocate the threads immediately so they can be constants + # all thread pools are configured to never reject the job + # TODO optional auto termination + module Executors + + IMMEDIATE_EXECUTOR = ImmediateExecutor.new + + # Only non-blocking and short tasks can go into this pool, otherwise it can starve or deadlock + FAST_EXECUTOR = Concurrent::FixedThreadPool.new( + [2, Concurrent.processor_count].max, + idletime: 60, # 1 minute same as Java pool default + max_queue: 0 # unlimited + ) + + # IO and blocking jobs should be executed on this pool + IO_EXECUTOR = Concurrent::ThreadPoolExecutor.new( + min_threads: [2, Concurrent.processor_count].max, + max_threads: Concurrent.processor_count * 100, + idletime: 60, # 1 minute same as Java pool default + max_queue: 0 # unlimited + ) + + def executor(which) + case which + when :immediate, :immediately + IMMEDIATE_EXECUTOR + when :fast + FAST_EXECUTOR + when :io + IO_EXECUTOR + when Executor + which + else + raise TypeError + end + end + end + + extend Executors + + module Shortcuts + + # @return true + def post(executor = :fast, &job) + Next.executor(executor).post &job + end + + # @return [Future] + def future(executor = :fast, &block) + Future.execute executor, &block + end + + alias_method :async, :future + end + + extend Shortcuts + + # TODO benchmark java implementation, is it faster as expected? + class SynchronizedObject + + engine = defined?(RUBY_ENGINE) && RUBY_ENGINE + + case engine + when 'jruby' + require 'jruby' + + def initialize + end + + def synchronize + JRuby.reference0(self).synchronized { yield } + end + + def wait(timeout) + JRuby.reference0(self).wait(timeout ? timeout * 1000 : nil) + end + + def notify_all + JRuby.reference0(self).notifyAll + end + + when 'rbx' + + raise NotImplementedError # TODO + + # def synchronize + # Rubinius.lock(self) + # yield + # ensure + # Rubinius.unlock(self) + # end + + else + + def initialize + @mutex = Mutex.new + @condition = Concurrent::Condition.new + end + + def synchronize + if @mutex.owned? + yield + else + @mutex.synchronize { yield } + end + end + + def wait(timeout) + @condition.wait @mutex, timeout + end + + def notify + @condition.signal + end + + def notify_all + @condition.broadcast + end + + end + end + + module FutureHelpers + # fails on first error + # does not block a thread + # @return [Future] + def join(*futures) + countdown = Concurrent::AtomicFixnum.new futures.size + promise = Promise.new # TODO add injectable executor + futures.each do |future| + future.on_completion! do |success, _, reason| + if success + promise.success futures.map(&:value) if countdown.decrement.zero? + else + promise.try_fail reason + end + end + end + promise.future + end + + # @return [Future] + def execute(executor = :fast, &block) + promise = Promise.new(executor) + Next.executor(executor).post { promise.evaluate_to &block } + promise.future + end + end + + class Future < SynchronizedObject + extend FutureHelpers + + singleton_class.send :alias_method, :dataflow, :join + + def initialize(default_executor = :fast) + super() + synchronize do + @value = nil + @reason = nil + @state = :pending + @callbacks = [] + @default_executor = default_executor + end + end + + # Has the obligation been success? + # @return [Boolean] + def success? + state == :success + end + + # Has the obligation been failed? + # @return [Boolean] + def failed? + state == :failed + end + + # Is obligation completion still pending? + # @return [Boolean] + def pending? + state == :pending + end + + alias_method :incomplete?, :pending? + + def completed? + [:success, :failed].include? state + end + + # @return [Object] see Dereferenceable#deref + def value(timeout = nil) + wait timeout + synchronize { @value } + end + + # wait until Obligation is #complete? + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Obligation] self + def wait(timeout = nil) + synchronize do + # TODO interruptions ? + super timeout if incomplete? + self + end + end + + # wait until Obligation is #complete? + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Obligation] self + # @raise [Exception] when #failed? it raises #reason + def no_error!(timeout = nil) + wait(timeout).tap { raise self if failed? } + end + + # @raise [Exception] when #failed? it raises #reason + # @return [Object] see Dereferenceable#deref + def value!(timeout = nil) + val = value(timeout) + if failed? + raise self + else + val + end + end + + def state + synchronize { @state } + end + + def reason + synchronize { @reason } + end + + def default_executor + synchronize { @default_executor } + end + + # @example allows Obligation to be risen + # failed_ivar = Ivar.new.fail + # raise failed_ivar + def exception(*args) + raise 'obligation is not failed' unless failed? + reason.exception(*args) + end + + # @yield [success, value, reason] of the parent + def chain(executor = default_executor, &callback) + on_completion executor, &with_promise(promise = Promise.new(default_executor), &callback) + promise.future + end + + # @yield [value] executed only on parent success + def then(executor = default_executor, &callback) + on_completion executor, + &with_promise(promise = Promise.new(default_executor), + &-> success, value, reason { success ? callback.call(value) : raise(reason) }) + promise.future + end + + # @yield [reason] executed only on parent failure + def rescue(executor = default_executor, &callback) + on_completion executor, + &with_promise(promise = Promise.new(default_executor), + &-> success, value, reason { callback.call(reason) unless success }) + promise.future + end + + # @yield [success, value, reason] executed async on `executor` when completed + # @return self + def on_completion(executor = default_executor, &callback) + on_completion! &with_async(executor, &callback) + end + + # @yield [value] executed async on `executor` when success + # @return self + def on_success(executor = default_executor, &callback) + on_success! &with_async(executor, &callback) + end + + # @yield [reason] executed async on `executor` when failed? + # @return self + def on_failure(executor = default_executor, &callback) + on_failure! &with_async(executor, &callback) + end + + # @yield [success, value, reason] executed sync when completed + # @return self + def on_completion!(&callback) + add_callback { callback.call success?, value, reason } + end + + # @yield [value] executed sync when success + # @return self + def on_success!(&callback) + add_callback { callback.call value if success? } + end + + # @yield [reason] executed sync when failed? + # @return self + def on_failure!(&callback) + add_callback { callback.call reason if failed? } + end + + # @api private + def complete(success, value, reason, raise = true) # :nodoc: + callbacks = synchronize do + if completed? + if raise + raise MultipleAssignmentError.new('multiple assignment') + else + return nil + end + end + if success + @value = value + @state = :success + else + @reason = reason + @state = :failed + end + notify_all + @callbacks + end + + callbacks.each &:call + callbacks.clear + + self + end + + private + + def with_async(executor, &block) + -> *args { Next.executor(executor).post { block.call *args } } + end + + def with_promise(promise, &block) + -> *args { promise.evaluate_to { block.call *args } } + end + + def add_callback(&callback) + synchronize do + if completed? + callback.call + else + @callbacks << callback + end + end + self + end + + end + + class Promise < SynchronizedObject + def initialize(executor = :fast) + super() + synchronize { @future = Future.new(executor) } + end + + def future + synchronize { @future } + end + + # Set the `IVar` to a value and wake or notify all threads waiting on it. + # + # @param [Object] value the value to store in the `IVar` + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed + # @return [Future] + def success(value) + future.complete(true, value, nil) + end + + def try_success(value) + future.complete(true, value, nil, false) + end + + # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. + # + # @param [Object] reason for the failure + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed + # @return [Future] + def fail(reason = StandardError.new) + future.complete(false, nil, reason) + end + + def try_fail(reason = StandardError.new) + !!future.complete(false, nil, reason, false) + end + + # @return [Future] + def evaluate_to(&block) + success block.call + rescue => error + fail error + end + + # @return [Future] + def evaluate_to!(&block) + evaluate_to(&block).no_error! + end + + # @return [Future] + def connect_to(future) + future.on_completion! { |success, value, reason| self.future.complete success, value, reason } + self.future + end + + end + + end +end + +include Concurrent::Next +include Concurrent::Next::Shortcuts + +puts '-- asynchronous task without Future' +q = Queue.new +post { q << 'a' } +p q.pop + +puts '-- asynchronous task with Future' +p future = future { 1 + 1 } +p future.value + +puts '-- sync and async callbacks on futures' +future = future { 'value' } # executed on FAST_EXECUTOR pool by default +future.on_completion(:io) { p 'async' } # async callback overridden to execute on IO_EXECUTOR pool +future.on_completion! { p 'sync' } # sync callback executed right after completion in the same thread-pool +p future.value +# it should usually print "sync"\n"async"\n"value" + +sleep 0.1 + +puts '-- future chaining' +future0 = future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR +future1 = future0.then(:io) { raise 'boo' } # executed on IO_EXECUTOR +future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR +future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR +future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR +future5 = Promise.new(:io).connect_to(future3) +future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 +future7 = Future.join(future0, future3) + +futures = [future0, future1, future2, future3, future4, future5, future6, future7].each &:wait + + +puts 'index success value reason pool' +futures.each_with_index { |f, i| puts '%5i %7s %10s %6s %4s' % [i, f.success?, f.value, f.reason, f.default_executor] } +# index success value reason pool +# 0 true 3 fast +# 1 false boo fast +# 2 false boo fast +# 3 true boo fast +# 4 true true fast +# 5 true boo io +# 6 true Boo io +# 7 true [3, "boo"] fast + + From e2a6ab69a93bdd48f57660c8afad3fdefb35c75c Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 27 Oct 2014 22:27:42 +0100 Subject: [PATCH 02/12] Adding Delay and benchmark of the synchronization --- lib/concurrent/next.rb | 200 ++++++++++++++++++++++++++++++++--------- 1 file changed, 158 insertions(+), 42 deletions(-) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index a2391fb21..8ec594950 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -59,20 +59,21 @@ def future(executor = :fast, &block) Future.execute executor, &block end + # @return [Delay] + def delay(executor = :fast, &block) + Delay.new(executor, &block) + end + alias_method :async, :future end extend Shortcuts - # TODO benchmark java implementation, is it faster as expected? - class SynchronizedObject - - engine = defined?(RUBY_ENGINE) && RUBY_ENGINE - - case engine - when 'jruby' - require 'jruby' + begin + require 'jruby' + # roughly more than 2x faster + class JavaSynchronizedObject def initialize end @@ -81,51 +82,59 @@ def synchronize end def wait(timeout) - JRuby.reference0(self).wait(timeout ? timeout * 1000 : nil) + if timeout + JRuby.reference0(self).wait(timeout * 1000) + else + JRuby.reference0(self).wait + end end def notify_all JRuby.reference0(self).notifyAll end + end + rescue LoadError + # ignore + end - when 'rbx' - - raise NotImplementedError # TODO + class RubySynchronizedObject + def initialize + @mutex = Mutex.new + @condition = Concurrent::Condition.new + end - # def synchronize - # Rubinius.lock(self) + def synchronize + # if @mutex.owned? # yield - # ensure - # Rubinius.unlock(self) + # else + @mutex.synchronize { yield } + rescue ThreadError + yield # end + end - else - - def initialize - @mutex = Mutex.new - @condition = Concurrent::Condition.new - end - - def synchronize - if @mutex.owned? - yield - else - @mutex.synchronize { yield } - end - end - - def wait(timeout) - @condition.wait @mutex, timeout - end + def wait(timeout) + @condition.wait @mutex, timeout + end - def notify - @condition.signal - end + def notify + @condition.signal + end - def notify_all - @condition.broadcast - end + def notify_all + @condition.broadcast + end + end + engine = defined?(RUBY_ENGINE) && RUBY_ENGINE + case engine + when 'jruby' + class SynchronizedObject < JavaSynchronizedObject + end + when 'rbx' + raise NotImplementedError # TODO + else + class SynchronizedObject < RubySynchronizedObject end end @@ -161,6 +170,7 @@ class Future < SynchronizedObject singleton_class.send :alias_method, :dataflow, :join + # @api private def initialize(default_executor = :fast) super() synchronize do @@ -361,9 +371,15 @@ def add_callback(&callback) end class Promise < SynchronizedObject - def initialize(executor = :fast) + # @api private + def initialize(executor_or_future = :fast) super() - synchronize { @future = Future.new(executor) } + future = if Future === executor_or_future + executor_or_future + else + Future.new(executor_or_future) + end + synchronize { @future = future } end def future @@ -416,6 +432,33 @@ def connect_to(future) end + class Delay < Future + + def initialize(default_executor = :fast, &block) + super(default_executor) + raise ArgumentError.new('no block given') unless block_given? + synchronize do + @computing = false + @task = block + end + end + + def wait(timeout = nil) + execute_task_once + super timeout + end + + private + + def execute_task_once + execute, task = synchronize do + [(@computing = true unless @computing), @task] + end + + Next.executor(default_executor).post { Promise.new(self).evaluate_to &task } if execute + end + end + end end @@ -465,4 +508,77 @@ def connect_to(future) # 6 true Boo io # 7 true [3, "boo"] fast +puts '-- delay' + +# evaluated on #wait, #value +delay = delay { 1 + 1 } +p delay.completed?, delay.value + +puts '-- promise like tree' + +# if head of the tree is not constructed with #future but with #delay it does not start execute, +# it's triggered later by `head.wait` +head = delay { 1 } +tree = head.then(&:succ).then(&:succ).then(&:succ) +thread = Thread.new { p tree.value } # prints 4 +head.wait +thread.join + +puts '-- bench' +require 'benchmark' + +Benchmark.bmbm(20) do |b| + + parents = [RubySynchronizedObject, (JavaSynchronizedObject if defined? JavaSynchronizedObject)].compact + classes = parents.map do |parent| + klass = Class.new(parent) do + def initialize + super + synchronize do + @q = [] + end + end + + def add(v) + synchronize do + @q << v + if @q.size > 100 + @q.clear + end + end + end + end + [parent, klass] + end + + classes.each do |parent, klass| + b.report(parent) do + s = klass.new + 2.times.map do + Thread.new do + 5_000_000.times { s.add :a } + end + end.each &:join + end + + end + +end +# MRI +# Rehearsal ---------------------------------------------------------------------------- +# Concurrent::Next::RubySynchronizedObject 8.010000 6.290000 14.300000 ( 12.197402) +# ------------------------------------------------------------------ total: 14.300000sec +# +# user system total real +# Concurrent::Next::RubySynchronizedObject 8.950000 9.320000 18.270000 ( 15.053220) +# +# JRuby +# Rehearsal ---------------------------------------------------------------------------- +# Concurrent::Next::RubySynchronizedObject 10.500000 6.440000 16.940000 ( 10.640000) +# Concurrent::Next::JavaSynchronizedObject 8.410000 0.050000 8.460000 ( 4.132000) +# ------------------------------------------------------------------ total: 25.400000sec +# +# user system total real +# Concurrent::Next::RubySynchronizedObject 9.090000 6.640000 15.730000 ( 10.690000) +# Concurrent::Next::JavaSynchronizedObject 8.200000 0.030000 8.230000 ( 4.141000) From 6511328f893009ec1bffddf9ff6dbadbb078ce8b Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 4 Nov 2014 10:48:35 +0100 Subject: [PATCH 03/12] Various improvements - untangle callback into methods for better readability - promise tree starting with delay can be now also triggered by calling wait or value on any of the children - better inspectability - track what blocks what - better to_s and inspect support --- lib/concurrent/next.rb | 246 +++++++++++++++++++++++++++++++---------- 1 file changed, 189 insertions(+), 57 deletions(-) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index 8ec594950..df59613ef 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -49,9 +49,9 @@ def executor(which) module Shortcuts - # @return true def post(executor = :fast, &job) Next.executor(executor).post &job + self end # @return [Future] @@ -144,16 +144,8 @@ module FutureHelpers # @return [Future] def join(*futures) countdown = Concurrent::AtomicFixnum.new futures.size - promise = Promise.new # TODO add injectable executor - futures.each do |future| - future.on_completion! do |success, _, reason| - if success - promise.success futures.map(&:value) if countdown.decrement.zero? - else - promise.try_fail reason - end - end - end + promise = Promise.new.add_blocked_by(*futures) # TODO add injectable executor + futures.each { |future| future.add_callback :join, countdown, promise, *futures } promise.future end @@ -171,9 +163,10 @@ class Future < SynchronizedObject singleton_class.send :alias_method, :dataflow, :join # @api private - def initialize(default_executor = :fast) + def initialize(promise, default_executor = :fast) super() synchronize do + @promise = promise @value = nil @reason = nil @state = :pending @@ -206,6 +199,10 @@ def completed? [:success, :failed].include? state end + def promise + synchronize { @promise } + end + # @return [Object] see Dereferenceable#deref def value(timeout = nil) wait timeout @@ -217,12 +214,17 @@ def value(timeout = nil) # @return [Obligation] self def wait(timeout = nil) synchronize do + touch # TODO interruptions ? super timeout if incomplete? self end end + def touch + promise.touch + end + # wait until Obligation is #complete? # @param [Numeric] timeout the maximum time in second to wait. # @return [Obligation] self @@ -262,62 +264,75 @@ def exception(*args) reason.exception(*args) end + # TODO add #then_delay { ... } and such to be able to chain delayed evaluations + # @yield [success, value, reason] of the parent def chain(executor = default_executor, &callback) - on_completion executor, &with_promise(promise = Promise.new(default_executor), &callback) + add_callback :chain_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback promise.future end # @yield [value] executed only on parent success def then(executor = default_executor, &callback) - on_completion executor, - &with_promise(promise = Promise.new(default_executor), - &-> success, value, reason { success ? callback.call(value) : raise(reason) }) + add_callback :then_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback promise.future end # @yield [reason] executed only on parent failure def rescue(executor = default_executor, &callback) - on_completion executor, - &with_promise(promise = Promise.new(default_executor), - &-> success, value, reason { callback.call(reason) unless success }) + add_callback :rescue_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback promise.future end # @yield [success, value, reason] executed async on `executor` when completed # @return self def on_completion(executor = default_executor, &callback) - on_completion! &with_async(executor, &callback) + add_callback :async_callback_on_completion, executor, callback end # @yield [value] executed async on `executor` when success # @return self def on_success(executor = default_executor, &callback) - on_success! &with_async(executor, &callback) + add_callback :async_callback_on_success, executor, callback end # @yield [reason] executed async on `executor` when failed? # @return self def on_failure(executor = default_executor, &callback) - on_failure! &with_async(executor, &callback) + add_callback :async_callback_on_failure, executor, callback end # @yield [success, value, reason] executed sync when completed # @return self def on_completion!(&callback) - add_callback { callback.call success?, value, reason } + add_callback :callback_on_completion, callback end # @yield [value] executed sync when success # @return self def on_success!(&callback) - add_callback { callback.call value if success? } + add_callback :callback_on_success, callback end # @yield [reason] executed sync when failed? # @return self def on_failure!(&callback) - add_callback { callback.call reason if failed? } + add_callback :callback_on_failure, callback + end + + # @return [Array] + def blocks + synchronize { @callbacks }.each_with_object([]) do |callback, promises| + promises.push *callback.select { |v| v.is_a? Promise } + end + end + + def to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" + end + + def inspect + "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" end # @api private @@ -341,33 +356,103 @@ def complete(success, value, reason, raise = true) # :nodoc: @callbacks end - callbacks.each &:call + callbacks.each { |method, *args| call_callback method, *args } callbacks.clear self end + # @api private + # just for inspection + def callbacks + synchronize { @callbacks }.clone.freeze + end + + # @api private + def add_callback(method, *args) + synchronize do + if completed? + call_callback method, *args + else + @callbacks << [method, *args] + end + end + self + end + private - def with_async(executor, &block) - -> *args { Next.executor(executor).post { block.call *args } } + def set_promise_on_completion(promise) + promise.complete success?, value, reason + end + + def join(countdown, promise, *futures) + if success? + promise.success futures.map(&:value) if countdown.decrement.zero? + else + promise.try_fail reason + end end def with_promise(promise, &block) - -> *args { promise.evaluate_to { block.call *args } } + promise.evaluate_to &block end - def add_callback(&callback) - synchronize do - if completed? - callback.call - else - @callbacks << callback + def chain_callback(executor, promise, callback) + with_async(executor) do + with_promise(promise) do + callback_on_completion callback end end - self end + def then_callback(executor, promise, callback) + with_async(executor) do + with_promise(promise) do + success? ? callback.call(value) : raise(reason) + end + end + end + + def rescue_callback(executor, promise, callback) + with_async(executor) do + with_promise(promise) do + callback_on_failure callback + end + end + end + + def with_async(executor) + Next.executor(executor).post { yield } + end + + def async_callback_on_completion(executor, callback) + with_async(executor) { callback_on_completion callback } + end + + def async_callback_on_success(executor, callback) + with_async(executor) { callback_on_success callback } + end + + def async_callback_on_failure(executor, callback) + with_async(executor) { callback_on_failure callback } + end + + def callback_on_completion(callback) + callback.call success?, value, reason + end + + def callback_on_success(callback) + callback.call value if success? + end + + def callback_on_failure(callback) + callback.call reason if failed? + end + + def call_callback(method, *args) + self.send method, *args + end end class Promise < SynchronizedObject @@ -377,26 +462,35 @@ def initialize(executor_or_future = :fast) future = if Future === executor_or_future executor_or_future else - Future.new(executor_or_future) + Future.new(self, executor_or_future) end - synchronize { @future = future } + + synchronize do + @future = future + @blocked_by = [] + @touched = false + end end def future synchronize { @future } end + def blocked_by + synchronize { @blocked_by } + end + # Set the `IVar` to a value and wake or notify all threads waiting on it. # # @param [Object] value the value to store in the `IVar` # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed # @return [Future] def success(value) - future.complete(true, value, nil) + complete(true, value, nil) end def try_success(value) - future.complete(true, value, nil, false) + complete(true, value, nil, false) end # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. @@ -405,11 +499,20 @@ def try_success(value) # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed # @return [Future] def fail(reason = StandardError.new) - future.complete(false, nil, reason) + complete(false, nil, reason) end def try_fail(reason = StandardError.new) - !!future.complete(false, nil, reason, false) + !!complete(false, nil, reason, false) + end + + def complete(success, value, reason, raise = true) + future.complete(success, value, reason, raise) + synchronize { @blocked_by.clear } + end + + def state + future.state end # @return [Future] @@ -426,16 +529,34 @@ def evaluate_to!(&block) # @return [Future] def connect_to(future) - future.on_completion! { |success, value, reason| self.future.complete success, value, reason } + add_blocked_by future + future.add_callback :set_promise_on_completion, self self.future end + def touch + blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) } + end + + def to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" + end + + def inspect + "#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>" + end + + # @api private + def add_blocked_by(*futures) + synchronize { @blocked_by += futures } + self + end end class Delay < Future def initialize(default_executor = :fast, &block) - super(default_executor) + super(Promise.new(self), default_executor) raise ArgumentError.new('no block given') unless block_given? synchronize do @computing = false @@ -444,18 +565,18 @@ def initialize(default_executor = :fast, &block) end def wait(timeout = nil) - execute_task_once + touch super timeout end - private - - def execute_task_once + # starts executing the value without blocking + def touch execute, task = synchronize do [(@computing = true unless @computing), @task] end - Next.executor(default_executor).post { Promise.new(self).evaluate_to &task } if execute + Next.executor(default_executor).post { promise.evaluate_to &task } if execute + self end end @@ -493,7 +614,11 @@ def execute_task_once future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 future7 = Future.join(future0, future3) -futures = [future0, future1, future2, future3, future4, future5, future6, future7].each &:wait +p future3, future5 +p future3.callbacks, future5.callbacks + +futures = [future0, future1, future2, future3, future4, future5, future6, future7] +futures.each &:wait puts 'index success value reason pool' @@ -517,12 +642,17 @@ def execute_task_once puts '-- promise like tree' # if head of the tree is not constructed with #future but with #delay it does not start execute, -# it's triggered later by `head.wait` -head = delay { 1 } -tree = head.then(&:succ).then(&:succ).then(&:succ) -thread = Thread.new { p tree.value } # prints 4 -head.wait -thread.join +# it's triggered later by calling wait or value on any of the depedent futures or the delay itself +tree = (head = delay { 1 }).then { |v| v.succ }.then(&:succ).then(&:succ) + +# meaningful to_s and inspect defined for Future and Promise +puts head +# <#Concurrent::Next::Delay:7f89b4bccc68 pending> +p head +# <#Concurrent::Next::Delay:7f89b4bccc68 pending [<#Concurrent::Next::Promise:7f89b4bccb00 pending>]]> +p head.callbacks +# [[:then_callback, :fast, <#Concurrent::Next::Promise:0x7fa54b31d218 pending [<#Concurrent::Next::Delay:0x7fa54b31d380 pending>]>, #]] +p tree.value puts '-- bench' require 'benchmark' @@ -551,12 +681,14 @@ def add(v) [parent, klass] end + count = 5_000_000 + classes.each do |parent, klass| b.report(parent) do s = klass.new 2.times.map do Thread.new do - 5_000_000.times { s.add :a } + count.times { s.add :a } end end.each &:join end From 96a4f0db0d84d40f19739ef0ce458e3d0146d940 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 4 Nov 2014 22:13:49 +0100 Subject: [PATCH 04/12] Allows to build promise trees with lazy evaluated branches - Adds #chain_delay, #then_delay, #rescue_delay which are same as #chain, #then, #rescue but are not evaluated automatically but only when requested by #value. - Restructure class hierarchy. Only one Future with Multiple Promise implementations which are hidden to the user. Provides better encapsulation. - Delay is now implemented as a Promise descendant. --- lib/concurrent/next.rb | 266 ++++++++++++++++++++++++++++------------- 1 file changed, 180 insertions(+), 86 deletions(-) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index df59613ef..f8aff1f2f 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -56,12 +56,12 @@ def post(executor = :fast, &job) # @return [Future] def future(executor = :fast, &block) - Future.execute executor, &block + Immediate.new(executor, &block).future end # @return [Delay] def delay(executor = :fast, &block) - Delay.new(executor, &block) + Delay.new(nil, executor, &block).future end alias_method :async, :future @@ -144,21 +144,15 @@ module FutureHelpers # @return [Future] def join(*futures) countdown = Concurrent::AtomicFixnum.new futures.size - promise = Promise.new.add_blocked_by(*futures) # TODO add injectable executor + promise = ExternalPromise.new(futures) futures.each { |future| future.add_callback :join, countdown, promise, *futures } promise.future end - - # @return [Future] - def execute(executor = :fast, &block) - promise = Promise.new(executor) - Next.executor(executor).post { promise.evaluate_to &block } - promise.future - end end class Future < SynchronizedObject extend FutureHelpers + extend Shortcuts singleton_class.send :alias_method, :dataflow, :join @@ -264,26 +258,47 @@ def exception(*args) reason.exception(*args) end - # TODO add #then_delay { ... } and such to be able to chain delayed evaluations + # TODO needs better name + def connect(executor = default_executor) + ConnectedPromise.new(self, executor).future + end # @yield [success, value, reason] of the parent def chain(executor = default_executor, &callback) - add_callback :chain_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback + add_callback :chain_callback, executor, promise = ExternalPromise.new([self], default_executor), callback promise.future end # @yield [value] executed only on parent success def then(executor = default_executor, &callback) - add_callback :then_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback + add_callback :then_callback, executor, promise = ExternalPromise.new([self], default_executor), callback promise.future end # @yield [reason] executed only on parent failure def rescue(executor = default_executor, &callback) - add_callback :rescue_callback, executor, promise = Promise.new(default_executor).add_blocked_by(self), callback + add_callback :rescue_callback, executor, promise = ExternalPromise.new([self], default_executor), callback promise.future end + # lazy version of #chain + def chain_delay(executor = default_executor, &callback) + delay = Delay.new(self, executor) { callback_on_completion callback } + delay.future + end + + # lazy version of #then + def then_delay(executor = default_executor, &callback) + delay = Delay.new(self, executor) { conditioned_callback callback } + delay.future + end + + # lazy version of #rescue + def rescue_delay(executor = default_executor, &callback) + delay = Delay.new(self, executor) { callback_on_failure callback } + delay.future + end + # @yield [success, value, reason] executed async on `executor` when completed # @return self def on_completion(executor = default_executor, &callback) @@ -399,27 +414,15 @@ def with_promise(promise, &block) end def chain_callback(executor, promise, callback) - with_async(executor) do - with_promise(promise) do - callback_on_completion callback - end - end + with_async(executor) { with_promise(promise) { callback_on_completion callback } } end def then_callback(executor, promise, callback) - with_async(executor) do - with_promise(promise) do - success? ? callback.call(value) : raise(reason) - end - end + with_async(executor) { with_promise(promise) { conditioned_callback callback } } end def rescue_callback(executor, promise, callback) - with_async(executor) do - with_promise(promise) do - callback_on_failure callback - end - end + with_async(executor) { with_promise(promise) { callback_on_failure callback } } end def with_async(executor) @@ -450,6 +453,10 @@ def callback_on_failure(callback) callback.call reason if failed? end + def conditioned_callback(callback) + self.success? ? callback.call(value) : raise(reason) + end + def call_callback(method, *args) self.send method, *args end @@ -457,13 +464,9 @@ def call_callback(method, *args) class Promise < SynchronizedObject # @api private - def initialize(executor_or_future = :fast) + def initialize(executor = :fast) super() - future = if Future === executor_or_future - executor_or_future - else - Future.new(self, executor_or_future) - end + future = Future.new(self, executor) synchronize do @future = future @@ -480,6 +483,48 @@ def blocked_by synchronize { @blocked_by } end + def state + future.state + end + + def touch + blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) } + end + + def to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" + end + + def inspect + "#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>" + end + + private + + def add_blocked_by(*futures) + synchronize { @blocked_by += futures } + self + end + + def complete(success, value, reason, raise = true) + future.complete(success, value, reason, raise) + synchronize { @blocked_by.clear } + end + + # @return [Future] + def evaluate_to(&block) # TODO for parent + complete true, block.call, nil + rescue => error + complete false, nil, error + end + end + + class ExternalPromise < Promise + def initialize(blocked_by_futures, executor = :fast) + super executor + add_blocked_by *blocked_by_futures + end + # Set the `IVar` to a value and wake or notify all threads waiting on it. # # @param [Object] value the value to store in the `IVar` @@ -506,26 +551,24 @@ def try_fail(reason = StandardError.new) !!complete(false, nil, reason, false) end - def complete(success, value, reason, raise = true) - future.complete(success, value, reason, raise) - synchronize { @blocked_by.clear } - end - - def state - future.state - end - - # @return [Future] - def evaluate_to(&block) - success block.call - rescue => error - fail error - end + public :evaluate_to # @return [Future] def evaluate_to!(&block) evaluate_to(&block).no_error! end + end + + class ConnectedPromise < Promise + def initialize(future, executor = :fast) + super(executor) + connect_to future + end + + # @api private + public :complete + + private # @return [Future] def connect_to(future) @@ -533,49 +576,44 @@ def connect_to(future) future.add_callback :set_promise_on_completion, self self.future end + end - def touch - blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) } - end - - def to_s - "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" - end - - def inspect - "#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>" - end - - # @api private - def add_blocked_by(*futures) - synchronize { @blocked_by += futures } - self + class Immediate < Promise + def initialize(executor = :fast, &task) + super(executor) + Next.executor(executor).post { evaluate_to &task } end end - class Delay < Future - - def initialize(default_executor = :fast, &block) - super(Promise.new(self), default_executor) - raise ArgumentError.new('no block given') unless block_given? + class Delay < Promise + def initialize(blocked_by_future, executor = :fast, &task) + super(executor) synchronize do + @task = task @computing = false - @task = block end + add_blocked_by blocked_by_future if blocked_by_future end - def wait(timeout = nil) - touch - super timeout + def touch + if blocked_by.all?(&:completed?) + execute_once + else + blocked_by.each { |f| f.on_success! { self.touch } unless synchronize { @touched } } + super + end end - # starts executing the value without blocking - def touch + private + + def execute_once execute, task = synchronize do [(@computing = true unless @computing), @task] end - Next.executor(default_executor).post { promise.evaluate_to &task } if execute + if execute + Next.executor(future.default_executor).post { evaluate_to &task } + end self end end @@ -610,7 +648,7 @@ def touch future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR -future5 = Promise.new(:io).connect_to(future3) +future5 = future3.connect(:io) # connects new future with different executor, the new future is completed when future3 is future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 future7 = Future.join(future0, future3) @@ -642,22 +680,78 @@ def touch puts '-- promise like tree' # if head of the tree is not constructed with #future but with #delay it does not start execute, -# it's triggered later by calling wait or value on any of the depedent futures or the delay itself -tree = (head = delay { 1 }).then { |v| v.succ }.then(&:succ).then(&:succ) +# it's triggered later by calling wait or value on any of the dependent futures or the delay itself +three = (head = delay { 1 }).then { |v| v.succ }.then(&:succ) +four = three.then_delay(&:succ) # meaningful to_s and inspect defined for Future and Promise puts head -# <#Concurrent::Next::Delay:7f89b4bccc68 pending> +# <#Concurrent::Next::Future:0x7fb9dcabacc8 pending> p head -# <#Concurrent::Next::Delay:7f89b4bccc68 pending [<#Concurrent::Next::Promise:7f89b4bccb00 pending>]]> +# <#Concurrent::Next::Future:0x7fb9dcabacc8 pending blocks:[<#Concurrent::Next::ExternalPromise:0x7fb9dcabaac0 pending>]> p head.callbacks -# [[:then_callback, :fast, <#Concurrent::Next::Promise:0x7fa54b31d218 pending [<#Concurrent::Next::Delay:0x7fa54b31d380 pending>]>, #]] -p tree.value +# [[:then_callback, :fast, <#Concurrent::Next::ExternalPromise:0x7fb9dcabaac0 pending blocked_by:[<#Concurrent::Next::Future:0x7fb9dcabacc8 pending>]>, +# #]] + + +# evaluates only up to three, four is left unevaluated +p three.value # 3 +p four, four.promise +# until value is called on four +p four.value # 4 + +# futures hidden behind two delays trigger evaluation of both +double_delay = delay { 1 }.then_delay(&:succ) +p double_delay.value # 2 + +puts '-- graph' + +head = future { 1 } +branch1 = head.then(&:succ).then(&:succ) +branch2 = head.then(&:succ).then_delay(&:succ) +result = Future.join(branch1, branch2).then { |b1, b2| b1 + b2 } + +sleep 0.1 +p branch1.completed?, branch2.completed? # true, false +# force evaluation of whole graph +p result.value # 6 puts '-- bench' require 'benchmark' -Benchmark.bmbm(20) do |b| +module Benchmark + def self.bmbmbm(rehearsals, width) + job = Job.new(width) + yield(job) + width = job.width + 1 + sync = STDOUT.sync + STDOUT.sync = true + + # rehearsal + rehearsals.times do + puts 'Rehearsal '.ljust(width+CAPTION.length, '-') + ets = job.list.inject(Tms.new) { |sum, (label, item)| + print label.ljust(width) + res = Benchmark.measure(&item) + print res.format + sum + res + }.format("total: %tsec") + print " #{ets}\n\n".rjust(width+CAPTION.length+2, '-') + end + + # take + print ' '*width + CAPTION + job.list.map { |label, item| + GC.start + print label.ljust(width) + Benchmark.measure(label, &item).tap { |res| print res } + } + ensure + STDOUT.sync = sync unless sync.nil? + end +end + +Benchmark.bmbmbm(20, 20) do |b| parents = [RubySynchronizedObject, (JavaSynchronizedObject if defined? JavaSynchronizedObject)].compact classes = parents.map do |parent| From 440f0a2de0e186d1b7689c9a60ddc33b46c0c260 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 6 Nov 2014 09:38:37 +0100 Subject: [PATCH 05/12] Adding more documentation --- lib/concurrent/next.rb | 1045 +++++++++++++++++++++------------------- 1 file changed, 542 insertions(+), 503 deletions(-) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index f8aff1f2f..a467dd40a 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -1,640 +1,655 @@ require 'concurrent' -module Concurrent - - # TODO Dereferencable - # TODO document new global pool setting: no overflow, user has to buffer when there is too many tasks - module Next - - - # executors do not allocate the threads immediately so they can be constants - # all thread pools are configured to never reject the job - # TODO optional auto termination - module Executors - - IMMEDIATE_EXECUTOR = ImmediateExecutor.new - - # Only non-blocking and short tasks can go into this pool, otherwise it can starve or deadlock - FAST_EXECUTOR = Concurrent::FixedThreadPool.new( - [2, Concurrent.processor_count].max, - idletime: 60, # 1 minute same as Java pool default - max_queue: 0 # unlimited - ) - - # IO and blocking jobs should be executed on this pool - IO_EXECUTOR = Concurrent::ThreadPoolExecutor.new( - min_threads: [2, Concurrent.processor_count].max, - max_threads: Concurrent.processor_count * 100, - idletime: 60, # 1 minute same as Java pool default - max_queue: 0 # unlimited - ) - - def executor(which) - case which - when :immediate, :immediately - IMMEDIATE_EXECUTOR - when :fast - FAST_EXECUTOR - when :io - IO_EXECUTOR - when Executor - which - else - raise TypeError - end +# TODO Dereferencable +# TODO document new global pool setting: no overflow, user has to buffer when there is too many tasks + +# different name just not to collide +module ConcurrentNext + + # executors do not allocate the threads immediately so they can be constants + # all thread pools are configured to never reject the job + # TODO optional auto termination + module Executors + + IMMEDIATE_EXECUTOR = Concurrent::ImmediateExecutor.new + + # Only non-blocking and short tasks can go into this pool, otherwise it can starve or deadlock + FAST_EXECUTOR = Concurrent::FixedThreadPool.new( + [2, Concurrent.processor_count].max, + idletime: 60, # 1 minute same as Java pool default + max_queue: 0 # unlimited + ) + + # IO and blocking jobs should be executed on this pool + IO_EXECUTOR = Concurrent::ThreadPoolExecutor.new( + min_threads: [2, Concurrent.processor_count].max, + max_threads: Concurrent.processor_count * 100, + idletime: 60, # 1 minute same as Java pool default + max_queue: 0 # unlimited + ) + + def executor(which) + case which + when :immediate, :immediately + IMMEDIATE_EXECUTOR + when :fast + FAST_EXECUTOR + when :io + IO_EXECUTOR + when Executor + which + else + raise TypeError end end + end - extend Executors + extend Executors - module Shortcuts + begin + require 'jruby' - def post(executor = :fast, &job) - Next.executor(executor).post &job - self + # roughly more than 2x faster + class JavaSynchronizedObject + def initialize end - # @return [Future] - def future(executor = :fast, &block) - Immediate.new(executor, &block).future + def synchronize + JRuby.reference0(self).synchronized { yield } end - # @return [Delay] - def delay(executor = :fast, &block) - Delay.new(nil, executor, &block).future + def wait(timeout) + if timeout + JRuby.reference0(self).wait(timeout * 1000) + else + JRuby.reference0(self).wait + end end - alias_method :async, :future + def notify_all + JRuby.reference0(self).notifyAll + end end + rescue LoadError + # ignore + end - extend Shortcuts - - begin - require 'jruby' + class RubySynchronizedObject + def initialize + @mutex = Mutex.new + @condition = Concurrent::Condition.new + end - # roughly more than 2x faster - class JavaSynchronizedObject - def initialize - end + def synchronize + # if @mutex.owned? + # yield + # else + @mutex.synchronize { yield } + rescue ThreadError + yield + # end + end - def synchronize - JRuby.reference0(self).synchronized { yield } - end + def wait(timeout) + @condition.wait @mutex, timeout + end - def wait(timeout) - if timeout - JRuby.reference0(self).wait(timeout * 1000) - else - JRuby.reference0(self).wait - end - end + def notify + @condition.signal + end - def notify_all - JRuby.reference0(self).notifyAll - end - end - rescue LoadError - # ignore + def notify_all + @condition.broadcast end + end - class RubySynchronizedObject - def initialize - @mutex = Mutex.new - @condition = Concurrent::Condition.new - end + case defined?(RUBY_ENGINE) && RUBY_ENGINE + when 'jruby' + # @abstract + class SynchronizedObject < JavaSynchronizedObject + end + when 'rbx' + raise NotImplementedError # TODO + else + # @abstract + class SynchronizedObject < RubySynchronizedObject + end + end - def synchronize - # if @mutex.owned? - # yield - # else - @mutex.synchronize { yield } - rescue ThreadError - yield - # end + class Future < SynchronizedObject + module Shortcuts + def post(executor = :fast, &job) + ConcurrentNext.executor(executor).post &job + self end - def wait(timeout) - @condition.wait @mutex, timeout + # @return [Future] + def future(executor = :fast, &block) + ConcurrentNext::Immediate.new(executor, &block).future end - def notify - @condition.signal - end + alias_method :async, :future - def notify_all - @condition.broadcast + # @return [Delay] + def delay(executor = :fast, &block) + ConcurrentNext::Delay.new(nil, executor, &block).future end - end - engine = defined?(RUBY_ENGINE) && RUBY_ENGINE - case engine - when 'jruby' - class SynchronizedObject < JavaSynchronizedObject + def promise(executor = :fast) + ConcurrentNext::OuterPromise.new([], executor) end - when 'rbx' - raise NotImplementedError # TODO - else - class SynchronizedObject < RubySynchronizedObject - end - end - module FutureHelpers # fails on first error # does not block a thread # @return [Future] def join(*futures) countdown = Concurrent::AtomicFixnum.new futures.size - promise = ExternalPromise.new(futures) + promise = OuterPromise.new(futures) futures.each { |future| future.add_callback :join, countdown, promise, *futures } promise.future end end - class Future < SynchronizedObject - extend FutureHelpers - extend Shortcuts + extend Shortcuts - singleton_class.send :alias_method, :dataflow, :join + singleton_class.send :alias_method, :dataflow, :join - # @api private - def initialize(promise, default_executor = :fast) - super() - synchronize do - @promise = promise - @value = nil - @reason = nil - @state = :pending - @callbacks = [] - @default_executor = default_executor - end + # @api private + def initialize(promise, default_executor = :fast) + super() + synchronize do + @promise = promise + @value = nil + @reason = nil + @state = :pending + @callbacks = [] + @default_executor = default_executor end + end - # Has the obligation been success? - # @return [Boolean] - def success? - state == :success - end + # Has the obligation been success? + # @return [Boolean] + def success? + state == :success + end - # Has the obligation been failed? - # @return [Boolean] - def failed? - state == :failed - end + # Has the obligation been failed? + # @return [Boolean] + def failed? + state == :failed + end - # Is obligation completion still pending? - # @return [Boolean] - def pending? - state == :pending - end + # Is obligation completion still pending? + # @return [Boolean] + def pending? + state == :pending + end - alias_method :incomplete?, :pending? + alias_method :incomplete?, :pending? - def completed? - [:success, :failed].include? state - end + def completed? + [:success, :failed].include? state + end - def promise - synchronize { @promise } - end + # @return [Object] see Dereferenceable#deref + def value(timeout = nil) + wait timeout + synchronize { @value } + end - # @return [Object] see Dereferenceable#deref - def value(timeout = nil) - wait timeout - synchronize { @value } + # wait until Obligation is #complete? + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Obligation] self + def wait(timeout = nil) + synchronize do + touch + # TODO interruptions ? + super timeout if incomplete? + self end + end - # wait until Obligation is #complete? - # @param [Numeric] timeout the maximum time in second to wait. - # @return [Obligation] self - def wait(timeout = nil) - synchronize do - touch - # TODO interruptions ? - super timeout if incomplete? - self - end - end + def touch + promise.touch + end - def touch - promise.touch - end + # wait until Obligation is #complete? + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Obligation] self + # @raise [Exception] when #failed? it raises #reason + def no_error!(timeout = nil) + wait(timeout).tap { raise self if failed? } + end - # wait until Obligation is #complete? - # @param [Numeric] timeout the maximum time in second to wait. - # @return [Obligation] self - # @raise [Exception] when #failed? it raises #reason - def no_error!(timeout = nil) - wait(timeout).tap { raise self if failed? } + # @raise [Exception] when #failed? it raises #reason + # @return [Object] see Dereferenceable#deref + def value!(timeout = nil) + val = value(timeout) + if failed? + raise self + else + val end + end - # @raise [Exception] when #failed? it raises #reason - # @return [Object] see Dereferenceable#deref - def value!(timeout = nil) - val = value(timeout) - if failed? - raise self - else - val - end - end + def state + synchronize { @state } + end - def state - synchronize { @state } - end + def reason + synchronize { @reason } + end - def reason - synchronize { @reason } - end + def default_executor + synchronize { @default_executor } + end - def default_executor - synchronize { @default_executor } - end + # @example allows Obligation to be risen + # failed_ivar = Ivar.new.fail + # raise failed_ivar + def exception(*args) + raise 'obligation is not failed' unless failed? + reason.exception(*args) + end - # @example allows Obligation to be risen - # failed_ivar = Ivar.new.fail - # raise failed_ivar - def exception(*args) - raise 'obligation is not failed' unless failed? - reason.exception(*args) - end + def with_default_executor(executor = default_executor) + ConnectedPromise.new(self, executor).future + end - # TODO needs better name - def connect(executor = default_executor) - ConnectedPromise.new(self, executor).future - end + alias_method :new_connected, :with_default_executor - # @yield [success, value, reason] of the parent - def chain(executor = default_executor, &callback) - add_callback :chain_callback, executor, promise = ExternalPromise.new([self], default_executor), callback - promise.future - end + # @yield [success, value, reason] of the parent + def chain(executor = default_executor, &callback) + add_callback :chain_callback, executor, promise = OuterPromise.new([self], default_executor), callback + promise.future + end - # @yield [value] executed only on parent success - def then(executor = default_executor, &callback) - add_callback :then_callback, executor, promise = ExternalPromise.new([self], default_executor), callback - promise.future - end + # @yield [value] executed only on parent success + def then(executor = default_executor, &callback) + add_callback :then_callback, executor, promise = OuterPromise.new([self], default_executor), callback + promise.future + end - # @yield [reason] executed only on parent failure - def rescue(executor = default_executor, &callback) - add_callback :rescue_callback, executor, promise = ExternalPromise.new([self], default_executor), callback - promise.future - end + # @yield [reason] executed only on parent failure + def rescue(executor = default_executor, &callback) + add_callback :rescue_callback, executor, promise = OuterPromise.new([self], default_executor), callback + promise.future + end - # lazy version of #chain - def chain_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { callback_on_completion callback } - delay.future - end + # lazy version of #chain + def chain_delay(executor = default_executor, &callback) + delay = Delay.new(self, executor) { callback_on_completion callback } + delay.future + end - # lazy version of #then - def then_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { conditioned_callback callback } - delay.future - end + # lazy version of #then + def then_delay(executor = default_executor, &callback) + delay = Delay.new(self, executor) { conditioned_callback callback } + delay.future + end - # lazy version of #rescue - def rescue_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { callback_on_failure callback } - delay.future - end + # lazy version of #rescue + def rescue_delay(executor = default_executor, &callback) + delay = Delay.new(self, executor) { callback_on_failure callback } + delay.future + end - # @yield [success, value, reason] executed async on `executor` when completed - # @return self - def on_completion(executor = default_executor, &callback) - add_callback :async_callback_on_completion, executor, callback - end + # @yield [success, value, reason] executed async on `executor` when completed + # @return self + def on_completion(executor = default_executor, &callback) + add_callback :async_callback_on_completion, executor, callback + end - # @yield [value] executed async on `executor` when success - # @return self - def on_success(executor = default_executor, &callback) - add_callback :async_callback_on_success, executor, callback - end + # @yield [value] executed async on `executor` when success + # @return self + def on_success(executor = default_executor, &callback) + add_callback :async_callback_on_success, executor, callback + end - # @yield [reason] executed async on `executor` when failed? - # @return self - def on_failure(executor = default_executor, &callback) - add_callback :async_callback_on_failure, executor, callback - end + # @yield [reason] executed async on `executor` when failed? + # @return self + def on_failure(executor = default_executor, &callback) + add_callback :async_callback_on_failure, executor, callback + end - # @yield [success, value, reason] executed sync when completed - # @return self - def on_completion!(&callback) - add_callback :callback_on_completion, callback - end + # @yield [success, value, reason] executed sync when completed + # @return self + def on_completion!(&callback) + add_callback :callback_on_completion, callback + end - # @yield [value] executed sync when success - # @return self - def on_success!(&callback) - add_callback :callback_on_success, callback - end + # @yield [value] executed sync when success + # @return self + def on_success!(&callback) + add_callback :callback_on_success, callback + end - # @yield [reason] executed sync when failed? - # @return self - def on_failure!(&callback) - add_callback :callback_on_failure, callback - end + # @yield [reason] executed sync when failed? + # @return self + def on_failure!(&callback) + add_callback :callback_on_failure, callback + end - # @return [Array] - def blocks - synchronize { @callbacks }.each_with_object([]) do |callback, promises| - promises.push *callback.select { |v| v.is_a? Promise } - end + # @return [Array] + def blocks + synchronize { @callbacks }.each_with_object([]) do |callback, promises| + promises.push *callback.select { |v| v.is_a? Promise } end + end - def to_s - "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" - end + def to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" + end - def inspect - "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" - end + def inspect + "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" + end - # @api private - def complete(success, value, reason, raise = true) # :nodoc: - callbacks = synchronize do - if completed? - if raise - raise MultipleAssignmentError.new('multiple assignment') - else - return nil - end - end - if success - @value = value - @state = :success + # @api private + def complete(success, value, reason, raise = true) # :nodoc: + callbacks = synchronize do + if completed? + if raise + raise Concurrent::MultipleAssignmentError.new('multiple assignment') else - @reason = reason - @state = :failed + return nil end - notify_all - @callbacks end + if success + @value = value + @state = :success + else + @reason = reason + @state = :failed + end + notify_all + @callbacks + end - callbacks.each { |method, *args| call_callback method, *args } - callbacks.clear + callbacks.each { |method, *args| call_callback method, *args } + callbacks.clear - self - end + self + end - # @api private - # just for inspection - def callbacks - synchronize { @callbacks }.clone.freeze - end + # @api private + # just for inspection + def callbacks + synchronize { @callbacks }.clone.freeze + end - # @api private - def add_callback(method, *args) - synchronize do - if completed? - call_callback method, *args - else - @callbacks << [method, *args] - end + # @api private + def add_callback(method, *args) + synchronize do + if completed? + call_callback method, *args + else + @callbacks << [method, *args] end - self end + self + end - private + # @api private, only for inspection + def promise + synchronize { @promise } + end - def set_promise_on_completion(promise) - promise.complete success?, value, reason - end + private - def join(countdown, promise, *futures) - if success? - promise.success futures.map(&:value) if countdown.decrement.zero? - else - promise.try_fail reason - end - end + def set_promise_on_completion(promise) + promise.complete success?, value, reason + end - def with_promise(promise, &block) - promise.evaluate_to &block + def join(countdown, promise, *futures) + if success? + promise.success futures.map(&:value) if countdown.decrement.zero? + else + promise.try_fail reason end + end - def chain_callback(executor, promise, callback) - with_async(executor) { with_promise(promise) { callback_on_completion callback } } - end + def with_promise(promise, &block) + promise.evaluate_to &block + end - def then_callback(executor, promise, callback) - with_async(executor) { with_promise(promise) { conditioned_callback callback } } - end + def chain_callback(executor, promise, callback) + with_async(executor) { with_promise(promise) { callback_on_completion callback } } + end - def rescue_callback(executor, promise, callback) - with_async(executor) { with_promise(promise) { callback_on_failure callback } } - end + def then_callback(executor, promise, callback) + with_async(executor) { with_promise(promise) { conditioned_callback callback } } + end - def with_async(executor) - Next.executor(executor).post { yield } - end + def rescue_callback(executor, promise, callback) + with_async(executor) { with_promise(promise) { callback_on_failure callback } } + end - def async_callback_on_completion(executor, callback) - with_async(executor) { callback_on_completion callback } - end + def with_async(executor) + ConcurrentNext.executor(executor).post { yield } + end - def async_callback_on_success(executor, callback) - with_async(executor) { callback_on_success callback } - end + def async_callback_on_completion(executor, callback) + with_async(executor) { callback_on_completion callback } + end - def async_callback_on_failure(executor, callback) - with_async(executor) { callback_on_failure callback } - end + def async_callback_on_success(executor, callback) + with_async(executor) { callback_on_success callback } + end - def callback_on_completion(callback) - callback.call success?, value, reason - end + def async_callback_on_failure(executor, callback) + with_async(executor) { callback_on_failure callback } + end - def callback_on_success(callback) - callback.call value if success? - end + def callback_on_completion(callback) + callback.call success?, value, reason + end - def callback_on_failure(callback) - callback.call reason if failed? - end + def callback_on_success(callback) + callback.call value if success? + end - def conditioned_callback(callback) - self.success? ? callback.call(value) : raise(reason) - end + def callback_on_failure(callback) + callback.call reason if failed? + end - def call_callback(method, *args) - self.send method, *args - end + def conditioned_callback(callback) + self.success? ? callback.call(value) : raise(reason) end - class Promise < SynchronizedObject - # @api private - def initialize(executor = :fast) - super() - future = Future.new(self, executor) + def call_callback(method, *args) + self.send method, *args + end + end - synchronize do - @future = future - @blocked_by = [] - @touched = false - end - end + extend Future::Shortcuts + include Future::Shortcuts - def future - synchronize { @future } - end + # @abstract + class Promise < SynchronizedObject + # @api private + def initialize(executor = :fast) + super() + future = Future.new(self, executor) - def blocked_by - synchronize { @blocked_by } + synchronize do + @future = future + @blocked_by = [] + @touched = false end + end - def state - future.state - end + def future + synchronize { @future } + end - def touch - blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) } - end + def blocked_by + synchronize { @blocked_by } + end - def to_s - "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" - end + def state + future.state + end - def inspect - "#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>" - end + def touch + blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) } + end - private + def to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" + end - def add_blocked_by(*futures) - synchronize { @blocked_by += futures } - self - end + def inspect + "#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>" + end - def complete(success, value, reason, raise = true) - future.complete(success, value, reason, raise) - synchronize { @blocked_by.clear } - end + private - # @return [Future] - def evaluate_to(&block) # TODO for parent - complete true, block.call, nil - rescue => error - complete false, nil, error - end + def add_blocked_by(*futures) + synchronize { @blocked_by += futures } + self end - class ExternalPromise < Promise - def initialize(blocked_by_futures, executor = :fast) - super executor - add_blocked_by *blocked_by_futures - end + def complete(success, value, reason, raise = true) + future.complete(success, value, reason, raise) + synchronize { @blocked_by.clear } + end - # Set the `IVar` to a value and wake or notify all threads waiting on it. - # - # @param [Object] value the value to store in the `IVar` - # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed - # @return [Future] - def success(value) - complete(true, value, nil) - end + # @return [Future] + def evaluate_to(&block) + complete true, block.call, nil + rescue => error + complete false, nil, error + end - def try_success(value) - complete(true, value, nil, false) - end + # @return [Future] + def connect_to(future) + add_blocked_by future + future.add_callback :set_promise_on_completion, self + self.future + end + end - # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. - # - # @param [Object] reason for the failure - # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed - # @return [Future] - def fail(reason = StandardError.new) - complete(false, nil, reason) - end + # @note Be careful not to fullfill the promise twice + # @example initialization + # ConcurrentNext.promise + class OuterPromise < Promise + def initialize(blocked_by_futures, executor = :fast) + super executor + add_blocked_by *blocked_by_futures + end - def try_fail(reason = StandardError.new) - !!complete(false, nil, reason, false) - end + # Set the `IVar` to a value and wake or notify all threads waiting on it. + # + # @param [Object] value the value to store in the `IVar` + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed + # @return [Future] + def success(value) + complete(true, value, nil) + end - public :evaluate_to + def try_success(value) + complete(true, value, nil, false) + end - # @return [Future] - def evaluate_to!(&block) - evaluate_to(&block).no_error! - end + # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. + # + # @param [Object] reason for the failure + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already been set or otherwise completed + # @return [Future] + def fail(reason = StandardError.new) + complete(false, nil, reason) end - class ConnectedPromise < Promise - def initialize(future, executor = :fast) - super(executor) - connect_to future - end + def try_fail(reason = StandardError.new) + !!complete(false, nil, reason, false) + end - # @api private - public :complete + public :evaluate_to - private + # @return [Future] + def evaluate_to!(&block) + evaluate_to(&block).no_error! + end - # @return [Future] - def connect_to(future) - add_blocked_by future - future.add_callback :set_promise_on_completion, self - self.future - end + public :connect_to + + # @api private + public :complete + end + + # used internally to support #with_default_executor + class ConnectedPromise < Promise + def initialize(future, executor = :fast) + super(executor) + connect_to future end - class Immediate < Promise - def initialize(executor = :fast, &task) - super(executor) - Next.executor(executor).post { evaluate_to &task } - end + # @api private + public :complete + end + + # will be immediately evaluated to task + class Immediate < Promise + def initialize(executor = :fast, &task) + super(executor) + ConcurrentNext.executor(executor).post { evaluate_to &task } end + end - class Delay < Promise - def initialize(blocked_by_future, executor = :fast, &task) - super(executor) - synchronize do - @task = task - @computing = false - end - add_blocked_by blocked_by_future if blocked_by_future + # will be evaluated to task when first requested + class Delay < Promise + def initialize(blocked_by_future, executor = :fast, &task) + super(executor) + synchronize do + @task = task + @computing = false end + add_blocked_by blocked_by_future if blocked_by_future + end - def touch - if blocked_by.all?(&:completed?) - execute_once - else - blocked_by.each { |f| f.on_success! { self.touch } unless synchronize { @touched } } - super - end + def touch + if blocked_by.all?(&:completed?) + execute_once + else + blocked_by.each { |f| f.on_success! { self.touch } unless synchronize { @touched } } + super end + end - private + private - def execute_once - execute, task = synchronize do - [(@computing = true unless @computing), @task] - end + def execute_once + execute, task = synchronize do + [(@computing = true unless @computing), @task] + end - if execute - Next.executor(future.default_executor).post { evaluate_to &task } - end - self + if execute + ConcurrentNext.executor(future.default_executor).post { evaluate_to &task } end + self end - end + end -include Concurrent::Next -include Concurrent::Next::Shortcuts +logger = Logger.new($stderr) +logger.level = Logger::DEBUG +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end puts '-- asynchronous task without Future' q = Queue.new -post { q << 'a' } +ConcurrentNext.post { q << 'a' } p q.pop puts '-- asynchronous task with Future' -p future = future { 1 + 1 } +p future = ConcurrentNext.future { 1 + 1 } p future.value puts '-- sync and async callbacks on futures' -future = future { 'value' } # executed on FAST_EXECUTOR pool by default +future = ConcurrentNext.future { 'value' } # executed on FAST_EXECUTOR pool by default future.on_completion(:io) { p 'async' } # async callback overridden to execute on IO_EXECUTOR pool future.on_completion! { p 'sync' } # sync callback executed right after completion in the same thread-pool p future.value @@ -643,14 +658,14 @@ def execute_once sleep 0.1 puts '-- future chaining' -future0 = future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR +future0 = ConcurrentNext.future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR future1 = future0.then(:io) { raise 'boo' } # executed on IO_EXECUTOR future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR -future5 = future3.connect(:io) # connects new future with different executor, the new future is completed when future3 is +future5 = future3.with_default_executor(:io) # connects new future with different executor, the new future is completed when future3 is future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 -future7 = Future.join(future0, future3) +future7 = ConcurrentNext.join(future0, future3) p future3, future5 p future3.callbacks, future5.callbacks @@ -674,23 +689,23 @@ def execute_once puts '-- delay' # evaluated on #wait, #value -delay = delay { 1 + 1 } +delay = ConcurrentNext.delay { 1 + 1 } p delay.completed?, delay.value puts '-- promise like tree' # if head of the tree is not constructed with #future but with #delay it does not start execute, # it's triggered later by calling wait or value on any of the dependent futures or the delay itself -three = (head = delay { 1 }).then { |v| v.succ }.then(&:succ) +three = (head = ConcurrentNext.delay { 1 }).then { |v| v.succ }.then(&:succ) four = three.then_delay(&:succ) # meaningful to_s and inspect defined for Future and Promise puts head -# <#Concurrent::Next::Future:0x7fb9dcabacc8 pending> +# <#ConcurrentNext::Future:0x7fb9dcabacc8 pending> p head -# <#Concurrent::Next::Future:0x7fb9dcabacc8 pending blocks:[<#Concurrent::Next::ExternalPromise:0x7fb9dcabaac0 pending>]> +# <#ConcurrentNext::Future:0x7fb9dcabacc8 pending blocks:[<#ConcurrentNext::ExternalPromise:0x7fb9dcabaac0 pending>]> p head.callbacks -# [[:then_callback, :fast, <#Concurrent::Next::ExternalPromise:0x7fb9dcabaac0 pending blocked_by:[<#Concurrent::Next::Future:0x7fb9dcabacc8 pending>]>, +# [[:then_callback, :fast, <#ConcurrentNext::ExternalPromise:0x7fb9dcabaac0 pending blocked_by:[<#ConcurrentNext::Future:0x7fb9dcabacc8 pending>]>, # #]] @@ -701,24 +716,48 @@ def execute_once p four.value # 4 # futures hidden behind two delays trigger evaluation of both -double_delay = delay { 1 }.then_delay(&:succ) +double_delay = ConcurrentNext.delay { 1 }.then_delay(&:succ) p double_delay.value # 2 puts '-- graph' -head = future { 1 } +head = ConcurrentNext.future { 1 } branch1 = head.then(&:succ).then(&:succ) branch2 = head.then(&:succ).then_delay(&:succ) -result = Future.join(branch1, branch2).then { |b1, b2| b1 + b2 } +result = ConcurrentNext.join(branch1, branch2).then { |b1, b2| b1 + b2 } sleep 0.1 p branch1.completed?, branch2.completed? # true, false # force evaluation of whole graph p result.value # 6 +puts '-- connecting existing promises' + +source = ConcurrentNext.delay { 1 } +promise = ConcurrentNext.promise +promise.connect_to source +p promise.future.value # 1 +# or just +p ConcurrentNext.promise.connect_to(source).value + +puts '-- using shortcuts' + +include ConcurrentNext # includes Future::Shortcuts + +# now methods on ConcurrentNext are accessible directly + +p delay { 1 }.value, future { 1 }.value # => 1\n1 + +promise = promise() +promise.connect_to(future { 3 }) +p promise.future.value # 3 + puts '-- bench' require 'benchmark' +count = 5_000_000 +count = 5_000 + module Benchmark def self.bmbmbm(rehearsals, width) job = Job.new(width) @@ -751,9 +790,10 @@ def self.bmbmbm(rehearsals, width) end end -Benchmark.bmbmbm(20, 20) do |b| +Benchmark.bmbmbm(rehersals, 20) do |b| - parents = [RubySynchronizedObject, (JavaSynchronizedObject if defined? JavaSynchronizedObject)].compact + parents = [ConcurrentNext::RubySynchronizedObject, + (ConcurrentNext::JavaSynchronizedObject if defined? ConcurrentNext::JavaSynchronizedObject)].compact classes = parents.map do |parent| klass = Class.new(parent) do def initialize @@ -775,7 +815,6 @@ def add(v) [parent, klass] end - count = 5_000_000 classes.each do |parent, klass| b.report(parent) do @@ -793,18 +832,18 @@ def add(v) # MRI # Rehearsal ---------------------------------------------------------------------------- -# Concurrent::Next::RubySynchronizedObject 8.010000 6.290000 14.300000 ( 12.197402) +# ConcurrentNext::RubySynchronizedObject 8.010000 6.290000 14.300000 ( 12.197402) # ------------------------------------------------------------------ total: 14.300000sec # -# user system total real -# Concurrent::Next::RubySynchronizedObject 8.950000 9.320000 18.270000 ( 15.053220) +# user system total real +# ConcurrentNext::RubySynchronizedObject 8.950000 9.320000 18.270000 ( 15.053220) # # JRuby # Rehearsal ---------------------------------------------------------------------------- -# Concurrent::Next::RubySynchronizedObject 10.500000 6.440000 16.940000 ( 10.640000) -# Concurrent::Next::JavaSynchronizedObject 8.410000 0.050000 8.460000 ( 4.132000) +# ConcurrentNext::RubySynchronizedObject 10.500000 6.440000 16.940000 ( 10.640000) +# ConcurrentNext::JavaSynchronizedObject 8.410000 0.050000 8.460000 ( 4.132000) # ------------------------------------------------------------------ total: 25.400000sec # -# user system total real -# Concurrent::Next::RubySynchronizedObject 9.090000 6.640000 15.730000 ( 10.690000) -# Concurrent::Next::JavaSynchronizedObject 8.200000 0.030000 8.230000 ( 4.141000) +# user system total real +# ConcurrentNext::RubySynchronizedObject 9.090000 6.640000 15.730000 ( 10.690000) +# ConcurrentNext::JavaSynchronizedObject 8.200000 0.030000 8.230000 ( 4.141000) From d155f3fd6725c3a1130d569554e69248bf8c555c Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 8 Nov 2014 16:05:11 +0100 Subject: [PATCH 06/12] Add join method on instance, with alias :+ --- lib/concurrent/next.rb | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index a467dd40a..49ec034c1 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -145,7 +145,7 @@ def promise(executor = :fast) def join(*futures) countdown = Concurrent::AtomicFixnum.new futures.size promise = OuterPromise.new(futures) - futures.each { |future| future.add_callback :join, countdown, promise, *futures } + futures.each { |future| future.add_callback :join_callback, countdown, promise, *futures } promise.future end end @@ -345,6 +345,12 @@ def inspect "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" end + def join(*futures) + ConcurrentNext.join self, *futures + end + + alias_method :+, :join + # @api private def complete(success, value, reason, raise = true) # :nodoc: callbacks = synchronize do @@ -401,7 +407,7 @@ def set_promise_on_completion(promise) promise.complete success?, value, reason end - def join(countdown, promise, *futures) + def join_callback(countdown, promise, *futures) if success? promise.success futures.map(&:value) if countdown.decrement.zero? else @@ -409,20 +415,16 @@ def join(countdown, promise, *futures) end end - def with_promise(promise, &block) - promise.evaluate_to &block - end - def chain_callback(executor, promise, callback) - with_async(executor) { with_promise(promise) { callback_on_completion callback } } + with_async(executor) { promise.evaluate_to { callback_on_completion callback } } end def then_callback(executor, promise, callback) - with_async(executor) { with_promise(promise) { conditioned_callback callback } } + with_async(executor) { promise.evaluate_to { conditioned_callback callback } } end def rescue_callback(executor, promise, callback) - with_async(executor) { with_promise(promise) { callback_on_failure callback } } + with_async(executor) { promise.evaluate_to { callback_on_failure callback } } end def with_async(executor) @@ -725,6 +727,9 @@ def execute_once branch1 = head.then(&:succ).then(&:succ) branch2 = head.then(&:succ).then_delay(&:succ) result = ConcurrentNext.join(branch1, branch2).then { |b1, b2| b1 + b2 } +# other variants +result = branch1.join(branch2).then { |b1, b2| b1 + b2 } +result = (branch1 + branch2).then { |b1, b2| b1 + b2 } sleep 0.1 p branch1.completed?, branch2.completed? # true, false From dad8ba9332556b962e9e4789b05cd64bcc0692d4 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 10 Nov 2014 21:50:04 +0100 Subject: [PATCH 07/12] Move #post method to different Shortcut module --- doc/future-promise.md | 12 ++++++++++++ lib/concurrent/next.rb | 18 +++++++++++------- 2 files changed, 23 insertions(+), 7 deletions(-) create mode 100644 doc/future-promise.md diff --git a/doc/future-promise.md b/doc/future-promise.md new file mode 100644 index 000000000..5be5a6983 --- /dev/null +++ b/doc/future-promise.md @@ -0,0 +1,12 @@ +# Futures and Promises + +New implementation added in version 0.8 differs from previous versions and has little in common. +{Future} represents a value which will become {#completed?} in future, it'll contain {#value} if {#success?} or a {#reason} if {#failed?}. It cannot be directly completed, there are implementations of abstract {Promise} class for that, so {Promise}'s only purpose is to complete a given {Future} object. They are always constructed as a Pair even in chaining methods like {#then}, {#rescue}, {#then_delay}, etc. + +There is few {Promise} implementations: + +- OuterPromise - only Promise used by users, can be completed by outer code. Constructed with {Concurrent::Next.promise} helper method. +- Immediate - internal implementation of Promise used to represent immediate evaluation of a block. Constructed with {Concurrent::Next.future} helper method. +- Delay - internal implementation of Promise used to represent delayed evaluation of a block. Constructed with {Concurrent::Next.delay} helper method. +- ConnectedPromise - used internally to support {Future#with_default_executor} + diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index 49ec034c1..f5ed5c54a 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -2,8 +2,9 @@ # TODO Dereferencable # TODO document new global pool setting: no overflow, user has to buffer when there is too many tasks +# TODO behaviour with Interrupt exceptions is undefined, use Signal.trap to avoid issues -# different name just not to collide +# @note different name just not to collide for now module ConcurrentNext # executors do not allocate the threads immediately so they can be constants @@ -42,9 +43,17 @@ def executor(which) raise TypeError end end + + module Shortcuts + def post(executor = :fast, &job) + ConcurrentNext.executor(executor).post &job + end + end end extend Executors + extend Executors::Shortcuts + include Executors::Shortcuts begin require 'jruby' @@ -91,7 +100,7 @@ def synchronize end def wait(timeout) - @condition.wait @mutex, timeout + synchronize { @condition.wait @mutex, timeout } end def notify @@ -118,10 +127,6 @@ class SynchronizedObject < RubySynchronizedObject class Future < SynchronizedObject module Shortcuts - def post(executor = :fast, &job) - ConcurrentNext.executor(executor).post &job - self - end # @return [Future] def future(executor = :fast, &block) @@ -203,7 +208,6 @@ def value(timeout = nil) def wait(timeout = nil) synchronize do touch - # TODO interruptions ? super timeout if incomplete? self end From 587ee6eafce312cfc5cf0c53130a3e12473b86e4 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 10 Nov 2014 22:06:55 +0100 Subject: [PATCH 08/12] Adding doc for shortcut methods. --- lib/concurrent/next.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index f5ed5c54a..85bc59e87 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -128,6 +128,7 @@ class SynchronizedObject < RubySynchronizedObject class Future < SynchronizedObject module Shortcuts + # Constructs new Future which will be completed after block is evaluated on executor. Evaluation begins immediately. # @return [Future] def future(executor = :fast, &block) ConcurrentNext::Immediate.new(executor, &block).future @@ -135,11 +136,16 @@ def future(executor = :fast, &block) alias_method :async, :future + # Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delays until + # requested by {Future#wait} method, {Future#value} and {Future#value!} methods are calling {Future#wait} internally. # @return [Delay] def delay(executor = :fast, &block) ConcurrentNext::Delay.new(nil, executor, &block).future end + # Constructs {Promise} which helds its {Future} in {Promise#future} method. Intended for completion by user. + # User is responsible not to complete the Promise twice. + # @return [Promise] in this case instance of {OuterPromise} def promise(executor = :fast) ConcurrentNext::OuterPromise.new([], executor) end From 87628e63fe9c19bef3eae9467dd9494e9af33bbf Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 10 Nov 2014 22:08:25 +0100 Subject: [PATCH 09/12] Add Scheduled a Promise implementation replacing ScheduledTask. --- lib/concurrent/next.rb | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index 85bc59e87..670fdcf4a 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -150,6 +150,12 @@ def promise(executor = :fast) ConcurrentNext::OuterPromise.new([], executor) end + # Schedules the block to be executed on executor in given intended_time. + # @return [Future] + def schedule(intended_time, executor = :fast, &task) + Scheduled.new(intended_time, executor, &task).future + end + # fails on first error # does not block a thread # @return [Future] @@ -609,6 +615,24 @@ def initialize(executor = :fast, &task) end end + class Scheduled < Promise + def initialize(intended_time, executor = :fast, &task) + super(executor) + schedule_time = synchronize do + @schedule_time = Concurrent::TimerSet.calculate_schedule_time(intended_time) + end + + # TODO review + Concurrent::timer(schedule_time.to_f - Time.now.to_f) do + ConcurrentNext.executor(executor).post { evaluate_to &task } + end + end + + def schedule_time + synchronize { @schedule_time } + end + end + # will be evaluated to task when first requested class Delay < Promise def initialize(blocked_by_future, executor = :fast, &task) @@ -755,6 +779,12 @@ def execute_once # or just p ConcurrentNext.promise.connect_to(source).value +puts '-- scheduled' + +start = Time.now.to_f +ConcurrentNext.schedule(0.1) { 1 + 1 }.then { |v| p v, Time.now.to_f - start} +sleep 0.2 + puts '-- using shortcuts' include ConcurrentNext # includes Future::Shortcuts From 58e578dc5fba829044bed1d579f122f103f737bb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 12 Nov 2014 19:08:27 +0100 Subject: [PATCH 10/12] Fix rescue behavior to be like Scala's Future#recover --- lib/concurrent/next.rb | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index 670fdcf4a..9d360b632 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -160,6 +160,7 @@ def schedule(intended_time, executor = :fast, &task) # does not block a thread # @return [Future] def join(*futures) + # TODO consider renaming to zip as in scala countdown = Concurrent::AtomicFixnum.new futures.size promise = OuterPromise.new(futures) futures.each { |future| future.add_callback :join_callback, countdown, promise, *futures } @@ -300,13 +301,13 @@ def chain_delay(executor = default_executor, &callback) # lazy version of #then def then_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { conditioned_callback callback } + delay = Delay.new(self, executor) { conditioned_success_callback callback } delay.future end # lazy version of #rescue def rescue_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { callback_on_failure callback } + delay = Delay.new(self, executor) { conditioned_failure_callback callback } delay.future end @@ -436,11 +437,11 @@ def chain_callback(executor, promise, callback) end def then_callback(executor, promise, callback) - with_async(executor) { promise.evaluate_to { conditioned_callback callback } } + with_async(executor) { promise.evaluate_to { conditioned_success_callback callback } } end def rescue_callback(executor, promise, callback) - with_async(executor) { promise.evaluate_to { callback_on_failure callback } } + with_async(executor) { promise.evaluate_to { conditioned_failure_callback callback } } end def with_async(executor) @@ -471,10 +472,14 @@ def callback_on_failure(callback) callback.call reason if failed? end - def conditioned_callback(callback) + def conditioned_success_callback(callback) self.success? ? callback.call(value) : raise(reason) end + def conditioned_failure_callback(callback) + self.failed? ? callback.call(reason) : value + end + def call_callback(method, *args) self.send method, *args end @@ -615,6 +620,7 @@ def initialize(executor = :fast, &task) end end + # will be evaluated to task in intended_time class Scheduled < Promise def initialize(intended_time, executor = :fast, &task) super(executor) @@ -702,11 +708,12 @@ def execute_once future5 = future3.with_default_executor(:io) # connects new future with different executor, the new future is completed when future3 is future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 future7 = ConcurrentNext.join(future0, future3) +future8 = future0.rescue { raise 'never happens' } # future0 succeeds so future8'll have same value as future 0 p future3, future5 p future3.callbacks, future5.callbacks -futures = [future0, future1, future2, future3, future4, future5, future6, future7] +futures = [future0, future1, future2, future3, future4, future5, future6, future7, future8] futures.each &:wait @@ -721,6 +728,7 @@ def execute_once # 5 true boo io # 6 true Boo io # 7 true [3, "boo"] fast +# 8 true 3 fast puts '-- delay' @@ -782,7 +790,7 @@ def execute_once puts '-- scheduled' start = Time.now.to_f -ConcurrentNext.schedule(0.1) { 1 + 1 }.then { |v| p v, Time.now.to_f - start} +ConcurrentNext.schedule(0.1) { 1 + 1 }.then { |v| p v, Time.now.to_f - start } sleep 0.2 puts '-- using shortcuts' From 770dc0262be1f796a5dc8c5b04ff9f3f4411b03c Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 3 Dec 2014 20:53:35 +0100 Subject: [PATCH 11/12] First attempt to break classes down adding Scala's flatMap --- lib/concurrent/next.rb | 521 +++++++++++++++++++---------------- spec/concurrent/next_spec.rb | 165 +++++++++++ 2 files changed, 442 insertions(+), 244 deletions(-) create mode 100644 spec/concurrent/next_spec.rb diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index 9d360b632..99bb58f75 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -90,13 +90,13 @@ def initialize end def synchronize - # if @mutex.owned? - # yield - # else - @mutex.synchronize { yield } - rescue ThreadError - yield - # end + if @mutex.owned? + yield + else + @mutex.synchronize { yield } + # rescue ThreadError + # yield + end end def wait(timeout) @@ -140,7 +140,7 @@ def future(executor = :fast, &block) # requested by {Future#wait} method, {Future#value} and {Future#value!} methods are calling {Future#wait} internally. # @return [Delay] def delay(executor = :fast, &block) - ConcurrentNext::Delay.new(nil, executor, &block).future + ConcurrentNext::Delay.new([], executor, executor, &block).future end # Constructs {Promise} which helds its {Future} in {Promise#future} method. Intended for completion by user. @@ -153,7 +153,7 @@ def promise(executor = :fast) # Schedules the block to be executed on executor in given intended_time. # @return [Future] def schedule(intended_time, executor = :fast, &task) - Scheduled.new(intended_time, executor, &task).future + Scheduled.new(intended_time, [], executor, &task).future end # fails on first error @@ -161,17 +161,15 @@ def schedule(intended_time, executor = :fast, &task) # @return [Future] def join(*futures) # TODO consider renaming to zip as in scala - countdown = Concurrent::AtomicFixnum.new futures.size - promise = OuterPromise.new(futures) - futures.each { |future| future.add_callback :join_callback, countdown, promise, *futures } - promise.future + # TODO what about executor configuration + JoiningPromise.new(futures).future end + + # TODO add any(*futures) end extend Shortcuts - singleton_class.send :alias_method, :dataflow, :join - # @api private def initialize(promise, default_executor = :fast) super() @@ -270,45 +268,36 @@ def exception(*args) end def with_default_executor(executor = default_executor) - ConnectedPromise.new(self, executor).future + JoiningPromise.new([self], executor).future end alias_method :new_connected, :with_default_executor # @yield [success, value, reason] of the parent def chain(executor = default_executor, &callback) - add_callback :chain_callback, executor, promise = OuterPromise.new([self], default_executor), callback - promise.future + ChainPromise.new([self], default_executor, executor, &callback).future end # @yield [value] executed only on parent success def then(executor = default_executor, &callback) - add_callback :then_callback, executor, promise = OuterPromise.new([self], default_executor), callback - promise.future + ThenPromise.new([self], default_executor, executor, &callback).future end # @yield [reason] executed only on parent failure def rescue(executor = default_executor, &callback) - add_callback :rescue_callback, executor, promise = OuterPromise.new([self], default_executor), callback - promise.future + RescuePromise.new([self], default_executor, executor, &callback).future end - # lazy version of #chain - def chain_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { callback_on_completion callback } - delay.future + def delay(executor = default_executor, &task) + Delay.new([self], default_executor, executor, &task).future end - # lazy version of #then - def then_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { conditioned_success_callback callback } - delay.future + def flat + FlattingPromise.new([self], default_executor).future end - # lazy version of #rescue - def rescue_delay(executor = default_executor, &callback) - delay = Delay.new(self, executor) { conditioned_failure_callback callback } - delay.future + def schedule(intended_time) + Scheduled.new(intended_time, [self], default_executor).future end # @yield [success, value, reason] executed async on `executor` when completed @@ -363,7 +352,7 @@ def inspect end def join(*futures) - ConcurrentNext.join self, *futures + JoiningPromise.new([self, *futures], default_executor).future end alias_method :+, :join @@ -389,6 +378,7 @@ def complete(success, value, reason, raise = true) # :nodoc: @callbacks end + # TODO pass in local vars to avoid syncing callbacks.each { |method, *args| call_callback method, *args } callbacks.clear @@ -424,26 +414,6 @@ def set_promise_on_completion(promise) promise.complete success?, value, reason end - def join_callback(countdown, promise, *futures) - if success? - promise.success futures.map(&:value) if countdown.decrement.zero? - else - promise.try_fail reason - end - end - - def chain_callback(executor, promise, callback) - with_async(executor) { promise.evaluate_to { callback_on_completion callback } } - end - - def then_callback(executor, promise, callback) - with_async(executor) { promise.evaluate_to { conditioned_success_callback callback } } - end - - def rescue_callback(executor, promise, callback) - with_async(executor) { promise.evaluate_to { conditioned_failure_callback callback } } - end - def with_async(executor) ConcurrentNext.executor(executor).post { yield } end @@ -472,12 +442,8 @@ def callback_on_failure(callback) callback.call reason if failed? end - def conditioned_success_callback(callback) - self.success? ? callback.call(value) : raise(reason) - end - - def conditioned_failure_callback(callback) - self.failed? ? callback.call(reason) : value + def notify_blocked(promise) + promise.done self end def call_callback(method, *args) @@ -491,15 +457,21 @@ def call_callback(method, *args) # @abstract class Promise < SynchronizedObject # @api private - def initialize(executor = :fast) + def initialize(blocked_by_futures, default_executor = :fast) super() - future = Future.new(self, executor) + future = Future.new(self, default_executor) synchronize do @future = future @blocked_by = [] @touched = false end + + add_blocked_by blocked_by_futures + end + + def default_executor + future.default_executor end def future @@ -515,7 +487,7 @@ def state end def touch - blocked_by.each(&:touch) if synchronize { @touched ? false : (@touched = true) } + propagate_touch if synchronize { @touched ? false : (@touched = true) } end def to_s @@ -528,8 +500,8 @@ def inspect private - def add_blocked_by(*futures) - synchronize { @blocked_by += futures } + def add_blocked_by(futures) # TODO move to BlockedPromise + synchronize { @blocked_by += Array(futures) } self end @@ -539,28 +511,27 @@ def complete(success, value, reason, raise = true) end # @return [Future] - def evaluate_to(&block) - complete true, block.call, nil + def evaluate_to(*args, &block) + complete true, block.call(*args), nil rescue => error complete false, nil, error end # @return [Future] def connect_to(future) - add_blocked_by future future.add_callback :set_promise_on_completion, self self.future end + + def propagate_touch + blocked_by.each(&:touch) + end end # @note Be careful not to fullfill the promise twice # @example initialization # ConcurrentNext.promise class OuterPromise < Promise - def initialize(blocked_by_futures, executor = :fast) - super executor - add_blocked_by *blocked_by_futures - end # Set the `IVar` to a value and wake or notify all threads waiting on it. # @@ -591,225 +562,287 @@ def try_fail(reason = StandardError.new) public :evaluate_to # @return [Future] - def evaluate_to!(&block) - evaluate_to(&block).no_error! + def evaluate_to!(*args, &block) + evaluate_to(*args, &block).no_error! end - public :connect_to + # TODO remove + def connect_to(future) + add_blocked_by future + super future + end # @api private public :complete end - # used internally to support #with_default_executor - class ConnectedPromise < Promise - def initialize(future, executor = :fast) - super(executor) - connect_to future + # @abstract + class InnerPromise < Promise + def initialize(blocked_by_futures, default_executor = :fast, executor = default_executor, &task) + super blocked_by_futures, default_executor + synchronize do + @task = task + @executor = executor + @countdown = Concurrent::AtomicFixnum.new blocked_by_futures.size + end + + inner_initialization + + blocked_by_futures.each { |f| f.add_callback :notify_blocked, self } + resolvable if blocked_by_futures.empty? + end + + def executor + synchronize { @executor } end # @api private - public :complete - end + def done(future) # TODO pass in success/value/reason to avoid locking + # futures could be deleted from blocked_by one by one here, but that would too expensive, + # it's done once when all are done to free the reference + resolvable if synchronize { @countdown }.decrement.zero? + end - # will be immediately evaluated to task - class Immediate < Promise - def initialize(executor = :fast, &task) - super(executor) - ConcurrentNext.executor(executor).post { evaluate_to &task } + private + + def inner_initialization + end + + def resolvable + resolve + end + + def resolve + complete_task(*synchronize { [@executor, @task] }) + end + + def complete_task(executor, task) + if task + ConcurrentNext.executor(executor).post { completion task } + else + completion nil + end + end + + def completion(task) + raise NotImplementedError end end - # will be evaluated to task in intended_time - class Scheduled < Promise - def initialize(intended_time, executor = :fast, &task) - super(executor) - schedule_time = synchronize do - @schedule_time = Concurrent::TimerSet.calculate_schedule_time(intended_time) + # used internally to support #with_default_executor + class JoiningPromise < InnerPromise + private + + def completion(task) + if blocked_by.all?(&:success?) + params = blocked_by.map(&:value) + if task + evaluate_to *params, &task + else + complete(true, params.size == 1 ? params.first : params, nil) + end + else + # TODO what about other reasons? + complete false, nil, blocked_by.find(&:failed?).reason end + end + end - # TODO review - Concurrent::timer(schedule_time.to_f - Time.now.to_f) do - ConcurrentNext.executor(executor).post { evaluate_to &task } + class FlattingPromise < InnerPromise + def initialize(blocked_by_futures, default_executor = :fast) + raise ArgumentError, 'requires one blocked_by_future' unless blocked_by_futures.size == 1 + super(blocked_by_futures, default_executor, default_executor, &nil) + end + + def done(future) + value = future.value + if value.is_a? Future + synchronize { @countdown }.increment + add_blocked_by value # TODO DRY + value.add_callback :notify_blocked, self # TODO DRY end + super future end - def schedule_time - synchronize { @schedule_time } + def completion(task) + future = blocked_by.last + complete future.success?, future.value, future.reason end end - # will be evaluated to task when first requested - class Delay < Promise - def initialize(blocked_by_future, executor = :fast, &task) - super(executor) - synchronize do - @task = task - @computing = false + module RequiredTask + def initialize(*args, &task) + raise ArgumentError, 'no block given' unless block_given? + super(*args, &task) + end + end + + module ZeroOrOneBlockingFuture + def initialize(blocked_by_futures, *args, &task) + raise ArgumentError, 'only zero or one blocking future' unless (0..1).cover?(blocked_by_futures.size) + super(blocked_by_futures, *args, &task) + end + end + + module BlockingFutureOrTask + def initialize(blocked_by_futures, *args, &task) + raise ArgumentError, 'has to have task or blocked by future' if blocked_by_futures.empty? && task.nil? + super(blocked_by_futures, *args, &task) + end + + private + + def completion(task) + future = blocked_by.first + if task + if future + evaluate_to future.success?, future.value, future.reason, &task + else + evaluate_to &task + end + else + if future + complete future.success?, future.value, future.reason + else + raise + end end - add_blocked_by blocked_by_future if blocked_by_future end + end - def touch - if blocked_by.all?(&:completed?) - execute_once + class ThenPromise < InnerPromise + include RequiredTask + include ZeroOrOneBlockingFuture + + private + + def completion(task) + future = blocked_by.first + if future.success? + evaluate_to future.value, &task else - blocked_by.each { |f| f.on_success! { self.touch } unless synchronize { @touched } } - super + complete false, nil, future.reason end end + end + + class RescuePromise < InnerPromise + include RequiredTask + include ZeroOrOneBlockingFuture private - def execute_once - execute, task = synchronize do - [(@computing = true unless @computing), @task] + def completion(task) + future = blocked_by.first + if future.failed? + evaluate_to future.reason, &task + else + complete true, future.value, nil end + end + end + + class ChainPromise < InnerPromise + include RequiredTask + include ZeroOrOneBlockingFuture + + private - if execute - ConcurrentNext.executor(future.default_executor).post { evaluate_to &task } + def completion(task) + future = blocked_by.first + evaluate_to future.success?, future.value, future.reason, &task + end + end + + # will be immediately evaluated to task + class Immediate < InnerPromise + def initialize(default_executor = :fast, executor = default_executor, &task) + super([], default_executor, executor, &task) + end + + private + + def completion(task) + evaluate_to &task + end + end + + # will be evaluated to task in intended_time + class Scheduled < InnerPromise + include RequiredTask + include BlockingFutureOrTask + + def initialize(intended_time, blocked_by_futures, default_executor = :fast, executor = default_executor, &task) + @intended_time = intended_time + super(blocked_by_futures, default_executor, executor, &task) + synchronize { @intended_time = intended_time } + end + + def intended_time + synchronize { @intended_time } + end + + private + + def inner_initialization(*args) + super *args + synchronize { @intended_time = intended_time } + end + + def resolvable + in_seconds = synchronize do + now = Time.now + schedule_time = if @intended_time.is_a? Time + @intended_time + else + now + @intended_time + end + [0, schedule_time.to_f - now.to_f].max end - self + + Concurrent::timer(in_seconds) { resolve } end end -end + class Delay < InnerPromise + include ZeroOrOneBlockingFuture + include BlockingFutureOrTask -logger = Logger.new($stderr) -logger.level = Logger::DEBUG -Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| - logger.add level, message, progname, &block -end + def touch + if synchronize { @touched ? false : (@touched = true) } + propagate_touch + resolve + end + end -puts '-- asynchronous task without Future' -q = Queue.new -ConcurrentNext.post { q << 'a' } -p q.pop - -puts '-- asynchronous task with Future' -p future = ConcurrentNext.future { 1 + 1 } -p future.value - -puts '-- sync and async callbacks on futures' -future = ConcurrentNext.future { 'value' } # executed on FAST_EXECUTOR pool by default -future.on_completion(:io) { p 'async' } # async callback overridden to execute on IO_EXECUTOR pool -future.on_completion! { p 'sync' } # sync callback executed right after completion in the same thread-pool -p future.value -# it should usually print "sync"\n"async"\n"value" - -sleep 0.1 - -puts '-- future chaining' -future0 = ConcurrentNext.future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR -future1 = future0.then(:io) { raise 'boo' } # executed on IO_EXECUTOR -future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR -future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR -future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR -future5 = future3.with_default_executor(:io) # connects new future with different executor, the new future is completed when future3 is -future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 -future7 = ConcurrentNext.join(future0, future3) -future8 = future0.rescue { raise 'never happens' } # future0 succeeds so future8'll have same value as future 0 - -p future3, future5 -p future3.callbacks, future5.callbacks - -futures = [future0, future1, future2, future3, future4, future5, future6, future7, future8] -futures.each &:wait - - -puts 'index success value reason pool' -futures.each_with_index { |f, i| puts '%5i %7s %10s %6s %4s' % [i, f.success?, f.value, f.reason, f.default_executor] } -# index success value reason pool -# 0 true 3 fast -# 1 false boo fast -# 2 false boo fast -# 3 true boo fast -# 4 true true fast -# 5 true boo io -# 6 true Boo io -# 7 true [3, "boo"] fast -# 8 true 3 fast - -puts '-- delay' - -# evaluated on #wait, #value -delay = ConcurrentNext.delay { 1 + 1 } -p delay.completed?, delay.value - -puts '-- promise like tree' - -# if head of the tree is not constructed with #future but with #delay it does not start execute, -# it's triggered later by calling wait or value on any of the dependent futures or the delay itself -three = (head = ConcurrentNext.delay { 1 }).then { |v| v.succ }.then(&:succ) -four = three.then_delay(&:succ) - -# meaningful to_s and inspect defined for Future and Promise -puts head -# <#ConcurrentNext::Future:0x7fb9dcabacc8 pending> -p head -# <#ConcurrentNext::Future:0x7fb9dcabacc8 pending blocks:[<#ConcurrentNext::ExternalPromise:0x7fb9dcabaac0 pending>]> -p head.callbacks -# [[:then_callback, :fast, <#ConcurrentNext::ExternalPromise:0x7fb9dcabaac0 pending blocked_by:[<#ConcurrentNext::Future:0x7fb9dcabacc8 pending>]>, -# #]] - - -# evaluates only up to three, four is left unevaluated -p three.value # 3 -p four, four.promise -# until value is called on four -p four.value # 4 - -# futures hidden behind two delays trigger evaluation of both -double_delay = ConcurrentNext.delay { 1 }.then_delay(&:succ) -p double_delay.value # 2 - -puts '-- graph' - -head = ConcurrentNext.future { 1 } -branch1 = head.then(&:succ).then(&:succ) -branch2 = head.then(&:succ).then_delay(&:succ) -result = ConcurrentNext.join(branch1, branch2).then { |b1, b2| b1 + b2 } -# other variants -result = branch1.join(branch2).then { |b1, b2| b1 + b2 } -result = (branch1 + branch2).then { |b1, b2| b1 + b2 } - -sleep 0.1 -p branch1.completed?, branch2.completed? # true, false -# force evaluation of whole graph -p result.value # 6 - -puts '-- connecting existing promises' - -source = ConcurrentNext.delay { 1 } -promise = ConcurrentNext.promise -promise.connect_to source -p promise.future.value # 1 -# or just -p ConcurrentNext.promise.connect_to(source).value - -puts '-- scheduled' - -start = Time.now.to_f -ConcurrentNext.schedule(0.1) { 1 + 1 }.then { |v| p v, Time.now.to_f - start } -sleep 0.2 - -puts '-- using shortcuts' + private + + def inner_initialization + super + synchronize { @resolvable = false } + end + + def resolvable + synchronize { @resolvable = true } + resolve + end -include ConcurrentNext # includes Future::Shortcuts + def resolve + super if synchronize { @resolvable && @touched } + end -# now methods on ConcurrentNext are accessible directly + end +end -p delay { 1 }.value, future { 1 }.value # => 1\n1 - -promise = promise() -promise.connect_to(future { 3 }) -p promise.future.value # 3 +__END__ puts '-- bench' require 'benchmark' count = 5_000_000 +rehersals = 20 count = 5_000 +rehersals = 1 module Benchmark def self.bmbmbm(rehearsals, width) diff --git a/spec/concurrent/next_spec.rb b/spec/concurrent/next_spec.rb new file mode 100644 index 000000000..3b15182e1 --- /dev/null +++ b/spec/concurrent/next_spec.rb @@ -0,0 +1,165 @@ +require 'concurrent' +require 'concurrent/next' + +logger = Logger.new($stderr) +logger.level = Logger::DEBUG +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end + +describe 'ConcurrentNext' do + + describe '.post' do + it 'executes tasks asynchronously' do + queue = Queue.new + value = 12 + ConcurrentNext.post { queue << value } + ConcurrentNext.post(:io) { queue << value } + expect(queue.pop).to eq value + expect(queue.pop).to eq value + + # TODO test correct executor + end + end + + describe '.future' do + it 'executes' do + future = ConcurrentNext.future { 1 + 1 } + expect(future.value).to eq 2 + end + end + + describe '.delay' do + it 'delays execution' do + delay = ConcurrentNext.delay { 1 + 1 } + expect(delay.completed?).to eq false + expect(delay.value).to eq 2 + end + end + + describe '.schedule' do + it 'scheduled execution' do + start = Time.now.to_f + queue = Queue.new + f = ConcurrentNext.schedule(0.1) { 1 + 1 }.then { |v| queue << v << Time.now.to_f - start } + + expect(f.value).to eq queue + expect(queue.pop).to eq 2 + expect(queue.pop).to be_between(0.1, 0.15) + end + end + + describe 'Future' do + it 'has sync and async callbacks' do + queue = Queue.new + future = ConcurrentNext.future { :value } # executed on FAST_EXECUTOR pool by default + future.on_completion(:io) { queue << :async } # async callback overridden to execute on IO_EXECUTOR pool + future.on_completion! { queue << :sync } # sync callback executed right after completion in the same thread-pool + + expect(future.value).to eq :value + expect(queue.pop).to eq :sync + expect(queue.pop).to eq :async + end + + it 'chains' do + future0 = ConcurrentNext.future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR + future1 = future0.then(:io) { raise 'boo' } # executed on IO_EXECUTOR + future2 = future1.then { |v| v + 1 } # will fail with 'boo' error, executed on default FAST_EXECUTOR + future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR + future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR + future5 = future3.with_default_executor(:io) # connects new future with different executor, the new future is completed when future3 is + future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 + future7 = ConcurrentNext.join(future0, future3) + future8 = future0.rescue { raise 'never happens' } # future0 succeeds so future8'll have same value as future 0 + + futures = [future0, future1, future2, future3, future4, future5, future6, future7, future8] + futures.each &:wait + + table = futures.each_with_index.map do |f, i| + '%5i %7s %10s %6s %4s %6s' % [i, f.success?, f.value, f.reason, f.default_executor, f.promise.executor] + end.unshift('index success value reason pool d.pool') + + expect(table.join("\n")).to eq <<-TABLE.gsub(/^\s+\|/, '').strip + |index success value reason pool d.pool + | 0 true 3 fast fast + | 1 false boo fast io + | 2 false boo fast fast + | 3 true boo fast fast + | 4 true true fast fast + | 5 true boo io io + | 6 true Boo io io + | 7 true [3, "boo"] fast fast + | 8 true 3 fast fast + TABLE + end + + it 'constructs promise like tree' do + # if head of the tree is not constructed with #future but with #delay it does not start execute, + # it's triggered later by calling wait or value on any of the dependent futures or the delay itself + three = (head = ConcurrentNext.delay { 1 }).then { |v| v.succ }.then(&:succ) + four = three.delay.then(&:succ) + + # meaningful to_s and inspect defined for Future and Promise + expect(head.to_s).to match /<#ConcurrentNext::Future:0x[\da-f]{12} pending>/ + expect(head.inspect).to( + match(/<#ConcurrentNext::Future:0x[\da-f]{12} pending blocks:\[<#ConcurrentNext::ThenPromise:0x[\da-f]{12} pending>\]>/)) + + # evaluates only up to three, four is left unevaluated + expect(three.value).to eq 3 + expect(four).not_to be_completed + + expect(four.value).to eq 4 + + # futures hidden behind two delays trigger evaluation of both + double_delay = ConcurrentNext.delay { 1 }.delay.then(&:succ) + expect(double_delay.value).to eq 2 + end + + it 'allows graphs' do + head = ConcurrentNext.future { 1 } + branch1 = head.then(&:succ).then(&:succ) + branch2 = head.then(&:succ).delay.then(&:succ) + results = [ + ConcurrentNext.join(branch1, branch2).then { |b1, b2| b1 + b2 }, + branch1.join(branch2).then { |b1, b2| b1 + b2 }, + (branch1 + branch2).then { |b1, b2| b1 + b2 }] + + sleep 0.1 + expect(branch1).to be_completed + expect(branch2).not_to be_completed + + expect(results.map(&:value)).to eq [6, 6, 6] + end + + it 'has flat map' do + f = ConcurrentNext.future { ConcurrentNext.future { 1 } }.flat.then(&:succ) + expect(f.value).to eq 2 + end + end + +end + +__END__ + +puts '-- connecting existing promises' + +source = ConcurrentNext.delay { 1 } +promise = ConcurrentNext.promise +promise.connect_to source +p promise.future.value # 1 +# or just +p ConcurrentNext.promise.connect_to(source).value + + +puts '-- using shortcuts' + +include ConcurrentNext # includes Future::Shortcuts + +# now methods on ConcurrentNext are accessible directly + +p delay { 1 }.value, future { 1 }.value # => 1\n1 + +promise = promise() +promise.connect_to(future { 3 }) +p promise.future.value # 3 + From 500b96cb3e5dd8fdf99d76339d4eda837e719440 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 5 Dec 2014 10:28:12 +0100 Subject: [PATCH 12/12] Better break down of Promises to classes supports all, any, flattening --- lib/concurrent/next.rb | 718 ++++++++++++++++++++--------------- spec/concurrent/next_spec.rb | 86 ++++- 2 files changed, 481 insertions(+), 323 deletions(-) diff --git a/lib/concurrent/next.rb b/lib/concurrent/next.rb index 99bb58f75..eb11ab012 100644 --- a/lib/concurrent/next.rb +++ b/lib/concurrent/next.rb @@ -1,6 +1,6 @@ require 'concurrent' -# TODO Dereferencable +# TODO support Dereferencable ? # TODO document new global pool setting: no overflow, user has to buffer when there is too many tasks # TODO behaviour with Interrupt exceptions is undefined, use Signal.trap to avoid issues @@ -125,13 +125,194 @@ class SynchronizedObject < RubySynchronizedObject end end - class Future < SynchronizedObject + # FIXME turn callbacks into objects + + class Event < SynchronizedObject + # @api private + def initialize(promise, default_executor = :fast) + super() + synchronize do + @promise = promise + @state = :pending + @callbacks = [] + @default_executor = default_executor + end + end + + # Is obligation completion still pending? + # @return [Boolean] + def pending? + state == :pending + end + + alias_method :incomplete?, :pending? + + def completed? + state == :completed + end + + # wait until Obligation is #complete? + # @param [Numeric] timeout the maximum time in second to wait. + # @return [Obligation] self + def wait(timeout = nil) + synchronize do + touch + super timeout if incomplete? + self + end + end + + def touch + promise.touch + end + + def state + synchronize { @state } + end + + def default_executor + synchronize { @default_executor } + end + + # @yield [success, value, reason] of the parent + def chain(executor = default_executor, &callback) + ChainPromise.new(self, default_executor, executor, &callback).future + end + + def then(*args, &callback) + raise + chain(*args, &callback) + end + + def delay + self.join(Delay.new(default_executor).future) + end + + def schedule(intended_time) + self.chain { Scheduled.new(intended_time).future.join(self) }.flat + end + + # @yield [success, value, reason] executed async on `executor` when completed + # @return self + def on_completion(executor = default_executor, &callback) + add_callback :async_callback_on_completion, executor, callback + end + + # @yield [success, value, reason] executed sync when completed + # @return self + def on_completion!(&callback) + add_callback :callback_on_completion, callback + end + + # @return [Array] + def blocks + synchronize { @callbacks }.each_with_object([]) do |callback, promises| + promises.push *callback.select { |v| v.is_a? Promise } + end + end + + def to_s + "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" + end + + def inspect + "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" + end + + def join(*futures) + AllPromise.new([self, *futures], default_executor).future + end + + alias_method :+, :join + alias_method :and, :join + + # @api private + def complete(raise = true) + callbacks = synchronize do + check_multiple_assignment raise + complete_state + notify_all + @callbacks + end + + call_callbacks callbacks + + self + end + + # @api private + # just for inspection + def callbacks + synchronize { @callbacks }.clone.freeze + end + + # @api private + def add_callback(method, *args) + synchronize do + if completed? + call_callback method, *args + else + @callbacks << [method, *args] + end + end + self + end + + # @api private, only for inspection + def promise + synchronize { @promise } + end + + private + + def complete_state + @state = :completed + end + + def check_multiple_assignment(raise) + if completed? + if raise + raise Concurrent::MultipleAssignmentError.new('multiple assignment') + else + return nil + end + end + end + + def with_async(executor) + ConcurrentNext.executor(executor).post { yield } + end + + def async_callback_on_completion(executor, callback) + with_async(executor) { callback_on_completion callback } + end + + def callback_on_completion(callback) + callback.call + end + + def notify_blocked(promise) + promise.done self + end + + def call_callback(method, *args) + self.send method, *args + end + + def call_callbacks(callbacks) + # FIXME pass in local vars to avoid syncing + callbacks.each { |method, *args| call_callback method, *args } + synchronize { callbacks.clear } + end + end + + class Future < Event module Shortcuts # Constructs new Future which will be completed after block is evaluated on executor. Evaluation begins immediately. # @return [Future] - def future(executor = :fast, &block) - ConcurrentNext::Immediate.new(executor, &block).future + def future(default_executor = :fast, &task) + ConcurrentNext::Immediate.new(default_executor).future.chain(&task) end alias_method :async, :future @@ -139,47 +320,48 @@ def future(executor = :fast, &block) # Constructs new Future which will be completed after block is evaluated on executor. Evaluation is delays until # requested by {Future#wait} method, {Future#value} and {Future#value!} methods are calling {Future#wait} internally. # @return [Delay] - def delay(executor = :fast, &block) - ConcurrentNext::Delay.new([], executor, executor, &block).future + def delay(default_executor = :fast, &task) + ConcurrentNext::Delay.new(default_executor).future.chain(&task) end # Constructs {Promise} which helds its {Future} in {Promise#future} method. Intended for completion by user. # User is responsible not to complete the Promise twice. # @return [Promise] in this case instance of {OuterPromise} - def promise(executor = :fast) - ConcurrentNext::OuterPromise.new([], executor) + def promise(default_executor = :fast) + ConcurrentNext::OuterPromise.new(default_executor) end # Schedules the block to be executed on executor in given intended_time. # @return [Future] - def schedule(intended_time, executor = :fast, &task) - Scheduled.new(intended_time, [], executor, &task).future + def schedule(intended_time, default_executor = :fast, &task) + Scheduled.new(intended_time, default_executor).future.chain(&task) end # fails on first error # does not block a thread # @return [Future] def join(*futures) - # TODO consider renaming to zip as in scala - # TODO what about executor configuration - JoiningPromise.new(futures).future + AllPromise.new(futures).future end - # TODO add any(*futures) + # TODO pick names for join, any on class/instance + # consider renaming to zip as in scala + alias_method :all, :join + alias_method :zip, :join + + def any(*futures) + AnyPromise.new(futures).future + end end extend Shortcuts # @api private def initialize(promise, default_executor = :fast) - super() + super(promise, default_executor) synchronize do - @promise = promise - @value = nil - @reason = nil - @state = :pending - @callbacks = [] - @default_executor = default_executor + @value = nil + @reason = nil end end @@ -213,6 +395,11 @@ def value(timeout = nil) synchronize { @value } end + def reason(timeout = nil) + wait timeout + synchronize { @reason } + end + # wait until Obligation is #complete? # @param [Numeric] timeout the maximum time in second to wait. # @return [Obligation] self @@ -247,18 +434,6 @@ def value!(timeout = nil) end end - def state - synchronize { @state } - end - - def reason - synchronize { @reason } - end - - def default_executor - synchronize { @default_executor } - end - # @example allows Obligation to be risen # failed_ivar = Ivar.new.fail # raise failed_ivar @@ -268,43 +443,33 @@ def exception(*args) end def with_default_executor(executor = default_executor) - JoiningPromise.new([self], executor).future + AllPromise.new([self], executor).future end - alias_method :new_connected, :with_default_executor - # @yield [success, value, reason] of the parent def chain(executor = default_executor, &callback) - ChainPromise.new([self], default_executor, executor, &callback).future + ChainPromise.new(self, default_executor, executor, &callback).future end # @yield [value] executed only on parent success def then(executor = default_executor, &callback) - ThenPromise.new([self], default_executor, executor, &callback).future + ThenPromise.new(self, default_executor, executor, &callback).future end # @yield [reason] executed only on parent failure def rescue(executor = default_executor, &callback) - RescuePromise.new([self], default_executor, executor, &callback).future - end - - def delay(executor = default_executor, &task) - Delay.new([self], default_executor, executor, &task).future + RescuePromise.new(self, default_executor, executor, &callback).future end def flat - FlattingPromise.new([self], default_executor).future + FlattingPromise.new(self, default_executor).future end - def schedule(intended_time) - Scheduled.new(intended_time, [self], default_executor).future + def or(*futures) + AnyPromise.new([self, *futures], default_executor).future end - # @yield [success, value, reason] executed async on `executor` when completed - # @return self - def on_completion(executor = default_executor, &callback) - add_callback :async_callback_on_completion, executor, callback - end + alias_method :|, :or # @yield [value] executed async on `executor` when success # @return self @@ -318,12 +483,6 @@ def on_failure(executor = default_executor, &callback) add_callback :async_callback_on_failure, executor, callback end - # @yield [success, value, reason] executed sync when completed - # @return self - def on_completion!(&callback) - add_callback :callback_on_completion, callback - end - # @yield [value] executed sync when success # @return self def on_success!(&callback) @@ -336,51 +495,16 @@ def on_failure!(&callback) add_callback :callback_on_failure, callback end - # @return [Array] - def blocks - synchronize { @callbacks }.each_with_object([]) do |callback, promises| - promises.push *callback.select { |v| v.is_a? Promise } - end - end - - def to_s - "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state}>" - end - - def inspect - "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" - end - - def join(*futures) - JoiningPromise.new([self, *futures], default_executor).future - end - - alias_method :+, :join - # @api private - def complete(success, value, reason, raise = true) # :nodoc: + def complete(success, value, reason, raise = true) callbacks = synchronize do - if completed? - if raise - raise Concurrent::MultipleAssignmentError.new('multiple assignment') - else - return nil - end - end - if success - @value = value - @state = :success - else - @reason = reason - @state = :failed - end + check_multiple_assignment raise + complete_state success, value, reason notify_all @callbacks end - # TODO pass in local vars to avoid syncing - callbacks.each { |method, *args| call_callback method, *args } - callbacks.clear + call_callbacks callbacks self end @@ -410,16 +534,14 @@ def promise private - def set_promise_on_completion(promise) - promise.complete success?, value, reason - end - - def with_async(executor) - ConcurrentNext.executor(executor).post { yield } - end - - def async_callback_on_completion(executor, callback) - with_async(executor) { callback_on_completion callback } + def complete_state(success, value, reason) + if success + @value = value + @state = :success + else + @reason = reason + @state = :failed + end end def async_callback_on_success(executor, callback) @@ -430,10 +552,6 @@ def async_callback_on_failure(executor, callback) with_async(executor) { callback_on_failure callback } end - def callback_on_completion(callback) - callback.call success?, value, reason - end - def callback_on_success(callback) callback.call value if success? end @@ -442,32 +560,25 @@ def callback_on_failure(callback) callback.call reason if failed? end - def notify_blocked(promise) - promise.done self - end - - def call_callback(method, *args) - self.send method, *args + def callback_on_completion(callback) + callback.call success?, value, reason end end extend Future::Shortcuts include Future::Shortcuts + # TODO modularize blocked_by and notify blocked + # @abstract class Promise < SynchronizedObject # @api private - def initialize(blocked_by_futures, default_executor = :fast) + def initialize(future) super() - future = Future.new(self, default_executor) - synchronize do - @future = future - @blocked_by = [] - @touched = false + @future = future + @touched = false end - - add_blocked_by blocked_by_futures end def default_executor @@ -478,16 +589,11 @@ def future synchronize { @future } end - def blocked_by - synchronize { @blocked_by } - end - def state future.state end def touch - propagate_touch if synchronize { @touched ? false : (@touched = true) } end def to_s @@ -495,19 +601,13 @@ def to_s end def inspect - "#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>" + to_s end private - def add_blocked_by(futures) # TODO move to BlockedPromise - synchronize { @blocked_by += Array(futures) } - self - end - - def complete(success, value, reason, raise = true) - future.complete(success, value, reason, raise) - synchronize { @blocked_by.clear } + def complete(*args) + future.complete(*args) end # @return [Future] @@ -516,22 +616,16 @@ def evaluate_to(*args, &block) rescue => error complete false, nil, error end - - # @return [Future] - def connect_to(future) - future.add_callback :set_promise_on_completion, self - self.future - end - - def propagate_touch - blocked_by.each(&:touch) - end end # @note Be careful not to fullfill the promise twice # @example initialization # ConcurrentNext.promise + # @note TODO consider to allow being blocked_by class OuterPromise < Promise + def initialize(default_executor = :fast) + super Future.new(self, default_executor) + end # Set the `IVar` to a value and wake or notify all threads waiting on it. # @@ -566,272 +660,266 @@ def evaluate_to!(*args, &block) evaluate_to(*args, &block).no_error! end - # TODO remove - def connect_to(future) - add_blocked_by future - super future - end - # @api private public :complete end # @abstract class InnerPromise < Promise - def initialize(blocked_by_futures, default_executor = :fast, executor = default_executor, &task) - super blocked_by_futures, default_executor - synchronize do - @task = task - @executor = executor - @countdown = Concurrent::AtomicFixnum.new blocked_by_futures.size - end - - inner_initialization + end - blocked_by_futures.each { |f| f.add_callback :notify_blocked, self } - resolvable if blocked_by_futures.empty? + # @abstract + class BlockedPromise < InnerPromise + def self.new(*args) + promise = super(*args) + promise.blocked_by.each { |f| f.add_callback :notify_blocked, promise } + promise end - def executor - synchronize { @executor } + def initialize(future, blocked_by_futures) + super future + synchronize do + @blocked_by = Array(blocked_by_futures) + @countdown = Concurrent::AtomicFixnum.new @blocked_by.size + @touched = false + end end # @api private - def done(future) # TODO pass in success/value/reason to avoid locking + def done(future) # FIXME pass in success/value/reason to avoid locking # futures could be deleted from blocked_by one by one here, but that would too expensive, # it's done once when all are done to free the reference - resolvable if synchronize { @countdown }.decrement.zero? - end - - private - - def inner_initialization + completable if synchronize { @countdown }.decrement.zero? end - def resolvable - resolve - end - - def resolve - complete_task(*synchronize { [@executor, @task] }) + def touch + propagate_touch if synchronize { @touched ? false : (@touched = true) } end - def complete_task(executor, task) - if task - ConcurrentNext.executor(executor).post { completion task } - else - completion nil - end + # @api private + # for inspection only + def blocked_by + synchronize { @blocked_by } end - def completion(task) - raise NotImplementedError + def inspect + "#{to_s[0..-2]} blocked_by:[#{synchronize { @blocked_by }.map(&:to_s).join(', ')}]>" end - end - # used internally to support #with_default_executor - class JoiningPromise < InnerPromise private - def completion(task) - if blocked_by.all?(&:success?) - params = blocked_by.map(&:value) - if task - evaluate_to *params, &task - else - complete(true, params.size == 1 ? params.first : params, nil) - end - else - # TODO what about other reasons? - complete false, nil, blocked_by.find(&:failed?).reason - end - end - end - - class FlattingPromise < InnerPromise - def initialize(blocked_by_futures, default_executor = :fast) - raise ArgumentError, 'requires one blocked_by_future' unless blocked_by_futures.size == 1 - super(blocked_by_futures, default_executor, default_executor, &nil) + def completable + raise NotImplementedError end - def done(future) - value = future.value - if value.is_a? Future - synchronize { @countdown }.increment - add_blocked_by value # TODO DRY - value.add_callback :notify_blocked, self # TODO DRY - end - super future + def propagate_touch + blocked_by.each(&:touch) end - def completion(task) - future = blocked_by.last - complete future.success?, future.value, future.reason + def complete(*args) + super *args + synchronize { @blocked_by.clear } end end - module RequiredTask - def initialize(*args, &task) + # @abstract + class BlockedTaskPromise < BlockedPromise + def initialize(blocked_by_future, default_executor = :fast, executor = default_executor, &task) raise ArgumentError, 'no block given' unless block_given? - super(*args, &task) + super Future.new(self, default_executor), [blocked_by_future] + synchronize do + @task = task + @executor = executor + end end - end - module ZeroOrOneBlockingFuture - def initialize(blocked_by_futures, *args, &task) - raise ArgumentError, 'only zero or one blocking future' unless (0..1).cover?(blocked_by_futures.size) - super(blocked_by_futures, *args, &task) + def executor + synchronize { @executor } end + end - module BlockingFutureOrTask - def initialize(blocked_by_futures, *args, &task) - raise ArgumentError, 'has to have task or blocked by future' if blocked_by_futures.empty? && task.nil? - super(blocked_by_futures, *args, &task) + class ThenPromise < BlockedTaskPromise + def initialize(blocked_by_future, default_executor = :fast, executor = default_executor, &task) + blocked_by_future.is_a? Future or + raise ArgumentError, 'only Future can be appended with then' + super(blocked_by_future, default_executor, executor, &task) end private - def completion(task) + def completable future = blocked_by.first - if task - if future - evaluate_to future.success?, future.value, future.reason, &task - else - evaluate_to &task - end + if future.success? + ConcurrentNext.post(executor) { evaluate_to future.value, &synchronize { @task } } else - if future - complete future.success?, future.value, future.reason - else - raise - end + complete false, nil, future.reason end end end - class ThenPromise < InnerPromise - include RequiredTask - include ZeroOrOneBlockingFuture + class RescuePromise < BlockedTaskPromise + def initialize(blocked_by_future, default_executor = :fast, executor = default_executor, &task) + blocked_by_future.is_a? Future or + raise ArgumentError, 'only Future can be rescued' + super(blocked_by_future, default_executor, executor, &task) + end private - def completion(task) + def completable future = blocked_by.first - if future.success? - evaluate_to future.value, &task + if future.failed? + ConcurrentNext.post(executor) { evaluate_to future.reason, &synchronize { @task } } else - complete false, nil, future.reason + complete true, future.value, nil end end end - class RescuePromise < InnerPromise - include RequiredTask - include ZeroOrOneBlockingFuture - + class ChainPromise < BlockedTaskPromise private - def completion(task) + def completable future = blocked_by.first - if future.failed? - evaluate_to future.reason, &task + if Future === future + ConcurrentNext.post(executor) do + evaluate_to future.success?, future.value, future.reason, &synchronize { @task } + end else - complete true, future.value, nil + ConcurrentNext.post(executor) { evaluate_to &synchronize { @task } } end end end - class ChainPromise < InnerPromise - include RequiredTask - include ZeroOrOneBlockingFuture + # will be immediately completed + class Immediate < InnerPromise + def self.new(*args) + promise = super(*args) + ConcurrentNext.post { promise.future.complete } + promise + end - private - def completion(task) - future = blocked_by.first - evaluate_to future.success?, future.value, future.reason, &task + def initialize(default_executor = :fast) + super Event.new(self, default_executor) end end - # will be immediately evaluated to task - class Immediate < InnerPromise - def initialize(default_executor = :fast, executor = default_executor, &task) - super([], default_executor, executor, &task) + # @note TODO add support for levels + class FlattingPromise < BlockedPromise + def initialize(blocked_by_future, default_executor = :fast) + blocked_by_future.is_a? Future or + raise ArgumentError, 'only Future can be flatten' + super(Future.new(self, default_executor), [blocked_by_future]) + end + + def done(future) + value = future.value + case value + when Future + synchronize do + @countdown.increment + @blocked_by << value + end + value.add_callback :notify_blocked, self + when Event + raise TypeError, 'cannot flatten to Event' + else + # nothing we are done flattening + end + super future end private - def completion(task) - evaluate_to &task + def completable + future = blocked_by.last + complete future.success?, future.value, future.reason end end - # will be evaluated to task in intended_time - class Scheduled < InnerPromise - include RequiredTask - include BlockingFutureOrTask - - def initialize(intended_time, blocked_by_futures, default_executor = :fast, executor = default_executor, &task) - @intended_time = intended_time - super(blocked_by_futures, default_executor, executor, &task) - synchronize { @intended_time = intended_time } + # used internally to support #with_default_executor + class AllPromise < BlockedPromise + def initialize(blocked_by_futures, default_executor = :fast) + klass = blocked_by_futures.any? { |f| f.is_a?(Future) } ? Future : Event + super(klass.new(self, default_executor), blocked_by_futures) end - def intended_time - synchronize { @intended_time } + private + + def completable + results = blocked_by.select { |f| f.is_a?(Future) } + if results.empty? + complete + else + if results.all?(&:success?) + params = results.map(&:value) + complete(true, params.size == 1 ? params.first : params, nil) + else + # TODO what about other reasons? + complete false, nil, results.find(&:failed?).reason + end + end end + end - private + class AnyPromise < BlockedPromise + def initialize(blocked_by_futures, default_executor = :fast) + blocked_by_futures.all? { |f| f.is_a? Future } or + raise ArgumentError, 'accepts only Futures not Events' + super(Future.new(self, default_executor), blocked_by_futures) + end - def inner_initialization(*args) - super *args - synchronize { @intended_time = intended_time } + def done(future) + completable(future) end - def resolvable - in_seconds = synchronize do - now = Time.now - schedule_time = if @intended_time.is_a? Time - @intended_time - else - now + @intended_time - end - [0, schedule_time.to_f - now.to_f].max - end + private - Concurrent::timer(in_seconds) { resolve } + def completable(future) + complete future.success?, future.value, future.reason, false end end class Delay < InnerPromise - include ZeroOrOneBlockingFuture - include BlockingFutureOrTask + def initialize(default_executor = :fast) + super Event.new(self, default_executor) + synchronize { @touched = false } + end def touch - if synchronize { @touched ? false : (@touched = true) } - propagate_touch - resolve - end + complete if synchronize { @touched ? false : (@touched = true) } end + end - private + # will be evaluated to task in intended_time + class Scheduled < InnerPromise + def initialize(intended_time, default_executor = :fast) + super Event.new(self, default_executor) + in_seconds = synchronize do + @intended_time = intended_time + now = Time.now + schedule_time = if intended_time.is_a? Time + intended_time + else + now + intended_time + end + [0, schedule_time.to_f - now.to_f].max + end - def inner_initialization - super - synchronize { @resolvable = false } + Concurrent::timer(in_seconds) { complete } end - def resolvable - synchronize { @resolvable = true } - resolve + def intended_time + synchronize { @intended_time } end - def resolve - super if synchronize { @resolvable && @touched } + def inspect + "#{to_s[0..-2]} intended_time:[#{synchronize { @intended_time }}>" end - end + end __END__ diff --git a/spec/concurrent/next_spec.rb b/spec/concurrent/next_spec.rb index 3b15182e1..faac5899c 100644 --- a/spec/concurrent/next_spec.rb +++ b/spec/concurrent/next_spec.rb @@ -39,14 +39,47 @@ describe '.schedule' do it 'scheduled execution' do - start = Time.now.to_f - queue = Queue.new - f = ConcurrentNext.schedule(0.1) { 1 + 1 }.then { |v| queue << v << Time.now.to_f - start } + start = Time.now.to_f + queue = Queue.new + future = ConcurrentNext.schedule(0.1) { 1 + 1 }.then { |v| queue << v << Time.now.to_f - start } - expect(f.value).to eq queue + expect(future.value).to eq queue expect(queue.pop).to eq 2 expect(queue.pop).to be_between(0.1, 0.15) end + + it 'scheduled execution in graph' do + start = Time.now.to_f + queue = Queue.new + future = ConcurrentNext. + future { sleep 0.1; 1 }. + schedule(0.1). + then { |v| v + 1 }. + then { |v| queue << v << Time.now.to_f - start } + + expect(future.value).to eq queue + expect(queue.pop).to eq 2 + expect(queue.pop).to be_between(0.2, 0.25) + end + end + + describe '.any' do + it 'continues on first result' do + queue = Queue.new + f1 = ConcurrentNext.future(:io) { queue.pop } + f2 = ConcurrentNext.future(:io) { queue.pop } + + queue << 1 << 2 + + anys = [ConcurrentNext.any(f1, f2), + f1 | f2, + f1.or(f2)] + + anys.each do |any| + expect(any.value.to_s).to match /1|2/ + end + + end end describe 'Future' do @@ -76,19 +109,21 @@ futures.each &:wait table = futures.each_with_index.map do |f, i| - '%5i %7s %10s %6s %4s %6s' % [i, f.success?, f.value, f.reason, f.default_executor, f.promise.executor] + '%5i %7s %10s %6s %4s %6s' % [i, f.success?, f.value, f.reason, + (f.promise.executor if f.promise.respond_to?(:executor)), + f.default_executor] end.unshift('index success value reason pool d.pool') expect(table.join("\n")).to eq <<-TABLE.gsub(/^\s+\|/, '').strip |index success value reason pool d.pool | 0 true 3 fast fast - | 1 false boo fast io + | 1 false boo io fast | 2 false boo fast fast | 3 true boo fast fast | 4 true true fast fast - | 5 true boo io io + | 5 true boo io | 6 true Boo io io - | 7 true [3, "boo"] fast fast + | 7 true [3, "boo"] fast | 8 true 3 fast fast TABLE end @@ -137,6 +172,41 @@ end end + it 'interoperability' do + actor = Concurrent::Actor::Utils::AdHoc.spawn :doubler do + -> v { v * 2 } + end + + # convert ivar to future + Concurrent::IVar.class_eval do + def to_future + ConcurrentNext.promise.tap do |p| + with_observer { p.complete fulfilled?, value, reason } + end.future + end + end + + expect(ConcurrentNext. + future { 2 }. + then { |v| actor.ask(v).to_future }. + flat. + then { |v| v + 2 }. + value).to eq 6 + + # possible simplification with helper + ConcurrentNext::Future.class_eval do + def then_ask(actor) + self.then { |v| actor.ask(v).to_future }.flat + end + end + + expect(ConcurrentNext. + future { 2 }. + then_ask(actor). + then { |v| v + 2 }. + value).to eq 6 + end + end __END__