From 1bb630426c571846aa84b0f1ea2e1301207937fb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 12 May 2014 21:46:01 +0200 Subject: [PATCH 01/30] Raw port of Actress needs more work and discussions --- Gemfile | 5 + lib/concurrent.rb | 1 + lib/concurrent/actress.rb | 295 ++++++++++++++++++++++++++++++++ spec/concurrent/actress_spec.rb | 50 ++++++ 4 files changed, 351 insertions(+) create mode 100644 lib/concurrent/actress.rb create mode 100644 spec/concurrent/actress_spec.rb diff --git a/Gemfile b/Gemfile index b60167eaa..b5aa3b1e1 100644 --- a/Gemfile +++ b/Gemfile @@ -2,6 +2,11 @@ source 'https://rubygems.org' gemspec +group :actress do + gem 'algebrick' + gem 'atomic' +end + group :development do gem 'rake', '~> 10.2.2' gem 'countloc', '~> 0.4.0', platforms: :mri diff --git a/lib/concurrent.rb b/lib/concurrent.rb index ec74f16f1..15302c436 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -28,6 +28,7 @@ require 'concurrent/supervisor' require 'concurrent/timer_task' require 'concurrent/tvar' +require 'concurrent/actress' # Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, # F#, C#, Java, and classic concurrency patterns. diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb new file mode 100644 index 000000000..dd1ae4091 --- /dev/null +++ b/lib/concurrent/actress.rb @@ -0,0 +1,295 @@ +require 'algebrick' +require 'atomic' +require 'logger' + +module Concurrent + module Actress + Error = Class.new(StandardError) + + class ActressTerminated < Error + include Algebrick::TypeCheck + + def initialize(reference) + Type! reference, Reference + super reference.path + end + end + + def self.current + Thread.current[:__current_actress__] + end + + module CoreDelegations + def path + core.path + end + + def parent + core.parent + end + + def terminated? + core.terminated? + end + + def reference + core.reference + end + + alias_method :ref, :reference + end + + class Reference + include Algebrick::TypeCheck + include Algebrick::Types + include CoreDelegations + + attr_reader :core + private :core + + def initialize(core) + @core = Type! core, Core + end + + + def tell(message) + message message, nil + end + + alias_method :<<, :tell + + def ask(message, ivar = IVar.new) + message message, ivar + end + + def message(message, ivar = nil) + core.on_envelope Envelope[message, + ivar ? Some[IVar][ivar] : None, + Actress.current ? Some[Reference][Actress.current] : None] + return ivar || self + end + + def to_s + "#<#{self.class} #{path}>" + end + + alias_method :inspect, :to_s + + def ==(other) + Type? other, self.class and other.send(:core) == core + end + end + + include Algebrick::Types + + Envelope = Algebrick.type do + fields! message: Object, + ivar: Maybe[IVar], + sender: Maybe[Reference] + end + + module Envelope + def sender_path + sender.maybe { |reference| reference.path } || 'outside-actress' + end + + def reject!(error) + ivar.maybe { |v| v.fail error } + end + end + + class Core + include Algebrick::TypeCheck + + attr_reader :reference, :name, :path, :logger, :parent_core + private :parent_core + + def initialize(parent, name, actress_class, *args, &block) + @mailbox = Array.new + @one_by_one = OneByOne.new + @executor = Concurrent.configuration.global_task_pool # TODO configurable + @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) + @name = (Type! name, String, Symbol).to_s + @children = Atomic.new [] + @path = @parent_core ? File.join(@parent_core.path, @name) : @name + @logger = Logger.new($stderr) # TODO add proper logging + @logger.progname = @path + @reference = Reference.new self + # noinspection RubyArgCount + @terminated = Event.new + @mutex = Mutex.new + + @actress_class = Child! actress_class, Abstract + schedule_execution do + parent_core.add_child reference if parent_core + @actress = actress_class.new self, *args, &block # FIXME it may fail + end + end + + def parent + @parent_core.reference + end + + def children + @children.get + end + + def add_child(child) + Type! child, Reference + @children.update { |o| [*o, child] } + end + + def remove_child(child) + Type! child, Reference + @children.update { |o| o - [child] } + end + + def on_envelope(envelope) + schedule_execution { execute_on_envelope envelope } + end + + def terminated? + @terminated.set? + end + + def terminate! + guard! + @terminated.set + parent_core.remove_child reference if parent_core + end + + def guard! + raise 'can be called only inside this actor' unless Actress.current == reference + end + + private + + def process? + unless @mailbox.empty? || @receive_envelope_scheduled + @receive_envelope_scheduled = true + schedule_execution { receive_envelope } + end + end + + def receive_envelope + envelope = @mailbox.shift + + if terminated? + # FIXME make sure that it cannot be GCed before all messages are rejected after termination + reject_envelope envelope + logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" + return + end + logger.debug "received #{envelope.message} from #{envelope.sender_path}" + + result = @actress.on_envelope envelope + envelope.ivar.maybe { |iv| iv.set result } + rescue => error + logger.error error + envelope.ivar.maybe { |iv| iv.fail error } + ensure + @receive_envelope_scheduled = false + process? + end + + def schedule_execution + @one_by_one.post(@executor) do + begin + # TODO enable this mutex only on JRuby + @mutex.lock # only for JRuby + Thread.current[:__current_actress__] = reference + yield + rescue => e + puts e + ensure + Thread.current[:__current_actress__] = nil + @mutex.unlock # only for JRuby + end + end + end + + def execute_on_envelope(envelope) + if terminated? + reject_envelope envelope + else + @mailbox.push envelope + end + process? + end + + def create_and_set_actor(actress_class, block, *args) + parent_core.add_child reference if parent_core + @actress = actress_class.new self, *args, &block # FIXME may fail + end + + def reject_envelope(envelope) + envelope.reject! ActressTerminated.new(reference) + end + end + + class Abstract + include Algebrick::TypeCheck + extend Algebrick::TypeCheck + include Algebrick::Matching + include CoreDelegations + + attr_reader :core + + def self.new(core, *args, &block) + allocate.tap do |actress| + actress.__send__ :pre_initialize, core + actress.__send__ :initialize, *args, &block + end + end + + def on_message(message) + raise NotImplementedError + end + + def logger + core.logger + end + + def on_envelope(envelope) + @envelope = envelope + on_message envelope.message + ensure + @envelope = nil + end + + def spawn(actress_class, name, *args, &block) + Actress.spawn(actress_class, name, *args, &block) + end + + def children + core.children + end + + def terminate! + core.terminate! + end + + private + + def pre_initialize(core) + @core = Type! core, Core + end + + def envelope + @envelope or raise 'envelope not set' + end + end + + class Root < Abstract + def on_message(message) + # ignore + end + end + + ROOT = Core.new(nil, '/', Root).reference + + def self.spawn(actress_class, name, *args, &block) + Core.new(Actress.current || ROOT, name, actress_class, *args, &block).reference + end + end +end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb new file mode 100644 index 000000000..92493ba88 --- /dev/null +++ b/spec/concurrent/actress_spec.rb @@ -0,0 +1,50 @@ +require 'spec_helper' +require_relative 'dereferenceable_shared' +require_relative 'observable_shared' + +module Concurrent + + describe Actress do + Child = Algebrick.atom + Terminate = Algebrick.atom + + class Ping < Actress::Abstract + + def initialize(queue) + @queue = queue + end + + def on_message(message) + match message, + on(Terminate) { terminate! }, + on(Child) { spawn Ping, :pong, @queue }, + (on(any) do + @queue << message + message + end) + + end + end + + it 'works' do + queue = Queue.new + actor = Actress.spawn Ping, :ping, queue + + actor << 'a' << 1 + queue.pop.should eq 'a' + actor.ask(2).value.should eq 2 + + actor.parent.should eq Actress::ROOT + Actress::ROOT.path.should eq '/' + actor.path.should eq '/ping' + child = actor.ask(Child).value + child.path.should eq '/ping/pong' + queue.clear + child.ask(3) + queue.pop.should eq 3 + + actor << Terminate + actor.ask(:blow_up).wait.rejected?.should be_true + end + end +end From e6d40015225f4a219ec70d32c70d7cc04ac993df Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Mon, 12 May 2014 22:27:45 +0200 Subject: [PATCH 02/30] Change Abstract actor class to a ActorContext module --- lib/concurrent/actress.rb | 25 +++++++++++++------------ spec/concurrent/actress_spec.rb | 3 ++- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index dd1ae4091..43a892c8b 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -119,10 +119,16 @@ def initialize(parent, name, actress_class, *args, &block) @terminated = Event.new @mutex = Mutex.new - @actress_class = Child! actress_class, Abstract + @actress_class = Child! actress_class, ActorContext schedule_execution do parent_core.add_child reference if parent_core - @actress = actress_class.new self, *args, &block # FIXME it may fail + begin + @actress = actress_class.new *args, &block + @actress.send :initialize_core, self + rescue => ex + puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}" + terminate! # TODO test that this is ok + end end end @@ -156,6 +162,7 @@ def terminate! guard! @terminated.set parent_core.remove_child reference if parent_core + # TODO terminate all children end def guard! @@ -227,7 +234,7 @@ def reject_envelope(envelope) end end - class Abstract + module ActorContext include Algebrick::TypeCheck extend Algebrick::TypeCheck include Algebrick::Matching @@ -235,13 +242,6 @@ class Abstract attr_reader :core - def self.new(core, *args, &block) - allocate.tap do |actress| - actress.__send__ :pre_initialize, core - actress.__send__ :initialize, *args, &block - end - end - def on_message(message) raise NotImplementedError end @@ -271,7 +271,7 @@ def terminate! private - def pre_initialize(core) + def initialize_core(core) @core = Type! core, Core end @@ -280,7 +280,8 @@ def envelope end end - class Root < Abstract + class Root + include ActorContext def on_message(message) # ignore end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index 92493ba88..823f177bd 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -8,7 +8,8 @@ module Concurrent Child = Algebrick.atom Terminate = Algebrick.atom - class Ping < Actress::Abstract + class Ping + include Actress::ActorContext def initialize(queue) @queue = queue From 6887f7868b5cdcc0b83ace76fabc0c8bff42a330 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 13 May 2014 10:41:07 +0200 Subject: [PATCH 03/30] Remove Algebrick dependency --- Gemfile | 1 - lib/concurrent/actress.rb | 85 ++++++++++++++++++++++++--------- spec/concurrent/actress_spec.rb | 24 +++++----- 3 files changed, 74 insertions(+), 36 deletions(-) diff --git a/Gemfile b/Gemfile index b5aa3b1e1..36b75ba3d 100644 --- a/Gemfile +++ b/Gemfile @@ -3,7 +3,6 @@ source 'https://rubygems.org' gemspec group :actress do - gem 'algebrick' gem 'atomic' end diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 43a892c8b..9c07e8635 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -1,4 +1,3 @@ -require 'algebrick' require 'atomic' require 'logger' @@ -6,8 +5,50 @@ module Concurrent module Actress Error = Class.new(StandardError) + module TypeCheck + # taken from Algebrick + + def Type?(value, *types) + types.any? { |t| value.is_a? t } + end + + def Type!(value, *types) + Type?(value, *types) or + TypeCheck.error(value, 'is not', types) + value + end + + def Match?(value, *types) + types.any? { |t| t === value } + end + + def Match!(value, *types) + Match?(value, *types) or + TypeCheck.error(value, 'is not matching', types) + value + end + + def Child?(value, *types) + Type?(value, Class) && + types.any? { |t| value <= t } + end + + def Child!(value, *types) + Child?(value, *types) or + TypeCheck.error(value, 'is not child', types) + value + end + + private + + def self.error(value, message, types) + raise TypeError, + "Value (#{value.class}) '#{value}' #{message} any of: #{types.join('; ')}." + end + end + class ActressTerminated < Error - include Algebrick::TypeCheck + include TypeCheck def initialize(reference) Type! reference, Reference @@ -40,8 +81,7 @@ def reference end class Reference - include Algebrick::TypeCheck - include Algebrick::Types + include TypeCheck include CoreDelegations attr_reader :core @@ -63,9 +103,7 @@ def ask(message, ivar = IVar.new) end def message(message, ivar = nil) - core.on_envelope Envelope[message, - ivar ? Some[IVar][ivar] : None, - Actress.current ? Some[Reference][Actress.current] : None] + core.on_envelope Envelope.new(message, ivar, Actress.current) return ivar || self end @@ -80,26 +118,30 @@ def ==(other) end end - include Algebrick::Types + Envelope = Struct.new :message, :ivar, :sender do + include TypeCheck - Envelope = Algebrick.type do - fields! message: Object, - ivar: Maybe[IVar], - sender: Maybe[Reference] - end + def initialize(message, ivar, sender) + super message, + (Type! ivar, IVar, NilClass), + (Type! sender, Reference, NilClass) + end - module Envelope def sender_path - sender.maybe { |reference| reference.path } || 'outside-actress' + if sender + sender.path + else + 'outside-actress' + end end def reject!(error) - ivar.maybe { |v| v.fail error } + ivar.fail error unless ivar.nil? end end class Core - include Algebrick::TypeCheck + include TypeCheck attr_reader :reference, :name, :path, :logger, :parent_core private :parent_core @@ -190,10 +232,10 @@ def receive_envelope logger.debug "received #{envelope.message} from #{envelope.sender_path}" result = @actress.on_envelope envelope - envelope.ivar.maybe { |iv| iv.set result } + envelope.ivar.set result unless envelope.ivar.nil? rescue => error logger.error error - envelope.ivar.maybe { |iv| iv.fail error } + envelope.ivar.fail error unless envelope.ivar.nil? ensure @receive_envelope_scheduled = false process? @@ -235,9 +277,8 @@ def reject_envelope(envelope) end module ActorContext - include Algebrick::TypeCheck - extend Algebrick::TypeCheck - include Algebrick::Matching + include TypeCheck + extend TypeCheck include CoreDelegations attr_reader :core diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index 823f177bd..0f02cbfb3 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -5,9 +5,6 @@ module Concurrent describe Actress do - Child = Algebrick.atom - Terminate = Algebrick.atom - class Ping include Actress::ActorContext @@ -16,14 +13,15 @@ def initialize(queue) end def on_message(message) - match message, - on(Terminate) { terminate! }, - on(Child) { spawn Ping, :pong, @queue }, - (on(any) do - @queue << message - message - end) - + case message + when :terminate + terminate! + when :child + spawn Ping, :pong, @queue + else + @queue << message + message + end end end @@ -38,13 +36,13 @@ def on_message(message) actor.parent.should eq Actress::ROOT Actress::ROOT.path.should eq '/' actor.path.should eq '/ping' - child = actor.ask(Child).value + child = actor.ask(:child).value child.path.should eq '/ping/pong' queue.clear child.ask(3) queue.pop.should eq 3 - actor << Terminate + actor << :terminate actor.ask(:blow_up).wait.rejected?.should be_true end end From 19466d1a8f62959187b54c2bffda1fd34a8facca Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 13 May 2014 16:01:18 +0200 Subject: [PATCH 04/30] when spawn returns children are set --- lib/concurrent/actress.rb | 3 ++- spec/concurrent/actress_spec.rb | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 9c07e8635..2e22c69d1 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -161,9 +161,10 @@ def initialize(parent, name, actress_class, *args, &block) @terminated = Event.new @mutex = Mutex.new + parent_core.add_child reference if parent_core + @actress_class = Child! actress_class, ActorContext schedule_execution do - parent_core.add_child reference if parent_core begin @actress = actress_class.new *args, &block @actress.send :initialize_core, self diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index 0f02cbfb3..112aa4e3c 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -29,6 +29,9 @@ def on_message(message) queue = Queue.new actor = Actress.spawn Ping, :ping, queue + # when spawn returns children are set + Actress::ROOT.send(:core).children.should include(actor) + actor << 'a' << 1 queue.pop.should eq 'a' actor.ask(2).value.should eq 2 From 39173fe7aae5b22edd366e7cc0d348153f9a6d77 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 13 May 2014 16:04:06 +0200 Subject: [PATCH 05/30] Reject all envelops sent to termiated actor before actor is GCed --- lib/concurrent/actress.rb | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 2e22c69d1..003053c51 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -205,6 +205,11 @@ def terminate! guard! @terminated.set parent_core.remove_child reference if parent_core + @mailbox.each do |envelope| + reject_envelope envelope + logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" + end + @mailbox.clear # TODO terminate all children end @@ -224,12 +229,6 @@ def process? def receive_envelope envelope = @mailbox.shift - if terminated? - # FIXME make sure that it cannot be GCed before all messages are rejected after termination - reject_envelope envelope - logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" - return - end logger.debug "received #{envelope.message} from #{envelope.sender_path}" result = @actress.on_envelope envelope From ecd97ac38e9b2f853ba13f7132ad90f8610f23e6 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 13 May 2014 16:04:26 +0200 Subject: [PATCH 06/30] Remove dead code --- lib/concurrent/actress.rb | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 003053c51..8060d288d 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -266,11 +266,6 @@ def execute_on_envelope(envelope) process? end - def create_and_set_actor(actress_class, block, *args) - parent_core.add_child reference if parent_core - @actress = actress_class.new self, *args, &block # FIXME may fail - end - def reject_envelope(envelope) envelope.reject! ActressTerminated.new(reference) end From 82c1a7f12e2484c126a00449be270906c6d9aeec Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 13 May 2014 16:33:29 +0200 Subject: [PATCH 07/30] Remove dependency on Atomic --- Gemfile | 3 --- lib/concurrent/actress.rb | 31 ++++++++++++++++++++++--------- spec/concurrent/actress_spec.rb | 2 +- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/Gemfile b/Gemfile index 36b75ba3d..4d65462ed 100644 --- a/Gemfile +++ b/Gemfile @@ -2,9 +2,6 @@ source 'https://rubygems.org' gemspec -group :actress do - gem 'atomic' -end group :development do gem 'rake', '~> 10.2.2' diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 8060d288d..922a926d3 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -1,4 +1,3 @@ -require 'atomic' require 'logger' module Concurrent @@ -152,7 +151,7 @@ def initialize(parent, name, actress_class, *args, &block) @executor = Concurrent.configuration.global_task_pool # TODO configurable @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) @name = (Type! name, String, Symbol).to_s - @children = Atomic.new [] + @children = [] @path = @parent_core ? File.join(@parent_core.path, @name) : @name @logger = Logger.new($stderr) # TODO add proper logging @logger.progname = @path @@ -180,17 +179,22 @@ def parent end def children - @children.get + guard! + @children end def add_child(child) - Type! child, Reference - @children.update { |o| [*o, child] } + guard! + @children << (Type! child, Reference) + self end def remove_child(child) - Type! child, Reference - @children.update { |o| o - [child] } + schedule_execution do + Type! child, Reference + @children.delete child + end + self end def on_envelope(envelope) @@ -319,14 +323,23 @@ def envelope class Root include ActorContext def on_message(message) - # ignore + case message.first + when :spawn + spawn *message[1..2], *message[3], &message[4] + else + #ignore + end end end ROOT = Core.new(nil, '/', Root).reference def self.spawn(actress_class, name, *args, &block) - Core.new(Actress.current || ROOT, name, actress_class, *args, &block).reference + if Actress.current + Core.new(Actress.current, name, actress_class, *args, &block).reference + else + ROOT.ask([:spawn, actress_class, name, args, block]).value + end end end end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index 112aa4e3c..9f8c1721a 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -30,7 +30,7 @@ def on_message(message) actor = Actress.spawn Ping, :ping, queue # when spawn returns children are set - Actress::ROOT.send(:core).children.should include(actor) + Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) actor << 'a' << 1 queue.pop.should eq 'a' From 4b9bed5b5a402aeb758d70c6ccd229f2950e2f71 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 13 May 2014 17:28:55 +0200 Subject: [PATCH 08/30] Remove unused mutex no instance variables are being reassigned --- lib/concurrent/actress.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 922a926d3..40d228cc2 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -158,7 +158,6 @@ def initialize(parent, name, actress_class, *args, &block) @reference = Reference.new self # noinspection RubyArgCount @terminated = Event.new - @mutex = Mutex.new parent_core.add_child reference if parent_core @@ -248,15 +247,12 @@ def receive_envelope def schedule_execution @one_by_one.post(@executor) do begin - # TODO enable this mutex only on JRuby - @mutex.lock # only for JRuby Thread.current[:__current_actress__] = reference yield rescue => e puts e ensure Thread.current[:__current_actress__] = nil - @mutex.unlock # only for JRuby end end end From 2b86bc2fd52953aa8582be62064f6eac37131e92 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 16 May 2014 10:36:15 +0200 Subject: [PATCH 09/30] Add Reference#ask! --- lib/concurrent/actress.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 40d228cc2..2dfb9d0d6 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -101,6 +101,11 @@ def ask(message, ivar = IVar.new) message message, ivar end + # **warning** - can lead to deadlocks + def ask!(message, ivar = IVar.new) + ask(message, ivar).value! + end + def message(message, ivar = nil) core.on_envelope Envelope.new(message, ivar, Actress.current) return ivar || self From c20520b8df6a2f52a9cbd1962e2cf1986b890feb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 16 May 2014 10:36:40 +0200 Subject: [PATCH 10/30] TODOs and comments update --- lib/concurrent/actress.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 2dfb9d0d6..b2b29b39e 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -90,7 +90,6 @@ def initialize(core) @core = Type! core, Core end - def tell(message) message message, nil end @@ -153,7 +152,7 @@ class Core def initialize(parent, name, actress_class, *args, &block) @mailbox = Array.new @one_by_one = OneByOne.new - @executor = Concurrent.configuration.global_task_pool # TODO configurable + @executor = Concurrent.configuration.global_task_pool # TODO make configurable @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) @name = (Type! name, String, Symbol).to_s @children = [] @@ -222,7 +221,9 @@ def terminate! end def guard! - raise 'can be called only inside this actor' unless Actress.current == reference + unless Actress.current == reference + raise "can be called only inside actor #{reference} but was #{Actress.current}" + end end private @@ -298,6 +299,7 @@ def on_envelope(envelope) @envelope = nil end + # TODO add basic supervision def spawn(actress_class, name, *args, &block) Actress.spawn(actress_class, name, *args, &block) end @@ -328,7 +330,7 @@ def on_message(message) when :spawn spawn *message[1..2], *message[3], &message[4] else - #ignore + # ignore end end end From f473249568f396f21b05b9368268876e70ecca0b Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 20 May 2014 20:53:05 +0200 Subject: [PATCH 11/30] Split files and add documentation --- lib/concurrent/actress.rb | 347 ++------------------- lib/concurrent/actress/context.rb | 79 +++++ lib/concurrent/actress/core.rb | 171 ++++++++++ lib/concurrent/actress/core_delegations.rb | 25 ++ lib/concurrent/actress/envelope.rb | 25 ++ lib/concurrent/actress/errors.rb | 14 + lib/concurrent/actress/reference.rb | 62 ++++ lib/concurrent/actress/type_check.rb | 48 +++ lib/concurrent/executor/one_by_one.rb | 2 +- spec/actress.rb | 79 +++++ spec/concurrent/actress_spec.rb | 52 --- 11 files changed, 535 insertions(+), 369 deletions(-) create mode 100644 lib/concurrent/actress/context.rb create mode 100644 lib/concurrent/actress/core.rb create mode 100644 lib/concurrent/actress/core_delegations.rb create mode 100644 lib/concurrent/actress/envelope.rb create mode 100644 lib/concurrent/actress/errors.rb create mode 100644 lib/concurrent/actress/reference.rb create mode 100644 lib/concurrent/actress/type_check.rb create mode 100644 spec/actress.rb delete mode 100644 spec/concurrent/actress_spec.rb diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index b2b29b39e..0006a71db 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -1,330 +1,40 @@ require 'logger' -module Concurrent - module Actress - Error = Class.new(StandardError) - - module TypeCheck - # taken from Algebrick - - def Type?(value, *types) - types.any? { |t| value.is_a? t } - end - - def Type!(value, *types) - Type?(value, *types) or - TypeCheck.error(value, 'is not', types) - value - end - - def Match?(value, *types) - types.any? { |t| t === value } - end - - def Match!(value, *types) - Match?(value, *types) or - TypeCheck.error(value, 'is not matching', types) - value - end - - def Child?(value, *types) - Type?(value, Class) && - types.any? { |t| value <= t } - end - - def Child!(value, *types) - Child?(value, *types) or - TypeCheck.error(value, 'is not child', types) - value - end - - private +require 'concurrent/configuration' +require 'concurrent/executor/one_by_one' +require 'concurrent/ivar' - def self.error(value, message, types) - raise TypeError, - "Value (#{value.class}) '#{value}' #{message} any of: #{types.join('; ')}." - end - end +module Concurrent - class ActressTerminated < Error - include TypeCheck + # TODO broader description with examples + # + # @example ping + # class Ping + # include Context + # def on_message(message) + # message + # end + # end + # Ping.spawn(:ping1).ask(:m).value #=> :m + module Actress - def initialize(reference) - Type! reference, Reference - super reference.path - end - end + require 'concurrent/actress/type_check' + require 'concurrent/actress/errors' + require 'concurrent/actress/core_delegations' + require 'concurrent/actress/envelope' + require 'concurrent/actress/reference' + require 'concurrent/actress/core' + require 'concurrent/actress/context' + # @return [Reference, nil] current executing actor if any def self.current Thread.current[:__current_actress__] end - module CoreDelegations - def path - core.path - end - - def parent - core.parent - end - - def terminated? - core.terminated? - end - - def reference - core.reference - end - - alias_method :ref, :reference - end - - class Reference - include TypeCheck - include CoreDelegations - - attr_reader :core - private :core - - def initialize(core) - @core = Type! core, Core - end - - def tell(message) - message message, nil - end - - alias_method :<<, :tell - - def ask(message, ivar = IVar.new) - message message, ivar - end - - # **warning** - can lead to deadlocks - def ask!(message, ivar = IVar.new) - ask(message, ivar).value! - end - - def message(message, ivar = nil) - core.on_envelope Envelope.new(message, ivar, Actress.current) - return ivar || self - end - - def to_s - "#<#{self.class} #{path}>" - end - - alias_method :inspect, :to_s - - def ==(other) - Type? other, self.class and other.send(:core) == core - end - end - - Envelope = Struct.new :message, :ivar, :sender do - include TypeCheck - - def initialize(message, ivar, sender) - super message, - (Type! ivar, IVar, NilClass), - (Type! sender, Reference, NilClass) - end - - def sender_path - if sender - sender.path - else - 'outside-actress' - end - end - - def reject!(error) - ivar.fail error unless ivar.nil? - end - end - - class Core - include TypeCheck - - attr_reader :reference, :name, :path, :logger, :parent_core - private :parent_core - - def initialize(parent, name, actress_class, *args, &block) - @mailbox = Array.new - @one_by_one = OneByOne.new - @executor = Concurrent.configuration.global_task_pool # TODO make configurable - @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) - @name = (Type! name, String, Symbol).to_s - @children = [] - @path = @parent_core ? File.join(@parent_core.path, @name) : @name - @logger = Logger.new($stderr) # TODO add proper logging - @logger.progname = @path - @reference = Reference.new self - # noinspection RubyArgCount - @terminated = Event.new - - parent_core.add_child reference if parent_core - - @actress_class = Child! actress_class, ActorContext - schedule_execution do - begin - @actress = actress_class.new *args, &block - @actress.send :initialize_core, self - rescue => ex - puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}" - terminate! # TODO test that this is ok - end - end - end - - def parent - @parent_core.reference - end - - def children - guard! - @children - end - - def add_child(child) - guard! - @children << (Type! child, Reference) - self - end - - def remove_child(child) - schedule_execution do - Type! child, Reference - @children.delete child - end - self - end - - def on_envelope(envelope) - schedule_execution { execute_on_envelope envelope } - end - - def terminated? - @terminated.set? - end - - def terminate! - guard! - @terminated.set - parent_core.remove_child reference if parent_core - @mailbox.each do |envelope| - reject_envelope envelope - logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" - end - @mailbox.clear - # TODO terminate all children - end - - def guard! - unless Actress.current == reference - raise "can be called only inside actor #{reference} but was #{Actress.current}" - end - end - - private - - def process? - unless @mailbox.empty? || @receive_envelope_scheduled - @receive_envelope_scheduled = true - schedule_execution { receive_envelope } - end - end - - def receive_envelope - envelope = @mailbox.shift - - logger.debug "received #{envelope.message} from #{envelope.sender_path}" - - result = @actress.on_envelope envelope - envelope.ivar.set result unless envelope.ivar.nil? - rescue => error - logger.error error - envelope.ivar.fail error unless envelope.ivar.nil? - ensure - @receive_envelope_scheduled = false - process? - end - - def schedule_execution - @one_by_one.post(@executor) do - begin - Thread.current[:__current_actress__] = reference - yield - rescue => e - puts e - ensure - Thread.current[:__current_actress__] = nil - end - end - end - - def execute_on_envelope(envelope) - if terminated? - reject_envelope envelope - else - @mailbox.push envelope - end - process? - end - - def reject_envelope(envelope) - envelope.reject! ActressTerminated.new(reference) - end - end - - module ActorContext - include TypeCheck - extend TypeCheck - include CoreDelegations - - attr_reader :core - - def on_message(message) - raise NotImplementedError - end - - def logger - core.logger - end - - def on_envelope(envelope) - @envelope = envelope - on_message envelope.message - ensure - @envelope = nil - end - - # TODO add basic supervision - def spawn(actress_class, name, *args, &block) - Actress.spawn(actress_class, name, *args, &block) - end - - def children - core.children - end - - def terminate! - core.terminate! - end - - private - - def initialize_core(core) - @core = Type! core, Core - end - - def envelope - @envelope or raise 'envelope not set' - end - end - + # implements ROOT class Root - include ActorContext + include Context + # to allow spawning of new actors, spawn needs to be called inside the parent Actor def on_message(message) case message.first when :spawn @@ -335,8 +45,13 @@ def on_message(message) end end + # A root actor, a default parent of all actors spawned outside an actor ROOT = Core.new(nil, '/', Root).reference + # @param [Context] actress_class to be spawned + # @param [String, Symbol] name of the instance, it's used to generate the path of the actor + # @param args for actress_class instantiation + # @param block for actress_class instantiation def self.spawn(actress_class, name, *args, &block) if Actress.current Core.new(Actress.current, name, actress_class, *args, &block).reference diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb new file mode 100644 index 000000000..d9279521f --- /dev/null +++ b/lib/concurrent/actress/context.rb @@ -0,0 +1,79 @@ +module Concurrent + module Actress + + # module used to define actor behaviours + # @example ping + # class Ping + # include Context + # def on_message(message) + # message + # end + # end + # + # Ping.spawn(:ping1).ask(:m).value #=> :m + module Context + include TypeCheck + include CoreDelegations + + attr_reader :core + + # @abstract override to define Actor's behaviour + # @param [Object] message + # @return [Object] a result which will be used to set the IVar supplied to Reference#ask + def on_message(message) + raise NotImplementedError + end + + def logger + core.logger + end + + # @api private + def on_envelope(envelope) + @envelope = envelope + on_message envelope.message + ensure + @envelope = nil + end + + # @see Actress.spawn + def spawn(actress_class, name, *args, &block) + Actress.spawn(actress_class, name, *args, &block) + end + + # @see Core#children + def children + core.children + end + + # @see Core#terminate! + def terminate! + core.terminate! + end + + private + + # @api private + def initialize_core(core) + @core = Type! core, Core + end + + # @return [Envelope] current envelope, accessible inside #on_message processing + def envelope + @envelope or raise 'envelope not set' + end + + def self.included(base) + base.extend ClassMethods + super base + end + + module ClassMethods + # behaves as {Actress.spawn} but class_name is omitted + def spawn(name, *args, &block) + Actress.spawn self, name, *args, &block + end + end + end + end +end diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb new file mode 100644 index 000000000..c41f42d18 --- /dev/null +++ b/lib/concurrent/actress/core.rb @@ -0,0 +1,171 @@ +module Concurrent + module Actress + + # Core of the actor + # @api private + class Core + include TypeCheck + + attr_reader :reference, :name, :path, :logger, :parent_core + private :parent_core + + # @param [Reference, nil] parent of an actor spawning this one + # @param [String] name + # @param [Context] actress_class a class to be instantiated defining Actor's behaviour + # @param args arguments for actress_class instantiation + # @param block for actress_class instantiation + def initialize(parent, name, actress_class, *args, &block) + @mailbox = Array.new + @one_by_one = OneByOne.new + @executor = Concurrent.configuration.global_task_pool # TODO make configurable + @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) + @name = (Type! name, String, Symbol).to_s + @children = [] + @path = @parent_core ? File.join(@parent_core.path, @name) : @name + @logger = Logger.new($stderr) # TODO add proper logging + @logger.progname = @path + @reference = Reference.new self + # noinspection RubyArgCount + @terminated = Event.new + + parent_core.add_child reference if parent_core + + @actress_class = Child! actress_class, Context + schedule_execution do + begin + @actress = actress_class.new *args, &block + @actress.send :initialize_core, self + rescue => ex + puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}" + terminate! # TODO test that this is ok + end + end + end + + # @return [Reference] of parent actor + def parent + @parent_core.reference + end + + # @return [Array] of children actors + def children + guard! + @children.dup + end + + # @api private + def add_child(child) + guard! + @children << (Type! child, Reference) + self + end + + # @api private + def remove_child(child) + schedule_execution do + Type! child, Reference + @children.delete child + end + self + end + + # is executed by Reference scheduling processing of new messages + # can be called from other alternative Reference implementations + # @param [Envelope] envelope + def on_envelope(envelope) + schedule_execution do + if terminated? + reject_envelope envelope + else + @mailbox.push envelope + end + process_envelopes? + end + self + end + + # @note Actor rejects envelopes when terminated. + # @return [true, false] if actor is terminated + def terminated? + @terminated.set? + end + + # Terminates the actor, any Envelope received after termination is rejected + def terminate! + guard! + @terminated.set + parent_core.remove_child reference if parent_core + @mailbox.each do |envelope| + reject_envelope envelope + logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" + end + @mailbox.clear + # TODO terminate all children + self + end + + # @api private + # ensures that we are inside of the executor + def guard! + unless Actress.current == reference + raise "can be called only inside actor #{reference} but was #{Actress.current}" + end + end + + private + + # Ensures that only one envelope processing is scheduled with #schedule_execution, + # this allows other scheduled blocks to be executed before next envelope processing. + # Simply put this ensures that Core is still responsive to internal calls (like add_child) + # even though the Actor is flooded with messages. + def process_envelopes? + unless @mailbox.empty? || @receive_envelope_scheduled + @receive_envelope_scheduled = true + schedule_execution { receive_envelope } + end + end + + # Processes single envelope, calls #process_envelopes? at the end to ensure next envelope + # scheduling. + def receive_envelope + envelope = @mailbox.shift + + if terminated? + reject_envelope envelope + puts "this should not happen" + end + + logger.debug "received #{envelope.message} from #{envelope.sender_path}" + + result = @actress.on_envelope envelope + envelope.ivar.set result unless envelope.ivar.nil? + rescue => error + logger.error error + envelope.ivar.fail error unless envelope.ivar.nil? + ensure + @receive_envelope_scheduled = false + process_envelopes? + end + + # Schedules blocks to be executed on executor sequentially, + # sets Actress.current + def schedule_execution + @one_by_one.post(@executor) do + begin + Thread.current[:__current_actress__] = reference + yield + rescue => e + logger.error e + ensure + Thread.current[:__current_actress__] = nil + end + end + self + end + + def reject_envelope(envelope) + envelope.reject! ActressTerminated.new(reference) + end + end + end +end diff --git a/lib/concurrent/actress/core_delegations.rb b/lib/concurrent/actress/core_delegations.rb new file mode 100644 index 000000000..eba81461d --- /dev/null +++ b/lib/concurrent/actress/core_delegations.rb @@ -0,0 +1,25 @@ +module Concurrent + module Actress + + # delegates publicly expose-able methods calls to Core + module CoreDelegations + def path + core.path + end + + def parent + core.parent + end + + def terminated? + core.terminated? + end + + def reference + core.reference + end + + alias_method :ref, :reference + end + end +end diff --git a/lib/concurrent/actress/envelope.rb b/lib/concurrent/actress/envelope.rb new file mode 100644 index 000000000..15f979327 --- /dev/null +++ b/lib/concurrent/actress/envelope.rb @@ -0,0 +1,25 @@ +module Concurrent + module Actress + Envelope = Struct.new :message, :ivar, :sender do + include TypeCheck + + def initialize(message, ivar, sender) + super message, + (Type! ivar, IVar, NilClass), + (Type! sender, Reference, Thread) + end + + def sender_path + if sender.is_a? Reference + sender.path + else + sender.to_s + end + end + + def reject!(error) + ivar.fail error unless ivar.nil? + end + end + end +end diff --git a/lib/concurrent/actress/errors.rb b/lib/concurrent/actress/errors.rb new file mode 100644 index 000000000..d9e3c4a50 --- /dev/null +++ b/lib/concurrent/actress/errors.rb @@ -0,0 +1,14 @@ +module Concurrent + module Actress + Error = Class.new(StandardError) + + class ActressTerminated < Error + include TypeCheck + + def initialize(reference) + Type! reference, Reference + super reference.path + end + end + end +end diff --git a/lib/concurrent/actress/reference.rb b/lib/concurrent/actress/reference.rb new file mode 100644 index 000000000..afaef63a0 --- /dev/null +++ b/lib/concurrent/actress/reference.rb @@ -0,0 +1,62 @@ +module Concurrent + module Actress + + # Reference serves as public interface to send messages to Actors and can freely passed around + class Reference + include TypeCheck + include CoreDelegations + + attr_reader :core + private :core + + # @!visibility private + def initialize(core) + @core = Type! core, Core + end + + # tells message to the actor + # @param [Object] message + # @return [Reference] self + def tell(message) + message message, nil + end + + alias_method :<<, :tell + + # tells message to the actor + # @param [Object] message + # @param [Ivar] ivar to be fulfilled be message's processing result + # @return [IVar] supplied ivar + def ask(message, ivar = IVar.new) + message message, ivar + end + + # @note can lead to deadlocks + # tells message to the actor + # @param [Object] message + # @param [Ivar] ivar to be fulfilled be message's processing result + # @return [Object] message's processing result + # @raise [Exception] ivar.reason if ivar is #rejected? + def ask!(message, ivar = IVar.new) + ask(message, ivar).value! + end + + # behaves as #tell when no ivar and as #ask when ivar + def message(message, ivar = nil) + core.on_envelope Envelope.new(message, ivar, Actress.current || Thread.current) + return ivar || self + end + + def to_s + "#<#{self.class} #{path}>" + end + + alias_method :inspect, :to_s + + def ==(other) + Type? other, self.class and other.send(:core) == core + end + end + + end +end diff --git a/lib/concurrent/actress/type_check.rb b/lib/concurrent/actress/type_check.rb new file mode 100644 index 000000000..017b74f49 --- /dev/null +++ b/lib/concurrent/actress/type_check.rb @@ -0,0 +1,48 @@ +module Concurrent + module Actress + + # taken from Algebrick + # supplies type-checking helpers whenever included + module TypeCheck + + def Type?(value, *types) + types.any? { |t| value.is_a? t } + end + + def Type!(value, *types) + Type?(value, *types) or + TypeCheck.error(value, 'is not', types) + value + end + + def Match?(value, *types) + types.any? { |t| t === value } + end + + def Match!(value, *types) + Match?(value, *types) or + TypeCheck.error(value, 'is not matching', types) + value + end + + def Child?(value, *types) + Type?(value, Class) && + types.any? { |t| value <= t } + end + + def Child!(value, *types) + Child?(value, *types) or + TypeCheck.error(value, 'is not child', types) + value + end + + private + + def self.error(value, message, types) + raise TypeError, + "Value (#{value.class}) '#{value}' #{message} any of: #{types.join('; ')}." + end + end + end +end + diff --git a/lib/concurrent/executor/one_by_one.rb b/lib/concurrent/executor/one_by_one.rb index ee4e96c8f..5192adb57 100644 --- a/lib/concurrent/executor/one_by_one.rb +++ b/lib/concurrent/executor/one_by_one.rb @@ -1,6 +1,6 @@ module Concurrent - # Ensures that jobs are passed to the underlying executor one by one, + # Ensures that jobs are passed to the given executors one by one, # never running at the same time. class OneByOne diff --git a/spec/actress.rb b/spec/actress.rb new file mode 100644 index 000000000..05487d904 --- /dev/null +++ b/spec/actress.rb @@ -0,0 +1,79 @@ +# require 'spec_helper' +require 'concurrent/actress' + +# describe Concurrent::Actress do +# FIXME it does not work in Rspec environment + +class Ping + include Concurrent::Actress::Context + + def initialize(queue) + @queue = queue + end + + def on_message(message) + case message + when :terminate + terminate! + when :child + Ping.spawn :pong, @queue + else + @queue << message + message + end + end +end + +# def trace! +# set_trace_func proc { |event, file, line, id, binding, classname| +# # thread = eval('Thread.current', binding).object_id.to_s(16) +# printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line +# } +# yield +# ensure +# set_trace_func nil +# end + +def assert condition + unless condition + require 'pry' + binding.pry + raise + end +end + + + +Array.new(100).map do + Thread.new do + 20.times do |i| + # it format('--- %3d ---', i) do + puts format('--- %3d ---', i) + # trace! do + queue = Queue.new + actor = Ping.spawn :ping, queue + + # when spawn returns children are set + assert Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).include?(actor) + + actor << 'a' << 1 + assert queue.pop == 'a' + assert actor.ask(2).value == 2 + + assert actor.parent == Concurrent::Actress::ROOT + assert Concurrent::Actress::ROOT.path == '/' + assert actor.path == '/ping' + child = actor.ask(:child).value + assert child.path == '/ping/pong' + queue.clear + child.ask(3) + assert queue.pop == 3 + + actor << :terminate + assert actor.ask(:blow_up).wait.rejected? + end + end +end.each(&:join) + +# end +# end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb deleted file mode 100644 index 9f8c1721a..000000000 --- a/spec/concurrent/actress_spec.rb +++ /dev/null @@ -1,52 +0,0 @@ -require 'spec_helper' -require_relative 'dereferenceable_shared' -require_relative 'observable_shared' - -module Concurrent - - describe Actress do - class Ping - include Actress::ActorContext - - def initialize(queue) - @queue = queue - end - - def on_message(message) - case message - when :terminate - terminate! - when :child - spawn Ping, :pong, @queue - else - @queue << message - message - end - end - end - - it 'works' do - queue = Queue.new - actor = Actress.spawn Ping, :ping, queue - - # when spawn returns children are set - Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) - - actor << 'a' << 1 - queue.pop.should eq 'a' - actor.ask(2).value.should eq 2 - - actor.parent.should eq Actress::ROOT - Actress::ROOT.path.should eq '/' - actor.path.should eq '/ping' - child = actor.ask(:child).value - child.path.should eq '/ping/pong' - queue.clear - child.ask(3) - queue.pop.should eq 3 - - actor << :terminate - actor.ask(:blow_up).wait.rejected?.should be_true - end - end -end From b399efe340b71114c1ebbdbfb0939182f6c0e3e3 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Tue, 20 May 2014 22:32:06 +0200 Subject: [PATCH 12/30] Add Actress::AdHoc actor it's behavior is given by proc passed on initialization --- lib/concurrent/actress.rb | 2 ++ lib/concurrent/actress/ad_hoc.rb | 14 ++++++++++++++ spec/actress.rb | 3 +-- 3 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 lib/concurrent/actress/ad_hoc.rb diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 0006a71db..65adc5415 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -26,6 +26,8 @@ module Actress require 'concurrent/actress/core' require 'concurrent/actress/context' + require 'concurrent/actress/ad_hoc' + # @return [Reference, nil] current executing actor if any def self.current Thread.current[:__current_actress__] diff --git a/lib/concurrent/actress/ad_hoc.rb b/lib/concurrent/actress/ad_hoc.rb new file mode 100644 index 000000000..88b7f60d9 --- /dev/null +++ b/lib/concurrent/actress/ad_hoc.rb @@ -0,0 +1,14 @@ +module Concurrent + module Actress + class AdHoc + include Context + def initialize(&initializer) + @on_message = Type! initializer.call, Proc + end + + def on_message(message) + @on_message.call message + end + end + end +end diff --git a/spec/actress.rb b/spec/actress.rb index 05487d904..cd1c02be2 100644 --- a/spec/actress.rb +++ b/spec/actress.rb @@ -16,7 +16,7 @@ def on_message(message) when :terminate terminate! when :child - Ping.spawn :pong, @queue + Concurrent::Actress::AdHoc.spawn(:pong) { -> m { @queue << m } } else @queue << message message @@ -43,7 +43,6 @@ def assert condition end - Array.new(100).map do Thread.new do 20.times do |i| From 0ec8e08055b90247a1f08f84a866923f668fc2bb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 21 May 2014 09:56:09 +0200 Subject: [PATCH 13/30] Configurable Core executor can be configured, other options will follow spawn can be also called with hash instead of class, name, *args --- lib/concurrent/actress.rb | 29 +++++++++++++----- lib/concurrent/actress/context.rb | 13 +++++--- lib/concurrent/actress/core.rb | 49 ++++++++++++++++++------------- spec/actress.rb | 22 +++++++------- 4 files changed, 70 insertions(+), 43 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 65adc5415..814cd1e91 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -40,7 +40,7 @@ class Root def on_message(message) case message.first when :spawn - spawn *message[1..2], *message[3], &message[4] + spawn message[1], &message[2] else # ignore end @@ -48,17 +48,30 @@ def on_message(message) end # A root actor, a default parent of all actors spawned outside an actor - ROOT = Core.new(nil, '/', Root).reference + ROOT = Core.new(parent: nil, name: '/', class: Root).reference - # @param [Context] actress_class to be spawned - # @param [String, Symbol] name of the instance, it's used to generate the path of the actor - # @param args for actress_class instantiation # @param block for actress_class instantiation - def self.spawn(actress_class, name, *args, &block) + def self.spawn(*args, &block) if Actress.current - Core.new(Actress.current, name, actress_class, *args, &block).reference + Core.new(spawn_optionify(*args).merge(parent: Actress.current), &block).reference else - ROOT.ask([:spawn, actress_class, name, args, block]).value + ROOT.ask([:spawn, spawn_optionify(*args), block]).value + end + end + + # @overload spawn_optionify(actress_class, name, *args) + # @param [Context] actress_class to be spawned + # @param [String, Symbol] name of the instance, it's used to generate the path of the actor + # @param args for actress_class instantiation + # @overload spawn_optionify(opts) + # see {Core.new} opts + def self.spawn_optionify(*args) + if args.size == 1 && args.first.is_a?(Hash) + args.first + else + { class: args[0], + name: args[1], + args: args[2..-1] } end end end diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb index d9279521f..60d87d6c5 100644 --- a/lib/concurrent/actress/context.rb +++ b/lib/concurrent/actress/context.rb @@ -37,8 +37,8 @@ def on_envelope(envelope) end # @see Actress.spawn - def spawn(actress_class, name, *args, &block) - Actress.spawn(actress_class, name, *args, &block) + def spawn(*args, &block) + Actress.spawn(*args, &block) end # @see Core#children @@ -70,8 +70,13 @@ def self.included(base) module ClassMethods # behaves as {Actress.spawn} but class_name is omitted - def spawn(name, *args, &block) - Actress.spawn self, name, *args, &block + def spawn(name_or_opts, *args, &block) + opts = if name_or_opts.is_a? Hash + name_or_opts.merge class: self + else + { class: self, name: name_or_opts, args: args } + end + Actress.spawn opts, &block end end end diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index c41f42d18..17ba37b96 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -9,28 +9,37 @@ class Core attr_reader :reference, :name, :path, :logger, :parent_core private :parent_core - # @param [Reference, nil] parent of an actor spawning this one - # @param [String] name - # @param [Context] actress_class a class to be instantiated defining Actor's behaviour - # @param args arguments for actress_class instantiation - # @param block for actress_class instantiation - def initialize(parent, name, actress_class, *args, &block) - @mailbox = Array.new - @one_by_one = OneByOne.new - @executor = Concurrent.configuration.global_task_pool # TODO make configurable - @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) - @name = (Type! name, String, Symbol).to_s - @children = [] + # @option opts [String] name + # @option opts [Reference, nil] parent of an actor spawning this one + # @option opts [Context] actress_class a class to be instantiated defining Actor's behaviour + # @option opts [Array] args arguments for actress_class instantiation + # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` + # @param [Proc] block for class instantiation + def initialize(opts = {}, &block) + @mailbox = Array.new + @one_by_one = OneByOne.new + # noinspection RubyArgCount + @terminated = Event.new + @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor + @children = [] + @reference = Reference.new self + @name = (Type! opts.fetch(:name), String, Symbol).to_s + + parent = opts[:parent] + @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) + if @parent_core.nil? && @name != '/' + raise 'only root has no parent' + end + @path = @parent_core ? File.join(@parent_core.path, @name) : @name @logger = Logger.new($stderr) # TODO add proper logging @logger.progname = @path - @reference = Reference.new self - # noinspection RubyArgCount - @terminated = Event.new - parent_core.add_child reference if parent_core + @parent_core.add_child reference if @parent_core + + @actress_class = actress_class = Child! opts.fetch(:class), Context + args = opts.fetch(:args, []) - @actress_class = Child! actress_class, Context schedule_execution do begin @actress = actress_class.new *args, &block @@ -42,9 +51,9 @@ def initialize(parent, name, actress_class, *args, &block) end end - # @return [Reference] of parent actor + # @return [Reference, nil] of parent actor def parent - @parent_core.reference + @parent_core && @parent_core.reference end # @return [Array] of children actors @@ -155,7 +164,7 @@ def schedule_execution Thread.current[:__current_actress__] = reference yield rescue => e - logger.error e + logger.fatal e ensure Thread.current[:__current_actress__] = nil end diff --git a/spec/actress.rb b/spec/actress.rb index cd1c02be2..7f9d1fec1 100644 --- a/spec/actress.rb +++ b/spec/actress.rb @@ -36,18 +36,19 @@ def on_message(message) def assert condition unless condition - require 'pry' - binding.pry + # require 'pry' + # binding.pry raise + puts "--- \n#{caller.join("\n")}" end end -Array.new(100).map do - Thread.new do - 20.times do |i| - # it format('--- %3d ---', i) do - puts format('--- %3d ---', i) +# it format('--- %3d ---', i) do +10.times do |i| + puts format('--- %3d ---', i) + Array.new(10).map do + Thread.new do # trace! do queue = Queue.new actor = Ping.spawn :ping, queue @@ -71,8 +72,7 @@ def assert condition actor << :terminate assert actor.ask(:blow_up).wait.rejected? end - end -end.each(&:join) - -# end + end.each(&:join) +end +# end # end From 363e7b8241111bd8582f17ca772df7a1d73a8a02 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 22 May 2014 18:17:27 +0200 Subject: [PATCH 14/30] configurable log level --- lib/concurrent/actress/core.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 17ba37b96..b007816bd 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -32,7 +32,9 @@ def initialize(opts = {}, &block) end @path = @parent_core ? File.join(@parent_core.path, @name) : @name - @logger = Logger.new($stderr) # TODO add proper logging + # TODO add proper logging + @logger = Logger.new($stderr) + @logger.level = opts.fetch(:logger_level, 1) @logger.progname = @path @parent_core.add_child reference if @parent_core From 55d2ab69a600e83690ff2db9a9808e1f1c385cd4 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 22 May 2014 18:18:59 +0200 Subject: [PATCH 15/30] Terminate on error in message processing since we do not know ho to restart yet minor updates in logging and comments --- lib/concurrent/actress/core.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index b007816bd..cb43331f9 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -48,7 +48,7 @@ def initialize(opts = {}, &block) @actress.send :initialize_core, self rescue => ex puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}" - terminate! # TODO test that this is ok + terminate! # TODO test this end end end @@ -105,6 +105,7 @@ def terminated? def terminate! guard! @terminated.set + parent_core.remove_child reference if parent_core @mailbox.each do |envelope| reject_envelope envelope @@ -143,7 +144,7 @@ def receive_envelope if terminated? reject_envelope envelope - puts "this should not happen" + logger.fatal "this should not be happening #{caller[0]}" end logger.debug "received #{envelope.message} from #{envelope.sender_path}" @@ -153,6 +154,7 @@ def receive_envelope rescue => error logger.error error envelope.ivar.fail error unless envelope.ivar.nil? + terminate! ensure @receive_envelope_scheduled = false process_envelopes? From 1d203051f59e9e8f673dcaba5419b29006b9a749 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 22 May 2014 18:19:54 +0200 Subject: [PATCH 16/30] Add basic spec --- spec/actress.rb | 78 ----------------------------- spec/concurrent/actress_spec.rb | 88 +++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 78 deletions(-) delete mode 100644 spec/actress.rb create mode 100644 spec/concurrent/actress_spec.rb diff --git a/spec/actress.rb b/spec/actress.rb deleted file mode 100644 index 7f9d1fec1..000000000 --- a/spec/actress.rb +++ /dev/null @@ -1,78 +0,0 @@ -# require 'spec_helper' -require 'concurrent/actress' - -# describe Concurrent::Actress do -# FIXME it does not work in Rspec environment - -class Ping - include Concurrent::Actress::Context - - def initialize(queue) - @queue = queue - end - - def on_message(message) - case message - when :terminate - terminate! - when :child - Concurrent::Actress::AdHoc.spawn(:pong) { -> m { @queue << m } } - else - @queue << message - message - end - end -end - -# def trace! -# set_trace_func proc { |event, file, line, id, binding, classname| -# # thread = eval('Thread.current', binding).object_id.to_s(16) -# printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line -# } -# yield -# ensure -# set_trace_func nil -# end - -def assert condition - unless condition - # require 'pry' - # binding.pry - raise - puts "--- \n#{caller.join("\n")}" - end -end - - -# it format('--- %3d ---', i) do -10.times do |i| - puts format('--- %3d ---', i) - Array.new(10).map do - Thread.new do - # trace! do - queue = Queue.new - actor = Ping.spawn :ping, queue - - # when spawn returns children are set - assert Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).include?(actor) - - actor << 'a' << 1 - assert queue.pop == 'a' - assert actor.ask(2).value == 2 - - assert actor.parent == Concurrent::Actress::ROOT - assert Concurrent::Actress::ROOT.path == '/' - assert actor.path == '/ping' - child = actor.ask(:child).value - assert child.path == '/ping/pong' - queue.clear - child.ask(3) - assert queue.pop == 3 - - actor << :terminate - assert actor.ask(:blow_up).wait.rejected? - end - end.each(&:join) -end -# end -# end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb new file mode 100644 index 000000000..cf94bf89c --- /dev/null +++ b/spec/concurrent/actress_spec.rb @@ -0,0 +1,88 @@ +require 'spec_helper' +require 'concurrent/actress' + +EXECUTOR = Concurrent::ThreadPoolExecutor.new( + min_threads: [2, Concurrent.processor_count].max, + max_threads: [20, Concurrent.processor_count * 15].max, + idletime: 2 * 60, # 2 minutes + max_queue: 0, # unlimited + overflow_policy: :abort # raise an exception +) + +describe Concurrent::Actress do + + class Ping + include Concurrent::Actress::Context + + def initialize(queue) + @queue = queue + end + + def on_message(message) + case message + when :terminate + terminate! + when :child + Concurrent::Actress::AdHoc.spawn(name: :pong, executor: EXECUTOR) { -> m { @queue << m } } + else + @queue << message + message + end + end + end + + # def trace! + # set_trace_func proc { |event, file, line, id, binding, classname| + # # thread = eval('Thread.current', binding).object_id.to_s(16) + # printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line + # } + # yield + # ensure + # set_trace_func nil + # end + + def assert condition + unless condition + # require 'pry' + # binding.pry + raise + puts "--- \n#{caller.join("\n")}" + end + end + + + # FIXME increasing this does not work in Rspec environment + 1.times do |i| + it format('run %3d', i) do + # puts format('run %3d', i) + Array.new(10).map do + Thread.new do + 10.times do + # trace! do + queue = Queue.new + actor = Ping.spawn name: :ping, executor: EXECUTOR, args: [queue] + + # when spawn returns children are set + assert Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).include?(actor) + + actor << 'a' << 1 + assert queue.pop == 'a' + assert actor.ask(2).value == 2 + + assert actor.parent == Concurrent::Actress::ROOT + assert Concurrent::Actress::ROOT.path == '/' + assert actor.path == '/ping' + child = actor.ask(:child).value + assert child.path == '/ping/pong' + queue.clear + child.ask(3) + assert queue.pop == 3 + + actor << :terminate + assert actor.ask(:blow_up).wait.rejected? + end + end + end.each(&:join) + end + end +end From 67a06710b5b421b59d462afd2a20cc8ee176dd6b Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 23 May 2014 13:24:50 +0200 Subject: [PATCH 17/30] Replace Array of children with more effective Set --- lib/concurrent/actress/core.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index cb43331f9..aedd6718b 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -1,6 +1,8 @@ module Concurrent module Actress + require 'set' + # Core of the actor # @api private class Core @@ -21,7 +23,7 @@ def initialize(opts = {}, &block) # noinspection RubyArgCount @terminated = Event.new @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor - @children = [] + @children = Set.new @reference = Reference.new self @name = (Type! opts.fetch(:name), String, Symbol).to_s @@ -61,13 +63,14 @@ def parent # @return [Array] of children actors def children guard! - @children.dup + @children.to_a end # @api private def add_child(child) guard! - @children << (Type! child, Reference) + Type! child, Reference + @children.add child self end From 314a26dc0181d1729f3b1ea3a7a2a77a0ad542c0 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 23 May 2014 13:25:38 +0200 Subject: [PATCH 18/30] Make name and executor publicly accessible --- lib/concurrent/actress/core_delegations.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/concurrent/actress/core_delegations.rb b/lib/concurrent/actress/core_delegations.rb index eba81461d..806b721df 100644 --- a/lib/concurrent/actress/core_delegations.rb +++ b/lib/concurrent/actress/core_delegations.rb @@ -1,8 +1,12 @@ module Concurrent module Actress - # delegates publicly expose-able methods calls to Core + # Provides publicly expose-able methods from {Core}. module CoreDelegations + def name + core.name + end + def path core.path end @@ -19,6 +23,10 @@ def reference core.reference end + def executor + core.executor + end + alias_method :ref, :reference end end From ce4fe75787d5ef16da89505aecbe8364310aebec Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 23 May 2014 13:26:36 +0200 Subject: [PATCH 19/30] Fix attar accessibility of @parent_core and @logger --- lib/concurrent/actress/core.rb | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index aedd6718b..3243a2262 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -8,8 +8,7 @@ module Actress class Core include TypeCheck - attr_reader :reference, :name, :path, :logger, :parent_core - private :parent_core + attr_reader :reference, :name, :path, :executor # @option opts [String] name # @option opts [Reference, nil] parent of an actor spawning this one @@ -49,8 +48,8 @@ def initialize(opts = {}, &block) @actress = actress_class.new *args, &block @actress.send :initialize_core, self rescue => ex - puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}" - terminate! # TODO test this + @logger.error ex + terminate! end end end @@ -109,10 +108,10 @@ def terminate! guard! @terminated.set - parent_core.remove_child reference if parent_core + @parent_core.remove_child reference if @parent_core @mailbox.each do |envelope| reject_envelope envelope - logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" + @logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" end @mailbox.clear # TODO terminate all children @@ -147,15 +146,15 @@ def receive_envelope if terminated? reject_envelope envelope - logger.fatal "this should not be happening #{caller[0]}" + @logger.fatal "this should not be happening #{caller[0]}" end - logger.debug "received #{envelope.message} from #{envelope.sender_path}" + @logger.debug "received #{envelope.message} from #{envelope.sender_path}" result = @actress.on_envelope envelope envelope.ivar.set result unless envelope.ivar.nil? rescue => error - logger.error error + @logger.error error envelope.ivar.fail error unless envelope.ivar.nil? terminate! ensure @@ -171,7 +170,7 @@ def schedule_execution Thread.current[:__current_actress__] = reference yield rescue => e - logger.fatal e + @logger.fatal e ensure Thread.current[:__current_actress__] = nil end From 8217cb4d9386c8e794aeee07748aef46ce121bbb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 23 May 2014 13:27:39 +0200 Subject: [PATCH 20/30] AdHoc calls message processing block with instance_exec and it accepts *args --- lib/concurrent/actress/ad_hoc.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/actress/ad_hoc.rb b/lib/concurrent/actress/ad_hoc.rb index 88b7f60d9..d4b49a386 100644 --- a/lib/concurrent/actress/ad_hoc.rb +++ b/lib/concurrent/actress/ad_hoc.rb @@ -2,12 +2,12 @@ module Concurrent module Actress class AdHoc include Context - def initialize(&initializer) - @on_message = Type! initializer.call, Proc + def initialize(*args, &initializer) + @on_message = Type! initializer.call(*args), Proc end def on_message(message) - @on_message.call message + instance_exec message, &@on_message end end end From b021a94bfc51ccf5ca537ad90a4fceccb78c7304 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 23 May 2014 13:30:02 +0200 Subject: [PATCH 21/30] Add Actress specs --- lib/concurrent/actress/reference.rb | 4 +- lib/concurrent/executor/executor.rb | 5 + lib/concurrent/executor/immediate_executor.rb | 1 + .../executor/ruby_single_thread_executor.rb | 2 +- .../executor/ruby_thread_pool_executor.rb | 2 +- lib/concurrent/executor/timer_set.rb | 2 +- lib/concurrent/timer_task.rb | 2 +- spec/concurrent/actress_spec.rb | 219 ++++++++++++------ spec/concurrent/atomic/atomic_boolean_spec.rb | 2 +- spec/concurrent/atomic/atomic_fixnum_spec.rb | 2 +- .../atomic/count_down_latch_spec.rb | 2 +- .../collection/priority_queue_spec.rb | 2 +- spec/concurrent/configuration_spec.rb | 1 + .../executor/java_cached_thread_pool_spec.rb | 2 +- .../executor/java_fixed_thread_pool_spec.rb | 2 +- .../java_single_thread_executor_spec.rb | 2 +- .../java_thread_pool_executor_spec.rb | 2 +- spec/concurrent/scheduled_task_spec.rb | 1 + spec/concurrent/timer_task_spec.rb | 63 ++--- spec/spec_helper.rb | 16 -- spec/support/example_group_extensions.rb | 48 ++++ spec/support/functions.rb | 25 -- 22 files changed, 250 insertions(+), 157 deletions(-) create mode 100644 spec/support/example_group_extensions.rb delete mode 100644 spec/support/functions.rb diff --git a/lib/concurrent/actress/reference.rb b/lib/concurrent/actress/reference.rb index afaef63a0..16d748d64 100644 --- a/lib/concurrent/actress/reference.rb +++ b/lib/concurrent/actress/reference.rb @@ -1,7 +1,9 @@ module Concurrent module Actress - # Reference serves as public interface to send messages to Actors and can freely passed around + # Reference is public interface of Actor instances. It is used for sending messages and can + # be freely passed around the program. It also provides some basic information about the actor + # see {CoreDelegations} class Reference include TypeCheck include CoreDelegations diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index 3c1adae24..253909727 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -4,6 +4,10 @@ module Concurrent module Executor + end + + module RubyExecutor + include Executor # Submit a task to the executor for asynchronous processing. # @@ -120,6 +124,7 @@ def kill_execution if RUBY_PLATFORM == 'java' module JavaExecutor + include Executor # Submit a task to the executor for asynchronous processing. # diff --git a/lib/concurrent/executor/immediate_executor.rb b/lib/concurrent/executor/immediate_executor.rb index 929133241..2f0653be0 100644 --- a/lib/concurrent/executor/immediate_executor.rb +++ b/lib/concurrent/executor/immediate_executor.rb @@ -1,5 +1,6 @@ module Concurrent class ImmediateExecutor + include Executor def post(*args, &task) raise ArgumentError.new('no block given') unless block_given? diff --git a/lib/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent/executor/ruby_single_thread_executor.rb index ec967a3b8..194641f9f 100644 --- a/lib/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent/executor/ruby_single_thread_executor.rb @@ -4,7 +4,7 @@ module Concurrent # @!macro single_thread_executor class RubySingleThreadExecutor - include Executor + include RubyExecutor # Create a new thread pool. # diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index e254de2cc..544fc9bad 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -8,7 +8,7 @@ module Concurrent # @!macro thread_pool_executor class RubyThreadPoolExecutor - include Executor + include RubyExecutor # Default maximum number of threads that will be created in the pool. DEFAULT_MAX_POOL_SIZE = 2**15 # 32768 diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 7fe81ef61..a30667718 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -11,7 +11,7 @@ module Concurrent # monitors the set and schedules each task for execution at the appropriate # time. Tasks are run on the global task pool or on the supplied executor. class TimerSet - include Executor + include RubyExecutor # Create a new set of timed tasks. # diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index 6209c7101..9fd4e71d4 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -146,7 +146,7 @@ module Concurrent # @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html class TimerTask include Dereferenceable - include Executor + include RubyExecutor include Concurrent::Observable # Default `:execution_interval` in seconds. diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index cf94bf89c..e57d174ea 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -1,88 +1,163 @@ require 'spec_helper' require 'concurrent/actress' -EXECUTOR = Concurrent::ThreadPoolExecutor.new( - min_threads: [2, Concurrent.processor_count].max, - max_threads: [20, Concurrent.processor_count * 15].max, - idletime: 2 * 60, # 2 minutes - max_queue: 0, # unlimited - overflow_policy: :abort # raise an exception -) +module Concurrent + module Actress + describe 'Concurrent::Actress' do -describe Concurrent::Actress do + class Ping + include Context - class Ping - include Concurrent::Actress::Context + def initialize(queue) + @queue = queue + end - def initialize(queue) - @queue = queue - end + def on_message(message) + case message + when :terminate + terminate! + when :child + AdHoc.spawn(:pong, @queue) { |queue| -> m { queue << m } } + else + @queue << message + message + end + end + end - def on_message(message) - case message - when :terminate - terminate! - when :child - Concurrent::Actress::AdHoc.spawn(name: :pong, executor: EXECUTOR) { -> m { @queue << m } } - else - @queue << message - message + # def trace! + # set_trace_func proc { |event, file, line, id, binding, classname| + # # thread = eval('Thread.current', binding).object_id.to_s(16) + # printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line + # } + # yield + # ensure + # set_trace_func nil + # end + + def assert condition + unless condition + # require 'pry' + # binding.pry + raise + puts "--- \n#{caller.join("\n")}" + end end - end - end - # def trace! - # set_trace_func proc { |event, file, line, id, binding, classname| - # # thread = eval('Thread.current', binding).object_id.to_s(16) - # printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line - # } - # yield - # ensure - # set_trace_func nil - # end - - def assert condition - unless condition - # require 'pry' - # binding.pry - raise - puts "--- \n#{caller.join("\n")}" - end - end + describe 'stress test' do + 1.times do |i| + it format('run %3d', i) do + # puts format('run %3d', i) + Array.new(10).map do + Thread.new do + 10.times do + # trace! do + queue = Queue.new + actor = Ping.spawn :ping, queue + + # when spawn returns children are set + assert Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).include?(actor) + + actor << 'a' << 1 + assert queue.pop == 'a' + assert actor.ask(2).value == 2 + assert actor.parent == Concurrent::Actress::ROOT + assert Concurrent::Actress::ROOT.path == '/' + assert actor.path == '/ping' + child = actor.ask(:child).value + assert child.path == '/ping/pong' + queue.clear + child.ask(3) + assert queue.pop == 3 - # FIXME increasing this does not work in Rspec environment - 1.times do |i| - it format('run %3d', i) do - # puts format('run %3d', i) - Array.new(10).map do - Thread.new do - 10.times do - # trace! do - queue = Queue.new - actor = Ping.spawn name: :ping, executor: EXECUTOR, args: [queue] - - # when spawn returns children are set - assert Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).include?(actor) - - actor << 'a' << 1 - assert queue.pop == 'a' - assert actor.ask(2).value == 2 - - assert actor.parent == Concurrent::Actress::ROOT - assert Concurrent::Actress::ROOT.path == '/' - assert actor.path == '/ping' - child = actor.ask(:child).value - assert child.path == '/ping/pong' - queue.clear - child.ask(3) - assert queue.pop == 3 - - actor << :terminate - assert actor.ask(:blow_up).wait.rejected? + actor << :terminate + assert actor.ask(:blow_up).wait.rejected? + end + end + end.each(&:join) end end - end.each(&:join) + end + + describe 'spawning' do + describe 'Actress#spawn' do + behaviour = -> v { -> _ { v } } + subjects = { spawn: -> { Actress.spawn(AdHoc, :ping, 'arg', &behaviour) }, + context_spawn: -> { AdHoc.spawn(:ping, 'arg', &behaviour) }, + spawn_by_hash: -> { Actress.spawn(class: AdHoc, name: :ping, args: ['arg'], &behaviour) }, + context_spawn_by_hash: -> { AdHoc.spawn(name: :ping, args: ['arg'], &behaviour) } } + + subjects.each do |desc, subject_definition| + describe desc do + subject &subject_definition + its(:path) { should eq '/ping' } + its(:parent) { should eq ROOT } + its(:name) { should eq 'ping' } + its(:executor) { should eq Concurrent.configuration.global_task_pool } + its(:reference) { should eq subject } + it 'returns ars' do + subject.ask!(:anything).should eq 'arg' + end + end + end + end + + it 'terminates on failed initialization' do + a = AdHoc.spawn(name: :fail, logger_level: 4) { raise } + a.ask(nil).wait.rejected?.should be_true + a.terminated?.should be_true + end + + it 'terminates on failed message processing' do + a = AdHoc.spawn(name: :fail, logger_level: 4) { -> _ { raise } } + a.ask(nil).wait.rejected?.should be_true + a.terminated?.should be_true + end + end + + describe 'messaging' do + subject { AdHoc.spawn(:add) { c = 0; -> v { c = c + v } } } + specify do + subject.tell(1).tell(1) + subject << 1 << 1 + subject.ask(0).value!.should eq 4 + end + end + + describe 'children' do + let(:parent) do + AdHoc.spawn(:parent) do + -> message do + if message == :child + AdHoc.spawn(:child) { -> _ { parent } } + else + children + end + end + end + end + + it 'has children set after a child is created' do + child = parent.ask!(:child) + parent.ask!(nil).should include(child) + child.ask!(nil).should eq parent + end + end + + describe 'envelope' do + subject { AdHoc.spawn(:subject) { -> _ { envelope } } } + specify do + envelope = subject.ask!('a') + envelope.should be_a_kind_of Envelope + envelope.message.should eq 'a' + envelope.ivar.should be_completed + envelope.ivar.value.should eq envelope + envelope.sender.should eq Thread.current + end + end end + end end + diff --git a/spec/concurrent/atomic/atomic_boolean_spec.rb b/spec/concurrent/atomic/atomic_boolean_spec.rb index 896e71f03..d87b71e86 100644 --- a/spec/concurrent/atomic/atomic_boolean_spec.rb +++ b/spec/concurrent/atomic/atomic_boolean_spec.rb @@ -151,7 +151,7 @@ module Concurrent end end - if jruby? + if TestHelpers.jruby? describe JavaAtomicBoolean do it_should_behave_like :atomic_boolean diff --git a/spec/concurrent/atomic/atomic_fixnum_spec.rb b/spec/concurrent/atomic/atomic_fixnum_spec.rb index 52bae798f..282175ece 100644 --- a/spec/concurrent/atomic/atomic_fixnum_spec.rb +++ b/spec/concurrent/atomic/atomic_fixnum_spec.rb @@ -165,7 +165,7 @@ module Concurrent end end - if jruby? + if TestHelpers.jruby? describe JavaAtomicFixnum do it_should_behave_like :atomic_fixnum diff --git a/spec/concurrent/atomic/count_down_latch_spec.rb b/spec/concurrent/atomic/count_down_latch_spec.rb index b4b82be41..8e3a58607 100644 --- a/spec/concurrent/atomic/count_down_latch_spec.rb +++ b/spec/concurrent/atomic/count_down_latch_spec.rb @@ -129,7 +129,7 @@ def subject.simulate_spurious_wake_up end end - if jruby? + if TestHelpers.jruby? describe JavaCountDownLatch do diff --git a/spec/concurrent/collection/priority_queue_spec.rb b/spec/concurrent/collection/priority_queue_spec.rb index e23f61333..e942a5c06 100644 --- a/spec/concurrent/collection/priority_queue_spec.rb +++ b/spec/concurrent/collection/priority_queue_spec.rb @@ -295,7 +295,7 @@ module Concurrent it_should_behave_like :priority_queue end - if jruby? + if TestHelpers.jruby? describe JavaPriorityQueue do diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index 3ee69c4fe..0e8b2ca0f 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -3,6 +3,7 @@ module Concurrent describe Configuration do + with_full_reset it 'creates a global timer pool' do Concurrent.configuration.global_timer_set.should_not be_nil diff --git a/spec/concurrent/executor/java_cached_thread_pool_spec.rb b/spec/concurrent/executor/java_cached_thread_pool_spec.rb index 106756735..395ebb0e8 100644 --- a/spec/concurrent/executor/java_cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_cached_thread_pool_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'cached_thread_pool_shared' diff --git a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb index 71527ac5b..0f8dd810a 100644 --- a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'fixed_thread_pool_shared' diff --git a/spec/concurrent/executor/java_single_thread_executor_spec.rb b/spec/concurrent/executor/java_single_thread_executor_spec.rb index e6bf86c21..81df30176 100644 --- a/spec/concurrent/executor/java_single_thread_executor_spec.rb +++ b/spec/concurrent/executor/java_single_thread_executor_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'thread_pool_shared' diff --git a/spec/concurrent/executor/java_thread_pool_executor_spec.rb b/spec/concurrent/executor/java_thread_pool_executor_spec.rb index 33d6368ea..57fb7c747 100644 --- a/spec/concurrent/executor/java_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/java_thread_pool_executor_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'thread_pool_executor_shared' diff --git a/spec/concurrent/scheduled_task_spec.rb b/spec/concurrent/scheduled_task_spec.rb index afd77ab24..4922e5805 100644 --- a/spec/concurrent/scheduled_task_spec.rb +++ b/spec/concurrent/scheduled_task_spec.rb @@ -7,6 +7,7 @@ module Concurrent describe ScheduledTask do + with_full_reset context 'behavior' do diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 27ace0c39..6b47512d1 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -5,6 +5,7 @@ module Concurrent describe TimerTask do + with_full_reset before(:each) do # suppress deprecation warnings. @@ -23,13 +24,13 @@ module Concurrent end def dereferenceable_subject(value, opts = {}) - opts = opts.merge(execution_interval: 0.1, run_now: true) - @subject = TimerTask.new(opts){ value }.execute.tap{ sleep(0.1) } + opts = opts.merge(execution_interval: 0.1, run_now: true) + @subject = TimerTask.new(opts) { value }.execute.tap { sleep(0.1) } end def dereferenceable_observable(opts = {}) - opts = opts.merge(execution_interval: 0.1, run_now: true) - @subject = TimerTask.new(opts){ 'value' } + opts = opts.merge(execution_interval: 0.1, run_now: true) + @subject = TimerTask.new(opts) { 'value' } end def execute_dereferenceable(subject) @@ -42,9 +43,9 @@ def execute_dereferenceable(subject) context :observable do - subject{ TimerTask.new(execution_interval: 0.1){ nil } } + subject { TimerTask.new(execution_interval: 0.1) { nil } } - after(:each){ subject.kill } + after(:each) { subject.kill } def trigger_observable(observable) observable.execute @@ -66,45 +67,45 @@ def trigger_observable(observable) it 'raises an exception if :execution_interval is not greater than zero' do lambda { - Concurrent::TimerTask.new(execution_interval: 0){ nil } + Concurrent::TimerTask.new(execution_interval: 0) { nil } }.should raise_error(ArgumentError) end it 'raises an exception if :execution_interval is not an integer' do lambda { - Concurrent::TimerTask.new(execution_interval: 'one'){ nil } + Concurrent::TimerTask.new(execution_interval: 'one') { nil } }.should raise_error(ArgumentError) end it 'raises an exception if :timeout_interval is not greater than zero' do lambda { - Concurrent::TimerTask.new(timeout_interval: 0){ nil } + Concurrent::TimerTask.new(timeout_interval: 0) { nil } }.should raise_error(ArgumentError) end it 'raises an exception if :timeout_interval is not an integer' do lambda { - Concurrent::TimerTask.new(timeout_interval: 'one'){ nil } + Concurrent::TimerTask.new(timeout_interval: 'one') { nil } }.should raise_error(ArgumentError) end it 'uses the default execution interval when no interval is given' do - subject = TimerTask.new{ nil } + subject = TimerTask.new { nil } subject.execution_interval.should eq TimerTask::EXECUTION_INTERVAL end it 'uses the default timeout interval when no interval is given' do - subject = TimerTask.new{ nil } + subject = TimerTask.new { nil } subject.timeout_interval.should eq TimerTask::TIMEOUT_INTERVAL end it 'uses the given execution interval' do - subject = TimerTask.new(execution_interval: 5){ nil } + subject = TimerTask.new(execution_interval: 5) { nil } subject.execution_interval.should eq 5 end it 'uses the given timeout interval' do - subject = TimerTask.new(timeout_interval: 5){ nil } + subject = TimerTask.new(timeout_interval: 5) { nil } subject.timeout_interval.should eq 5 end end @@ -112,7 +113,7 @@ def trigger_observable(observable) context '#kill' do it 'returns true on success' do - task = TimerTask.execute(run_now: false){ nil } + task = TimerTask.execute(run_now: false) { nil } sleep(0.1) task.kill.should be_true end @@ -129,7 +130,7 @@ def trigger_observable(observable) specify '#execution_interval is writeable' do - latch = CountDownLatch.new(1) + latch = CountDownLatch.new(1) subject = TimerTask.new(execution_interval: 1) do |task| task.execution_interval = 3 latch.count_down @@ -148,7 +149,7 @@ def trigger_observable(observable) specify '#execution_interval is writeable' do - latch = CountDownLatch.new(1) + latch = CountDownLatch.new(1) subject = TimerTask.new(timeout_interval: 1, execution_interval: 0.1) do |task| task.timeout_interval = 3 latch.count_down @@ -169,23 +170,23 @@ def trigger_observable(observable) context 'execution' do it 'runs the block immediately when the :run_now option is true' do - latch = CountDownLatch.new(1) - subject = TimerTask.execute(execution: 500, now: true){ latch.count_down } + latch = CountDownLatch.new(1) + subject = TimerTask.execute(execution: 500, now: true) { latch.count_down } latch.wait(1).should be_true subject.kill end it 'waits for :execution_interval seconds when the :run_now option is false' do - latch = CountDownLatch.new(1) - subject = TimerTask.execute(execution: 0.1, now: false){ latch.count_down } + latch = CountDownLatch.new(1) + subject = TimerTask.execute(execution: 0.1, now: false) { latch.count_down } latch.count.should eq 1 latch.wait(1).should be_true subject.kill end it 'waits for :execution_interval seconds when the :run_now option is not given' do - latch = CountDownLatch.new(1) - subject = TimerTask.execute(execution: 0.1, now: false){ latch.count_down } + latch = CountDownLatch.new(1) + subject = TimerTask.execute(execution: 0.1, now: false) { latch.count_down } latch.count.should eq 1 latch.wait(1).should be_true subject.kill @@ -193,8 +194,8 @@ def trigger_observable(observable) it 'passes a "self" reference to the block as the sole argument' do expected = nil - latch = CountDownLatch.new(1) - subject = TimerTask.new(execution_interval: 1, run_now: true) do |task| + latch = CountDownLatch.new(1) + subject = TimerTask.new(execution_interval: 1, run_now: true) do |task| expected = task latch.sount_down end @@ -213,18 +214,18 @@ def trigger_observable(observable) attr_reader :value attr_reader :ex attr_reader :latch - define_method(:initialize){ @latch = CountDownLatch.new(1) } + define_method(:initialize) { @latch = CountDownLatch.new(1) } define_method(:update) do |time, value, ex| - @time = time + @time = time @value = value - @ex = ex + @ex = ex @latch.count_down end end.new end it 'notifies all observers on success' do - subject = TimerTask.new(execution: 0.1){ 42 } + subject = TimerTask.new(execution: 0.1) { 42 } subject.add_observer(observer) subject.execute observer.latch.wait(1) @@ -234,7 +235,7 @@ def trigger_observable(observable) end it 'notifies all observers on timeout' do - subject = TimerTask.new(execution: 0.1, timeout: 0.1){ sleep } + subject = TimerTask.new(execution: 0.1, timeout: 0.1) { sleep } subject.add_observer(observer) subject.execute observer.latch.wait(1) @@ -244,7 +245,7 @@ def trigger_observable(observable) end it 'notifies all observers on error' do - subject = TimerTask.new(execution: 0.1){ raise ArgumentError } + subject = TimerTask.new(execution: 0.1) { raise ArgumentError } subject.add_observer(observer) subject.execute observer.latch.wait(1) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 80a5d7806..270e3829d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -22,20 +22,4 @@ RSpec.configure do |config| config.order = 'random' - - config.before(:suite) do - end - - config.before(:each) do - reset_gem_configuration - end - - config.after(:each) do - Thread.list.each do |thread| - thread.kill unless thread == Thread.current - end - end - - config.after(:suite) do - end end diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb new file mode 100644 index 000000000..b97385818 --- /dev/null +++ b/spec/support/example_group_extensions.rb @@ -0,0 +1,48 @@ +require 'rbconfig' + +module Concurrent + module TestHelpers + def delta(v1, v2) + if block_given? + v1 = yield(v1) + v2 = yield(v2) + end + return (v1 - v2).abs + end + + def mri? + RbConfig::CONFIG['ruby_install_name']=~ /^ruby$/i + end + + def jruby? + RbConfig::CONFIG['ruby_install_name']=~ /^jruby$/i + end + + def rbx? + RbConfig::CONFIG['ruby_install_name']=~ /^rbx$/i + end + + def reset_gem_configuration + Concurrent.instance_variable_set(:@configuration, Concurrent::Configuration.new) + end + + extend self + end +end + +class RSpec::Core::ExampleGroup + def self.with_full_reset + before(:each) do + reset_gem_configuration + end + + after(:each) do + Thread.list.each do |thread| + thread.kill unless thread == Thread.current + end + end + end + + include Concurrent::TestHelpers + extend Concurrent::TestHelpers +end diff --git a/spec/support/functions.rb b/spec/support/functions.rb deleted file mode 100644 index f21856d39..000000000 --- a/spec/support/functions.rb +++ /dev/null @@ -1,25 +0,0 @@ -require 'rbconfig' - -def delta(v1, v2) - if block_given? - v1 = yield(v1) - v2 = yield(v2) - end - return (v1 - v2).abs -end - -def mri? - RbConfig::CONFIG['ruby_install_name']=~ /^ruby$/i -end - -def jruby? - RbConfig::CONFIG['ruby_install_name']=~ /^jruby$/i -end - -def rbx? - RbConfig::CONFIG['ruby_install_name']=~ /^rbx$/i -end - -def reset_gem_configuration - Concurrent.instance_variable_set(:@configuration, Concurrent::Configuration.new) -end From ae4331af1421c6b969f9c97f5e2590a2c38dfc90 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 09:52:28 +0200 Subject: [PATCH 22/30] Do not allow running jobs with OneOnOne on executors which may averflow --- lib/concurrent/executor/executor.rb | 3 +++ lib/concurrent/executor/java_thread_pool_executor.rb | 4 ++++ lib/concurrent/executor/one_by_one.rb | 4 ++++ lib/concurrent/executor/ruby_thread_pool_executor.rb | 4 ++++ 4 files changed, 15 insertions(+) diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index 253909727..b4784524d 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -4,6 +4,9 @@ module Concurrent module Executor + def can_overflow? + false + end end module RubyExecutor diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index 745256487..278acedbe 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -89,6 +89,10 @@ def initialize(opts = {}) set_shutdown_hook end + def can_overflow? + @max_queue != 0 + end + # The minimum number of threads that may be retained in the pool. # # @return [Integer] the min_length diff --git a/lib/concurrent/executor/one_by_one.rb b/lib/concurrent/executor/one_by_one.rb index 5192adb57..a5594a4cb 100644 --- a/lib/concurrent/executor/one_by_one.rb +++ b/lib/concurrent/executor/one_by_one.rb @@ -30,6 +30,10 @@ def initialize # @raise [ArgumentError] if no task is given def post(executor, *args, &task) return nil if task.nil? + if executor.can_overflow? + raise ArgumentError, 'OneByOne cannot be used in conjunction with executor which may overflow' + end + job = Job.new executor, args, task begin diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index 544fc9bad..88cec0dc4 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -99,6 +99,10 @@ def initialize(opts = {}) @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max end + def can_overflow? + @max_queue != 0 + end + # The number of threads currently in the pool. # # @return [Integer] the length From dbd1144675199268ecea3e65cf46737fa1cde9c9 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 09:55:20 +0200 Subject: [PATCH 23/30] Add configurable logging --- lib/concurrent/actress.rb | 1 + lib/concurrent/actress/core.rb | 26 +++++++++++++++----------- lib/concurrent/agent.rb | 1 - lib/concurrent/configuration.rb | 7 +++++++ lib/concurrent/logging.rb | 11 +++++++++++ spec/concurrent/actress_spec.rb | 4 ++-- spec/spec_helper.rb | 8 ++++++-- 7 files changed, 42 insertions(+), 16 deletions(-) create mode 100644 lib/concurrent/logging.rb diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 814cd1e91..9f9b62275 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -3,6 +3,7 @@ require 'concurrent/configuration' require 'concurrent/executor/one_by_one' require 'concurrent/ivar' +require 'concurrent/logging' module Concurrent diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 3243a2262..2813e6e4c 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -7,6 +7,7 @@ module Actress # @api private class Core include TypeCheck + include Concurrent::Logging attr_reader :reference, :name, :path, :executor @@ -15,6 +16,8 @@ class Core # @option opts [Context] actress_class a class to be instantiated defining Actor's behaviour # @option opts [Array] args arguments for actress_class instantiation # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` + # @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params, + # can be used to hook actor instance to any logging system # @param [Proc] block for class instantiation def initialize(opts = {}, &block) @mailbox = Array.new @@ -32,11 +35,8 @@ def initialize(opts = {}, &block) raise 'only root has no parent' end - @path = @parent_core ? File.join(@parent_core.path, @name) : @name - # TODO add proper logging - @logger = Logger.new($stderr) - @logger.level = opts.fetch(:logger_level, 1) - @logger.progname = @path + @path = @parent_core ? File.join(@parent_core.path, @name) : @name + @logger = opts[:logger] @parent_core.add_child reference if @parent_core @@ -48,7 +48,7 @@ def initialize(opts = {}, &block) @actress = actress_class.new *args, &block @actress.send :initialize_core, self rescue => ex - @logger.error ex + log ERROR, ex terminate! end end @@ -111,7 +111,7 @@ def terminate! @parent_core.remove_child reference if @parent_core @mailbox.each do |envelope| reject_envelope envelope - @logger.debug "rejected #{envelope.message} from #{envelope.sender_path}" + log DEBUG, "rejected #{envelope.message} from #{envelope.sender_path}" end @mailbox.clear # TODO terminate all children @@ -146,15 +146,15 @@ def receive_envelope if terminated? reject_envelope envelope - @logger.fatal "this should not be happening #{caller[0]}" + log FATAL, "this should not be happening #{caller[0]}" end - @logger.debug "received #{envelope.message} from #{envelope.sender_path}" + log DEBUG, "received #{envelope.message} from #{envelope.sender_path}" result = @actress.on_envelope envelope envelope.ivar.set result unless envelope.ivar.nil? rescue => error - @logger.error error + log ERROR, error envelope.ivar.fail error unless envelope.ivar.nil? terminate! ensure @@ -170,7 +170,7 @@ def schedule_execution Thread.current[:__current_actress__] = reference yield rescue => e - @logger.fatal e + log FATAL, e ensure Thread.current[:__current_actress__] = nil end @@ -181,6 +181,10 @@ def schedule_execution def reject_envelope(envelope) envelope.reject! ActressTerminated.new(reference) end + + def log(level, message = nil, &block) + super level, @path, message, &block + end end end end diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index a1ab4eb9f..5e78abf8e 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -200,7 +200,6 @@ def work(&handler) # :nodoc: validator, value = mutex.synchronize { [@validator, @value] } begin - # FIXME creates second thread result, valid = Concurrent::timeout(@timeout) do result = handler.call(value) [result, validator.call(result)] diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 0a71f8400..f3e6c59aa 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -10,11 +10,18 @@ module Concurrent # A gem-level configuration object. class Configuration + attr_accessor :logger + # Create a new configuration object. def initialize @global_task_pool = Delay.new { new_task_pool } @global_operation_pool = Delay.new { new_operation_pool } @global_timer_set = Delay.new { Concurrent::TimerSet.new } + @logger = no_logger + end + + def no_logger + -> (level, progname, message = nil, &block) {} end # Global thread pool optimized for short *tasks*. diff --git a/lib/concurrent/logging.rb b/lib/concurrent/logging.rb new file mode 100644 index 000000000..cac7173b3 --- /dev/null +++ b/lib/concurrent/logging.rb @@ -0,0 +1,11 @@ +require 'logger' + +module Concurrent + module Logging + include Logger::Severity + + def log(level, progname, message = nil, &block) + (@logger || Concurrent.configuration.logger).call level, progname, message, &block + end + end +end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index e57d174ea..d42a7d729 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -104,13 +104,13 @@ def assert condition end it 'terminates on failed initialization' do - a = AdHoc.spawn(name: :fail, logger_level: 4) { raise } + a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { raise } a.ask(nil).wait.rejected?.should be_true a.terminated?.should be_true end it 'terminates on failed message processing' do - a = AdHoc.spawn(name: :fail, logger_level: 4) { -> _ { raise } } + a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { -> _ { raise } } a.ask(nil).wait.rejected?.should be_true a.terminated?.should be_true end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 270e3829d..1728c2259 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,8 +2,8 @@ require 'coveralls' SimpleCov.formatter = SimpleCov::Formatter::MultiFormatter[ - SimpleCov::Formatter::HTMLFormatter, - Coveralls::SimpleCov::Formatter + SimpleCov::Formatter::HTMLFormatter, + Coveralls::SimpleCov::Formatter ] SimpleCov.start do @@ -17,6 +17,10 @@ require 'concurrent' +logger = Logger.new($stderr) +logger.level = Logger::INFO +Concurrent.configuration.logger = -> (level, progname, message = nil, &block) { logger.add level, message, progname, &block } + # import all the support files Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require File.expand_path(f) } From b0a68842013429ff80ce61e3a977d4d6e5bf2158 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 09:57:45 +0200 Subject: [PATCH 24/30] fix core methods return values --- lib/concurrent/actress/core.rb | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 2813e6e4c..9b0978b95 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -70,7 +70,7 @@ def add_child(child) guard! Type! child, Reference @children.add child - self + nil end # @api private @@ -79,7 +79,7 @@ def remove_child(child) Type! child, Reference @children.delete child end - self + nil end # is executed by Reference scheduling processing of new messages @@ -94,7 +94,7 @@ def on_envelope(envelope) end process_envelopes? end - self + nil end # @note Actor rejects envelopes when terminated. @@ -115,7 +115,8 @@ def terminate! end @mailbox.clear # TODO terminate all children - self + + nil end # @api private @@ -153,6 +154,8 @@ def receive_envelope result = @actress.on_envelope envelope envelope.ivar.set result unless envelope.ivar.nil? + + nil rescue => error log ERROR, error envelope.ivar.fail error unless envelope.ivar.nil? @@ -175,7 +178,8 @@ def schedule_execution Thread.current[:__current_actress__] = nil end end - self + + nil end def reject_envelope(envelope) From c643e885fd76a1953cdbf52e5ad393e59c7ce7c8 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 09:58:47 +0200 Subject: [PATCH 25/30] Expose terminated to public to be able to wait for termination --- lib/concurrent/actress/core.rb | 2 +- lib/concurrent/actress/core_delegations.rb | 4 ++++ spec/concurrent/actress_spec.rb | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 9b0978b95..86234e165 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -9,7 +9,7 @@ class Core include TypeCheck include Concurrent::Logging - attr_reader :reference, :name, :path, :executor + attr_reader :reference, :name, :path, :executor, :terminated # @option opts [String] name # @option opts [Reference, nil] parent of an actor spawning this one diff --git a/lib/concurrent/actress/core_delegations.rb b/lib/concurrent/actress/core_delegations.rb index 806b721df..4a0144bcb 100644 --- a/lib/concurrent/actress/core_delegations.rb +++ b/lib/concurrent/actress/core_delegations.rb @@ -19,6 +19,10 @@ def terminated? core.terminated? end + def terminated + core.terminated + end + def reference core.reference end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index d42a7d729..e18838a33 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -147,7 +147,7 @@ def assert condition describe 'envelope' do subject { AdHoc.spawn(:subject) { -> _ { envelope } } } - specify do + specify do envelope = subject.ask!('a') envelope.should be_a_kind_of Envelope envelope.message.should eq 'a' @@ -156,8 +156,8 @@ def assert condition envelope.sender.should eq Thread.current end end - end + end end end From bd65cd72356dea3d088460bfbd9cb62c529d0a53 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 10:07:01 +0200 Subject: [PATCH 26/30] Add spawn! method which raises when Context initialization fails opts[:initialized] is also added to be able to inject IVar to be set after Context Initialization --- lib/concurrent/actress.rb | 11 +++++++---- lib/concurrent/actress/context.rb | 24 ++++++++++++++++++------ lib/concurrent/actress/core.rb | 6 ++++++ spec/concurrent/actress_spec.rb | 6 ++++++ 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 9f9b62275..627740245 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -7,8 +7,6 @@ module Concurrent - # TODO broader description with examples - # # @example ping # class Ping # include Context @@ -39,8 +37,7 @@ class Root include Context # to allow spawning of new actors, spawn needs to be called inside the parent Actor def on_message(message) - case message.first - when :spawn + if message.is_a?(Array) && message.first == :spawn spawn message[1], &message[2] else # ignore @@ -52,6 +49,7 @@ def on_message(message) ROOT = Core.new(parent: nil, name: '/', class: Root).reference # @param block for actress_class instantiation + # @param args see {#spawn_optionify} def self.spawn(*args, &block) if Actress.current Core.new(spawn_optionify(*args).merge(parent: Actress.current), &block).reference @@ -60,6 +58,11 @@ def self.spawn(*args, &block) end end + # as {#spawn} but it'll raise when Actor not initialized properly + def self.spawn!(*args, &block) + spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! } + end + # @overload spawn_optionify(actress_class, name, *args) # @param [Context] actress_class to be spawned # @param [String, Symbol] name of the instance, it's used to generate the path of the actor diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb index 60d87d6c5..b5b48ec22 100644 --- a/lib/concurrent/actress/context.rb +++ b/lib/concurrent/actress/context.rb @@ -20,6 +20,8 @@ module Context # @abstract override to define Actor's behaviour # @param [Object] message # @return [Object] a result which will be used to set the IVar supplied to Reference#ask + # @note self should not be returned (or sent to other actors), {#reference} should be used + # instead def on_message(message) raise NotImplementedError end @@ -71,12 +73,22 @@ def self.included(base) module ClassMethods # behaves as {Actress.spawn} but class_name is omitted def spawn(name_or_opts, *args, &block) - opts = if name_or_opts.is_a? Hash - name_or_opts.merge class: self - else - { class: self, name: name_or_opts, args: args } - end - Actress.spawn opts, &block + Actress.spawn spawn_optionify(name_or_opts, *args), &block + end + + # behaves as {Actress.spawn!} but class_name is omitted + def spawn!(name_or_opts, *args, &block) + Actress.spawn! spawn_optionify(name_or_opts, *args), &block + end + + private + + def spawn_optionify(name_or_opts, *args) + if name_or_opts.is_a? Hash + name_or_opts.merge class: self + else + { class: self, name: name_or_opts, args: args } + end end end end diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 86234e165..060527f5c 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -5,6 +5,8 @@ module Actress # Core of the actor # @api private + # @note devel: core should not block on anything, e.g. it cannot wait on children to terminate + # that would eat up all threads in task pool and deadlock class Core include TypeCheck include Concurrent::Logging @@ -16,6 +18,7 @@ class Core # @option opts [Context] actress_class a class to be instantiated defining Actor's behaviour # @option opts [Array] args arguments for actress_class instantiation # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` + # @option opts [IVar, nil] initialized, if present it'll be set or failed after {Context} initialization # @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params, # can be used to hook actor instance to any logging system # @param [Proc] block for class instantiation @@ -42,14 +45,17 @@ def initialize(opts = {}, &block) @actress_class = actress_class = Child! opts.fetch(:class), Context args = opts.fetch(:args, []) + initialized = Type! opts[:initialized], IVar, NilClass schedule_execution do begin @actress = actress_class.new *args, &block @actress.send :initialize_core, self + initialized.set true if initialized rescue => ex log ERROR, ex terminate! + initialized.fail ex if initialized end end end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index e18838a33..aa7739d81 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -109,6 +109,12 @@ def assert condition a.terminated?.should be_true end + it 'terminates on failed initialization and raises with spawn!' do + expect do + AdHoc.spawn!(name: :fail, logger: Concurrent.configuration.no_logger) { raise 'm' } + end.to raise_error(StandardError, 'm') + end + it 'terminates on failed message processing' do a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { -> _ { raise } } a.ask(nil).wait.rejected?.should be_true From 8d64ed1c17f443fb21ea591ff472a6a363c524eb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 10:10:29 +0200 Subject: [PATCH 27/30] Terminate all children on parent termination --- lib/concurrent/actress/core.rb | 8 ++++++-- spec/concurrent/actress_spec.rb | 24 ++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index 060527f5c..fffd503c9 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -109,9 +109,14 @@ def terminated? @terminated.set? end - # Terminates the actor, any Envelope received after termination is rejected + # Terminates the actor. Any Envelope received after termination is rejected. + # Terminates all its children, does not wait until they are terminated. def terminate! guard! + @children.each do |ch| + ch.send(:core).tap { |core| core.send(:schedule_execution) { core.terminate! } } + end + @terminated.set @parent_core.remove_child reference if @parent_core @@ -120,7 +125,6 @@ def terminate! log DEBUG, "rejected #{envelope.message} from #{envelope.sender_path}" end @mailbox.clear - # TODO terminate all children nil end diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index aa7739d81..07056fe30 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -163,6 +163,30 @@ def assert condition end end + describe 'termination' do + subject do + AdHoc.spawn(:parent) do + child = AdHoc.spawn(:child) { -> v { v } } + -> v do + if v == :terminate + terminate! + else + child + end + end + end + end + + it 'terminates with all its children' do + child = subject.ask! :child + subject.terminated?.should be_false + subject.ask(:terminate).wait + subject.terminated?.should be_true + child.terminated.wait + child.terminated?.should be_true + end + end + end end end From cd22f673f03839f0f9263b35f9cc237cb4305803 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 14:48:55 +0200 Subject: [PATCH 28/30] log ignored exceptions on DEBUG level --- lib/concurrent/actress.rb | 2 -- lib/concurrent/agent.rb | 5 ++++- lib/concurrent/configuration.rb | 4 +++- lib/concurrent/executor/executor.rb | 2 ++ lib/concurrent/executor/ruby_single_thread_executor.rb | 1 + lib/concurrent/executor/ruby_thread_pool_executor.rb | 3 ++- lib/concurrent/executor/ruby_thread_pool_worker.rb | 3 +++ 7 files changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 627740245..02ae221ab 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -1,5 +1,3 @@ -require 'logger' - require 'concurrent/configuration' require 'concurrent/executor/one_by_one' require 'concurrent/ivar' diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 5e78abf8e..24a3e729f 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -4,6 +4,7 @@ require 'concurrent/observable' require 'concurrent/options_parser' require 'concurrent/utility/timeout' +require 'concurrent/logging' module Concurrent @@ -35,6 +36,7 @@ module Concurrent class Agent include Dereferenceable include Concurrent::Observable + include Logging # The default timeout value (in seconds); used when no timeout option # is given at initialization @@ -192,7 +194,8 @@ def try_rescue(ex) # :nodoc: end rescuer.block.call(ex) if rescuer rescue Exception => ex - # supress + # suppress + log DEBUG, ex end # @!visibility private diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index f3e6c59aa..942fcf669 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -6,6 +6,7 @@ require 'concurrent/utility/processor_count' module Concurrent + extend Logging # A gem-level configuration object. class Configuration @@ -134,7 +135,8 @@ def self.finalize_executor(executor) executor.kill end true - rescue + rescue => ex + log DEBUG, ex false end diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index b4784524d..6eb929ccb 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -1,4 +1,5 @@ require 'concurrent/errors' +require 'concurrent/logging' require 'concurrent/atomic/event' module Concurrent @@ -11,6 +12,7 @@ def can_overflow? module RubyExecutor include Executor + include Logging # Submit a task to the executor for asynchronous processing. # diff --git a/lib/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent/executor/ruby_single_thread_executor.rb index 194641f9f..8173d98d1 100644 --- a/lib/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent/executor/ruby_single_thread_executor.rb @@ -64,6 +64,7 @@ def work task.last.call(*task.first) rescue => ex # let it fail + log DEBUG, ex end end stopped_event.set diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index 88cec0dc4..46074b92d 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -237,8 +237,9 @@ def handle_overflow(*args) when :caller_runs begin yield(*args) - rescue + rescue => ex # let it fail + log DEBUG, ex end true end diff --git a/lib/concurrent/executor/ruby_thread_pool_worker.rb b/lib/concurrent/executor/ruby_thread_pool_worker.rb index f3c76775a..42fa735a9 100644 --- a/lib/concurrent/executor/ruby_thread_pool_worker.rb +++ b/lib/concurrent/executor/ruby_thread_pool_worker.rb @@ -1,9 +1,11 @@ require 'thread' +require 'concurrent/logging' module Concurrent # @!visibility private class RubyThreadPoolWorker + include Logging # @!visibility private def initialize(queue, parent) @@ -59,6 +61,7 @@ def run(thread = Thread.current) task.last.call(*task.first) rescue => ex # let it fail + log DEBUG, ex ensure @last_activity = Time.now.to_f @parent.on_end_task From 465f0effb9909d53b29107094ea96b828c6dad01 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 15:57:09 +0200 Subject: [PATCH 29/30] More documentation --- lib/concurrent/actress.rb | 15 +++----- lib/concurrent/actress/doc.md | 53 +++++++++++++++++++++++++++++ lib/concurrent/actress/reference.rb | 2 +- lib/concurrent/configuration.rb | 3 ++ lib/concurrent/logging.rb | 6 ++++ 5 files changed, 67 insertions(+), 12 deletions(-) create mode 100644 lib/concurrent/actress/doc.md diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb index 02ae221ab..525a85c7c 100644 --- a/lib/concurrent/actress.rb +++ b/lib/concurrent/actress.rb @@ -5,14 +5,7 @@ module Concurrent - # @example ping - # class Ping - # include Context - # def on_message(message) - # message - # end - # end - # Ping.spawn(:ping1).ask(:m).value #=> :m + # {include:file:lib/concurrent/actress/doc.md} module Actress require 'concurrent/actress/type_check' @@ -47,7 +40,7 @@ def on_message(message) ROOT = Core.new(parent: nil, name: '/', class: Root).reference # @param block for actress_class instantiation - # @param args see {#spawn_optionify} + # @param args see {.spawn_optionify} def self.spawn(*args, &block) if Actress.current Core.new(spawn_optionify(*args).merge(parent: Actress.current), &block).reference @@ -56,7 +49,7 @@ def self.spawn(*args, &block) end end - # as {#spawn} but it'll raise when Actor not initialized properly + # as {.spawn} but it'll raise when Actor not initialized properly def self.spawn!(*args, &block) spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! } end @@ -66,7 +59,7 @@ def self.spawn!(*args, &block) # @param [String, Symbol] name of the instance, it's used to generate the path of the actor # @param args for actress_class instantiation # @overload spawn_optionify(opts) - # see {Core.new} opts + # see {Core#initialize} opts def self.spawn_optionify(*args) if args.size == 1 && args.first.is_a?(Hash) args.first diff --git a/lib/concurrent/actress/doc.md b/lib/concurrent/actress/doc.md new file mode 100644 index 000000000..e47248094 --- /dev/null +++ b/lib/concurrent/actress/doc.md @@ -0,0 +1,53 @@ +# Light-weighted implement of Actors. Inspired by Akka and Erlang. + +Actors are using a thread-pool by default which makes them very cheap to create and discard. +Thousands of actors can be created allowing to brake the program to small maintainable pieces +without breaking single responsibility principles. + +## Quick example + + class Counter + include Context + + def initialize(initial_value) + @count = initial_value + end + + def on_message(message) + case message + when Integer + @count += message + when :terminate + terminate! + else + raise 'unknown' + end + end + end + + # create new actor + counter = Counter.spawn(:test_counter, 5) # => a Reference + + # send messages + counter.tell(1) # => counter + counter << 1 # => counter + + # send messages getting an IVar back for synchronization + counter.ask(0) # => an ivar + counter.ask(0).value # => 7 + + # terminate the actor + counter.ask(:terminate).wait + counter.terminated? # => true + counter.ask(5).wait.rejected? # => true + + # failure on message processing will terminate the actor + counter = Counter.spawn(:test_counter, 0) + counter.ask('boom').wait.rejected? # => true + counter.terminated? # => true + + + + + + diff --git a/lib/concurrent/actress/reference.rb b/lib/concurrent/actress/reference.rb index 16d748d64..a20ad5066 100644 --- a/lib/concurrent/actress/reference.rb +++ b/lib/concurrent/actress/reference.rb @@ -33,7 +33,7 @@ def ask(message, ivar = IVar.new) message message, ivar end - # @note can lead to deadlocks + # @note can lead to deadlocks, use only in tests or when you are sure it won't deadlock # tells message to the actor # @param [Object] message # @param [Ivar] ivar to be fulfilled be message's processing result diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 942fcf669..af0683c4c 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -11,6 +11,8 @@ module Concurrent # A gem-level configuration object. class Configuration + # a proc defining how to log messages, its interface has to be: + # lambda { |level, progname, message = nil, &block| _ } attr_accessor :logger # Create a new configuration object. @@ -21,6 +23,7 @@ def initialize @logger = no_logger end + # if assigned to {#logger}, it will log nothing. def no_logger -> (level, progname, message = nil, &block) {} end diff --git a/lib/concurrent/logging.rb b/lib/concurrent/logging.rb index cac7173b3..802db21f6 100644 --- a/lib/concurrent/logging.rb +++ b/lib/concurrent/logging.rb @@ -1,9 +1,15 @@ require 'logger' module Concurrent + # Include where logging is needed module Logging include Logger::Severity + # Logs through {Configuration#logger}, it can be overridden by setting @logger + # @param [Integer] level one of Logger::Severity constants + # @param [String] progname e.g. a path of an Actor + # @param [String, nil] message when nil block is used to generate the message + # @yields_return [String] a message def log(level, progname, message = nil, &block) (@logger || Concurrent.configuration.logger).call level, progname, message, &block end From 7ce883cf472cee745ad2fa9727d948d24021cae6 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 24 May 2014 10:29:26 +0200 Subject: [PATCH 30/30] Test fixes --- lib/concurrent/actress/core.rb | 2 +- lib/concurrent/configuration.rb | 2 +- lib/concurrent/executor/one_by_one.rb | 7 ++--- .../executor/per_thread_executor.rb | 1 + spec/concurrent/actress_spec.rb | 27 +++++++------------ spec/spec_helper.rb | 4 ++- 6 files changed, 19 insertions(+), 24 deletions(-) diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb index fffd503c9..f51b8855c 100644 --- a/lib/concurrent/actress/core.rb +++ b/lib/concurrent/actress/core.rb @@ -168,8 +168,8 @@ def receive_envelope nil rescue => error log ERROR, error - envelope.ivar.fail error unless envelope.ivar.nil? terminate! + envelope.ivar.fail error unless envelope.ivar.nil? ensure @receive_envelope_scheduled = false process_envelopes? diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index af0683c4c..138ce2e64 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -25,7 +25,7 @@ def initialize # if assigned to {#logger}, it will log nothing. def no_logger - -> (level, progname, message = nil, &block) {} + lambda { |level, progname, message = nil, &block| } end # Global thread pool optimized for short *tasks*. diff --git a/lib/concurrent/executor/one_by_one.rb b/lib/concurrent/executor/one_by_one.rb index a5594a4cb..a86e421f7 100644 --- a/lib/concurrent/executor/one_by_one.rb +++ b/lib/concurrent/executor/one_by_one.rb @@ -30,9 +30,10 @@ def initialize # @raise [ArgumentError] if no task is given def post(executor, *args, &task) return nil if task.nil? - if executor.can_overflow? - raise ArgumentError, 'OneByOne cannot be used in conjunction with executor which may overflow' - end + # FIXME Agent#send-off will blow up here + # if executor.can_overflow? + # raise ArgumentError, 'OneByOne cannot be used in conjunction with executor which may overflow' + # end job = Job.new executor, args, task diff --git a/lib/concurrent/executor/per_thread_executor.rb b/lib/concurrent/executor/per_thread_executor.rb index 3562ea1c9..6fd3e566a 100644 --- a/lib/concurrent/executor/per_thread_executor.rb +++ b/lib/concurrent/executor/per_thread_executor.rb @@ -1,6 +1,7 @@ module Concurrent class PerThreadExecutor + include Executor def self.post(*args) raise ArgumentError.new('no block given') unless block_given? diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb index 07056fe30..61b6cacc6 100644 --- a/spec/concurrent/actress_spec.rb +++ b/spec/concurrent/actress_spec.rb @@ -35,15 +35,6 @@ def on_message(message) # set_trace_func nil # end - def assert condition - unless condition - # require 'pry' - # binding.pry - raise - puts "--- \n#{caller.join("\n")}" - end - end - describe 'stress test' do 1.times do |i| it format('run %3d', i) do @@ -56,23 +47,23 @@ def assert condition actor = Ping.spawn :ping, queue # when spawn returns children are set - assert Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).include?(actor) + Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) actor << 'a' << 1 - assert queue.pop == 'a' - assert actor.ask(2).value == 2 + queue.pop.should eq 'a' + actor.ask(2).value.should eq 2 - assert actor.parent == Concurrent::Actress::ROOT - assert Concurrent::Actress::ROOT.path == '/' - assert actor.path == '/ping' + actor.parent.should eq Concurrent::Actress::ROOT + Concurrent::Actress::ROOT.path.should eq '/' + actor.path.should eq '/ping' child = actor.ask(:child).value - assert child.path == '/ping/pong' + child.path.should eq '/ping/pong' queue.clear child.ask(3) - assert queue.pop == 3 + queue.pop.should eq 3 actor << :terminate - assert actor.ask(:blow_up).wait.rejected? + actor.ask(:blow_up).wait.should be_rejected end end end.each(&:join) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1728c2259..f5f7b0bcf 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -19,7 +19,9 @@ logger = Logger.new($stderr) logger.level = Logger::INFO -Concurrent.configuration.logger = -> (level, progname, message = nil, &block) { logger.add level, message, progname, &block } +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end # import all the support files Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require File.expand_path(f) }