Skip to content

Promise extends IVar #270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions lib/concurrent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,7 @@ class Agent
#
# @param [Object] initial the initial value
#
# @!macro [attach] executor_and_deref_options
#
# @param [Hash] opts the options used to define the behavior at update and deref
# and to specify the executor on which to perform actions
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:task` returns the global task pool,
# `:operation` returns the global operation pool, and `:immediate` returns a new
# `ImmediateExecutor` object.
# @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data
# @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data
# @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
# @!macro executor_and_deref_options
def initialize(initial, opts = {})
@value = initial
@rescuers = []
Expand Down
7 changes: 2 additions & 5 deletions lib/concurrent/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,11 @@ def method_missing(method, *args, &block)
self.define_singleton_method(method) do |*args2|
Async::validate_argc(@delegate, method, *args2)
ivar = Concurrent::IVar.new
value, reason = nil, nil
@serializer.post(@executor.value) do
begin
value = @delegate.send(method, *args2, &block)
ivar.set(@delegate.send(method, *args2, &block))
rescue => reason
# caught
ensure
ivar.complete(reason.nil?, value, reason)
ivar.fail(reason)
end
end
ivar.value if @blocking
Expand Down
26 changes: 11 additions & 15 deletions lib/concurrent/atomic/copy_on_notify_observer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,38 @@ def add_observer(observer=nil, func=:update, &block)
begin
@mutex.lock
@observers[observer] = func
observer
ensure
@mutex.unlock
end

observer
end

# @param [Object] observer the observer to remove
# @return [Object] the deleted observer
def delete_observer(observer)
@mutex.lock
@observers.delete(observer)
@mutex.unlock

observer
ensure
@mutex.unlock
end

# Deletes all observers
# @return [CopyOnWriteObserverSet] self
def delete_observers
@mutex.lock
@observers.clear
@mutex.unlock

self
ensure
@mutex.unlock
end

# @return [Integer] the observers count
def count_observers
@mutex.lock
result = @observers.count
@observers.count
ensure
@mutex.unlock

result
end

# Notifies all registered observers with optional args
Expand All @@ -75,7 +73,6 @@ def count_observers
def notify_observers(*args, &block)
observers = duplicate_observers
notify_to(observers, *args, &block)

self
end

Expand All @@ -86,7 +83,6 @@ def notify_observers(*args, &block)
def notify_and_delete_observers(*args, &block)
observers = duplicate_and_clear_observers
notify_to(observers, *args, &block)

self
end

Expand All @@ -96,17 +92,17 @@ def duplicate_and_clear_observers
@mutex.lock
observers = @observers.dup
@observers.clear
@mutex.unlock

observers
ensure
@mutex.unlock
end

def duplicate_observers
@mutex.lock
observers = @observers.dup
@mutex.unlock

observers
ensure
@mutex.unlock
end

def notify_to(observers, *args)
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/channel/buffered_channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def select(probe)
@probe_set.put(probe)
true
else
shift_buffer if probe.set_unless_assigned(peek_buffer, self)
shift_buffer if probe.try_set([peek_buffer, self])
end

end
Expand Down Expand Up @@ -76,7 +76,7 @@ def set_probe_or_push_into_buffer(value)
push_into_buffer(value)
true
else
@probe_set.take.set_unless_assigned(value, self)
@probe_set.take.try_set([value, self])
end
end
end
Expand Down
29 changes: 2 additions & 27 deletions lib/concurrent/channel/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,12 @@
module Concurrent
module Channel

class Probe < Concurrent::IVar

def initialize(value = NO_VALUE, opts = {})
super(value, opts)
end

def set_unless_assigned(value, channel)
mutex.synchronize do
return false if [:fulfilled, :rejected].include? @state

set_state(true, [value, channel], nil)
event.set
true
end
end

alias_method :composite_value, :value

def value
composite_value.nil? ? nil : composite_value[0]
end

def channel
composite_value.nil? ? nil : composite_value[1]
end
end
Probe = IVar

def self.select(*channels)
probe = Probe.new
channels.each { |channel| channel.select(probe) }
result = probe.composite_value
result = probe.value
channels.each { |channel| channel.remove_probe(probe) }
result
end
Expand Down
3 changes: 1 addition & 2 deletions lib/concurrent/channel/unbuffered_channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ def probe_set_size
end

def push(value)
# TODO set_unless_assigned define on IVar as #set_state? or #try_set_state
until @probe_set.take.set_unless_assigned(value, self)
until @probe_set.take.try_set([value, self])
end
end

Expand Down
55 changes: 34 additions & 21 deletions lib/concurrent/delay.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'concurrent/obligation'
require 'concurrent/executor/executor_options'
require 'concurrent/executor/immediate_executor'
require 'concurrent/synchronization'

module Concurrent

Expand Down Expand Up @@ -37,7 +38,7 @@ module Concurrent
# execute on the given executor, allowing the call to timeout.
#
# @see Concurrent::Dereferenceable
class Delay
class Delay < Synchronization::Object
include Obligation
include ExecutorOptions

Expand All @@ -51,15 +52,27 @@ class Delay

# Create a new `Delay` in the `:pending` state.
#
# @yield the delayed operation to perform
# @!macro [attach] executor_and_deref_options
#
# @param [Hash] opts the options used to define the behavior at update and deref
# and to specify the executor on which to perform actions
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:task` returns the global task pool,
# `:operation` returns the global operation pool, and `:immediate` returns a new
# `ImmediateExecutor` object.
# @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data
# @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data
# @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
#
# @!macro executor_and_deref_options
# @yield the delayed operation to perform
#
# @raise [ArgumentError] if no block is given
def initialize(opts = {}, &block)
raise ArgumentError.new('no block given') unless block_given?

init_obligation
super()
init_obligation(self)
set_deref_options(opts)
@task_executor = get_executor_from(opts)

Expand Down Expand Up @@ -145,16 +158,15 @@ def wait(timeout = nil)
# @yield the delayed operation to perform
# @return [true, false] if success
def reconfigure(&block)
mutex.lock
raise ArgumentError.new('no block given') unless block_given?
unless @computing
@task = block
true
else
false
mutex.synchronize do
raise ArgumentError.new('no block given') unless block_given?
unless @computing
@task = block
true
else
false
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since delay is migrated to Delay < Synchronization::Object it probably should be using its method synchronize, is there a reason for using mutex which I am missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is using the #synchronize method from Synchronization::Object, it's just doing so through a level of indirection. See line 64. Delay includes Obligation which also includes Dereferenceable, the latter of which creates its own mutex within the #init_mutex function. I've added an argument to both #init_obligation and #init_mutex that allows the mutex to be injected. Delay passes self into those initialization functions, thus allowing the delay object to be its own lock. The #mutex method returns self. The mutex.synchronize call on line 150 is actually self.synchronize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I've missed that. It seems unnecessarily complicated, but I am not sure if it makes sense to invest time in refactoring it - I still hope that the new futures will replace this class soon.

end
ensure
mutex.unlock
end

private
Expand All @@ -163,10 +175,11 @@ def reconfigure(&block)
def execute_task_once # :nodoc:
# this function has been optimized for performance and
# should not be modified without running new benchmarks
mutex.lock
execute = @computing = true unless @computing
task = @task
mutex.unlock
execute = task = nil
mutex.synchronize do
execute = @computing = true unless @computing
task = @task
end

if execute
@task_executor.post do
Expand All @@ -176,10 +189,10 @@ def execute_task_once # :nodoc:
rescue => ex
reason = ex
end
mutex.lock
set_state(success, result, reason)
event.set
mutex.unlock
mutex.synchronize do
set_state(success, result, reason)
event.set
end
end
end
end
Expand Down
34 changes: 13 additions & 21 deletions lib/concurrent/dereferenceable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,17 @@ module Dereferenceable
#
# @return [Object] the current value of the object
def value
mutex.lock
apply_deref_options(@value)
ensure
mutex.unlock
mutex.synchronize { apply_deref_options(@value) }
end

alias_method :deref, :value

protected

# Set the internal value of this object
#
# @param [Object] val the new value
def value=(val)
mutex.lock
@value = val
ensure
mutex.unlock
# @param [Object] value the new value
def value=(value)
mutex.synchronize{ @value = value }
end

# A mutex lock used for synchronizing thread-safe operations. Methods defined
Expand All @@ -74,8 +67,8 @@ def mutex
# @note This method *must* be called from within the constructor of the including class.
#
# @see #mutex
def init_mutex
@mutex = Mutex.new
def init_mutex(mutex = Mutex.new)
@mutex = mutex
end

# Set the options which define the operations #value performs before
Expand All @@ -91,14 +84,13 @@ def init_mutex
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
def set_deref_options(opts = {})
mutex.lock
@dup_on_deref = opts[:dup_on_deref] || opts[:dup]
@freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze]
@copy_on_deref = opts[:copy_on_deref] || opts[:copy]
@do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref)
nil
ensure
mutex.unlock
mutex.synchronize do
@dup_on_deref = opts[:dup_on_deref] || opts[:dup]
@freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze]
@copy_on_deref = opts[:copy_on_deref] || opts[:copy]
@do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref)
nil
end
end

# @!visibility private
Expand Down
13 changes: 12 additions & 1 deletion lib/concurrent/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,18 @@ def self.execute(opts = {}, &block)
Future.new(opts, &block).execute
end

protected :set, :fail, :complete
# @!macro ivar_set_method
def set(value = IVar::NO_VALUE, &block)
check_for_block_or_value!(block_given?, value)
mutex.synchronize do
if @state != :unscheduled
raise MultipleAssignmentError
else
@task = block || Proc.new { value }
end
end
execute
end

private

Expand Down
Loading