diff --git a/Gemfile b/Gemfile index b60167eaa..4d65462ed 100644 --- a/Gemfile +++ b/Gemfile @@ -2,6 +2,7 @@ source 'https://rubygems.org' gemspec + group :development do gem 'rake', '~> 10.2.2' gem 'countloc', '~> 0.4.0', platforms: :mri diff --git a/lib/concurrent.rb b/lib/concurrent.rb index ec74f16f1..15302c436 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -28,6 +28,7 @@ require 'concurrent/supervisor' require 'concurrent/timer_task' require 'concurrent/tvar' +require 'concurrent/actress' # Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, # F#, C#, Java, and classic concurrency patterns. diff --git a/lib/concurrent/actress.rb b/lib/concurrent/actress.rb new file mode 100644 index 000000000..525a85c7c --- /dev/null +++ b/lib/concurrent/actress.rb @@ -0,0 +1,73 @@ +require 'concurrent/configuration' +require 'concurrent/executor/one_by_one' +require 'concurrent/ivar' +require 'concurrent/logging' + +module Concurrent + + # {include:file:lib/concurrent/actress/doc.md} + module Actress + + require 'concurrent/actress/type_check' + require 'concurrent/actress/errors' + require 'concurrent/actress/core_delegations' + require 'concurrent/actress/envelope' + require 'concurrent/actress/reference' + require 'concurrent/actress/core' + require 'concurrent/actress/context' + + require 'concurrent/actress/ad_hoc' + + # @return [Reference, nil] current executing actor if any + def self.current + Thread.current[:__current_actress__] + end + + # implements ROOT + class Root + include Context + # to allow spawning of new actors, spawn needs to be called inside the parent Actor + def on_message(message) + if message.is_a?(Array) && message.first == :spawn + spawn message[1], &message[2] + else + # ignore + end + end + end + + # A root actor, a default parent of all actors spawned outside an actor + ROOT = Core.new(parent: nil, name: '/', class: Root).reference + + # @param block for actress_class instantiation + # @param args see {.spawn_optionify} + def self.spawn(*args, &block) + if Actress.current + Core.new(spawn_optionify(*args).merge(parent: Actress.current), &block).reference + else + ROOT.ask([:spawn, spawn_optionify(*args), block]).value + end + end + + # as {.spawn} but it'll raise when Actor not initialized properly + def self.spawn!(*args, &block) + spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! } + end + + # @overload spawn_optionify(actress_class, name, *args) + # @param [Context] actress_class to be spawned + # @param [String, Symbol] name of the instance, it's used to generate the path of the actor + # @param args for actress_class instantiation + # @overload spawn_optionify(opts) + # see {Core#initialize} opts + def self.spawn_optionify(*args) + if args.size == 1 && args.first.is_a?(Hash) + args.first + else + { class: args[0], + name: args[1], + args: args[2..-1] } + end + end + end +end diff --git a/lib/concurrent/actress/ad_hoc.rb b/lib/concurrent/actress/ad_hoc.rb new file mode 100644 index 000000000..d4b49a386 --- /dev/null +++ b/lib/concurrent/actress/ad_hoc.rb @@ -0,0 +1,14 @@ +module Concurrent + module Actress + class AdHoc + include Context + def initialize(*args, &initializer) + @on_message = Type! initializer.call(*args), Proc + end + + def on_message(message) + instance_exec message, &@on_message + end + end + end +end diff --git a/lib/concurrent/actress/context.rb b/lib/concurrent/actress/context.rb new file mode 100644 index 000000000..b5b48ec22 --- /dev/null +++ b/lib/concurrent/actress/context.rb @@ -0,0 +1,96 @@ +module Concurrent + module Actress + + # module used to define actor behaviours + # @example ping + # class Ping + # include Context + # def on_message(message) + # message + # end + # end + # + # Ping.spawn(:ping1).ask(:m).value #=> :m + module Context + include TypeCheck + include CoreDelegations + + attr_reader :core + + # @abstract override to define Actor's behaviour + # @param [Object] message + # @return [Object] a result which will be used to set the IVar supplied to Reference#ask + # @note self should not be returned (or sent to other actors), {#reference} should be used + # instead + def on_message(message) + raise NotImplementedError + end + + def logger + core.logger + end + + # @api private + def on_envelope(envelope) + @envelope = envelope + on_message envelope.message + ensure + @envelope = nil + end + + # @see Actress.spawn + def spawn(*args, &block) + Actress.spawn(*args, &block) + end + + # @see Core#children + def children + core.children + end + + # @see Core#terminate! + def terminate! + core.terminate! + end + + private + + # @api private + def initialize_core(core) + @core = Type! core, Core + end + + # @return [Envelope] current envelope, accessible inside #on_message processing + def envelope + @envelope or raise 'envelope not set' + end + + def self.included(base) + base.extend ClassMethods + super base + end + + module ClassMethods + # behaves as {Actress.spawn} but class_name is omitted + def spawn(name_or_opts, *args, &block) + Actress.spawn spawn_optionify(name_or_opts, *args), &block + end + + # behaves as {Actress.spawn!} but class_name is omitted + def spawn!(name_or_opts, *args, &block) + Actress.spawn! spawn_optionify(name_or_opts, *args), &block + end + + private + + def spawn_optionify(name_or_opts, *args) + if name_or_opts.is_a? Hash + name_or_opts.merge class: self + else + { class: self, name: name_or_opts, args: args } + end + end + end + end + end +end diff --git a/lib/concurrent/actress/core.rb b/lib/concurrent/actress/core.rb new file mode 100644 index 000000000..f51b8855c --- /dev/null +++ b/lib/concurrent/actress/core.rb @@ -0,0 +1,204 @@ +module Concurrent + module Actress + + require 'set' + + # Core of the actor + # @api private + # @note devel: core should not block on anything, e.g. it cannot wait on children to terminate + # that would eat up all threads in task pool and deadlock + class Core + include TypeCheck + include Concurrent::Logging + + attr_reader :reference, :name, :path, :executor, :terminated + + # @option opts [String] name + # @option opts [Reference, nil] parent of an actor spawning this one + # @option opts [Context] actress_class a class to be instantiated defining Actor's behaviour + # @option opts [Array] args arguments for actress_class instantiation + # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` + # @option opts [IVar, nil] initialized, if present it'll be set or failed after {Context} initialization + # @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params, + # can be used to hook actor instance to any logging system + # @param [Proc] block for class instantiation + def initialize(opts = {}, &block) + @mailbox = Array.new + @one_by_one = OneByOne.new + # noinspection RubyArgCount + @terminated = Event.new + @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor + @children = Set.new + @reference = Reference.new self + @name = (Type! opts.fetch(:name), String, Symbol).to_s + + parent = opts[:parent] + @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core) + if @parent_core.nil? && @name != '/' + raise 'only root has no parent' + end + + @path = @parent_core ? File.join(@parent_core.path, @name) : @name + @logger = opts[:logger] + + @parent_core.add_child reference if @parent_core + + @actress_class = actress_class = Child! opts.fetch(:class), Context + args = opts.fetch(:args, []) + initialized = Type! opts[:initialized], IVar, NilClass + + schedule_execution do + begin + @actress = actress_class.new *args, &block + @actress.send :initialize_core, self + initialized.set true if initialized + rescue => ex + log ERROR, ex + terminate! + initialized.fail ex if initialized + end + end + end + + # @return [Reference, nil] of parent actor + def parent + @parent_core && @parent_core.reference + end + + # @return [Array] of children actors + def children + guard! + @children.to_a + end + + # @api private + def add_child(child) + guard! + Type! child, Reference + @children.add child + nil + end + + # @api private + def remove_child(child) + schedule_execution do + Type! child, Reference + @children.delete child + end + nil + end + + # is executed by Reference scheduling processing of new messages + # can be called from other alternative Reference implementations + # @param [Envelope] envelope + def on_envelope(envelope) + schedule_execution do + if terminated? + reject_envelope envelope + else + @mailbox.push envelope + end + process_envelopes? + end + nil + end + + # @note Actor rejects envelopes when terminated. + # @return [true, false] if actor is terminated + def terminated? + @terminated.set? + end + + # Terminates the actor. Any Envelope received after termination is rejected. + # Terminates all its children, does not wait until they are terminated. + def terminate! + guard! + @children.each do |ch| + ch.send(:core).tap { |core| core.send(:schedule_execution) { core.terminate! } } + end + + @terminated.set + + @parent_core.remove_child reference if @parent_core + @mailbox.each do |envelope| + reject_envelope envelope + log DEBUG, "rejected #{envelope.message} from #{envelope.sender_path}" + end + @mailbox.clear + + nil + end + + # @api private + # ensures that we are inside of the executor + def guard! + unless Actress.current == reference + raise "can be called only inside actor #{reference} but was #{Actress.current}" + end + end + + private + + # Ensures that only one envelope processing is scheduled with #schedule_execution, + # this allows other scheduled blocks to be executed before next envelope processing. + # Simply put this ensures that Core is still responsive to internal calls (like add_child) + # even though the Actor is flooded with messages. + def process_envelopes? + unless @mailbox.empty? || @receive_envelope_scheduled + @receive_envelope_scheduled = true + schedule_execution { receive_envelope } + end + end + + # Processes single envelope, calls #process_envelopes? at the end to ensure next envelope + # scheduling. + def receive_envelope + envelope = @mailbox.shift + + if terminated? + reject_envelope envelope + log FATAL, "this should not be happening #{caller[0]}" + end + + log DEBUG, "received #{envelope.message} from #{envelope.sender_path}" + + result = @actress.on_envelope envelope + envelope.ivar.set result unless envelope.ivar.nil? + + nil + rescue => error + log ERROR, error + terminate! + envelope.ivar.fail error unless envelope.ivar.nil? + ensure + @receive_envelope_scheduled = false + process_envelopes? + end + + # Schedules blocks to be executed on executor sequentially, + # sets Actress.current + def schedule_execution + @one_by_one.post(@executor) do + begin + Thread.current[:__current_actress__] = reference + yield + rescue => e + log FATAL, e + ensure + Thread.current[:__current_actress__] = nil + end + end + + nil + end + + def reject_envelope(envelope) + envelope.reject! ActressTerminated.new(reference) + end + + def log(level, message = nil, &block) + super level, @path, message, &block + end + end + end +end diff --git a/lib/concurrent/actress/core_delegations.rb b/lib/concurrent/actress/core_delegations.rb new file mode 100644 index 000000000..4a0144bcb --- /dev/null +++ b/lib/concurrent/actress/core_delegations.rb @@ -0,0 +1,37 @@ +module Concurrent + module Actress + + # Provides publicly expose-able methods from {Core}. + module CoreDelegations + def name + core.name + end + + def path + core.path + end + + def parent + core.parent + end + + def terminated? + core.terminated? + end + + def terminated + core.terminated + end + + def reference + core.reference + end + + def executor + core.executor + end + + alias_method :ref, :reference + end + end +end diff --git a/lib/concurrent/actress/doc.md b/lib/concurrent/actress/doc.md new file mode 100644 index 000000000..e47248094 --- /dev/null +++ b/lib/concurrent/actress/doc.md @@ -0,0 +1,53 @@ +# Light-weighted implement of Actors. Inspired by Akka and Erlang. + +Actors are using a thread-pool by default which makes them very cheap to create and discard. +Thousands of actors can be created allowing to brake the program to small maintainable pieces +without breaking single responsibility principles. + +## Quick example + + class Counter + include Context + + def initialize(initial_value) + @count = initial_value + end + + def on_message(message) + case message + when Integer + @count += message + when :terminate + terminate! + else + raise 'unknown' + end + end + end + + # create new actor + counter = Counter.spawn(:test_counter, 5) # => a Reference + + # send messages + counter.tell(1) # => counter + counter << 1 # => counter + + # send messages getting an IVar back for synchronization + counter.ask(0) # => an ivar + counter.ask(0).value # => 7 + + # terminate the actor + counter.ask(:terminate).wait + counter.terminated? # => true + counter.ask(5).wait.rejected? # => true + + # failure on message processing will terminate the actor + counter = Counter.spawn(:test_counter, 0) + counter.ask('boom').wait.rejected? # => true + counter.terminated? # => true + + + + + + diff --git a/lib/concurrent/actress/envelope.rb b/lib/concurrent/actress/envelope.rb new file mode 100644 index 000000000..15f979327 --- /dev/null +++ b/lib/concurrent/actress/envelope.rb @@ -0,0 +1,25 @@ +module Concurrent + module Actress + Envelope = Struct.new :message, :ivar, :sender do + include TypeCheck + + def initialize(message, ivar, sender) + super message, + (Type! ivar, IVar, NilClass), + (Type! sender, Reference, Thread) + end + + def sender_path + if sender.is_a? Reference + sender.path + else + sender.to_s + end + end + + def reject!(error) + ivar.fail error unless ivar.nil? + end + end + end +end diff --git a/lib/concurrent/actress/errors.rb b/lib/concurrent/actress/errors.rb new file mode 100644 index 000000000..d9e3c4a50 --- /dev/null +++ b/lib/concurrent/actress/errors.rb @@ -0,0 +1,14 @@ +module Concurrent + module Actress + Error = Class.new(StandardError) + + class ActressTerminated < Error + include TypeCheck + + def initialize(reference) + Type! reference, Reference + super reference.path + end + end + end +end diff --git a/lib/concurrent/actress/reference.rb b/lib/concurrent/actress/reference.rb new file mode 100644 index 000000000..a20ad5066 --- /dev/null +++ b/lib/concurrent/actress/reference.rb @@ -0,0 +1,64 @@ +module Concurrent + module Actress + + # Reference is public interface of Actor instances. It is used for sending messages and can + # be freely passed around the program. It also provides some basic information about the actor + # see {CoreDelegations} + class Reference + include TypeCheck + include CoreDelegations + + attr_reader :core + private :core + + # @!visibility private + def initialize(core) + @core = Type! core, Core + end + + # tells message to the actor + # @param [Object] message + # @return [Reference] self + def tell(message) + message message, nil + end + + alias_method :<<, :tell + + # tells message to the actor + # @param [Object] message + # @param [Ivar] ivar to be fulfilled be message's processing result + # @return [IVar] supplied ivar + def ask(message, ivar = IVar.new) + message message, ivar + end + + # @note can lead to deadlocks, use only in tests or when you are sure it won't deadlock + # tells message to the actor + # @param [Object] message + # @param [Ivar] ivar to be fulfilled be message's processing result + # @return [Object] message's processing result + # @raise [Exception] ivar.reason if ivar is #rejected? + def ask!(message, ivar = IVar.new) + ask(message, ivar).value! + end + + # behaves as #tell when no ivar and as #ask when ivar + def message(message, ivar = nil) + core.on_envelope Envelope.new(message, ivar, Actress.current || Thread.current) + return ivar || self + end + + def to_s + "#<#{self.class} #{path}>" + end + + alias_method :inspect, :to_s + + def ==(other) + Type? other, self.class and other.send(:core) == core + end + end + + end +end diff --git a/lib/concurrent/actress/type_check.rb b/lib/concurrent/actress/type_check.rb new file mode 100644 index 000000000..017b74f49 --- /dev/null +++ b/lib/concurrent/actress/type_check.rb @@ -0,0 +1,48 @@ +module Concurrent + module Actress + + # taken from Algebrick + # supplies type-checking helpers whenever included + module TypeCheck + + def Type?(value, *types) + types.any? { |t| value.is_a? t } + end + + def Type!(value, *types) + Type?(value, *types) or + TypeCheck.error(value, 'is not', types) + value + end + + def Match?(value, *types) + types.any? { |t| t === value } + end + + def Match!(value, *types) + Match?(value, *types) or + TypeCheck.error(value, 'is not matching', types) + value + end + + def Child?(value, *types) + Type?(value, Class) && + types.any? { |t| value <= t } + end + + def Child!(value, *types) + Child?(value, *types) or + TypeCheck.error(value, 'is not child', types) + value + end + + private + + def self.error(value, message, types) + raise TypeError, + "Value (#{value.class}) '#{value}' #{message} any of: #{types.join('; ')}." + end + end + end +end + diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index a1ab4eb9f..24a3e729f 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -4,6 +4,7 @@ require 'concurrent/observable' require 'concurrent/options_parser' require 'concurrent/utility/timeout' +require 'concurrent/logging' module Concurrent @@ -35,6 +36,7 @@ module Concurrent class Agent include Dereferenceable include Concurrent::Observable + include Logging # The default timeout value (in seconds); used when no timeout option # is given at initialization @@ -192,7 +194,8 @@ def try_rescue(ex) # :nodoc: end rescuer.block.call(ex) if rescuer rescue Exception => ex - # supress + # suppress + log DEBUG, ex end # @!visibility private @@ -200,7 +203,6 @@ def work(&handler) # :nodoc: validator, value = mutex.synchronize { [@validator, @value] } begin - # FIXME creates second thread result, valid = Concurrent::timeout(@timeout) do result = handler.call(value) [result, validator.call(result)] diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 0a71f8400..138ce2e64 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -6,15 +6,26 @@ require 'concurrent/utility/processor_count' module Concurrent + extend Logging # A gem-level configuration object. class Configuration + # a proc defining how to log messages, its interface has to be: + # lambda { |level, progname, message = nil, &block| _ } + attr_accessor :logger + # Create a new configuration object. def initialize @global_task_pool = Delay.new { new_task_pool } @global_operation_pool = Delay.new { new_operation_pool } @global_timer_set = Delay.new { Concurrent::TimerSet.new } + @logger = no_logger + end + + # if assigned to {#logger}, it will log nothing. + def no_logger + lambda { |level, progname, message = nil, &block| } end # Global thread pool optimized for short *tasks*. @@ -127,7 +138,8 @@ def self.finalize_executor(executor) executor.kill end true - rescue + rescue => ex + log DEBUG, ex false end diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index 3c1adae24..6eb929ccb 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -1,9 +1,18 @@ require 'concurrent/errors' +require 'concurrent/logging' require 'concurrent/atomic/event' module Concurrent module Executor + def can_overflow? + false + end + end + + module RubyExecutor + include Executor + include Logging # Submit a task to the executor for asynchronous processing. # @@ -120,6 +129,7 @@ def kill_execution if RUBY_PLATFORM == 'java' module JavaExecutor + include Executor # Submit a task to the executor for asynchronous processing. # diff --git a/lib/concurrent/executor/immediate_executor.rb b/lib/concurrent/executor/immediate_executor.rb index 929133241..2f0653be0 100644 --- a/lib/concurrent/executor/immediate_executor.rb +++ b/lib/concurrent/executor/immediate_executor.rb @@ -1,5 +1,6 @@ module Concurrent class ImmediateExecutor + include Executor def post(*args, &task) raise ArgumentError.new('no block given') unless block_given? diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index 745256487..278acedbe 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -89,6 +89,10 @@ def initialize(opts = {}) set_shutdown_hook end + def can_overflow? + @max_queue != 0 + end + # The minimum number of threads that may be retained in the pool. # # @return [Integer] the min_length diff --git a/lib/concurrent/executor/one_by_one.rb b/lib/concurrent/executor/one_by_one.rb index ee4e96c8f..a86e421f7 100644 --- a/lib/concurrent/executor/one_by_one.rb +++ b/lib/concurrent/executor/one_by_one.rb @@ -1,6 +1,6 @@ module Concurrent - # Ensures that jobs are passed to the underlying executor one by one, + # Ensures that jobs are passed to the given executors one by one, # never running at the same time. class OneByOne @@ -30,6 +30,11 @@ def initialize # @raise [ArgumentError] if no task is given def post(executor, *args, &task) return nil if task.nil? + # FIXME Agent#send-off will blow up here + # if executor.can_overflow? + # raise ArgumentError, 'OneByOne cannot be used in conjunction with executor which may overflow' + # end + job = Job.new executor, args, task begin diff --git a/lib/concurrent/executor/per_thread_executor.rb b/lib/concurrent/executor/per_thread_executor.rb index 3562ea1c9..6fd3e566a 100644 --- a/lib/concurrent/executor/per_thread_executor.rb +++ b/lib/concurrent/executor/per_thread_executor.rb @@ -1,6 +1,7 @@ module Concurrent class PerThreadExecutor + include Executor def self.post(*args) raise ArgumentError.new('no block given') unless block_given? diff --git a/lib/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent/executor/ruby_single_thread_executor.rb index ec967a3b8..8173d98d1 100644 --- a/lib/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent/executor/ruby_single_thread_executor.rb @@ -4,7 +4,7 @@ module Concurrent # @!macro single_thread_executor class RubySingleThreadExecutor - include Executor + include RubyExecutor # Create a new thread pool. # @@ -64,6 +64,7 @@ def work task.last.call(*task.first) rescue => ex # let it fail + log DEBUG, ex end end stopped_event.set diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index e254de2cc..46074b92d 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -8,7 +8,7 @@ module Concurrent # @!macro thread_pool_executor class RubyThreadPoolExecutor - include Executor + include RubyExecutor # Default maximum number of threads that will be created in the pool. DEFAULT_MAX_POOL_SIZE = 2**15 # 32768 @@ -99,6 +99,10 @@ def initialize(opts = {}) @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max end + def can_overflow? + @max_queue != 0 + end + # The number of threads currently in the pool. # # @return [Integer] the length @@ -233,8 +237,9 @@ def handle_overflow(*args) when :caller_runs begin yield(*args) - rescue + rescue => ex # let it fail + log DEBUG, ex end true end diff --git a/lib/concurrent/executor/ruby_thread_pool_worker.rb b/lib/concurrent/executor/ruby_thread_pool_worker.rb index f3c76775a..42fa735a9 100644 --- a/lib/concurrent/executor/ruby_thread_pool_worker.rb +++ b/lib/concurrent/executor/ruby_thread_pool_worker.rb @@ -1,9 +1,11 @@ require 'thread' +require 'concurrent/logging' module Concurrent # @!visibility private class RubyThreadPoolWorker + include Logging # @!visibility private def initialize(queue, parent) @@ -59,6 +61,7 @@ def run(thread = Thread.current) task.last.call(*task.first) rescue => ex # let it fail + log DEBUG, ex ensure @last_activity = Time.now.to_f @parent.on_end_task diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 7fe81ef61..a30667718 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -11,7 +11,7 @@ module Concurrent # monitors the set and schedules each task for execution at the appropriate # time. Tasks are run on the global task pool or on the supplied executor. class TimerSet - include Executor + include RubyExecutor # Create a new set of timed tasks. # diff --git a/lib/concurrent/logging.rb b/lib/concurrent/logging.rb new file mode 100644 index 000000000..802db21f6 --- /dev/null +++ b/lib/concurrent/logging.rb @@ -0,0 +1,17 @@ +require 'logger' + +module Concurrent + # Include where logging is needed + module Logging + include Logger::Severity + + # Logs through {Configuration#logger}, it can be overridden by setting @logger + # @param [Integer] level one of Logger::Severity constants + # @param [String] progname e.g. a path of an Actor + # @param [String, nil] message when nil block is used to generate the message + # @yields_return [String] a message + def log(level, progname, message = nil, &block) + (@logger || Concurrent.configuration.logger).call level, progname, message, &block + end + end +end diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index 6209c7101..9fd4e71d4 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -146,7 +146,7 @@ module Concurrent # @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html class TimerTask include Dereferenceable - include Executor + include RubyExecutor include Concurrent::Observable # Default `:execution_interval` in seconds. diff --git a/spec/concurrent/actress_spec.rb b/spec/concurrent/actress_spec.rb new file mode 100644 index 000000000..61b6cacc6 --- /dev/null +++ b/spec/concurrent/actress_spec.rb @@ -0,0 +1,184 @@ +require 'spec_helper' +require 'concurrent/actress' + +module Concurrent + module Actress + describe 'Concurrent::Actress' do + + class Ping + include Context + + def initialize(queue) + @queue = queue + end + + def on_message(message) + case message + when :terminate + terminate! + when :child + AdHoc.spawn(:pong, @queue) { |queue| -> m { queue << m } } + else + @queue << message + message + end + end + end + + # def trace! + # set_trace_func proc { |event, file, line, id, binding, classname| + # # thread = eval('Thread.current', binding).object_id.to_s(16) + # printf "%8s %20s %20s %s %s:%-2d\n", event, id, classname, nil, file, line + # } + # yield + # ensure + # set_trace_func nil + # end + + describe 'stress test' do + 1.times do |i| + it format('run %3d', i) do + # puts format('run %3d', i) + Array.new(10).map do + Thread.new do + 10.times do + # trace! do + queue = Queue.new + actor = Ping.spawn :ping, queue + + # when spawn returns children are set + Concurrent::Actress::ROOT.send(:core).instance_variable_get(:@children).should include(actor) + + actor << 'a' << 1 + queue.pop.should eq 'a' + actor.ask(2).value.should eq 2 + + actor.parent.should eq Concurrent::Actress::ROOT + Concurrent::Actress::ROOT.path.should eq '/' + actor.path.should eq '/ping' + child = actor.ask(:child).value + child.path.should eq '/ping/pong' + queue.clear + child.ask(3) + queue.pop.should eq 3 + + actor << :terminate + actor.ask(:blow_up).wait.should be_rejected + end + end + end.each(&:join) + end + end + end + + describe 'spawning' do + describe 'Actress#spawn' do + behaviour = -> v { -> _ { v } } + subjects = { spawn: -> { Actress.spawn(AdHoc, :ping, 'arg', &behaviour) }, + context_spawn: -> { AdHoc.spawn(:ping, 'arg', &behaviour) }, + spawn_by_hash: -> { Actress.spawn(class: AdHoc, name: :ping, args: ['arg'], &behaviour) }, + context_spawn_by_hash: -> { AdHoc.spawn(name: :ping, args: ['arg'], &behaviour) } } + + subjects.each do |desc, subject_definition| + describe desc do + subject &subject_definition + its(:path) { should eq '/ping' } + its(:parent) { should eq ROOT } + its(:name) { should eq 'ping' } + its(:executor) { should eq Concurrent.configuration.global_task_pool } + its(:reference) { should eq subject } + it 'returns ars' do + subject.ask!(:anything).should eq 'arg' + end + end + end + end + + it 'terminates on failed initialization' do + a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { raise } + a.ask(nil).wait.rejected?.should be_true + a.terminated?.should be_true + end + + it 'terminates on failed initialization and raises with spawn!' do + expect do + AdHoc.spawn!(name: :fail, logger: Concurrent.configuration.no_logger) { raise 'm' } + end.to raise_error(StandardError, 'm') + end + + it 'terminates on failed message processing' do + a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { -> _ { raise } } + a.ask(nil).wait.rejected?.should be_true + a.terminated?.should be_true + end + end + + describe 'messaging' do + subject { AdHoc.spawn(:add) { c = 0; -> v { c = c + v } } } + specify do + subject.tell(1).tell(1) + subject << 1 << 1 + subject.ask(0).value!.should eq 4 + end + end + + describe 'children' do + let(:parent) do + AdHoc.spawn(:parent) do + -> message do + if message == :child + AdHoc.spawn(:child) { -> _ { parent } } + else + children + end + end + end + end + + it 'has children set after a child is created' do + child = parent.ask!(:child) + parent.ask!(nil).should include(child) + child.ask!(nil).should eq parent + end + end + + describe 'envelope' do + subject { AdHoc.spawn(:subject) { -> _ { envelope } } } + specify do + envelope = subject.ask!('a') + envelope.should be_a_kind_of Envelope + envelope.message.should eq 'a' + envelope.ivar.should be_completed + envelope.ivar.value.should eq envelope + envelope.sender.should eq Thread.current + end + end + + describe 'termination' do + subject do + AdHoc.spawn(:parent) do + child = AdHoc.spawn(:child) { -> v { v } } + -> v do + if v == :terminate + terminate! + else + child + end + end + end + end + + it 'terminates with all its children' do + child = subject.ask! :child + subject.terminated?.should be_false + subject.ask(:terminate).wait + subject.terminated?.should be_true + child.terminated.wait + child.terminated?.should be_true + end + end + + end + end +end + diff --git a/spec/concurrent/atomic/atomic_boolean_spec.rb b/spec/concurrent/atomic/atomic_boolean_spec.rb index 896e71f03..d87b71e86 100644 --- a/spec/concurrent/atomic/atomic_boolean_spec.rb +++ b/spec/concurrent/atomic/atomic_boolean_spec.rb @@ -151,7 +151,7 @@ module Concurrent end end - if jruby? + if TestHelpers.jruby? describe JavaAtomicBoolean do it_should_behave_like :atomic_boolean diff --git a/spec/concurrent/atomic/atomic_fixnum_spec.rb b/spec/concurrent/atomic/atomic_fixnum_spec.rb index 52bae798f..282175ece 100644 --- a/spec/concurrent/atomic/atomic_fixnum_spec.rb +++ b/spec/concurrent/atomic/atomic_fixnum_spec.rb @@ -165,7 +165,7 @@ module Concurrent end end - if jruby? + if TestHelpers.jruby? describe JavaAtomicFixnum do it_should_behave_like :atomic_fixnum diff --git a/spec/concurrent/atomic/count_down_latch_spec.rb b/spec/concurrent/atomic/count_down_latch_spec.rb index b4b82be41..8e3a58607 100644 --- a/spec/concurrent/atomic/count_down_latch_spec.rb +++ b/spec/concurrent/atomic/count_down_latch_spec.rb @@ -129,7 +129,7 @@ def subject.simulate_spurious_wake_up end end - if jruby? + if TestHelpers.jruby? describe JavaCountDownLatch do diff --git a/spec/concurrent/collection/priority_queue_spec.rb b/spec/concurrent/collection/priority_queue_spec.rb index e23f61333..e942a5c06 100644 --- a/spec/concurrent/collection/priority_queue_spec.rb +++ b/spec/concurrent/collection/priority_queue_spec.rb @@ -295,7 +295,7 @@ module Concurrent it_should_behave_like :priority_queue end - if jruby? + if TestHelpers.jruby? describe JavaPriorityQueue do diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index 3ee69c4fe..0e8b2ca0f 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -3,6 +3,7 @@ module Concurrent describe Configuration do + with_full_reset it 'creates a global timer pool' do Concurrent.configuration.global_timer_set.should_not be_nil diff --git a/spec/concurrent/executor/java_cached_thread_pool_spec.rb b/spec/concurrent/executor/java_cached_thread_pool_spec.rb index 106756735..395ebb0e8 100644 --- a/spec/concurrent/executor/java_cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_cached_thread_pool_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'cached_thread_pool_shared' diff --git a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb index 71527ac5b..0f8dd810a 100644 --- a/spec/concurrent/executor/java_fixed_thread_pool_spec.rb +++ b/spec/concurrent/executor/java_fixed_thread_pool_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'fixed_thread_pool_shared' diff --git a/spec/concurrent/executor/java_single_thread_executor_spec.rb b/spec/concurrent/executor/java_single_thread_executor_spec.rb index e6bf86c21..81df30176 100644 --- a/spec/concurrent/executor/java_single_thread_executor_spec.rb +++ b/spec/concurrent/executor/java_single_thread_executor_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'thread_pool_shared' diff --git a/spec/concurrent/executor/java_thread_pool_executor_spec.rb b/spec/concurrent/executor/java_thread_pool_executor_spec.rb index 33d6368ea..57fb7c747 100644 --- a/spec/concurrent/executor/java_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/java_thread_pool_executor_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -if jruby? +if Concurrent::TestHelpers.jruby? require_relative 'thread_pool_executor_shared' diff --git a/spec/concurrent/scheduled_task_spec.rb b/spec/concurrent/scheduled_task_spec.rb index afd77ab24..4922e5805 100644 --- a/spec/concurrent/scheduled_task_spec.rb +++ b/spec/concurrent/scheduled_task_spec.rb @@ -7,6 +7,7 @@ module Concurrent describe ScheduledTask do + with_full_reset context 'behavior' do diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 27ace0c39..6b47512d1 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -5,6 +5,7 @@ module Concurrent describe TimerTask do + with_full_reset before(:each) do # suppress deprecation warnings. @@ -23,13 +24,13 @@ module Concurrent end def dereferenceable_subject(value, opts = {}) - opts = opts.merge(execution_interval: 0.1, run_now: true) - @subject = TimerTask.new(opts){ value }.execute.tap{ sleep(0.1) } + opts = opts.merge(execution_interval: 0.1, run_now: true) + @subject = TimerTask.new(opts) { value }.execute.tap { sleep(0.1) } end def dereferenceable_observable(opts = {}) - opts = opts.merge(execution_interval: 0.1, run_now: true) - @subject = TimerTask.new(opts){ 'value' } + opts = opts.merge(execution_interval: 0.1, run_now: true) + @subject = TimerTask.new(opts) { 'value' } end def execute_dereferenceable(subject) @@ -42,9 +43,9 @@ def execute_dereferenceable(subject) context :observable do - subject{ TimerTask.new(execution_interval: 0.1){ nil } } + subject { TimerTask.new(execution_interval: 0.1) { nil } } - after(:each){ subject.kill } + after(:each) { subject.kill } def trigger_observable(observable) observable.execute @@ -66,45 +67,45 @@ def trigger_observable(observable) it 'raises an exception if :execution_interval is not greater than zero' do lambda { - Concurrent::TimerTask.new(execution_interval: 0){ nil } + Concurrent::TimerTask.new(execution_interval: 0) { nil } }.should raise_error(ArgumentError) end it 'raises an exception if :execution_interval is not an integer' do lambda { - Concurrent::TimerTask.new(execution_interval: 'one'){ nil } + Concurrent::TimerTask.new(execution_interval: 'one') { nil } }.should raise_error(ArgumentError) end it 'raises an exception if :timeout_interval is not greater than zero' do lambda { - Concurrent::TimerTask.new(timeout_interval: 0){ nil } + Concurrent::TimerTask.new(timeout_interval: 0) { nil } }.should raise_error(ArgumentError) end it 'raises an exception if :timeout_interval is not an integer' do lambda { - Concurrent::TimerTask.new(timeout_interval: 'one'){ nil } + Concurrent::TimerTask.new(timeout_interval: 'one') { nil } }.should raise_error(ArgumentError) end it 'uses the default execution interval when no interval is given' do - subject = TimerTask.new{ nil } + subject = TimerTask.new { nil } subject.execution_interval.should eq TimerTask::EXECUTION_INTERVAL end it 'uses the default timeout interval when no interval is given' do - subject = TimerTask.new{ nil } + subject = TimerTask.new { nil } subject.timeout_interval.should eq TimerTask::TIMEOUT_INTERVAL end it 'uses the given execution interval' do - subject = TimerTask.new(execution_interval: 5){ nil } + subject = TimerTask.new(execution_interval: 5) { nil } subject.execution_interval.should eq 5 end it 'uses the given timeout interval' do - subject = TimerTask.new(timeout_interval: 5){ nil } + subject = TimerTask.new(timeout_interval: 5) { nil } subject.timeout_interval.should eq 5 end end @@ -112,7 +113,7 @@ def trigger_observable(observable) context '#kill' do it 'returns true on success' do - task = TimerTask.execute(run_now: false){ nil } + task = TimerTask.execute(run_now: false) { nil } sleep(0.1) task.kill.should be_true end @@ -129,7 +130,7 @@ def trigger_observable(observable) specify '#execution_interval is writeable' do - latch = CountDownLatch.new(1) + latch = CountDownLatch.new(1) subject = TimerTask.new(execution_interval: 1) do |task| task.execution_interval = 3 latch.count_down @@ -148,7 +149,7 @@ def trigger_observable(observable) specify '#execution_interval is writeable' do - latch = CountDownLatch.new(1) + latch = CountDownLatch.new(1) subject = TimerTask.new(timeout_interval: 1, execution_interval: 0.1) do |task| task.timeout_interval = 3 latch.count_down @@ -169,23 +170,23 @@ def trigger_observable(observable) context 'execution' do it 'runs the block immediately when the :run_now option is true' do - latch = CountDownLatch.new(1) - subject = TimerTask.execute(execution: 500, now: true){ latch.count_down } + latch = CountDownLatch.new(1) + subject = TimerTask.execute(execution: 500, now: true) { latch.count_down } latch.wait(1).should be_true subject.kill end it 'waits for :execution_interval seconds when the :run_now option is false' do - latch = CountDownLatch.new(1) - subject = TimerTask.execute(execution: 0.1, now: false){ latch.count_down } + latch = CountDownLatch.new(1) + subject = TimerTask.execute(execution: 0.1, now: false) { latch.count_down } latch.count.should eq 1 latch.wait(1).should be_true subject.kill end it 'waits for :execution_interval seconds when the :run_now option is not given' do - latch = CountDownLatch.new(1) - subject = TimerTask.execute(execution: 0.1, now: false){ latch.count_down } + latch = CountDownLatch.new(1) + subject = TimerTask.execute(execution: 0.1, now: false) { latch.count_down } latch.count.should eq 1 latch.wait(1).should be_true subject.kill @@ -193,8 +194,8 @@ def trigger_observable(observable) it 'passes a "self" reference to the block as the sole argument' do expected = nil - latch = CountDownLatch.new(1) - subject = TimerTask.new(execution_interval: 1, run_now: true) do |task| + latch = CountDownLatch.new(1) + subject = TimerTask.new(execution_interval: 1, run_now: true) do |task| expected = task latch.sount_down end @@ -213,18 +214,18 @@ def trigger_observable(observable) attr_reader :value attr_reader :ex attr_reader :latch - define_method(:initialize){ @latch = CountDownLatch.new(1) } + define_method(:initialize) { @latch = CountDownLatch.new(1) } define_method(:update) do |time, value, ex| - @time = time + @time = time @value = value - @ex = ex + @ex = ex @latch.count_down end end.new end it 'notifies all observers on success' do - subject = TimerTask.new(execution: 0.1){ 42 } + subject = TimerTask.new(execution: 0.1) { 42 } subject.add_observer(observer) subject.execute observer.latch.wait(1) @@ -234,7 +235,7 @@ def trigger_observable(observable) end it 'notifies all observers on timeout' do - subject = TimerTask.new(execution: 0.1, timeout: 0.1){ sleep } + subject = TimerTask.new(execution: 0.1, timeout: 0.1) { sleep } subject.add_observer(observer) subject.execute observer.latch.wait(1) @@ -244,7 +245,7 @@ def trigger_observable(observable) end it 'notifies all observers on error' do - subject = TimerTask.new(execution: 0.1){ raise ArgumentError } + subject = TimerTask.new(execution: 0.1) { raise ArgumentError } subject.add_observer(observer) subject.execute observer.latch.wait(1) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 80a5d7806..f5f7b0bcf 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,8 +2,8 @@ require 'coveralls' SimpleCov.formatter = SimpleCov::Formatter::MultiFormatter[ - SimpleCov::Formatter::HTMLFormatter, - Coveralls::SimpleCov::Formatter + SimpleCov::Formatter::HTMLFormatter, + Coveralls::SimpleCov::Formatter ] SimpleCov.start do @@ -17,25 +17,15 @@ require 'concurrent' +logger = Logger.new($stderr) +logger.level = Logger::INFO +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end + # import all the support files Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require File.expand_path(f) } RSpec.configure do |config| config.order = 'random' - - config.before(:suite) do - end - - config.before(:each) do - reset_gem_configuration - end - - config.after(:each) do - Thread.list.each do |thread| - thread.kill unless thread == Thread.current - end - end - - config.after(:suite) do - end end diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb new file mode 100644 index 000000000..b97385818 --- /dev/null +++ b/spec/support/example_group_extensions.rb @@ -0,0 +1,48 @@ +require 'rbconfig' + +module Concurrent + module TestHelpers + def delta(v1, v2) + if block_given? + v1 = yield(v1) + v2 = yield(v2) + end + return (v1 - v2).abs + end + + def mri? + RbConfig::CONFIG['ruby_install_name']=~ /^ruby$/i + end + + def jruby? + RbConfig::CONFIG['ruby_install_name']=~ /^jruby$/i + end + + def rbx? + RbConfig::CONFIG['ruby_install_name']=~ /^rbx$/i + end + + def reset_gem_configuration + Concurrent.instance_variable_set(:@configuration, Concurrent::Configuration.new) + end + + extend self + end +end + +class RSpec::Core::ExampleGroup + def self.with_full_reset + before(:each) do + reset_gem_configuration + end + + after(:each) do + Thread.list.each do |thread| + thread.kill unless thread == Thread.current + end + end + end + + include Concurrent::TestHelpers + extend Concurrent::TestHelpers +end diff --git a/spec/support/functions.rb b/spec/support/functions.rb deleted file mode 100644 index f21856d39..000000000 --- a/spec/support/functions.rb +++ /dev/null @@ -1,25 +0,0 @@ -require 'rbconfig' - -def delta(v1, v2) - if block_given? - v1 = yield(v1) - v2 = yield(v2) - end - return (v1 - v2).abs -end - -def mri? - RbConfig::CONFIG['ruby_install_name']=~ /^ruby$/i -end - -def jruby? - RbConfig::CONFIG['ruby_install_name']=~ /^jruby$/i -end - -def rbx? - RbConfig::CONFIG['ruby_install_name']=~ /^rbx$/i -end - -def reset_gem_configuration - Concurrent.instance_variable_set(:@configuration, Concurrent::Configuration.new) -end