diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 4b0628e1f..8b3c67e2e 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -24,18 +24,7 @@ class Agent # # @param [Object] initial the initial value # - # @!macro [attach] executor_and_deref_options - # - # @param [Hash] opts the options used to define the behavior at update and deref - # and to specify the executor on which to perform actions - # @option opts [Executor] :executor when set use the given `Executor` instance. - # Three special values are also supported: `:task` returns the global task pool, - # `:operation` returns the global operation pool, and `:immediate` returns a new - # `ImmediateExecutor` object. - # @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data - # @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data - # @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing - # the internal value and returning the value returned from the proc + # @!macro executor_and_deref_options def initialize(initial, opts = {}) @value = initial @rescuers = [] diff --git a/lib/concurrent/async.rb b/lib/concurrent/async.rb index bbf269d82..762518140 100644 --- a/lib/concurrent/async.rb +++ b/lib/concurrent/async.rb @@ -82,14 +82,11 @@ def method_missing(method, *args, &block) self.define_singleton_method(method) do |*args2| Async::validate_argc(@delegate, method, *args2) ivar = Concurrent::IVar.new - value, reason = nil, nil @serializer.post(@executor.value) do begin - value = @delegate.send(method, *args2, &block) + ivar.set(@delegate.send(method, *args2, &block)) rescue => reason - # caught - ensure - ivar.complete(reason.nil?, value, reason) + ivar.fail(reason) end end ivar.value if @blocking diff --git a/lib/concurrent/atomic/copy_on_notify_observer_set.rb b/lib/concurrent/atomic/copy_on_notify_observer_set.rb index 21534da86..2c354a9b9 100644 --- a/lib/concurrent/atomic/copy_on_notify_observer_set.rb +++ b/lib/concurrent/atomic/copy_on_notify_observer_set.rb @@ -33,11 +33,10 @@ def add_observer(observer=nil, func=:update, &block) begin @mutex.lock @observers[observer] = func + observer ensure @mutex.unlock end - - observer end # @param [Object] observer the observer to remove @@ -45,9 +44,9 @@ def add_observer(observer=nil, func=:update, &block) def delete_observer(observer) @mutex.lock @observers.delete(observer) - @mutex.unlock - observer + ensure + @mutex.unlock end # Deletes all observers @@ -55,18 +54,17 @@ def delete_observer(observer) def delete_observers @mutex.lock @observers.clear - @mutex.unlock - self + ensure + @mutex.unlock end # @return [Integer] the observers count def count_observers @mutex.lock - result = @observers.count + @observers.count + ensure @mutex.unlock - - result end # Notifies all registered observers with optional args @@ -75,7 +73,6 @@ def count_observers def notify_observers(*args, &block) observers = duplicate_observers notify_to(observers, *args, &block) - self end @@ -86,7 +83,6 @@ def notify_observers(*args, &block) def notify_and_delete_observers(*args, &block) observers = duplicate_and_clear_observers notify_to(observers, *args, &block) - self end @@ -96,17 +92,17 @@ def duplicate_and_clear_observers @mutex.lock observers = @observers.dup @observers.clear - @mutex.unlock - observers + ensure + @mutex.unlock end def duplicate_observers @mutex.lock observers = @observers.dup - @mutex.unlock - observers + ensure + @mutex.unlock end def notify_to(observers, *args) diff --git a/lib/concurrent/channel/buffered_channel.rb b/lib/concurrent/channel/buffered_channel.rb index 8badab99c..ec4e80f3b 100644 --- a/lib/concurrent/channel/buffered_channel.rb +++ b/lib/concurrent/channel/buffered_channel.rb @@ -40,7 +40,7 @@ def select(probe) @probe_set.put(probe) true else - shift_buffer if probe.set_unless_assigned(peek_buffer, self) + shift_buffer if probe.try_set([peek_buffer, self]) end end @@ -76,7 +76,7 @@ def set_probe_or_push_into_buffer(value) push_into_buffer(value) true else - @probe_set.take.set_unless_assigned(value, self) + @probe_set.take.try_set([value, self]) end end end diff --git a/lib/concurrent/channel/channel.rb b/lib/concurrent/channel/channel.rb index f0357b9f1..0ce22befb 100644 --- a/lib/concurrent/channel/channel.rb +++ b/lib/concurrent/channel/channel.rb @@ -3,37 +3,12 @@ module Concurrent module Channel - class Probe < Concurrent::IVar - - def initialize(value = NO_VALUE, opts = {}) - super(value, opts) - end - - def set_unless_assigned(value, channel) - mutex.synchronize do - return false if [:fulfilled, :rejected].include? @state - - set_state(true, [value, channel], nil) - event.set - true - end - end - - alias_method :composite_value, :value - - def value - composite_value.nil? ? nil : composite_value[0] - end - - def channel - composite_value.nil? ? nil : composite_value[1] - end - end + Probe = IVar def self.select(*channels) probe = Probe.new channels.each { |channel| channel.select(probe) } - result = probe.composite_value + result = probe.value channels.each { |channel| channel.remove_probe(probe) } result end diff --git a/lib/concurrent/channel/unbuffered_channel.rb b/lib/concurrent/channel/unbuffered_channel.rb index da62418b6..c516fc8aa 100644 --- a/lib/concurrent/channel/unbuffered_channel.rb +++ b/lib/concurrent/channel/unbuffered_channel.rb @@ -12,8 +12,7 @@ def probe_set_size end def push(value) - # TODO set_unless_assigned define on IVar as #set_state? or #try_set_state - until @probe_set.take.set_unless_assigned(value, self) + until @probe_set.take.try_set([value, self]) end end diff --git a/lib/concurrent/delay.rb b/lib/concurrent/delay.rb index 8b3bebfe1..034c6ceed 100644 --- a/lib/concurrent/delay.rb +++ b/lib/concurrent/delay.rb @@ -3,6 +3,7 @@ require 'concurrent/obligation' require 'concurrent/executor/executor_options' require 'concurrent/executor/immediate_executor' +require 'concurrent/synchronization' module Concurrent @@ -37,7 +38,7 @@ module Concurrent # execute on the given executor, allowing the call to timeout. # # @see Concurrent::Dereferenceable - class Delay + class Delay < Synchronization::Object include Obligation include ExecutorOptions @@ -51,15 +52,27 @@ class Delay # Create a new `Delay` in the `:pending` state. # - # @yield the delayed operation to perform + # @!macro [attach] executor_and_deref_options + # + # @param [Hash] opts the options used to define the behavior at update and deref + # and to specify the executor on which to perform actions + # @option opts [Executor] :executor when set use the given `Executor` instance. + # Three special values are also supported: `:task` returns the global task pool, + # `:operation` returns the global operation pool, and `:immediate` returns a new + # `ImmediateExecutor` object. + # @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data + # @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data + # @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing + # the internal value and returning the value returned from the proc # - # @!macro executor_and_deref_options + # @yield the delayed operation to perform # # @raise [ArgumentError] if no block is given def initialize(opts = {}, &block) raise ArgumentError.new('no block given') unless block_given? - init_obligation + super() + init_obligation(self) set_deref_options(opts) @task_executor = get_executor_from(opts) @@ -145,16 +158,15 @@ def wait(timeout = nil) # @yield the delayed operation to perform # @return [true, false] if success def reconfigure(&block) - mutex.lock - raise ArgumentError.new('no block given') unless block_given? - unless @computing - @task = block - true - else - false + mutex.synchronize do + raise ArgumentError.new('no block given') unless block_given? + unless @computing + @task = block + true + else + false + end end - ensure - mutex.unlock end private @@ -163,10 +175,11 @@ def reconfigure(&block) def execute_task_once # :nodoc: # this function has been optimized for performance and # should not be modified without running new benchmarks - mutex.lock - execute = @computing = true unless @computing - task = @task - mutex.unlock + execute = task = nil + mutex.synchronize do + execute = @computing = true unless @computing + task = @task + end if execute @task_executor.post do @@ -176,10 +189,10 @@ def execute_task_once # :nodoc: rescue => ex reason = ex end - mutex.lock - set_state(success, result, reason) - event.set - mutex.unlock + mutex.synchronize do + set_state(success, result, reason) + event.set + end end end end diff --git a/lib/concurrent/dereferenceable.rb b/lib/concurrent/dereferenceable.rb index f5ee523e4..6a9cf4a6c 100644 --- a/lib/concurrent/dereferenceable.rb +++ b/lib/concurrent/dereferenceable.rb @@ -39,24 +39,17 @@ module Dereferenceable # # @return [Object] the current value of the object def value - mutex.lock - apply_deref_options(@value) - ensure - mutex.unlock + mutex.synchronize { apply_deref_options(@value) } end - alias_method :deref, :value protected # Set the internal value of this object # - # @param [Object] val the new value - def value=(val) - mutex.lock - @value = val - ensure - mutex.unlock + # @param [Object] value the new value + def value=(value) + mutex.synchronize{ @value = value } end # A mutex lock used for synchronizing thread-safe operations. Methods defined @@ -74,8 +67,8 @@ def mutex # @note This method *must* be called from within the constructor of the including class. # # @see #mutex - def init_mutex - @mutex = Mutex.new + def init_mutex(mutex = Mutex.new) + @mutex = mutex end # Set the options which define the operations #value performs before @@ -91,14 +84,13 @@ def init_mutex # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing # the internal value and returning the value returned from the proc def set_deref_options(opts = {}) - mutex.lock - @dup_on_deref = opts[:dup_on_deref] || opts[:dup] - @freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze] - @copy_on_deref = opts[:copy_on_deref] || opts[:copy] - @do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref) - nil - ensure - mutex.unlock + mutex.synchronize do + @dup_on_deref = opts[:dup_on_deref] || opts[:dup] + @freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze] + @copy_on_deref = opts[:copy_on_deref] || opts[:copy] + @do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref) + nil + end end # @!visibility private diff --git a/lib/concurrent/future.rb b/lib/concurrent/future.rb index 58d29bbbe..be78e8791 100644 --- a/lib/concurrent/future.rb +++ b/lib/concurrent/future.rb @@ -76,7 +76,18 @@ def self.execute(opts = {}, &block) Future.new(opts, &block).execute end - protected :set, :fail, :complete + # @!macro ivar_set_method + def set(value = IVar::NO_VALUE, &block) + check_for_block_or_value!(block_given?, value) + mutex.synchronize do + if @state != :unscheduled + raise MultipleAssignmentError + else + @task = block || Proc.new { value } + end + end + execute + end private diff --git a/lib/concurrent/ivar.rb b/lib/concurrent/ivar.rb index 9d3d1e292..6d4531e36 100644 --- a/lib/concurrent/ivar.rb +++ b/lib/concurrent/ivar.rb @@ -3,6 +3,7 @@ require 'concurrent/errors' require 'concurrent/obligation' require 'concurrent/observable' +require 'concurrent/synchronization' module Concurrent @@ -38,8 +39,7 @@ module Concurrent # ivar.set 14 # ivar.get #=> 14 # ivar.set 2 # would now be an error - class IVar - + class IVar < Synchronization::Object include Obligation include Observable @@ -57,15 +57,13 @@ class IVar # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing # the internal value and returning the value returned from the proc def initialize(value = NO_VALUE, opts = {}) - init_obligation + super(&nil) + init_obligation(self) self.observers = CopyOnWriteObserverSet.new set_deref_options(opts) + @state = :pending - if value == NO_VALUE - @state = :pending - else - set(value) - end + set(value) unless value == NO_VALUE end # Add an observer on this object that will receive notification on update. @@ -100,28 +98,59 @@ def add_observer(observer = nil, func = :update, &block) observer end - # Set the `IVar` to a value and wake or notify all threads waiting on it. - # - # @param [Object] value the value to store in the `IVar` - # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already - # been set or otherwise completed - def set(value) - complete(true, value, nil) + # @!macro [attach] ivar_set_method + # Set the `IVar` to a value and wake or notify all threads waiting on it. + # + # @!macro [attach] ivar_set_parameters_and_exceptions + # @param [Object] value the value to store in the `IVar` + # @yield A block operation to use for setting the value + # @raise [ArgumentError] if both a value and a block are given + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already + # been set or otherwise completed + # + # @return [IVar] self + def set(value = NO_VALUE) + check_for_block_or_value!(block_given?, value) + raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending) + + begin + value = yield if block_given? + complete(true, value, nil) + rescue => ex + complete(false, nil, ex) + end end - # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. - # - # @param [Object] reason for the failure - # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already - # been set or otherwise completed + # @!macro [attach] ivar_fail_method + # Set the `IVar` to failed due to some error and wake or notify all threads waiting on it. + # + # @param [Object] reason for the failure + # @raise [Concurrent::MultipleAssignmentError] if the `IVar` has already + # been set or otherwise completed + # @return [IVar] self def fail(reason = StandardError.new) complete(false, nil, reason) end + # Attempt to set the `IVar` with the given value or block. Return a + # boolean indicating the success or failure of the set operation. + # + # @!macro ivar_set_parameters_and_exceptions + # + # @return [Boolean] true if the value was set else false + def try_set(value = NO_VALUE, &block) + set(value, &block) + true + rescue MultipleAssignmentError + false + end + + protected + # @!visibility private def complete(success, value, reason) # :nodoc: mutex.synchronize do - raise MultipleAssignmentError.new('multiple assignment') if [:fulfilled, :rejected].include? @state + raise MultipleAssignmentError if [:fulfilled, :rejected].include? @state set_state(success, value, reason) event.set end @@ -130,5 +159,12 @@ def complete(success, value, reason) # :nodoc: observers.notify_and_delete_observers{ [time, self.value, reason] } self end + + # @!visibility private + def check_for_block_or_value!(block_given, value) # :nodoc: + if (block_given && value != NO_VALUE) || (! block_given && value == NO_VALUE) + raise ArgumentError.new('must set with either a value or a block') + end + end end end diff --git a/lib/concurrent/obligation.rb b/lib/concurrent/obligation.rb index f3cdf19c0..1c7441c94 100644 --- a/lib/concurrent/obligation.rb +++ b/lib/concurrent/obligation.rb @@ -59,7 +59,7 @@ def completed? # # @return [Boolean] def incomplete? - [:unscheduled, :pending].include? state + ! complete? end # The current value of the obligation. Will be `nil` while the state is @@ -113,10 +113,7 @@ def value!(timeout = nil) # # @return [Symbol] the current state def state - mutex.lock - @state - ensure - mutex.unlock + mutex.synchronize { @state } end # If an exception was raised during processing this will return the @@ -125,10 +122,7 @@ def state # # @return [Exception] the exception raised during processing or `nil` def reason - mutex.lock - @reason - ensure - mutex.unlock + mutex.synchronize { @reason } end # @example allows Obligation to be risen @@ -147,8 +141,8 @@ def get_arguments_from(opts = {}) # :nodoc: end # @!visibility private - def init_obligation # :nodoc: - init_mutex + def init_obligation(*args) # :nodoc: + init_mutex(*args) @event = Event.new end @@ -170,10 +164,7 @@ def set_state(success, value, reason) # :nodoc: # @!visibility private def state=(value) # :nodoc: - mutex.lock - @state = value - ensure - mutex.unlock + mutex.synchronize { @state = value } end # Atomic compare and set operation @@ -186,15 +177,14 @@ def state=(value) # :nodoc: # # @!visibility private def compare_and_set_state(next_state, expected_current) # :nodoc: - mutex.lock - if @state == expected_current - @state = next_state - true - else - false + mutex.synchronize do + if @state == expected_current + @state = next_state + true + else + false + end end - ensure - mutex.unlock end # executes the block within mutex if current state is included in expected_states @@ -203,16 +193,15 @@ def compare_and_set_state(next_state, expected_current) # :nodoc: # # @!visibility private def if_state(*expected_states) # :nodoc: - mutex.lock - raise ArgumentError.new('no block given') unless block_given? - - if expected_states.include? @state - yield - else - false + mutex.synchronize do + raise ArgumentError.new('no block given') unless block_given? + + if expected_states.include? @state + yield + else + false + end end - ensure - mutex.unlock end end end diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index f7cb9b1a0..0831c39cc 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -1,5 +1,6 @@ require 'thread' +require 'concurrent/ivar' require 'concurrent/obligation' require 'concurrent/executor/executor_options' @@ -181,8 +182,7 @@ module Concurrent # - `on_success { |result| ... }` is the same as `then {|result| ... }` # - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )` # - `rescue` is aliased by `catch` and `on_error` - class Promise - include Obligation + class Promise < IVar include ExecutorOptions # Initialize a new Promise with the provided options. @@ -197,12 +197,15 @@ class Promise # @option opts [object, Array] :args zero or more arguments to be passed # the task block on execution # + # @yield The block operation to be performed asynchronously. + # # @raise [ArgumentError] if no block is given # # @see http://wiki.commonjs.org/wiki/Promises/A # @see http://promises-aplus.github.io/promises-spec/ def initialize(opts = {}, &block) opts.delete_if { |k, v| v.nil? } + super(IVar::NO_VALUE, opts) @executor = get_executor_from(opts) || Concurrent.global_io_executor @args = get_arguments_from(opts) @@ -214,23 +217,39 @@ def initialize(opts = {}, &block) @promise_body = block || Proc.new { |result| result } @state = :unscheduled @children = [] - - init_obligation - set_deref_options(opts) end - # @return [Promise] + # Create a new `Promise` and fulfill it immediately. + # + # @!macro executor_and_deref_options + # + # @!macro promise_init_options + # + # @raise [ArgumentError] if no block is given + # + # @return [Promise] the newly created `Promise` def self.fulfill(value, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, true, value, nil) } end - - # @return [Promise] + # Create a new `Promise` and reject it immediately. + # + # @!macro executor_and_deref_options + # + # @!macro promise_init_options + # + # @raise [ArgumentError] if no block is given + # + # @return [Promise] the newly created `Promise` def self.reject(reason, opts = {}) Promise.new(opts).tap { |p| p.send(:synchronized_set_state!, false, nil, reason) } end - # @return [Promise] + # Execute an `:unscheduled` `Promise`. Immediately sets the state to `:pending` and + # passes the block to a new thread/thread pool for eventual execution. + # Does nothing if the `Promise` is in any state other than `:unscheduled`. + # + # @return [Promise] a reference to `self` def execute if root? if compare_and_set_state(:pending, :unscheduled) @@ -243,6 +262,29 @@ def execute self end + # @!macro ivar_set_method + # + # @raise [Concurrent::PromiseExecutionError] if not the root promise + def set(value = IVar::NO_VALUE, &block) + raise PromiseExecutionError.new('supported only on root promise') unless root? + check_for_block_or_value!(block_given?, value) + mutex.synchronize do + if @state != :unscheduled + raise MultipleAssignmentError + else + @promise_body = block || Proc.new { |result| value } + end + end + execute + end + + # @!macro ivar_fail_method + # + # @raise [Concurrent::PromiseExecutionError] if not the root promise + def fail(reason = StandardError.new) + set { raise reason } + end + # Create a new `Promise` object with the given block, execute it, and return the # `:pending` object. # @@ -261,6 +303,13 @@ def self.execute(opts = {}, &block) new(opts, &block).execute end + # Chain a new promise off the current promise. + # + # @param [Proc] rescuer An optional rescue block to be executed if the + # promise is rejected. + # + # @yield The block operation to be performed asynchronously. + # # @return [Promise] the new promise def then(rescuer = nil, &block) raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given? @@ -282,13 +331,23 @@ def then(rescuer = nil, &block) child end - # @return [Promise] + # Chain onto this promise an action to be undertaken on success + # (fulfillment). + # + # @yield The block to execute + # + # @return [Promise] self def on_success(&block) raise ArgumentError.new('no block given') unless block_given? self.then(&block) end - # @return [Promise] + # Chain onto this promise an action to be undertaken on failure + # (rejection). + # + # @yield The block to execute + # + # @return [Promise] self def rescue(&block) self.then(block) end @@ -444,17 +503,22 @@ def notify_child(child) if_state(:rejected) { child.on_reject(@reason) } end + # @!visibility private + def complete(success, value, reason) + children_to_notify = mutex.synchronize do + set_state!(success, value, reason) + @children.dup + end + + children_to_notify.each { |child| notify_child(child) } + observers.notify_and_delete_observers{ [Time.now, self.value, reason] } + end + # @!visibility private def realize(task) @executor.post do success, value, reason = SafeTaskExecutor.new(task).execute(*@args) - - children_to_notify = mutex.synchronize do - set_state!(success, value, reason) - @children.dup - end - - children_to_notify.each { |child| notify_child(child) } + complete(success, value, reason) end end @@ -466,10 +530,7 @@ def set_state!(success, value, reason) # @!visibility private def synchronized_set_state!(success, value, reason) - mutex.lock - set_state!(success, value, reason) - ensure - mutex.unlock + mutex.synchronize { set_state!(success, value, reason) } end end end diff --git a/lib/concurrent/synchronization/abstract_object.rb b/lib/concurrent/synchronization/abstract_object.rb index 2dc81de73..7a3cfe52c 100644 --- a/lib/concurrent/synchronization/abstract_object.rb +++ b/lib/concurrent/synchronization/abstract_object.rb @@ -7,7 +7,9 @@ module Synchronization # Provides a single layer which can improve its implementation over time without changes needed to # the classes using it. Use {Synchronization::Object} not this abstract class. # - # @note this object does not support usage together with {Thread#wakeup} and {Thread#raise}. + # @note this object does not support usage together with + # [Thread#wakeup](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-wakeup) + # and [Thread#raise](http://ruby-doc.org/core-2.2.0/Thread.html#method-i-raise). # `Thread#sleep` and `Thread#wakeup` will work as expected but mixing `Synchronization::Object#wait` and # `Thread#wakeup` will not work on all platforms. # diff --git a/spec/concurrent/channel/buffered_channel_spec.rb b/spec/concurrent/channel/buffered_channel_spec.rb index dd40b99ee..f4b4b4640 100644 --- a/spec/concurrent/channel/buffered_channel_spec.rb +++ b/spec/concurrent/channel/buffered_channel_spec.rb @@ -47,7 +47,7 @@ module Concurrent it 'should assign value to a probe if probe set is not empty' do channel.select(probe) Thread.new { sleep(0.1); channel.push 3 } - expect(probe.value).to eq 3 + expect(probe.value.first).to eq 3 end end @@ -62,14 +62,14 @@ module Concurrent channel.push 1 result = channel.pop - expect(result).to eq 1 + expect(result.first).to eq 1 end it 'removes the first value from the buffer' do channel.push 'a' channel.push 'b' - expect(channel.pop).to eq 'a' + expect(channel.pop.first).to eq 'a' expect(channel.buffer_queue_size).to eq 1 end end @@ -91,7 +91,7 @@ module Concurrent Thread.new { channel.push 82 } - expect(probe.value).to eq 82 + expect(probe.value.first).to eq 82 end end @@ -120,7 +120,7 @@ module Concurrent expect(channel.buffer_queue_size).to eq 1 - expect(channel.pop).to eq 82 + expect(channel.pop.first).to eq 82 end end diff --git a/spec/concurrent/channel/probe_spec.rb b/spec/concurrent/channel/probe_spec.rb index f11d44601..bac8ab46d 100644 --- a/spec/concurrent/channel/probe_spec.rb +++ b/spec/concurrent/channel/probe_spec.rb @@ -20,20 +20,20 @@ def trigger_observable(observable) it_should_behave_like :observable end - describe '#set_unless_assigned' do + describe '#try_set' do context 'empty probe' do it 'assigns the value' do - probe.set_unless_assigned(32, channel) - expect(probe.value).to eq 32 + probe.try_set([32, channel]) + expect(probe.value.first).to eq 32 end it 'assign the channel' do - probe.set_unless_assigned(32, channel) - expect(probe.channel).to be channel + probe.try_set([32, channel]) + expect(probe.value.last).to be channel end it 'returns true' do - expect(probe.set_unless_assigned('hi', channel)).to eq true + expect(probe.try_set(['hi', channel])).to eq true end end @@ -41,12 +41,12 @@ def trigger_observable(observable) before(:each) { probe.set([27, nil]) } it 'does not assign the value' do - probe.set_unless_assigned(88, channel) - expect(probe.value).to eq 27 + probe.try_set([88, channel]) + expect(probe.value.first).to eq 27 end it 'returns false' do - expect(probe.set_unless_assigned('hello', channel)).to eq false + expect(probe.try_set(['hello', channel])).to eq false end end @@ -54,7 +54,7 @@ def trigger_observable(observable) before(:each) { probe.fail } it 'does not assign the value' do - probe.set_unless_assigned(88, channel) + probe.try_set([88, channel]) expect(probe).to be_rejected end @@ -62,15 +62,10 @@ def trigger_observable(observable) expect(probe.value).to be_nil end - it 'has a nil channel' do - expect(probe.channel).to be_nil - end - it 'returns false' do - expect(probe.set_unless_assigned('hello', channel)).to eq false + expect(probe.try_set(['hello', channel])).to eq false end end end - end end diff --git a/spec/concurrent/channel/unbuffered_channel_spec.rb b/spec/concurrent/channel/unbuffered_channel_spec.rb index 72dd6556d..a2ededc71 100644 --- a/spec/concurrent/channel/unbuffered_channel_spec.rb +++ b/spec/concurrent/channel/unbuffered_channel_spec.rb @@ -39,7 +39,7 @@ module Concurrent sleep(0.1) - expect(result).to eq 42 + expect(result.first).to eq 42 end it 'passes the pushed value to only one thread' do @@ -62,7 +62,7 @@ module Concurrent sleep(0.1) - expect(result).to eq 57 + expect(result.first).to eq 57 end end @@ -81,7 +81,7 @@ module Concurrent Thread.new { channel.push 82 } - expect(probe.value).to eq 82 + expect(probe.value.first).to eq 82 end it 'ignores already set probes and waits for a new one' do @@ -101,7 +101,7 @@ module Concurrent sleep(0.05) - expect(new_probe.value).to eq 72 + expect(new_probe.value.first).to eq 72 end end diff --git a/spec/concurrent/future_spec.rb b/spec/concurrent/future_spec.rb index 78a2d6165..0450a3599 100644 --- a/spec/concurrent/future_spec.rb +++ b/spec/concurrent/future_spec.rb @@ -1,6 +1,4 @@ -require_relative 'dereferenceable_shared' -require_relative 'obligation_shared' -require_relative 'observable_shared' +require_relative 'ivar_shared' require_relative 'thread_arguments_shared' module Concurrent @@ -16,40 +14,23 @@ module Concurrent }.execute.tap{ sleep(0.1) } end - context 'behavior' do + let!(:fulfilled_value) { 10 } + let!(:rejected_reason) { StandardError.new('mojo jojo') } - # thread_arguments - - def get_ivar_from_no_args - Concurrent::Future.execute{|*args| args } - end - - def get_ivar_from_args(opts) - Concurrent::Future.execute(opts){|*args| args } - end - - it_should_behave_like :thread_arguments - - # obligation - - let!(:fulfilled_value) { 10 } - let!(:rejected_reason) { StandardError.new('mojo jojo') } - - let(:pending_subject) do - Future.new(executor: executor){ sleep(3); fulfilled_value }.execute - end - - let(:fulfilled_subject) do - Future.new(executor: executor){ fulfilled_value }.execute.tap{ sleep(0.1) } - end + let(:pending_subject) do + Future.new(executor: executor){ sleep(0.1); fulfilled_value }.execute + end - let(:rejected_subject) do - Future.new(executor: executor){ raise rejected_reason }.execute.tap{ sleep(0.1) } - end + let(:fulfilled_subject) do + Future.new(executor: executor){ fulfilled_value }.execute.tap{ sleep(0.1) } + end - it_should_behave_like :obligation + let(:rejected_subject) do + Future.new(executor: executor){ raise rejected_reason }.execute.tap{ sleep(0.1) } + end - # dereferenceable + it_should_behave_like :ivar do + subject { Future.new(executor: :immediate){ value } } def dereferenceable_subject(value, opts = {}) opts = opts.merge(executor: executor) @@ -66,34 +47,20 @@ def execute_dereferenceable(subject) sleep(0.1) end - it_should_behave_like :dereferenceable - - # observable - - subject{ Future.new{ nil } } - def trigger_observable(observable) observable.execute sleep(0.1) end - - it_should_behave_like :observable end - context 'subclassing' do + it_should_behave_like :thread_arguments do - subject{ Future.execute(executor: executor){ 42 } } - - it 'protects #set' do - expect{ subject.set(100) }.to raise_error - end - - it 'protects #fail' do - expect{ subject.fail }.to raise_error + def get_ivar_from_no_args + Concurrent::Future.execute{|*args| args } end - it 'protects #complete' do - expect{ subject.complete(true, 100, nil) }.to raise_error + def get_ivar_from_args(opts) + Concurrent::Future.execute(opts){|*args| args } end end diff --git a/spec/concurrent/ivar_shared.rb b/spec/concurrent/ivar_shared.rb new file mode 100644 index 000000000..b22520fab --- /dev/null +++ b/spec/concurrent/ivar_shared.rb @@ -0,0 +1,158 @@ +require_relative 'dereferenceable_shared' +require_relative 'obligation_shared' +require_relative 'observable_shared' + +shared_examples :ivar do + + it_should_behave_like :obligation + it_should_behave_like :dereferenceable + it_should_behave_like :observable + + context 'initialization' do + + it 'sets the state to incomplete' do + expect(subject).to be_incomplete + end + end + + context '#set' do + + it 'sets the state to be fulfilled' do + subject.set(14) + expect(subject).to be_fulfilled + end + + it 'sets the value' do + subject.set(14) + expect(subject.value).to eq 14 + end + + it 'raises an exception if set more than once' do + subject.set(14) + expect {subject.set(2)}.to raise_error(Concurrent::MultipleAssignmentError) + expect(subject.value).to eq 14 + end + + it 'returns self' do + expect(subject.set(42)).to eq subject + end + it 'fulfils when given a block which executes successfully' do + subject.set{ 42 } + expect(subject.value).to eq 42 + end + + it 'rejects when given a block which raises an exception' do + expected = ArgumentError.new + subject.set{ raise expected } + expect(subject.reason).to eq expected + end + + it 'raises an exception when given a value and a block' do + expect { + subject.set(42){ :guide } + }.to raise_error(ArgumentError) + end + + it 'raises an exception when given neither a value nor a block' do + expect { + subject.set + }.to raise_error(ArgumentError) + end + end + + context '#fail' do + + it 'sets the state to be rejected' do + subject.fail + expect(subject).to be_rejected + end + + it 'sets the value to be nil' do + subject.fail + expect(subject.value).to be_nil + end + + it 'sets the reason to the given exception' do + expected = ArgumentError.new + subject.fail(expected) + expect(subject.reason).to eq expected + end + + it 'raises an exception if set more than once' do + subject.fail + expect {subject.fail}.to raise_error(Concurrent::MultipleAssignmentError) + expect(subject.value).to be_nil + end + + it 'defaults the reason to a StandardError' do + subject.fail + expect(subject.reason).to be_a StandardError + end + + it 'returns self' do + expect(subject.fail).to eq subject + end + end + + describe '#try_set' do + + context 'when unset' do + + it 'assigns the value' do + subject.try_set(32) + expect(subject.value).to eq 32 + end + + it 'assigns the block result' do + subject.try_set{ 32 } + expect(subject.value).to eq 32 + end + + it 'returns true' do + expect(subject.try_set('hi')).to eq true + end + end + + context 'when fulfilled' do + + before(:each) { subject.set(27) } + + it 'does not assign the value' do + subject.try_set(88) + expect(subject.value).to eq 27 + end + + it 'does not assign the block result' do + subject.try_set{ 88 } + expect(subject.value).to eq 27 + end + + it 'returns false' do + expect(subject.try_set('hello')).to eq false + end + end + + context 'when rejected' do + + before(:each) { subject.fail } + + it 'does not assign the value' do + subject.try_set(88) + expect(subject).to be_rejected + end + + it 'does not assign the block result' do + subject.try_set{ 88 } + expect(subject).to be_rejected + end + + it 'has a nil value' do + expect(subject.value).to be_nil + end + + it 'returns false' do + expect(subject.try_set('hello')).to eq false + end + end + end +end diff --git a/spec/concurrent/ivar_spec.rb b/spec/concurrent/ivar_spec.rb index d3d007a6f..cee303d98 100644 --- a/spec/concurrent/ivar_spec.rb +++ b/spec/concurrent/ivar_spec.rb @@ -1,6 +1,4 @@ -require_relative 'dereferenceable_shared' -require_relative 'obligation_shared' -require_relative 'observable_shared' +require_relative 'ivar_shared' module Concurrent @@ -8,43 +6,30 @@ module Concurrent let!(:value) { 10 } - subject do - i = IVar.new - i.set(14) - i - end - - context 'behavior' do + let!(:fulfilled_value) { 10 } + let(:rejected_reason) { StandardError.new('Boom!') } - # obligation + subject { IVar.new(value) } - let!(:fulfilled_value) { 10 } - let(:rejected_reason) { StandardError.new('Boom!') } - - let(:pending_subject) do - @i = IVar.new - Thread.new do - sleep(3) - @i.set(fulfilled_value) - end - @i - end - - let(:fulfilled_subject) do - i = IVar.new - i.set(fulfilled_value) - i + let(:pending_subject) do + ivar = IVar.new + Thread.new do + sleep(0.1) + ivar.set(fulfilled_value) end + ivar + end - let(:rejected_subject) do - i = IVar.new - i.fail(rejected_reason) - i - end + let(:fulfilled_subject) do + IVar.new.set(fulfilled_value) + end - it_should_behave_like :obligation + let(:rejected_subject) do + IVar.new.fail(rejected_reason) + end - # dereferenceable + it_should_behave_like :ivar do + subject{ IVar.new } def dereferenceable_subject(value, opts = {}) IVar.new(value, opts) @@ -58,17 +43,9 @@ def execute_dereferenceable(subject) subject.set('value') end - it_should_behave_like :dereferenceable - - # observable - - subject{ IVar.new } - def trigger_observable(observable) observable.set('value') end - - it_should_behave_like :observable end context '#initialize' do @@ -85,68 +62,7 @@ def trigger_observable(observable) it 'can set an initial value' do i = IVar.new(14) - expect(i).to be_completed - end - - end - - context '#set' do - - it 'sets the state to be fulfilled' do - i = IVar.new - i.set(14) - expect(i).to be_fulfilled - end - - it 'sets the value' do - i = IVar.new - i.set(14) - expect(i.value).to eq 14 - end - - it 'raises an exception if set more than once' do - i = IVar.new - i.set(14) - expect {i.set(2)}.to raise_error(Concurrent::MultipleAssignmentError) - expect(i.value).to eq 14 - end - - it 'returns self' do - i = IVar.new - expect(i.set(42)).to eq i - end - end - - context '#fail' do - - it 'sets the state to be rejected' do - i = IVar.new - i.fail - expect(i).to be_rejected - end - - it 'sets the value to be nil' do - i = IVar.new - i.fail - expect(i.value).to be_nil - end - - it 'raises an exception if set more than once' do - i = IVar.new - i.fail - expect {i.fail}.to raise_error(Concurrent::MultipleAssignmentError) - expect(i.value).to be_nil - end - - it 'defaults the reason to a StandardError' do - i = IVar.new - i.fail - expect(i.reason).to be_a StandardError - end - - it 'returns self' do - i = IVar.new - expect(i.fail).to eq i + expect(i).to be_complete end end @@ -208,7 +124,6 @@ def reentrant_observer(i) expect(obs.value).to eq 42 end end - end end end diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index a7d86ec07..150fe81ed 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -1,10 +1,11 @@ -require_relative 'obligation_shared' +require_relative 'ivar_shared' require_relative 'thread_arguments_shared' module Concurrent describe Promise do + let!(:value) { 10 } let(:executor) { PerThreadExecutor.new } let(:empty_root) { Promise.new(executor: executor){ nil } } @@ -12,7 +13,7 @@ module Concurrent let!(:rejected_reason) { StandardError.new('mojo jojo') } let(:pending_subject) do - Promise.new(executor: executor){ sleep(0.3); fulfilled_value }.execute + Promise.new(executor: executor){ sleep(0.1); fulfilled_value }.execute end let(:fulfilled_subject) do @@ -23,28 +24,39 @@ module Concurrent Promise.reject(rejected_reason, executor: executor) end - context 'behavior' do + it_should_behave_like :ivar do + subject{ Promise.new(executor: :immediate){ value } } - # thread_arguments + def dereferenceable_subject(value, opts = {}) + opts = opts.merge(executor: executor) + Promise.new(opts){ value }.execute.tap{ sleep(0.1) } + end - def get_ivar_from_no_args - Concurrent::Promise.execute{|*args| args } + def dereferenceable_observable(opts = {}) + opts = opts.merge(executor: executor) + Promise.new(opts){ 'value' } end - def get_ivar_from_args(opts) - Concurrent::Promise.execute(opts){|*args| args } + def execute_dereferenceable(subject) + subject.execute + sleep(0.1) end - it_should_behave_like :thread_arguments + def trigger_observable(observable) + observable.execute + sleep(0.1) + end + end - # obligation + it_should_behave_like :thread_arguments do - it_should_behave_like :obligation - end + def get_ivar_from_no_args + Concurrent::Promise.execute{|*args| args } + end - it 'includes Dereferenceable' do - promise = Promise.new{ nil } - expect(promise).to be_a(Dereferenceable) + def get_ivar_from_args(opts) + Concurrent::Promise.execute(opts){|*args| args } + end end context 'initializers' do @@ -374,13 +386,13 @@ def get_ivar_from_args(opts) composite = Promise.all?(promise1, promise2, promise3). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #then condition when no promises are given' do counter = Concurrent::AtomicFixnum.new(0) @@ -388,13 +400,13 @@ def get_ivar_from_args(opts) composite = Promise.all?. then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #rescue handler if even one component fails' do counter = Concurrent::AtomicFixnum.new(0) @@ -402,13 +414,13 @@ def get_ivar_from_args(opts) composite = Promise.all?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq -1 - end + end end describe '.any?' do @@ -429,13 +441,13 @@ def get_ivar_from_args(opts) composite = Promise.any?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #then condition when no promises are given' do counter = Concurrent::AtomicFixnum.new(0) @@ -443,13 +455,13 @@ def get_ivar_from_args(opts) composite = Promise.any?. then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq 1 - end + end it 'executes the #rescue handler if all componenst fail' do counter = Concurrent::AtomicFixnum.new(0) @@ -457,18 +469,68 @@ def get_ivar_from_args(opts) composite = Promise.any?(rejected_subject, rejected_subject, rejected_subject, rejected_subject). then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute + rescue { counter.down; latch.count_down }. + execute latch.wait(1) expect(counter.value).to eq -1 - end + end end end context 'fulfillment' do + context '#set' do + + it '#can only be called on the root promise' do + root = Promise.new{ :foo } + child = root.then{ :bar } + + expect { child.set('foo') }.to raise_error PromiseExecutionError + expect { root.set('foo') }.not_to raise_error + end + + it 'triggers children' do + expected = nil + root = Promise.new(executor: :immediate){ nil } + root.then{ |result| expected = result } + root.set(20) + expect(expected).to eq 20 + end + + it 'can be called with a block' do + p = Promise.new(executor: executor) + ch = p.then(&:to_s) + p.set { :value } + + expect(p.value).to eq :value + expect(p.state).to eq :fulfilled + + expect(ch.value).to eq 'value' + expect(ch.state).to eq :fulfilled + end + end + + context '#fail' do + + it 'can only be called on the root promise' do + root = Promise.new{ :foo } + child = root.then{ :bar } + + expect { child.fail }.to raise_error PromiseExecutionError + expect { root.fail }.not_to raise_error + end + + it 'rejects children' do + expected = nil + root = Promise.new(executor: :immediate) + root.then(Proc.new{ |reason| expected = reason }) + root.fail(ArgumentError.new('simulated error')) + expect(expected).to be_a ArgumentError + end + end + it 'passes the result of each block to all its children' do expected = nil Promise.new(executor: executor){ 20 }.then{ |result| expected = result }.execute