From 0969f9c284e8e632095f2a53506d785c071951fb Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 29 Oct 2014 21:02:36 +0100 Subject: [PATCH 01/16] All messages should have same priority It's now possible to send actor << job1 << job2 <<:terminate! and be sure that both jobs are processed first. --- lib/concurrent/actor/behaviour.rb | 36 ++++++++++++++--------- lib/concurrent/actor/behaviour/pausing.rb | 2 +- lib/concurrent/actor/root.rb | 6 ++-- spec/concurrent/actor_spec.rb | 12 ++------ 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/lib/concurrent/actor/behaviour.rb b/lib/concurrent/actor/behaviour.rb index 55c8ba39b..cce8ad548 100644 --- a/lib/concurrent/actor/behaviour.rb +++ b/lib/concurrent/actor/behaviour.rb @@ -33,24 +33,30 @@ module Behaviour require 'concurrent/actor/behaviour/terminates_children' def self.basic_behaviour_definition - [*base, - *user_messages(:terminate!)] + [*base(:terminate!), + *linking, + *user_messages] end - def self.restarting_behaviour_definition - [*base, + def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) + [*base(:pause!), + *linking, *supervised, - [Behaviour::Supervising, [:reset!, :one_for_one]], - *user_messages(:pause!)] + *supervising(handle, strategy), + *user_messages] end - def self.base - [[SetResults, [:terminate!]], + def self.base(on_error) + [[SetResults, [on_error]], # has to be before Termination to be able to remove children form terminated actor [RemovesChild, []], [Termination, []], - [TerminatesChildren, []], - [Linking, []]] + [TerminatesChildren, []]] + end + + # @see '' its source code + def self.linking + [[Linking, []]] end def self.supervised @@ -58,10 +64,12 @@ def self.supervised [Pausing, []]] end - def self.user_messages(on_error) - [[Buffer, []], - [SetResults, [on_error]], - [Awaits, []], + def self.supervising(handle = :reset!, strategy = :one_for_one) + [[Behaviour::Supervising, [handle, strategy]]] + end + + def self.user_messages + [[Awaits, []], [ExecutesContext, []], [ErrorsOnUnknownMessage, []]] end diff --git a/lib/concurrent/actor/behaviour/pausing.rb b/lib/concurrent/actor/behaviour/pausing.rb index 744392908..c81b141b5 100644 --- a/lib/concurrent/actor/behaviour/pausing.rb +++ b/lib/concurrent/actor/behaviour/pausing.rb @@ -66,7 +66,7 @@ def on_event(event) @buffer.each { |envelope| reject_envelope envelope } @buffer.clear when :resumed, :reset - @buffer.each { |envelope| core.schedule_execution { pass envelope } } + @buffer.each { |envelope| core.schedule_execution { core.process_envelope envelope } } @buffer.clear end super event diff --git a/lib/concurrent/actor/root.rb b/lib/concurrent/actor/root.rb index 60c7061b2..01ac0fb19 100644 --- a/lib/concurrent/actor/root.rb +++ b/lib/concurrent/actor/root.rb @@ -28,9 +28,9 @@ def dead_letter_routing end def behaviour_definition - [*Behaviour.base, - [Behaviour::Supervising, [:reset!, :one_for_one]], - *Behaviour.user_messages(:just_log)] + [*Behaviour.base(:just_log), + *Behaviour.supervising, + *Behaviour.user_messages] end end end diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index 305ae08d2..a878d7f27 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -282,13 +282,7 @@ def on_message(message) describe 'pausing' do it 'pauses on error' do queue = Queue.new - resuming_behaviour = Behaviour.restarting_behaviour_definition.map do |c, args| - if Behaviour::Supervising == c - [c, [:resume!, :one_for_one]] - else - [c, args] - end - end + resuming_behaviour = Behaviour.restarting_behaviour_definition(:resume!) test = AdHoc.spawn name: :tester, behaviour_definition: resuming_behaviour do actor = AdHoc.spawn name: :pausing, @@ -302,9 +296,7 @@ def on_message(message) actor << nil queue << actor.ask(:add) - -> m do - queue << m - end + -> m { queue << m } end expect(queue.pop).to eq :init From b898a47d2208bfeb6e46e8aab4e8c0e686d8bbba Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Wed, 29 Oct 2014 22:19:28 +0100 Subject: [PATCH 02/16] Improving documentation --- .yardopts | 2 +- doc/actor/celluloid_benchmark.rb | 1 - doc/actor/define.in.rb | 38 ++++ doc/actor/define.out.rb | 39 ++++ doc/actor/main.md | 185 +++++++++++------- doc/actor/messaging.in.rb | 30 +++ doc/actor/messaging.out.rb | 32 +++ doc/actor/quick.in.rb | 88 ++------- doc/actor/quick.out.rb | 89 ++------- lib/concurrent/actor.rb | 30 ++- lib/concurrent/actor/behaviour.rb | 72 ++++++- lib/concurrent/actor/behaviour/awaits.rb | 8 +- .../actor/behaviour/executes_context.rb | 2 +- lib/concurrent/actor/behaviour/linking.rb | 35 +++- lib/concurrent/actor/behaviour/pausing.rb | 2 +- .../actor/behaviour/sets_results.rb | 1 + lib/concurrent/actor/behaviour/supervised.rb | 19 ++ lib/concurrent/actor/behaviour/supervising.rb | 4 + .../actor/behaviour/terminates_children.rb | 3 +- lib/concurrent/actor/behaviour/termination.rb | 3 +- lib/concurrent/actor/context.rb | 69 +++---- lib/concurrent/actor/core.rb | 50 +++-- lib/concurrent/actor/reference.rb | 49 +++-- lib/concurrent/atomic/event.rb | 4 +- 24 files changed, 550 insertions(+), 305 deletions(-) create mode 100644 doc/actor/define.in.rb create mode 100644 doc/actor/define.out.rb create mode 100644 doc/actor/messaging.in.rb create mode 100644 doc/actor/messaging.out.rb diff --git a/.yardopts b/.yardopts index 00d17414a..efe260698 100644 --- a/.yardopts +++ b/.yardopts @@ -1,5 +1,5 @@ --protected ---no-private +--private --embed-mixins --output-dir ./yardoc --markup markdown diff --git a/doc/actor/celluloid_benchmark.rb b/doc/actor/celluloid_benchmark.rb index 5ffd54e77..dea20e553 100644 --- a/doc/actor/celluloid_benchmark.rb +++ b/doc/actor/celluloid_benchmark.rb @@ -1,6 +1,5 @@ require 'benchmark' require 'concurrent/actor' -Concurrent::Actor.i_know_it_is_experimental! require 'celluloid' require 'celluloid/autostart' diff --git a/doc/actor/define.in.rb b/doc/actor/define.in.rb new file mode 100644 index 000000000..98ee0611d --- /dev/null +++ b/doc/actor/define.in.rb @@ -0,0 +1,38 @@ +require 'concurrent' + +Message = Struct.new :action, :value # + +class AnActor < Concurrent::Actor::RestartingContext + def initialize(init) + @counter = init + end + + # override #on_message to define actor's behaviour on message received + def on_message(message) + case message.action + when :add + @counter = @counter + message.value + when :subtract + @counter = @counter - message.value + when :value + @counter + else + pass + end + end + + # set counter to zero when there is an error + def on_event(event) + if event == :reset + @counter = 0 # ignore initial value + end + end +end # + +an_actor = AnActor.spawn name: 'an_actor', args: 10, supervise: true # +an_actor << Message.new(:add, 1) << Message.new(:subtract, 2) # +an_actor.ask!(Message.new(:value, nil)) +an_actor << :boo << Message.new(:add, 1) # +an_actor.ask!(Message.new(:value, nil)) +an_actor << :terminate! + diff --git a/doc/actor/define.out.rb b/doc/actor/define.out.rb new file mode 100644 index 000000000..66713cb25 --- /dev/null +++ b/doc/actor/define.out.rb @@ -0,0 +1,39 @@ +require 'concurrent' # => true + +Message = Struct.new :action, :value + +class AnActor < Concurrent::Actor::RestartingContext + def initialize(init) + @counter = init + end + + # override #on_message to define actor's behaviour on message received + def on_message(message) + case message.action + when :add + @counter = @counter + message.value + when :subtract + @counter = @counter - message.value + when :value + @counter + else + pass + end + end + + # set counter to zero when there is an error + def on_event(event) + if event == :reset + @counter = 0 # ignore initial value + end + end +end + +an_actor = AnActor.spawn name: 'an_actor', args: 10, supervise: true +an_actor << Message.new(:add, 1) << Message.new(:subtract, 2) +an_actor.ask!(Message.new(:value, nil)) # => 9 +an_actor << :boo << Message.new(:add, 1) +an_actor.ask!(Message.new(:value, nil)) # => 1 +an_actor << :terminate! + # => # + diff --git a/doc/actor/main.md b/doc/actor/main.md index 5ef8f1a4e..794af1059 100644 --- a/doc/actor/main.md +++ b/doc/actor/main.md @@ -1,51 +1,92 @@ # Actor model -- Light-weighted. +- Light-weighted running on thread-pool. - Inspired by Akka and Erlang. - Modular. -Actors are sharing a thread-pool by default which makes them very cheap to create and discard. -Thousands of actors can be created, allowing you to break the program into small maintainable pieces, -without violating the single responsibility principle. +This Actor model implementation makes makes actors very cheap to create and discard. +Thousands of actors can be created, allowing you to break the program into smaller +maintainable pieces, without violating the single responsibility principle. ## What is an actor model? -[Wiki](http://en.wikipedia.org/wiki/Actor_model) says: -The actor model in computer science is a mathematical model of concurrent computation -that treats _actors_ as the universal primitives of concurrent digital computation: +Actor-based concurrency is all the rage in some circles. Originally described in 1973, the actor model is a paradigm +for creating asynchronous, concurrent objects that is becoming increasingly popular. Much has changed since actors +were first written about four decades ago, which has led to a serious fragmentation within the actor community. +There is *no* universally accepted, strict definition of "actor" and actor implementations differ widely between +languages and libraries. + +[Wiki](http://en.wikipedia.org/wiki/Actor_model) definition is pretty good: +_The actor model in computer science is a mathematical model of concurrent computation +that treats **actors** as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next -message received. +message received._ ## Why? -Concurrency is hard this is one of many ways how to simplify the problem. -It is simpler to reason about actors than about locks (and all their possible states). +Concurrency is hard to get right, actors are one of many ways how to simplify the problem. -## How to use it +## Quick example {include:file:doc/actor/quick.out.rb} -## Messaging +## Spawning actors + +- {Concurrent::Actor.spawn} and {Concurrent::Actor.spawn!} +- {Concurrent::Actor::AbstractContext.spawn} and {Concurrent::Actor::AbstractContext.spawn!} + +## Sending messages + +- {Concurrent::Actor::Reference#tell} + {include:Concurrent::Actor::Reference#tell} +- {Concurrent::Actor::Reference#ask} + {include:Concurrent::Actor::Reference#ask} +- {Concurrent::Actor::Reference#ask!} + {include:Concurrent::Actor::Reference#ask!} Messages are processed in same order as they are sent by a sender. It may interleaved with -messages form other senders though. There is also a contract in actor model that -messages sent between actors should be immutable. Gems like +messages from other senders though. + +### Immutability + +Messages sent between actors should be **immutable**. Gems like - [Algebrick](https://github.com/pitr-ch/algebrick) - Typed struct on steroids based on algebraic types and pattern matching - [Hamster](https://github.com/hamstergem/hamster) - Efficient, Immutable, Thread-Safe Collection classes for Ruby -are very useful. +are very helpful. -### Dead letter routing +{include:file:doc/actor/messaging.out.rb} -see {AbstractContext#dead_letter_routing} description: +## Actor definition -> {include:Actor::AbstractContext#dead_letter_routing} +{include:Concurrent::Actor::AbstractContext} + +## Reference + +{include:Actor::Reference} + +## Garbage collection + +Spawned actor cannot be garbage-collected until it's terminated. There is a reference held in the parent actor. + +## Parent-child relationship, name, and path + +- {Core#name} + {include:Actor::Core#name} +- {Core#path} + {include:Actor::Core#path} +- {Core#parent} + {include:Actor::Core#parent} -## Architecture +## Behaviour + +{include:Actor::Behaviour} + +## IO cooperation Actors are running on shared thread poll which allows user to create many actors cheaply. Downside is that these actors cannot be directly used to do IO or other blocking operations. @@ -56,19 +97,31 @@ Blocking operations could starve the `default_task_pool`. However there are two - Create an actor using `global_operation_pool` instead of `global_task_pool`, e.g. `AnIOActor.spawn name: :blocking, executor: Concurrent.configuration.global_operation_pool`. -Each actor is composed from 4 parts: +## Dead letter routing -### {Reference} -{include:Actor::Reference} +see {AbstractContext#dead_letter_routing} description: -### {Core} -{include:Actor::Core} +> {include:Actor::AbstractContext#dead_letter_routing} -### {AbstractContext} -{include:Actor::AbstractContext} +## FAQ -### {Behaviour} -{include:Actor::Behaviour} +### What happens if I try to supervise using a normal Context? + +Alleged supervisor will receive errors from its supervised actors. They'll have to be handled manually. + +### How to change supervision strategy? + +Use option `behaviour_definition: Behaviour.restarting_behaviour_definition(:resume!)` or +`behaviour_definition: Behaviour.restarting_behaviour_definition(:reset!, :one_for_all)` + +### How to change behaviors? + +Any existing behavior can be subclassed + +### How to implement custom restarting? + +By subclassing {Behaviour::Pausing} and overriding {Behaviour::Pausing#restart!}. Implementing +{AbstractContext#on_event} could be also considered. ## Speed @@ -85,49 +138,49 @@ Benchmark legend: ### JRUBY - Rehearsal -------------------------------------------------------- - 50000 2 concurrent 24.110000 0.800000 24.910000 ( 7.728000) - 50000 2 celluloid 28.510000 4.780000 33.290000 ( 14.782000) - 50000 500 concurrent 13.700000 0.280000 13.980000 ( 4.307000) - 50000 500 celluloid 14.520000 11.740000 26.260000 ( 12.258000) - 50000 1000 concurrent 10.890000 0.220000 11.110000 ( 3.760000) - 50000 1000 celluloid 15.600000 21.690000 37.290000 ( 18.512000) - 50000 1500 concurrent 10.580000 0.270000 10.850000 ( 3.646000) - 50000 1500 celluloid 14.490000 29.790000 44.280000 ( 26.043000) - --------------------------------------------- total: 201.970000sec + Rehearsal --------------------------------------------------------- + 50000 2 concurrent 26.140000 0.610000 26.750000 ( 7.761000) + 50000 2 celluloid 41.440000 5.270000 46.710000 ( 17.535000) + 50000 500 concurrent 11.340000 0.180000 11.520000 ( 3.498000) + 50000 500 celluloid 19.310000 10.680000 29.990000 ( 14.619000) + 50000 1000 concurrent 10.640000 0.180000 10.820000 ( 3.563000) + 50000 1000 celluloid 17.840000 19.850000 37.690000 ( 18.892000) + 50000 1500 concurrent 14.120000 0.290000 14.410000 ( 4.618000) + 50000 1500 celluloid 19.060000 28.920000 47.980000 ( 25.185000) + ---------------------------------------------- total: 225.870000sec - mes. act. impl. user system total real - 50000 2 concurrent 9.820000 0.510000 10.330000 ( 5.735000) - 50000 2 celluloid 10.390000 4.030000 14.420000 ( 7.494000) - 50000 500 concurrent 9.880000 0.200000 10.080000 ( 3.310000) - 50000 500 celluloid 12.430000 11.310000 23.740000 ( 11.727000) - 50000 1000 concurrent 10.590000 0.190000 10.780000 ( 4.029000) - 50000 1000 celluloid 14.950000 23.260000 38.210000 ( 20.841000) - 50000 1500 concurrent 10.710000 0.250000 10.960000 ( 3.892000) - 50000 1500 celluloid 13.280000 30.030000 43.310000 ( 24.620000) (1) + mes. act. impl. user system total real + 50000 2 concurrent 7.320000 0.530000 7.850000 ( 3.637000) + 50000 2 celluloid 13.780000 4.730000 18.510000 ( 10.756000) + 50000 500 concurrent 9.270000 0.140000 9.410000 ( 3.020000) + 50000 500 celluloid 16.540000 10.920000 27.460000 ( 14.308000) + 50000 1000 concurrent 9.970000 0.160000 10.130000 ( 3.445000) + 50000 1000 celluloid 15.930000 20.840000 36.770000 ( 18.272000) + 50000 1500 concurrent 11.580000 0.240000 11.820000 ( 3.723000) + 50000 1500 celluloid 19.440000 29.060000 48.500000 ( 25.227000) (1) ### MRI 2.1.0 - Rehearsal -------------------------------------------------------- - 50000 2 concurrent 4.640000 0.080000 4.720000 ( 4.852390) - 50000 2 celluloid 6.110000 2.300000 8.410000 ( 7.898069) - 50000 500 concurrent 6.260000 2.210000 8.470000 ( 7.400573) - 50000 500 celluloid 10.250000 4.930000 15.180000 ( 14.174329) - 50000 1000 concurrent 6.300000 1.860000 8.160000 ( 7.303162) - 50000 1000 celluloid 12.300000 7.090000 19.390000 ( 17.962621) - 50000 1500 concurrent 7.410000 2.610000 10.020000 ( 8.887396) - 50000 1500 celluloid 14.850000 10.690000 25.540000 ( 24.489796) - ---------------------------------------------- total: 99.890000sec + Rehearsal --------------------------------------------------------- + 50000 2 concurrent 4.180000 0.080000 4.260000 ( 4.269435) + 50000 2 celluloid 7.740000 3.100000 10.840000 ( 10.043875) + 50000 500 concurrent 5.900000 1.310000 7.210000 ( 6.565067) + 50000 500 celluloid 12.820000 5.810000 18.630000 ( 17.320765) + 50000 1000 concurrent 6.080000 1.640000 7.720000 ( 6.931294) + 50000 1000 celluloid 17.130000 8.320000 25.450000 ( 23.786146) + 50000 1500 concurrent 6.940000 2.030000 8.970000 ( 7.927330) + 50000 1500 celluloid 20.980000 12.040000 33.020000 ( 30.849578) + ---------------------------------------------- total: 116.100000sec - mes. act. impl. user system total real - 50000 2 concurrent 4.190000 0.070000 4.260000 ( 4.306386) - 50000 2 celluloid 6.490000 2.210000 8.700000 ( 8.280051) - 50000 500 concurrent 7.060000 2.520000 9.580000 ( 8.518707) - 50000 500 celluloid 10.550000 4.980000 15.530000 ( 14.699962) - 50000 1000 concurrent 6.440000 1.870000 8.310000 ( 7.571059) - 50000 1000 celluloid 12.340000 7.510000 19.850000 ( 18.793591) - 50000 1500 concurrent 6.720000 2.160000 8.880000 ( 7.929630) - 50000 1500 celluloid 14.140000 10.130000 24.270000 ( 22.775288) (1) + mes. act. impl. user system total real + 50000 2 concurrent 3.730000 0.100000 3.830000 ( 3.822688) + 50000 2 celluloid 7.900000 2.910000 10.810000 ( 9.924014) + 50000 500 concurrent 5.420000 1.230000 6.650000 ( 6.025579) + 50000 500 celluloid 12.720000 5.540000 18.260000 ( 16.889517) + 50000 1000 concurrent 5.420000 0.910000 6.330000 ( 5.896689) + 50000 1000 celluloid 16.090000 8.040000 24.130000 ( 22.347102) + 50000 1500 concurrent 5.580000 0.760000 6.340000 ( 6.038535) + 50000 1500 celluloid 20.000000 11.680000 31.680000 ( 29.590774) (1) *Note (1):* Celluloid is using thread per actor so this bench is creating about 1500 native threads. Actor is using constant number of threads. diff --git a/doc/actor/messaging.in.rb b/doc/actor/messaging.in.rb new file mode 100644 index 000000000..02575cbcd --- /dev/null +++ b/doc/actor/messaging.in.rb @@ -0,0 +1,30 @@ +require 'concurrent' +require 'algebrick' + +# Actor message protocol definition with Algebrick +Protocol = Algebrick.type do + variants Add = type { fields! a: Numeric, b: Numeric }, + Subtract = type { fields! a: Numeric, b: Numeric } +end + +class Calculator < Concurrent::Actor::RestartingContext + include Algebrick::Matching + + def on_message(message) + # pattern matching on the message with deconstruction + # ~ marks values which are passed to the block + match message, + (on Add.(~any, ~any) do |a, b| + a + b + end), + # or use multi-assignment + (on ~Subtract do |(a, b)| + a - b + end) + end +end # + +calculator = Calculator.spawn('calculator') +calculator.ask! Add[1, 2] +calculator.ask! Subtract[1, 0.5] +calculator << :terminate! diff --git a/doc/actor/messaging.out.rb b/doc/actor/messaging.out.rb new file mode 100644 index 000000000..93c864ea2 --- /dev/null +++ b/doc/actor/messaging.out.rb @@ -0,0 +1,32 @@ +require 'concurrent' # => true +require 'algebrick' # => true + +# Actor message protocol definition with Algebrick +Protocol = Algebrick.type do + variants Add = type { fields! a: Numeric, b: Numeric }, + Subtract = type { fields! a: Numeric, b: Numeric } +end # => Protocol(Add | Subtract) + +class Calculator < Concurrent::Actor::RestartingContext + include Algebrick::Matching + + def on_message(message) + # pattern matching on the message with deconstruction + # ~ marks values which are passed to the block + match message, + (on Add.(~any, ~any) do |a, b| + a + b + end), + # or use multi-assignment + (on ~Subtract do |(a, b)| + a - b + end) + end +end + +calculator = Calculator.spawn('calculator') + # => # +calculator.ask! Add[1, 2] # => 3 +calculator.ask! Subtract[1, 0.5] # => 0.5 +calculator << :terminate! + # => # diff --git a/doc/actor/quick.in.rb b/doc/actor/quick.in.rb index 1aae44160..694888df3 100644 --- a/doc/actor/quick.in.rb +++ b/doc/actor/quick.in.rb @@ -1,79 +1,29 @@ -class Counter < Concurrent::Actor::Context - # Include context of an actor which gives this class access to reference and other information - # about the actor, see PublicDelegations. - - # use initialize as you wish - def initialize(initial_value) - @count = initial_value - end - - # override on_message to define actor's behaviour - def on_message(message) - if Integer === message - @count += message - end - end -end # - -# Create new actor naming the instance 'first'. -# Return value is a reference to the actor, the actual actor is never returned. -counter = Counter.spawn(:first, 5) - -# Tell a message and forget returning self. -counter.tell(1) -counter << 1 -# (First counter now contains 7.) - -# Send a messages asking for a result. -counter.ask(0).class -counter.ask(0).value - -# Terminate the actor. -counter.tell :terminate! -# Not terminated yet, it takes a while until the message is processed. -counter.ask! :terminated? -# Waiting for the termination. -event = counter.ask!(:terminated_event) -event.class -event.wait -counter.ask! :terminated? -# Any subsequent messages are rejected. -counter.ask(5).wait.rejected? - -# Failure on message processing terminates the actor. -counter = Counter.spawn(:first, 0) -counter.ask('boom').wait.rejected? -counter.ask! :terminated? - - -# Lets define an actor creating children actors. -class Node < Concurrent::Actor::Context - def initialize - @last_child_id = 0 +class Adder < Concurrent::Actor::RestartingContext + def initialize(init) + @count = init end def on_message(message) case message - when :new_child - Node.spawn "child-#{@last_child_id += 1}" - when :how_many_children - children.size + when :add + @count += 1 else + # pass to ErrorsOnUnknownMessage behaviour, which will just fail pass end end end # -# Actors are tracking parent-child relationships -parent = Node.spawn :parent -child = parent.tell(:new_child).ask!(:new_child) -child.parent -parent.ask!(:how_many_children) - -# There is a special root actor which is used for all actors spawned outside any actor. -parent.parent - -# Termination of an parent will also terminate all children. -parent.ask('boom').wait # -counter.ask! :terminated? -counter.ask! :terminated? +# `supervise: true` makes the actor supervised by root actor +adder = Adder.spawn(name: :adder, supervise: true, args: [1]) +adder.parent + +# tell and forget +adder.tell(:add) << :add +# ask to get result +adder.ask!(:add) +# fail the actor +adder.ask!(:bad) rescue $! +# actor is restarted with initial values +adder.ask!(:add) +adder.ask!(:terminate!) diff --git a/doc/actor/quick.out.rb b/doc/actor/quick.out.rb index 23e6a88c9..cae9e03ae 100644 --- a/doc/actor/quick.out.rb +++ b/doc/actor/quick.out.rb @@ -1,82 +1,31 @@ -class Counter < Concurrent::Actor::Context - # Include context of an actor which gives this class access to reference and other information - # about the actor, see PublicDelegations. - - # use initialize as you wish - def initialize(initial_value) - @count = initial_value - end - - # override on_message to define actor's behaviour - def on_message(message) - if Integer === message - @count += message - end - end -end - -# Create new actor naming the instance 'first'. -# Return value is a reference to the actor, the actual actor is never returned. -counter = Counter.spawn(:first, 5) # => # - -# Tell a message and forget returning self. -counter.tell(1) # => # -counter << 1 # => # -# (First counter now contains 7.) - -# Send a messages asking for a result. -counter.ask(0).class # => Concurrent::IVar -counter.ask(0).value # => 7 - -# Terminate the actor. -counter.tell :terminate! # => # -# Not terminated yet, it takes a while until the message is processed. -counter.ask! :terminated? # => true -# Waiting for the termination. -event = counter.ask!(:terminated_event) - # => #, @condition=#>> -event.class # => Concurrent::Event -event.wait # => true -counter.ask! :terminated? # => true -# Any subsequent messages are rejected. -counter.ask(5).wait.rejected? # => true - -# Failure on message processing terminates the actor. -counter = Counter.spawn(:first, 0) # => # -counter.ask('boom').wait.rejected? # => false -counter.ask! :terminated? # => false - - -# Lets define an actor creating children actors. -class Node < Concurrent::Actor::Context - def initialize - @last_child_id = 0 +class Adder < Concurrent::Actor::RestartingContext + def initialize(init) + @count = init end def on_message(message) case message - when :new_child - Node.spawn "child-#{@last_child_id += 1}" - when :how_many_children - children.size + when :add + @count += 1 else + # pass to ErrorsOnUnknownMessage behaviour, which will just fail pass end end end -# Actors are tracking parent-child relationships -parent = Node.spawn :parent # => # -child = parent.tell(:new_child).ask!(:new_child) - # => # -child.parent # => # -parent.ask!(:how_many_children) # => 2 - -# There is a special root actor which is used for all actors spawned outside any actor. -parent.parent +# `supervise: true` makes the actor supervised by root actor +adder = Adder.spawn(name: :adder, supervise: true, args: [1]) + # => # +adder.parent # => # -# Termination of an parent will also terminate all children. -parent.ask('boom').wait -counter.ask! :terminated? # => false -counter.ask! :terminated? # => false +# tell and forget +adder.tell(:add) << :add # => # +# ask to get result +adder.ask!(:add) # => 4 +# fail the actor +adder.ask!(:bad) rescue $! # => # +# actor is restarted with initial values +adder.ask!(:add) # => 2 +adder.ask!(:terminate!) # => true diff --git a/lib/concurrent/actor.rb b/lib/concurrent/actor.rb index f799c3747..30e3da66c 100644 --- a/lib/concurrent/actor.rb +++ b/lib/concurrent/actor.rb @@ -10,13 +10,6 @@ module Concurrent # TODO IO interoperation # TODO un/become - # TODO doc - # - what happens if I try to supervise using a normal Context? - # - how to change behaviours - # - how to implement custom restarting? - # - pool for io operations using different executor - # - document guaranteed ordering - # {include:file:doc/actor/main.md} module Actor @@ -50,18 +43,21 @@ def self.root @root.value! end - # Spawns a new actor. - # - # @example simple + # Spawns a new actor. {Concurrent::Actor::AbstractContext.spawn} allows to omit class parameter. + # To see the list of avaliable options see {Core#initialize} + # @see Concurrent::Actor::AbstractContext.spawn + # @see Core#initialize + # @example by class and name # Actor.spawn(AdHoc, :ping1) { -> message { message } } # - # @example complex - # Actor.spawn name: :ping3, - # class: AdHoc, - # args: [1] - # executor: Concurrent.configuration.global_task_pool do |add| - # lambda { |number| number + add } + # @example by option hash + # inc2 = Actor.spawn(class: AdHoc, + # name: 'increment by 2', + # args: [2], + # executor: Concurrent.configuration.global_task_pool) do |increment_by| + # lambda { |number| number + increment_by } # end + # inc2.ask!(2) # => 4 # # @param block for context_class instantiation # @param args see {.spawn_optionify} @@ -74,7 +70,7 @@ def self.spawn(*args, &block) end end - # as {.spawn} but it'll raise when Actor not initialized properly + # as {.spawn} but it'll block until actor is initialized or it'll raise exception on error def self.spawn!(*args, &block) spawn(spawn_optionify(*args).merge(initialized: ivar = IVar.new), &block).tap { ivar.no_error! } end diff --git a/lib/concurrent/actor/behaviour.rb b/lib/concurrent/actor/behaviour.rb index cce8ad548..33ab5b073 100644 --- a/lib/concurrent/actor/behaviour.rb +++ b/lib/concurrent/actor/behaviour.rb @@ -3,7 +3,7 @@ module Actor # Actors have modular architecture, which is achieved by combining a light core with chain of # behaviours. Each message or internal event propagates through the chain allowing the - # behaviours react based on their responsibility. listing few as an example: + # behaviours react based on their responsibility. # # - {Behaviour::Linking}: # @@ -13,8 +13,45 @@ module Actor # # > {include:Actor::Behaviour::Awaits} # - # See {Behaviour}'s namespace fo other behaviours. + # - {Behaviour::Pausing}: + # + # > {include:Actor::Behaviour::Pausing} + # + # - {Behaviour::Supervised}: + # + # > {include:Actor::Behaviour::Supervised} + # + # - {Behaviour::Supervising}: + # + # > {include:Actor::Behaviour::Supervising} + # + # - {Behaviour::ExecutesContext}: + # + # > {include:Actor::Behaviour::ExecutesContext} + # + # - {Behaviour::ErrorsOnUnknownMessage}: + # + # > {include:Actor::Behaviour::ErrorsOnUnknownMessage} + # + # - {Behaviour::Termination}: + # + # > {include:Actor::Behaviour::Termination} + # + # - {Behaviour::TerminatesChildren}: + # + # > {include:Actor::Behaviour::TerminatesChildren} + # + # - {Behaviour::RemovesChild}: + # + # > {include:Actor::Behaviour::RemovesChild} + # # If needed new behaviours can be added, or old one removed to get required behaviour. + # + # - {Context} uses + # {include:Actor::Behaviour.basic_behaviour_definition} + # + # - {RestartingContext} uses + # {include:Actor::Behaviour.restarting_behaviour_definition} module Behaviour MESSAGE_PROCESSED = Object.new @@ -32,12 +69,39 @@ module Behaviour require 'concurrent/actor/behaviour/termination' require 'concurrent/actor/behaviour/terminates_children' + # Array of behaviours and their construction parameters. + # + # [[Behaviour::SetResults, [:terminate!]], + # [Behaviour::RemovesChild, []], + # [Behaviour::Termination, []], + # [Behaviour::TerminatesChildren, []], + # [Behaviour::Linking, []], + # [Behaviour::Awaits, []], + # [Behaviour::ExecutesContext, []], + # [Behaviour::ErrorsOnUnknownMessage, []]] + # + # @see '' its source code def self.basic_behaviour_definition [*base(:terminate!), *linking, *user_messages] end + # Array of behaviours and their construction parameters. + # + # [[Behaviour::SetResults, [:pause!]], + # [Behaviour::RemovesChild, []], + # [Behaviour::Termination, []], + # [Behaviour::TerminatesChildren, []], + # [Behaviour::Linking, []], + # [Behaviour::Supervised, []], + # [Behaviour::Pausing, []], + # [Behaviour::Supervising, [:reset!, :one_for_one]], + # [Behaviour::Awaits, []], + # [Behaviour::ExecutesContext, []], + # [Behaviour::ErrorsOnUnknownMessage, []]] + # + # @see '' its source code def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) [*base(:pause!), *linking, @@ -46,6 +110,7 @@ def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_o *user_messages] end + # @see '' its source code def self.base(on_error) [[SetResults, [on_error]], # has to be before Termination to be able to remove children form terminated actor @@ -59,15 +124,18 @@ def self.linking [[Linking, []]] end + # @see '' its source code def self.supervised [[Supervised, []], [Pausing, []]] end + # @see '' its source code def self.supervising(handle = :reset!, strategy = :one_for_one) [[Behaviour::Supervising, [handle, strategy]]] end + # @see '' its source code def self.user_messages [[Awaits, []], [ExecutesContext, []], diff --git a/lib/concurrent/actor/behaviour/awaits.rb b/lib/concurrent/actor/behaviour/awaits.rb index 1eec231a5..ba11a95e0 100644 --- a/lib/concurrent/actor/behaviour/awaits.rb +++ b/lib/concurrent/actor/behaviour/awaits.rb @@ -2,11 +2,11 @@ module Concurrent module Actor module Behaviour - # Handles `:await` messages. Which allows to wait on Actor to process all previously send + # Accepts `:await` messages. Which allows to wait on Actor to process all previously send # messages. - # @example - # actor << :a << :b - # actor.ask(:await).wait # blocks until :a and :b are processed + # + # actor << :a << :b + # actor.ask(:await).wait # blocks until :a and :b are processed class Awaits < Abstract def on_envelope(envelope) if envelope.message == :await diff --git a/lib/concurrent/actor/behaviour/executes_context.rb b/lib/concurrent/actor/behaviour/executes_context.rb index 1b13672d2..a7aa9c4dc 100644 --- a/lib/concurrent/actor/behaviour/executes_context.rb +++ b/lib/concurrent/actor/behaviour/executes_context.rb @@ -1,7 +1,7 @@ module Concurrent module Actor module Behaviour - # Delegates messages and events to {AbstractContext} instance + # Delegates messages and events to {AbstractContext} instance. class ExecutesContext < Abstract def on_envelope(envelope) context.on_envelope envelope diff --git a/lib/concurrent/actor/behaviour/linking.rb b/lib/concurrent/actor/behaviour/linking.rb index 26d06d36d..7263f8f58 100644 --- a/lib/concurrent/actor/behaviour/linking.rb +++ b/lib/concurrent/actor/behaviour/linking.rb @@ -3,8 +3,39 @@ module Actor module Behaviour # Links the actor to other actors and sends actor's events to them, - # like: `:terminated`, `:paused`, errors, etc - # TODO example + # like: `:terminated`, `:paused`, errors, etc. + # + # listener = AdHoc.spawn name: :listener do + # lambda do |message| + # case message + # when Reference + # if message.ask!(:linked?) + # message << :unlink + # else + # message << :link + # end + # else + # puts "got event #{message.inspect} from #{envelope.sender}" + # end + # end + # end + # + # an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do + # lambda { |message| raise 'failed'} + # end + # + # # link the actor + # listener.ask(an_actor).wait + # an_actor.ask(:fail).wait + # # unlink the actor + # listener.ask(an_actor).wait + # an_actor.ask(:fail).wait + # an_actor << :terminate! + # + # produces only two events, other events happened after unlinking + # + # got event # from # + # got event :reset from # class Linking < Abstract def initialize(core, subsequent) super core, subsequent diff --git a/lib/concurrent/actor/behaviour/pausing.rb b/lib/concurrent/actor/behaviour/pausing.rb index c81b141b5..cf75f5191 100644 --- a/lib/concurrent/actor/behaviour/pausing.rb +++ b/lib/concurrent/actor/behaviour/pausing.rb @@ -6,7 +6,7 @@ module Behaviour # When paused all arriving messages are collected and processed after the actor # is resumed or reset. Resume will simply continue with next message. # Reset also reinitialized context. - # TODO example + # @note TODO missing example class Pausing < Abstract def initialize(core, subsequent) super core, subsequent diff --git a/lib/concurrent/actor/behaviour/sets_results.rb b/lib/concurrent/actor/behaviour/sets_results.rb index c8f805878..9d9a8466f 100644 --- a/lib/concurrent/actor/behaviour/sets_results.rb +++ b/lib/concurrent/actor/behaviour/sets_results.rb @@ -14,6 +14,7 @@ def on_envelope(envelope) result = pass envelope if result != MESSAGE_PROCESSED && !envelope.ivar.nil? envelope.ivar.set result + log Logging::DEBUG, "finished processing of #{envelope.message.inspect}" end nil rescue => error diff --git a/lib/concurrent/actor/behaviour/supervised.rb b/lib/concurrent/actor/behaviour/supervised.rb index 6e9d698fc..0bd106f6d 100644 --- a/lib/concurrent/actor/behaviour/supervised.rb +++ b/lib/concurrent/actor/behaviour/supervised.rb @@ -5,6 +5,25 @@ module Behaviour # Sets and holds the supervisor of the actor if any. There is at most one supervisor # for each actor. Each supervisor is automatically linked. Messages: # `:pause!, :resume!, :reset!, :restart!` are accepted only from supervisor. + # + # actor = AdHoc.spawn(name: 'supervisor', behaviour_definition: Behaviour.restarting_behaviour_definition) do + # child = AdHoc.spawn(name: 'supervised', behaviour_definition: Behaviour.restarting_behaviour_definition) do + # p 'restarted' + # # message handle of supervised + # -> message { raise 'failed' } + # end + # # supervise the child + # child << :supervise + # + # # message handle of supervisor + # -> message do + # child << message if message != :reset + # end + # end + # + # actor << :bug + # # will be delegated to 'supervised', 'supervised' fails and is reset by its 'supervisor' + # class Supervised < Abstract attr_reader :supervisor diff --git a/lib/concurrent/actor/behaviour/supervising.rb b/lib/concurrent/actor/behaviour/supervising.rb index efeefad51..ef05fc22d 100644 --- a/lib/concurrent/actor/behaviour/supervising.rb +++ b/lib/concurrent/actor/behaviour/supervising.rb @@ -1,6 +1,10 @@ module Concurrent module Actor module Behaviour + + # Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, + # or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors). + # @note TODO missing example class Supervising < Abstract def initialize(core, subsequent, handle, strategy) super core, subsequent diff --git a/lib/concurrent/actor/behaviour/terminates_children.rb b/lib/concurrent/actor/behaviour/terminates_children.rb index 4af948a00..21193740b 100644 --- a/lib/concurrent/actor/behaviour/terminates_children.rb +++ b/lib/concurrent/actor/behaviour/terminates_children.rb @@ -4,7 +4,8 @@ module Behaviour # Terminates all children when the actor terminates. class TerminatesChildren < Abstract def on_event(event) - children.map { |ch| ch.ask :terminate! }.each(&:wait) if event == :terminated + # TODO set event in Termination after all children are terminated, requires new non-blocking join on Future + children.map { |ch| ch << :terminate! } if event == :terminated super event end end diff --git a/lib/concurrent/actor/behaviour/termination.rb b/lib/concurrent/actor/behaviour/termination.rb index d9dd95e85..de8cecf01 100644 --- a/lib/concurrent/actor/behaviour/termination.rb +++ b/lib/concurrent/actor/behaviour/termination.rb @@ -4,6 +4,7 @@ module Behaviour # Handles actor termination. # @note Actor rejects envelopes when terminated. + # @note TODO missing example class Termination < Abstract # @!attribute [r] terminated @@ -12,7 +13,7 @@ class Termination < Abstract def initialize(core, subsequent) super core, subsequent - @terminated = Event.new + @terminated = Concurrent::Event.new end # @note Actor rejects envelopes when terminated. diff --git a/lib/concurrent/actor/context.rb b/lib/concurrent/actor/context.rb index 091e962cd..1519acdd4 100644 --- a/lib/concurrent/actor/context.rb +++ b/lib/concurrent/actor/context.rb @@ -1,9 +1,8 @@ module Concurrent module Actor - # Abstract implementation of Actor context. Children has to implement - # {AbstractContext#on_message} and {AbstractContext#behaviour_definition} methods. - # There are two implementations: + # New actor is defined by subclassing {RestartingContext}, {Context} and defining its abstract methods. + # {AbstractContext} can be subclassed directly to implement more specific behaviour see {Root} implementation. # # - {Context} # @@ -12,6 +11,14 @@ module Actor # - {RestartingContext}. # # > {include:Actor::RestartingContext} + # + # Example of ac actor definition: + # + # {include:file:doc/actor/define.out.rb} + # + # See methods of {AbstractContext} what else can be tweaked, e.g {AbstractContext#default_reference_class} + # + # @abstract implement {AbstractContext#on_message} and {AbstractContext#behaviour_definition} class AbstractContext include TypeCheck include InternalDelegations @@ -27,7 +34,7 @@ def on_message(message) raise NotImplementedError end - # override to add custom code invocation on events like `:terminated`, `:resumed`, `anError`. + # override to add custom code invocation on internal events like `:terminated`, `:resumed`, `anError`. def on_event(event) end @@ -70,6 +77,7 @@ def default_reference_class Reference end + # tell self a message def tell(message) reference.tell message end @@ -81,24 +89,33 @@ def ask(message) alias_method :<<, :tell alias_method :ask!, :ask - private - - def initialize_core(core) - @core = Type! core, Core - end - - # behaves as {Concurrent::Actor.spawn} but :class is auto-inserted based on receiver + # Behaves as {Concurrent::Actor.spawn} but :class is auto-inserted based on receiver so it can be omitted. + # @example by class and name + # AdHoc.spawn(:ping1) { -> message { message } } + # + # @example by option hash + # inc2 = AdHoc.spawn(name: 'increment by 2', + # args: [2], + # executor: Concurrent.configuration.global_task_pool) do |increment_by| + # lambda { |number| number + increment_by } + # end + # inc2.ask!(2) # => 4 + # @see Concurrent::Actor.spawn def self.spawn(name_or_opts, *args, &block) Actor.spawn spawn_optionify(name_or_opts, *args), &block end - # behaves as {Concurrent::Actor.spawn!} but :class is auto-inserted based on receiver + # behaves as {Concurrent::Actor.spawn!} but :class is auto-inserted based on receiver so it can be omitted. def self.spawn!(name_or_opts, *args, &block) Actor.spawn! spawn_optionify(name_or_opts, *args), &block end private + def initialize_core(core) + @core = Type! core, Core + end + def self.spawn_optionify(name_or_opts, *args) if name_or_opts.is_a? Hash if name_or_opts.key?(:class) && name_or_opts[:class] != self @@ -115,36 +132,20 @@ def self.spawn_optionify(name_or_opts, *args) undef_method :spawn end - # Basic Context of an Actor. It does not support supervision and pausing. - # It simply terminates on error. - # - # - linking - # - terminates on error + # Basic Context of an Actor. It supports only linking and it simply terminates on error. + # Uses {Behaviour.basic_behaviour_definition}: # - # TODO describe behaviour - # TODO usage - # @example ping - # class Ping < Context - # def on_message(message) - # message - # end - # end - # - # Ping.spawn(:ping1).ask(:m).value #=> :m + # @abstract implement {AbstractContext#on_message} class Context < AbstractContext def behaviour_definition Behaviour.basic_behaviour_definition end end - # Context of an Actor for complex robust systems. - # - # - linking - # - supervising - # - pauses on error + # Context of an Actor for robust systems. It supports supervision, linking, pauses on error. + # Uses {Behaviour.restarting_behaviour_definition} # - # TODO describe behaviour - # TODO usage + # @abstract implement {AbstractContext#on_message} class RestartingContext < AbstractContext def behaviour_definition Behaviour.restarting_behaviour_definition diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 638f2970e..e54dbfaf4 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -3,7 +3,7 @@ module Actor require 'set' - # Core of the actor + # Core of the actor. # @note Whole class should be considered private. An user should use {Context}s and {Reference}s only. # @note devel: core should not block on anything, e.g. it cannot wait on children to terminate # that would eat up all threads in task pool and deadlock @@ -13,34 +13,38 @@ class Core include Synchronization # @!attribute [r] reference - # @return [Reference] reference to this actor which can be safely passed around + # Reference to this actor which can be safely passed around. + # @return [Reference] # @!attribute [r] name - # @return [String] the name of this instance, it should be uniq (not enforced right now) + # The name of actor instance, it should be uniq (not enforced). Allows easier orientation + # between actor instances. + # @return [String] # @!attribute [r] path - # @return [String] a path of this actor. It is used for easier orientation and logging. - # Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, - # e.g. `/an_actor/its_child`. - # (It will also probably form a supervision path (failures will be reported up to parents) - # in future versions.) + # Path of this actor. It is used for easier orientation and logging. + # Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root}, + # e.g. `/an_actor/its_child`. + # @return [String] # @!attribute [r] executor - # @return [Executor] which is used to process messages + # Executor which is used to process messages. + # @return [Executor] # @!attribute [r] actor_class - # @return [Context] a class including {Context} representing Actor's behaviour + # A subclass of {AbstractContext} representing Actor's behaviour. + # @return [Context] attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition # @option opts [String] name - # @option opts [Reference, nil] parent of an actor spawning this one - # @option opts [Class] reference a custom descendant of {Reference} to use # @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour # @option opts [Array] args arguments for actor_class instantiation # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` # @option opts [true, false] link, atomically link the actor to its parent # @option opts [true, false] supervise, atomically supervise the actor by its parent + # @option opts [Class] reference a custom descendant of {Reference} to use # @option opts [Array)>] behaviour_definition, array of pairs # where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition} # @option opts [IVar, nil] initialized, if present it'll be set or failed after {Context} initialization + # @option opts [Reference, nil] parent **private api** parent of the actor (the one spawning ) # @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params, - # can be used to hook actor instance to any logging system + # can be used to hook actor instance to any logging system, see {Concurrent::Logging} # @param [Proc] block for class instantiation def initialize(opts = {}, &block) synchronize do @@ -83,7 +87,8 @@ def initialize(opts = {}, &block) build_context messages.each do |message| - handle_envelope Envelope.new(message, nil, parent, reference) + log DEBUG, "preprocessing #{message} from #{parent}" + process_envelope Envelope.new(message, nil, parent, reference) end initialized.set reference if initialized @@ -96,7 +101,9 @@ def initialize(opts = {}, &block) end end - # @return [Reference, nil] of parent actor + # A parent Actor. When actor is spawned the {Actor.current} becomes its parent. + # When actor is spawned from a thread outside of an actor ({Actor.current} is nil) {Actor.root} is assigned. + # @return [Reference, nil] def parent @parent_core && @parent_core.reference end @@ -132,7 +139,10 @@ def remove_child(child) # can be called from other alternative Reference implementations # @param [Envelope] envelope def on_envelope(envelope) - schedule_execution { handle_envelope envelope } + schedule_execution do + log DEBUG, "received #{envelope.message.inspect} from #{envelope.sender}" + process_envelope envelope + end nil end @@ -194,13 +204,13 @@ def build_context @context.send :initialize, *@args, &@block end - private - - def handle_envelope(envelope) - log DEBUG, "received #{envelope.message.inspect} from #{envelope.sender}" + # @api private + def process_envelope(envelope) @first_behaviour.on_envelope envelope end + private + def initialize_behaviours(opts) @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, Array).each do |v| Type! v, Array diff --git a/lib/concurrent/actor/reference.rb b/lib/concurrent/actor/reference.rb index a43e992c9..daf8e93ca 100644 --- a/lib/concurrent/actor/reference.rb +++ b/lib/concurrent/actor/reference.rb @@ -1,9 +1,13 @@ module Concurrent module Actor - # Reference is public interface of Actor instances. It is used for sending messages and can - # be freely passed around the program. It also provides some basic information about the actor, + # {Reference} is public interface of Actor instances. It is used for sending messages and can + # be freely passed around the application. It also provides some basic information about the actor, # see {PublicDelegations}. + # + # AdHoc.spawn('printer') { -> message { puts message } } + # # => # + # # ^path ^context class class Reference include TypeCheck include PublicDelegations @@ -16,42 +20,61 @@ def initialize(core) @core = Type! core, Core end - # tells message to the actor, returns immediately + # Sends the message asynchronously to the actor and immediately returns + # `self` (the reference) allowing to chain message telling. # @param [Object] message # @return [Reference] self + # @example + # printer = AdHoc.spawn('printer') { -> message { puts message } } + # printer.tell('ping').tell('pong') + # printer << 'ping' << 'pong' + # # => 'ping'\n'pong'\n'ping'\n'pong'\n def tell(message) message message, nil end alias_method :<<, :tell - # @note it's a good practice to use tell whenever possible. Ask should be used only for - # testing and when it returns very shortly. It can lead to deadlock if all threads in - # global_task_pool will block on while asking. It's fine to use it form outside of actors and - # global_task_pool. + # Sends the message asynchronously to the actor and immediately returns {Concurrent::IVar} + # which will become completed when message is processed. # - # sends message to the actor and asks for the result of its processing, returns immediately + # @note it's a good practice to use {#tell} whenever possible. Results can be send back with other messages. + # Ask should be used only for testing and when it returns very shortly. It can lead to deadlock if all threads in + # global_task_pool will block on while asking. It's fine to use it form outside of actors and + # global_task_pool. # @param [Object] message # @param [Ivar] ivar to be fulfilled be message's processing result # @return [IVar] supplied ivar + # @example + # adder = AdHoc.spawn('adder') { -> message { message + 1 } } + # adder.ask(1).value # => 2 + # adder.ask(nil).wait.reason # => # def ask(message, ivar = IVar.new) message message, ivar end - # @note it's a good practice to use tell whenever possible. Ask should be used only for - # testing and when it returns very shortly. It can lead to deadlock if all threads in - # global_task_pool will block on while asking. It's fine to use it form outside of actors and - # global_task_pool. + # Sends the message synchronously and blocks until the message + # is processed. Raises on error. # - # sends message to the actor and asks for the result of its processing, blocks + # @note it's a good practice to use {#tell} whenever possible. Results can be send back with other messages. + # Ask should be used only for testing and when it returns very shortly. It can lead to deadlock if all threads in + # global_task_pool will block on while asking. It's fine to use it form outside of actors and + # global_task_pool. # @param [Object] message # @param [Ivar] ivar to be fulfilled be message's processing result # @return [Object] message's processing result # @raise [Exception] ivar.reason if ivar is #rejected? + # @example + # adder = AdHoc.spawn('adder') { -> message { message + 1 } } + # adder.ask!(1) # => 2 def ask!(message, ivar = IVar.new) ask(message, ivar).value! end + def map(messages) + messages.map { |m| self.ask(m) } + end + # behaves as {#tell} when no ivar and as {#ask} when ivar def message(message, ivar = nil) core.on_envelope Envelope.new(message, ivar, Actor.current || Thread.current, self) diff --git a/lib/concurrent/atomic/event.rb b/lib/concurrent/atomic/event.rb index b5393a73d..25f79a299 100644 --- a/lib/concurrent/atomic/event.rb +++ b/lib/concurrent/atomic/event.rb @@ -18,8 +18,8 @@ class Event # Creates a new `Event` in the unset state. Threads calling `#wait` on the # `Event` will block. def initialize - @set = false - @mutex = Mutex.new + @set = false + @mutex = Mutex.new @condition = Condition.new end From 810a591fe2053ec5554a433abf81a95858c72bf3 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 30 Oct 2014 15:28:43 +0100 Subject: [PATCH 03/16] Add IO example --- doc/actor/io.in.rb | 49 +++++++++++++++++++++++++++++++++++++ doc/actor/io.out.rb | 53 +++++++++++++++++++++++++++++++++++++++++ doc/actor/main.md | 4 ++++ lib/concurrent/actor.rb | 3 ++- 4 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 doc/actor/io.in.rb create mode 100644 doc/actor/io.out.rb diff --git a/doc/actor/io.in.rb b/doc/actor/io.in.rb new file mode 100644 index 000000000..04e2bc516 --- /dev/null +++ b/doc/actor/io.in.rb @@ -0,0 +1,49 @@ +require 'concurrent' + +# logger = Logger.new(STDOUT) +# Concurrent.configuration.logger = logger.method(:add) + +# First option is to use operation pool + +class ActorDoingIO < Concurrent::Actor::RestartingContext + def on_message(message) + # do IO operation + end + + def default_executor + Concurrent.configuration.global_operation_pool + end +end # + +actor_doing_io = ActorDoingIO.spawn :actor_doing_io +actor_doing_io.executor == Concurrent.configuration.global_operation_pool + +# It can be also built into a pool so there is not too many IO operations + +class IOWorker < Concurrent::Actor::Utils::AbstractWorker + def work(io_job) + # do IO work + sleep 1 + puts "#{path} second:#{Time.now.to_i} message:#{io_job}" + end + + def default_executor + Concurrent.configuration.global_operation_pool + end +end # + +pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index| + IOWorker.spawn(name: "worker-#{index}", supervise: true, args: [balancer]) +end + +pool << 1 << 2 << 3 << 4 << 5 << 6 + +# prints two lines each second +# /pool/worker-0 second:1414677666 message:1 +# /pool/worker-1 second:1414677666 message:2 +# /pool/worker-0 second:1414677667 message:3 +# /pool/worker-1 second:1414677667 message:4 +# /pool/worker-0 second:1414677668 message:5 +# /pool/worker-1 second:1414677668 message:6 + +sleep 4 diff --git a/doc/actor/io.out.rb b/doc/actor/io.out.rb new file mode 100644 index 000000000..0298a8cd3 --- /dev/null +++ b/doc/actor/io.out.rb @@ -0,0 +1,53 @@ +require 'concurrent' # => true + +# logger = Logger.new(STDOUT) +# Concurrent.configuration.logger = logger.method(:add) + +# First option is to use operation pool + +class ActorDoingIO < Concurrent::Actor::RestartingContext + def on_message(message) + # do IO operation + end + + def default_executor + Concurrent.configuration.global_operation_pool + end +end + +actor_doing_io = ActorDoingIO.spawn :actor_doing_io + # => # +actor_doing_io.executor == Concurrent.configuration.global_operation_pool + # => true + +# It can be also built into a pool so there is not too many IO operations + +class IOWorker < Concurrent::Actor::Utils::AbstractWorker + def work(io_job) + # do IO work + sleep 1 + puts "#{path} second:#{Time.now.to_i} message:#{io_job}" + end + + def default_executor + Concurrent.configuration.global_operation_pool + end +end + +pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index| + IOWorker.spawn(name: "worker-#{index}", supervise: true, args: [balancer]) +end + # => # + +pool << 1 << 2 << 3 << 4 << 5 << 6 + # => # + +# prints two lines each second +# /pool/worker-0 second:1414677666 message:1 +# /pool/worker-1 second:1414677666 message:2 +# /pool/worker-0 second:1414677667 message:3 +# /pool/worker-1 second:1414677667 message:4 +# /pool/worker-0 second:1414677668 message:5 +# /pool/worker-1 second:1414677668 message:6 + +sleep 4 # => 4 diff --git a/doc/actor/main.md b/doc/actor/main.md index 794af1059..42b248fec 100644 --- a/doc/actor/main.md +++ b/doc/actor/main.md @@ -96,6 +96,10 @@ Blocking operations could starve the `default_task_pool`. However there are two (which is intended for blocking operations) sending results back to self in messages. - Create an actor using `global_operation_pool` instead of `global_task_pool`, e.g. `AnIOActor.spawn name: :blocking, executor: Concurrent.configuration.global_operation_pool`. + +### Example + +{include:file:doc/actor/io.out.rb} ## Dead letter routing diff --git a/lib/concurrent/actor.rb b/lib/concurrent/actor.rb index 30e3da66c..1892c38c7 100644 --- a/lib/concurrent/actor.rb +++ b/lib/concurrent/actor.rb @@ -7,8 +7,9 @@ module Concurrent # TODO https://github.com/celluloid/celluloid/wiki/Supervision-Groups ? # TODO Remote actors using DRb - # TODO IO interoperation # TODO un/become + # TODO supervision tree, pause children on error in parent, pause may need higher priority + # TODO more effective executor # {include:file:doc/actor/main.md} module Actor From 08835fe5c89b13ed41ca60d6ac71bc4c332040a4 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 30 Oct 2014 15:29:25 +0100 Subject: [PATCH 04/16] Add AbstractContext#default_executor to be able to override executor class wide --- lib/concurrent/actor/context.rb | 6 ++++++ lib/concurrent/actor/core.rb | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/concurrent/actor/context.rb b/lib/concurrent/actor/context.rb index 1519acdd4..9dc29262e 100644 --- a/lib/concurrent/actor/context.rb +++ b/lib/concurrent/actor/context.rb @@ -77,6 +77,12 @@ def default_reference_class Reference end + # override to se different default executor, e.g. to change it to global_operation_pool + # @return [Executor] + def default_executor + Concurrent.configuration.global_task_pool + end + # tell self a message def tell(message) reference.tell message diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index e54dbfaf4..21e00d1d4 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -55,7 +55,7 @@ def initialize(opts = {}, &block) @context_class = Child! opts.fetch(:class), AbstractContext allocate_context - @executor = Type! opts.fetch(:executor, Concurrent.configuration.global_task_pool), Executor + @executor = Type! opts.fetch(:executor, @context.default_executor), Executor raise ArgumentError, 'ImmediateExecutor is not supported' if @executor.is_a? ImmediateExecutor @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self From 3c18951242b30162be8e70d3127b70a6e139820c Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 2 Nov 2014 21:56:43 +0100 Subject: [PATCH 05/16] Print also object_id in Reference to_s --- doc/actor/define.out.rb | 2 +- doc/actor/io.out.rb | 6 +++--- doc/actor/messaging.out.rb | 4 ++-- doc/actor/quick.out.rb | 7 ++++--- lib/concurrent/actor/reference.rb | 2 +- 5 files changed, 11 insertions(+), 10 deletions(-) diff --git a/doc/actor/define.out.rb b/doc/actor/define.out.rb index 66713cb25..c9957ae60 100644 --- a/doc/actor/define.out.rb +++ b/doc/actor/define.out.rb @@ -35,5 +35,5 @@ def on_event(event) an_actor << :boo << Message.new(:add, 1) an_actor.ask!(Message.new(:value, nil)) # => 1 an_actor << :terminate! - # => # + # => # diff --git a/doc/actor/io.out.rb b/doc/actor/io.out.rb index 0298a8cd3..bd56896f2 100644 --- a/doc/actor/io.out.rb +++ b/doc/actor/io.out.rb @@ -16,7 +16,7 @@ def default_executor end actor_doing_io = ActorDoingIO.spawn :actor_doing_io - # => # + # => # actor_doing_io.executor == Concurrent.configuration.global_operation_pool # => true @@ -37,10 +37,10 @@ def default_executor pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index| IOWorker.spawn(name: "worker-#{index}", supervise: true, args: [balancer]) end - # => # + # => # pool << 1 << 2 << 3 << 4 << 5 << 6 - # => # + # => # # prints two lines each second # /pool/worker-0 second:1414677666 message:1 diff --git a/doc/actor/messaging.out.rb b/doc/actor/messaging.out.rb index 93c864ea2..9dc8fc8f0 100644 --- a/doc/actor/messaging.out.rb +++ b/doc/actor/messaging.out.rb @@ -25,8 +25,8 @@ def on_message(message) end calculator = Calculator.spawn('calculator') - # => # + # => # calculator.ask! Add[1, 2] # => 3 calculator.ask! Subtract[1, 0.5] # => 0.5 calculator << :terminate! - # => # + # => # diff --git a/doc/actor/quick.out.rb b/doc/actor/quick.out.rb index cae9e03ae..402da7959 100644 --- a/doc/actor/quick.out.rb +++ b/doc/actor/quick.out.rb @@ -16,12 +16,13 @@ def on_message(message) # `supervise: true` makes the actor supervised by root actor adder = Adder.spawn(name: :adder, supervise: true, args: [1]) - # => # + # => # adder.parent - # => # + # => # # tell and forget -adder.tell(:add) << :add # => # +adder.tell(:add) << :add + # => # # ask to get result adder.ask!(:add) # => 4 # fail the actor diff --git a/lib/concurrent/actor/reference.rb b/lib/concurrent/actor/reference.rb index daf8e93ca..0e0d128a2 100644 --- a/lib/concurrent/actor/reference.rb +++ b/lib/concurrent/actor/reference.rb @@ -87,7 +87,7 @@ def dead_letter_routing end def to_s - "#<#{self.class} #{path} (#{actor_class})>" + "#<#{self.class}:0x#{'%x' % (object_id << 1)} #{path} (#{actor_class})>" end alias_method :inspect, :to_s From 469eaa34451f82884ff97ad01d226784b06a5e2f Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 2 Nov 2014 22:01:04 +0100 Subject: [PATCH 06/16] Improve actor events Events can be private and public, so far only difference is that Linking will pass to linked actors only public messages. Adding private :restarting and :resetting events which are send before the actor restarts or resets allowing to add callbacks to cleanup current child actors. --- lib/concurrent/actor/behaviour/abstract.rb | 8 +- lib/concurrent/actor/behaviour/buffer.rb | 4 +- .../actor/behaviour/executes_context.rb | 5 +- lib/concurrent/actor/behaviour/linking.rb | 9 ++- lib/concurrent/actor/behaviour/pausing.rb | 80 ++++++++++++++----- lib/concurrent/actor/behaviour/supervised.rb | 4 +- .../actor/behaviour/terminates_children.rb | 4 +- lib/concurrent/actor/behaviour/termination.rb | 4 +- lib/concurrent/actor/core.rb | 7 +- 9 files changed, 81 insertions(+), 44 deletions(-) diff --git a/lib/concurrent/actor/behaviour/abstract.rb b/lib/concurrent/actor/behaviour/abstract.rb index 4e4f03780..228329e0b 100644 --- a/lib/concurrent/actor/behaviour/abstract.rb +++ b/lib/concurrent/actor/behaviour/abstract.rb @@ -25,15 +25,15 @@ def pass(envelope) # override to add extra behaviour # @note super needs to be called not to break the chain - def on_event(event) - subsequent.on_event event if subsequent + def on_event(public, event) + subsequent.on_event public, event if subsequent end # broadcasts event to all behaviours and context # @see #on_event # @see AbstractContext#on_event - def broadcast(event) - core.broadcast(event) + def broadcast(public, event) + core.broadcast(public, event) end def reject_envelope(envelope) diff --git a/lib/concurrent/actor/behaviour/buffer.rb b/lib/concurrent/actor/behaviour/buffer.rb index e99a76c5f..0e8d02184 100644 --- a/lib/concurrent/actor/behaviour/buffer.rb +++ b/lib/concurrent/actor/behaviour/buffer.rb @@ -40,13 +40,13 @@ def process_envelope core.schedule_execution { process_envelopes? } end - def on_event(event) + def on_event(public, event) case event when :terminated, :restarted @buffer.each { |envelope| reject_envelope envelope } @buffer.clear end - super event + super public, event end end end diff --git a/lib/concurrent/actor/behaviour/executes_context.rb b/lib/concurrent/actor/behaviour/executes_context.rb index a7aa9c4dc..dc3cc1b31 100644 --- a/lib/concurrent/actor/behaviour/executes_context.rb +++ b/lib/concurrent/actor/behaviour/executes_context.rb @@ -7,10 +7,9 @@ def on_envelope(envelope) context.on_envelope envelope end - def on_event(event) + def on_event(public, event) context.on_event(event) - core.log Logging::DEBUG, "event: #{event.inspect}" - super event + super public, event end end end diff --git a/lib/concurrent/actor/behaviour/linking.rb b/lib/concurrent/actor/behaviour/linking.rb index 7263f8f58..244688c45 100644 --- a/lib/concurrent/actor/behaviour/linking.rb +++ b/lib/concurrent/actor/behaviour/linking.rb @@ -3,7 +3,8 @@ module Actor module Behaviour # Links the actor to other actors and sends actor's events to them, - # like: `:terminated`, `:paused`, errors, etc. + # like: `:terminated`, `:paused`, `:resumed`, errors, etc. + # Linked actor needs to handle those messages. # # listener = AdHoc.spawn name: :listener do # lambda do |message| @@ -65,10 +66,10 @@ def unlink(ref) true end - def on_event(event) - @linked.each { |a| a << event } + def on_event(public, event) + @linked.each { |a| a << event } if public @linked.clear if event == :terminated - super event + super public, event end end end diff --git a/lib/concurrent/actor/behaviour/pausing.rb b/lib/concurrent/actor/behaviour/pausing.rb index cf75f5191..452740054 100644 --- a/lib/concurrent/actor/behaviour/pausing.rb +++ b/lib/concurrent/actor/behaviour/pausing.rb @@ -35,41 +35,77 @@ def on_envelope(envelope) end def pause!(error = nil) - @paused = true - broadcast(error || :paused) + do_pause + broadcast true, error || :paused true end - def resume!(broadcast = true) - @paused = false - broadcast(:resumed) if broadcast + def resume! + do_resume + broadcast(true, :resumed) true end - def reset!(broadcast = true) - core.allocate_context - core.build_context - resume!(false) - broadcast(:reset) if broadcast + def reset! + broadcast(false, :resetting) + do_reset + broadcast(true, :reset) true end def restart! - reset! false - broadcast(:restarted) + broadcast(false, :restarting) + do_restart + broadcast(true, :restarted) true end - def on_event(event) - case event - when :terminated, :restarted - @buffer.each { |envelope| reject_envelope envelope } - @buffer.clear - when :resumed, :reset - @buffer.each { |envelope| core.schedule_execution { core.process_envelope envelope } } - @buffer.clear - end - super event + def on_event(public, event) + reject_buffer if event == :terminated + super public, event + end + + private + + def do_pause + @paused = true + nil + end + + def do_resume + @paused = false + reschedule_buffer + nil + end + + def do_reset + rebuild_context + do_resume + reschedule_buffer + nil + end + + def do_restart + rebuild_context + reject_buffer + do_resume + nil + end + + def rebuild_context + core.allocate_context + core.build_context + nil + end + + def reschedule_buffer + @buffer.each { |envelope| core.schedule_execution { core.process_envelope envelope } } + @buffer.clear + end + + def reject_buffer + @buffer.each { |envelope| reject_envelope envelope } + @buffer.clear end end end diff --git a/lib/concurrent/actor/behaviour/supervised.rb b/lib/concurrent/actor/behaviour/supervised.rb index 0bd106f6d..790352ebe 100644 --- a/lib/concurrent/actor/behaviour/supervised.rb +++ b/lib/concurrent/actor/behaviour/supervised.rb @@ -68,9 +68,9 @@ def un_supervise(ref) end end - def on_event(event) + def on_event(public, event) @supervisor = nil if event == :terminated - super event + super public, event end end end diff --git a/lib/concurrent/actor/behaviour/terminates_children.rb b/lib/concurrent/actor/behaviour/terminates_children.rb index 21193740b..e831e010a 100644 --- a/lib/concurrent/actor/behaviour/terminates_children.rb +++ b/lib/concurrent/actor/behaviour/terminates_children.rb @@ -3,10 +3,10 @@ module Actor module Behaviour # Terminates all children when the actor terminates. class TerminatesChildren < Abstract - def on_event(event) + def on_event(public, event) # TODO set event in Termination after all children are terminated, requires new non-blocking join on Future children.map { |ch| ch << :terminate! } if event == :terminated - super event + super public, event end end end diff --git a/lib/concurrent/actor/behaviour/termination.rb b/lib/concurrent/actor/behaviour/termination.rb index de8cecf01..5d97a1b98 100644 --- a/lib/concurrent/actor/behaviour/termination.rb +++ b/lib/concurrent/actor/behaviour/termination.rb @@ -28,7 +28,7 @@ def on_envelope(envelope) terminated? when :terminate! terminate! - when :terminated_event + when :terminated_event # TODO rename to :termination_event terminated else if terminated? @@ -45,7 +45,7 @@ def on_envelope(envelope) def terminate! return true if terminated? terminated.set - broadcast(:terminated) # TODO do not end up in Dead Letter Router + broadcast(true, :terminated) # TODO do not end up in Dead Letter Router parent << :remove_child if parent true end diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 21e00d1d4..9c4925228 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -87,7 +87,7 @@ def initialize(opts = {}, &block) build_context messages.each do |message| - log DEBUG, "preprocessing #{message} from #{parent}" + log DEBUG, "preprocessing #{message.inspect} from #{parent}" process_envelope Envelope.new(message, nil, parent, reference) end @@ -176,8 +176,9 @@ def schedule_execution nil end - def broadcast(event) - @first_behaviour.on_event(event) + def broadcast(public, event) + log Logging::DEBUG, "event: #{event.inspect} (#{public ? 'public' : 'private'})" + @first_behaviour.on_event(public, event) end # @param [Class] behaviour_class From 741d3ac59c11669adc8b4dae4a779d499fd508a6 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 2 Nov 2014 22:03:52 +0100 Subject: [PATCH 07/16] Adding example of supervision-tree For now built manually. --- doc/actor/supervision_tree.in.rb | 70 +++++++++++++++++++++++++++++ doc/actor/supervision_tree.out.rb | 74 +++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 doc/actor/supervision_tree.in.rb create mode 100644 doc/actor/supervision_tree.out.rb diff --git a/doc/actor/supervision_tree.in.rb b/doc/actor/supervision_tree.in.rb new file mode 100644 index 000000000..0fc2ca8c5 --- /dev/null +++ b/doc/actor/supervision_tree.in.rb @@ -0,0 +1,70 @@ +require 'concurrent' + +logger = Logger.new($stderr) # +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end # + + +class Master < Concurrent::Actor::RestartingContext + def initialize + # for listener to be child of master + @listener = Listener.spawn(name: 'listener1', supervise: true, args: [self]) + end + + def on_message(msg) + case msg + when :listener + @listener + when :reset, :terminated, :resumed, :paused + log Logger::DEBUG, " got #{msg} from #{envelope.sender}" + else + pass + end + end + + # TODO turn this into Behaviour and make it default part of RestartingContext + def on_event(event) + case event + when :resetting, :restarting + @listener << :terminate! + when Exception, :paused + @listener << :pause! + when :resumed + @listener << :resume! + end + end +end # + +class Listener < Concurrent::Actor::RestartingContext + def initialize(master) + @number = (rand() * 100).to_i + end + + def on_message(msg) + case msg + when :number + @number + else + pass + end + end + +end # + +master = Master.spawn(name: 'master', supervise: true) +listener = master.ask!(:listener) +listener.ask!(:number) + +master << :crash + +sleep 0.1 + +# ask for listener again, old one is terminated +listener.ask!(:terminated?) +listener = master.ask!(:listener) +listener.ask!(:number) + +master.ask!(:terminate!) + +sleep 0.1 diff --git a/doc/actor/supervision_tree.out.rb b/doc/actor/supervision_tree.out.rb new file mode 100644 index 000000000..1abe1a533 --- /dev/null +++ b/doc/actor/supervision_tree.out.rb @@ -0,0 +1,74 @@ +require 'concurrent' # => true + +logger = Logger.new($stderr) +Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| + logger.add level, message, progname, &block +end + + +class Master < Concurrent::Actor::RestartingContext + def initialize + # for listener to be child of master + @listener = Listener.spawn(name: 'listener1', supervise: true, args: [self]) + end + + def on_message(msg) + case msg + when :listener + @listener + when :reset, :terminated, :resumed, :paused + log Logger::DEBUG, " got #{msg} from #{envelope.sender}" + else + pass + end + end + + # TODO turn this into Behaviour and make it default part of RestartingContext + def on_event(event) + case event + when :resetting, :restarting + @listener << :terminate! + when Exception, :paused + @listener << :pause! + when :resumed + @listener << :resume! + end + end +end + +class Listener < Concurrent::Actor::RestartingContext + def initialize(master) + @number = (rand() * 100).to_i + end + + def on_message(msg) + case msg + when :number + @number + else + pass + end + end + +end + +master = Master.spawn(name: 'master', supervise: true) + # => # +listener = master.ask!(:listener) + # => # +listener.ask!(:number) # => 92 + +master << :crash + # => # + +sleep 0.1 # => 0 + +# ask for listener again, old one is terminated +listener.ask!(:terminated?) # => true +listener = master.ask!(:listener) + # => # +listener.ask!(:number) # => 99 + +master.ask!(:terminate!) # => true + +sleep 0.1 # => 0 From 595f7cd5964ddeefb2482df2a3042df91b6209b7 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 1 Jan 2015 16:35:20 +0100 Subject: [PATCH 08/16] Actor::Reference doc update --- lib/concurrent/actor/reference.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/concurrent/actor/reference.rb b/lib/concurrent/actor/reference.rb index 0e0d128a2..eec261fd7 100644 --- a/lib/concurrent/actor/reference.rb +++ b/lib/concurrent/actor/reference.rb @@ -6,8 +6,8 @@ module Actor # see {PublicDelegations}. # # AdHoc.spawn('printer') { -> message { puts message } } - # # => # - # # ^path ^context class + # # => # + # # ^object_id ^path ^context class class Reference include TypeCheck include PublicDelegations From 1b96989bb991bb0faa1ee0ded56d6800112adab2 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 1 Jan 2015 16:41:15 +0100 Subject: [PATCH 09/16] Supervised and linked by default Pass spawn options to behaviours --- lib/concurrent/actor/behaviour/abstract.rb | 2 +- lib/concurrent/actor/behaviour/buffer.rb | 4 ++-- lib/concurrent/actor/behaviour/linking.rb | 7 +++++-- lib/concurrent/actor/behaviour/pausing.rb | 4 ++-- lib/concurrent/actor/behaviour/sets_results.rb | 4 ++-- lib/concurrent/actor/behaviour/supervised.rb | 11 ++++++++--- lib/concurrent/actor/behaviour/supervising.rb | 5 +++-- lib/concurrent/actor/behaviour/termination.rb | 4 ++-- lib/concurrent/actor/core.rb | 16 +++------------- spec/spec_helper.rb | 13 +++++++++++-- 10 files changed, 39 insertions(+), 31 deletions(-) diff --git a/lib/concurrent/actor/behaviour/abstract.rb b/lib/concurrent/actor/behaviour/abstract.rb index 228329e0b..fca97ad0f 100644 --- a/lib/concurrent/actor/behaviour/abstract.rb +++ b/lib/concurrent/actor/behaviour/abstract.rb @@ -7,7 +7,7 @@ class Abstract attr_reader :core, :subsequent - def initialize(core, subsequent) + def initialize(core, subsequent, core_options) @core = Type! core, Core @subsequent = Type! subsequent, Abstract, NilClass end diff --git a/lib/concurrent/actor/behaviour/buffer.rb b/lib/concurrent/actor/behaviour/buffer.rb index 0e8d02184..3c3b8ebc8 100644 --- a/lib/concurrent/actor/behaviour/buffer.rb +++ b/lib/concurrent/actor/behaviour/buffer.rb @@ -8,8 +8,8 @@ module Behaviour # and they can be processed before messages arriving into buffer. This allows to # process internal actor messages like (`:link`, `:supervise`) processed first. class Buffer < Abstract - def initialize(core, subsequent) - super core, subsequent + def initialize(core, subsequent, core_options) + super core, subsequent, core_options @buffer = [] @receive_envelope_scheduled = false end diff --git a/lib/concurrent/actor/behaviour/linking.rb b/lib/concurrent/actor/behaviour/linking.rb index 244688c45..d61840779 100644 --- a/lib/concurrent/actor/behaviour/linking.rb +++ b/lib/concurrent/actor/behaviour/linking.rb @@ -38,9 +38,12 @@ module Behaviour # got event # from # # got event :reset from # class Linking < Abstract - def initialize(core, subsequent) - super core, subsequent + def initialize(core, subsequent, core_options) + super core, subsequent, core_options @linked = Set.new + if core_options[:link] != false || core_options[:supervise] != false + @linked.add Actor.current + end end def on_envelope(envelope) diff --git a/lib/concurrent/actor/behaviour/pausing.rb b/lib/concurrent/actor/behaviour/pausing.rb index 452740054..2929a30c7 100644 --- a/lib/concurrent/actor/behaviour/pausing.rb +++ b/lib/concurrent/actor/behaviour/pausing.rb @@ -8,8 +8,8 @@ module Behaviour # Reset also reinitialized context. # @note TODO missing example class Pausing < Abstract - def initialize(core, subsequent) - super core, subsequent + def initialize(core, subsequent, core_options) + super core, subsequent, core_options @paused = false @buffer = [] end diff --git a/lib/concurrent/actor/behaviour/sets_results.rb b/lib/concurrent/actor/behaviour/sets_results.rb index 9d9a8466f..4f3de314c 100644 --- a/lib/concurrent/actor/behaviour/sets_results.rb +++ b/lib/concurrent/actor/behaviour/sets_results.rb @@ -5,8 +5,8 @@ module Behaviour class SetResults < Abstract attr_reader :error_strategy - def initialize(core, subsequent, error_strategy) - super core, subsequent + def initialize(core, subsequent, core_options, error_strategy) + super core, subsequent, core_options @error_strategy = Match! error_strategy, :just_log, :terminate!, :pause! end diff --git a/lib/concurrent/actor/behaviour/supervised.rb b/lib/concurrent/actor/behaviour/supervised.rb index 790352ebe..de523fe84 100644 --- a/lib/concurrent/actor/behaviour/supervised.rb +++ b/lib/concurrent/actor/behaviour/supervised.rb @@ -27,9 +27,14 @@ module Behaviour class Supervised < Abstract attr_reader :supervisor - def initialize(core, subsequent) - super core, subsequent - @supervisor = nil + def initialize(core, subsequent, core_options) + super core, subsequent, core_options + + @supervisor = if core_options[:supervise] != false + Actor.current + else + nil + end end def on_envelope(envelope) diff --git a/lib/concurrent/actor/behaviour/supervising.rb b/lib/concurrent/actor/behaviour/supervising.rb index ef05fc22d..0877eadc0 100644 --- a/lib/concurrent/actor/behaviour/supervising.rb +++ b/lib/concurrent/actor/behaviour/supervising.rb @@ -5,9 +5,10 @@ module Behaviour # Handles supervised actors. Handle configures what to do with failed child: :terminate!, :resume!, :reset!, # or :restart!. Strategy sets :one_for_one (restarts just failed actor) or :one_for_all (restarts all child actors). # @note TODO missing example + # @note this will change in next version to support supervision trees better class Supervising < Abstract - def initialize(core, subsequent, handle, strategy) - super core, subsequent + def initialize(core, subsequent, core_options, handle, strategy) + super core, subsequent, core_options @handle = Match! handle, :terminate!, :resume!, :reset!, :restart! @strategy = case @handle when :terminate! diff --git a/lib/concurrent/actor/behaviour/termination.rb b/lib/concurrent/actor/behaviour/termination.rb index 5d97a1b98..2497fe26f 100644 --- a/lib/concurrent/actor/behaviour/termination.rb +++ b/lib/concurrent/actor/behaviour/termination.rb @@ -11,8 +11,8 @@ class Termination < Abstract # @return [Event] event which will become set when actor is terminated. attr_reader :terminated - def initialize(core, subsequent) - super core, subsequent + def initialize(core, subsequent, core_options) + super core, subsequent, core_options @terminated = Concurrent::Event.new end diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 9c4925228..d4866121b 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -36,8 +36,8 @@ class Core # @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour # @option opts [Array] args arguments for actor_class instantiation # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` - # @option opts [true, false] link, atomically link the actor to its parent - # @option opts [true, false] supervise, atomically supervise the actor by its parent + # @option opts [true, false] link, atomically link the actor to its parent (default: true) + # @option opts [true, false] supervise, atomically supervise the actor by its parent (default: true) # @option opts [Class] reference a custom descendant of {Reference} to use # @option opts [Array)>] behaviour_definition, array of pairs # where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition} @@ -78,19 +78,9 @@ def initialize(opts = {}, &block) @block = block initialized = Type! opts[:initialized], IVar, NilClass - messages = [] - messages << :link if opts[:link] - messages << :supervise if opts[:supervise] - schedule_execution do begin build_context - - messages.each do |message| - log DEBUG, "preprocessing #{message.inspect} from #{parent}" - process_envelope Envelope.new(message, nil, parent, reference) - end - initialized.set reference if initialized rescue => ex log ERROR, ex @@ -221,7 +211,7 @@ def initialize_behaviours(opts) end @behaviours = {} @first_behaviour = @behaviour_definition.reverse. - reduce(nil) { |last, (behaviour, args)| @behaviours[behaviour] = behaviour.new(self, last, *args) } + reduce(nil) { |last, (behaviour, args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) } end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index faaae3992..340f76067 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -17,8 +17,17 @@ require 'concurrent' -logger = Logger.new($stderr) -logger.level = Logger::WARN +logger = Logger.new($stderr) +logger.level = Logger::WARN + +logger.formatter = lambda do |severity, datetime, progname, msg| + format "[%s] %5s -- %s: %s\n", + datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), + severity, + progname, + msg +end + Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| logger.add level, message, progname, &block end From 6ed9e8196036f07df9f591a94344b37c05f7e4d0 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 1 Jan 2015 18:07:50 +0100 Subject: [PATCH 10/16] Remove supervised behavior Linking is enough, Supervised was just making things complicated. --- doc/actor/define.in.rb | 2 +- doc/actor/define.out.rb | 2 +- doc/actor/init.rb | 1 - doc/actor/io.in.rb | 2 +- doc/actor/io.out.rb | 2 +- doc/actor/messaging.out.rb | 4 +- doc/actor/quick.in.rb | 7 +- doc/actor/quick.out.rb | 11 +-- lib/concurrent/actor/behaviour.rb | 4 +- lib/concurrent/actor/behaviour/linking.rb | 8 +- lib/concurrent/actor/behaviour/supervised.rb | 83 -------------------- lib/concurrent/actor/core.rb | 1 - spec/concurrent/actor_spec.rb | 26 +++--- 13 files changed, 37 insertions(+), 116 deletions(-) delete mode 100644 lib/concurrent/actor/behaviour/supervised.rb diff --git a/doc/actor/define.in.rb b/doc/actor/define.in.rb index 98ee0611d..0a136a4e4 100644 --- a/doc/actor/define.in.rb +++ b/doc/actor/define.in.rb @@ -29,7 +29,7 @@ def on_event(event) end end # -an_actor = AnActor.spawn name: 'an_actor', args: 10, supervise: true # +an_actor = AnActor.spawn name: 'an_actor', args: 10 # an_actor << Message.new(:add, 1) << Message.new(:subtract, 2) # an_actor.ask!(Message.new(:value, nil)) an_actor << :boo << Message.new(:add, 1) # diff --git a/doc/actor/define.out.rb b/doc/actor/define.out.rb index c9957ae60..d6c3c0775 100644 --- a/doc/actor/define.out.rb +++ b/doc/actor/define.out.rb @@ -29,7 +29,7 @@ def on_event(event) end end -an_actor = AnActor.spawn name: 'an_actor', args: 10, supervise: true +an_actor = AnActor.spawn name: 'an_actor', args: 10 an_actor << Message.new(:add, 1) << Message.new(:subtract, 2) an_actor.ask!(Message.new(:value, nil)) # => 9 an_actor << :boo << Message.new(:add, 1) diff --git a/doc/actor/init.rb b/doc/actor/init.rb index 57698f53e..6da69ee27 100644 --- a/doc/actor/init.rb +++ b/doc/actor/init.rb @@ -1,2 +1 @@ require 'concurrent/actor' -Concurrent::Actor.i_know_it_is_experimental! diff --git a/doc/actor/io.in.rb b/doc/actor/io.in.rb index 04e2bc516..f3830d19f 100644 --- a/doc/actor/io.in.rb +++ b/doc/actor/io.in.rb @@ -33,7 +33,7 @@ def default_executor end # pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index| - IOWorker.spawn(name: "worker-#{index}", supervise: true, args: [balancer]) + IOWorker.spawn(name: "worker-#{index}", args: [balancer]) end pool << 1 << 2 << 3 << 4 << 5 << 6 diff --git a/doc/actor/io.out.rb b/doc/actor/io.out.rb index bd56896f2..0b84da48f 100644 --- a/doc/actor/io.out.rb +++ b/doc/actor/io.out.rb @@ -35,7 +35,7 @@ def default_executor end pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index| - IOWorker.spawn(name: "worker-#{index}", supervise: true, args: [balancer]) + IOWorker.spawn(name: "worker-#{index}", args: [balancer]) end # => # diff --git a/doc/actor/messaging.out.rb b/doc/actor/messaging.out.rb index 9dc8fc8f0..9c4089002 100644 --- a/doc/actor/messaging.out.rb +++ b/doc/actor/messaging.out.rb @@ -25,8 +25,8 @@ def on_message(message) end calculator = Calculator.spawn('calculator') - # => # + # => # calculator.ask! Add[1, 2] # => 3 calculator.ask! Subtract[1, 0.5] # => 0.5 calculator << :terminate! - # => # + # => # diff --git a/doc/actor/quick.in.rb b/doc/actor/quick.in.rb index 694888df3..85af9cdeb 100644 --- a/doc/actor/quick.in.rb +++ b/doc/actor/quick.in.rb @@ -14,12 +14,13 @@ def on_message(message) end end # -# `supervise: true` makes the actor supervised by root actor -adder = Adder.spawn(name: :adder, supervise: true, args: [1]) +# `link: true` makes the actor linked to root actor and supervised +# which is default behavior +adder = Adder.spawn(name: :adder, link: true, args: [1]) adder.parent # tell and forget -adder.tell(:add) << :add +adder.tell(:add).tell(:add) # ask to get result adder.ask!(:add) # fail the actor diff --git a/doc/actor/quick.out.rb b/doc/actor/quick.out.rb index 402da7959..0586b91d5 100644 --- a/doc/actor/quick.out.rb +++ b/doc/actor/quick.out.rb @@ -14,15 +14,16 @@ def on_message(message) end end -# `supervise: true` makes the actor supervised by root actor -adder = Adder.spawn(name: :adder, supervise: true, args: [1]) - # => # +# `link: true` makes the actor linked to root actor and supervised +# which is default behavior +adder = Adder.spawn(name: :adder, link: true, args: [1]) + # => # adder.parent # => # # tell and forget -adder.tell(:add) << :add - # => # +adder.tell(:add).tell(:add) + # => # # ask to get result adder.ask!(:add) # => 4 # fail the actor diff --git a/lib/concurrent/actor/behaviour.rb b/lib/concurrent/actor/behaviour.rb index 33ab5b073..469b52cc7 100644 --- a/lib/concurrent/actor/behaviour.rb +++ b/lib/concurrent/actor/behaviour.rb @@ -64,7 +64,6 @@ module Behaviour require 'concurrent/actor/behaviour/pausing' require 'concurrent/actor/behaviour/removes_child' require 'concurrent/actor/behaviour/sets_results' - require 'concurrent/actor/behaviour/supervised' require 'concurrent/actor/behaviour/supervising' require 'concurrent/actor/behaviour/termination' require 'concurrent/actor/behaviour/terminates_children' @@ -126,8 +125,7 @@ def self.linking # @see '' its source code def self.supervised - [[Supervised, []], - [Pausing, []]] + [[Pausing, []]] end # @see '' its source code diff --git a/lib/concurrent/actor/behaviour/linking.rb b/lib/concurrent/actor/behaviour/linking.rb index d61840779..8a8dae233 100644 --- a/lib/concurrent/actor/behaviour/linking.rb +++ b/lib/concurrent/actor/behaviour/linking.rb @@ -1,6 +1,8 @@ module Concurrent module Actor module Behaviour + # TODO track what is linked, clean when :terminated + # send :linked/:unlinked messages back to build the array of linked actors # Links the actor to other actors and sends actor's events to them, # like: `:terminated`, `:paused`, `:resumed`, errors, etc. @@ -41,11 +43,11 @@ class Linking < Abstract def initialize(core, subsequent, core_options) super core, subsequent, core_options @linked = Set.new - if core_options[:link] != false || core_options[:supervise] != false - @linked.add Actor.current - end + @linked.add Actor.current if core_options[:link] != false end + # TODO also handle :linked_actors returning array + def on_envelope(envelope) case envelope.message when :link diff --git a/lib/concurrent/actor/behaviour/supervised.rb b/lib/concurrent/actor/behaviour/supervised.rb deleted file mode 100644 index de523fe84..000000000 --- a/lib/concurrent/actor/behaviour/supervised.rb +++ /dev/null @@ -1,83 +0,0 @@ -module Concurrent - module Actor - module Behaviour - - # Sets and holds the supervisor of the actor if any. There is at most one supervisor - # for each actor. Each supervisor is automatically linked. Messages: - # `:pause!, :resume!, :reset!, :restart!` are accepted only from supervisor. - # - # actor = AdHoc.spawn(name: 'supervisor', behaviour_definition: Behaviour.restarting_behaviour_definition) do - # child = AdHoc.spawn(name: 'supervised', behaviour_definition: Behaviour.restarting_behaviour_definition) do - # p 'restarted' - # # message handle of supervised - # -> message { raise 'failed' } - # end - # # supervise the child - # child << :supervise - # - # # message handle of supervisor - # -> message do - # child << message if message != :reset - # end - # end - # - # actor << :bug - # # will be delegated to 'supervised', 'supervised' fails and is reset by its 'supervisor' - # - class Supervised < Abstract - attr_reader :supervisor - - def initialize(core, subsequent, core_options) - super core, subsequent, core_options - - @supervisor = if core_options[:supervise] != false - Actor.current - else - nil - end - end - - def on_envelope(envelope) - case envelope.message - when :supervise - supervise envelope.sender - when :supervisor - supervisor - when :un_supervise - un_supervise envelope.sender - when :pause!, :resume!, :reset!, :restart! - # allow only supervisor to control the actor - if @supervisor == envelope.sender - pass envelope - else - false - end - else - pass envelope - end - end - - def supervise(ref) - @supervisor = ref - behaviour!(Linking).link ref - true - end - - def un_supervise(ref) - if @supervisor == ref - behaviour!(Linking).unlink ref - @supervisor = nil - true - else - false - end - end - - def on_event(public, event) - @supervisor = nil if event == :terminated - super public, event - end - end - end - end -end diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index d4866121b..001076b85 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -37,7 +37,6 @@ class Core # @option opts [Array] args arguments for actor_class instantiation # @option opts [Executor] executor, default is `Concurrent.configuration.global_task_pool` # @option opts [true, false] link, atomically link the actor to its parent (default: true) - # @option opts [true, false] supervise, atomically supervise the actor by its parent (default: true) # @option opts [Class] reference a custom descendant of {Reference} to use # @option opts [Array)>] behaviour_definition, array of pairs # where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition} diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index 83892d2b4..e4ee4be05 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -280,7 +280,7 @@ def on_message(message) end describe 'pausing' do - it 'pauses on error' do + it 'pauses on error and resumes' do queue = Queue.new resuming_behaviour = Behaviour.restarting_behaviour_definition(:resume!) @@ -291,8 +291,8 @@ def on_message(message) -> m { m == :add ? 1 : pass } end - actor << :supervise - queue << actor.ask!(:supervisor) + actor << :link + queue << actor.ask!(:linked?) actor << nil queue << actor.ask(:add) @@ -300,21 +300,23 @@ def on_message(message) end expect(queue.pop).to eq :init - expect(queue.pop).to eq test + expect(queue.pop).to eq true expect(queue.pop.value).to eq 1 expect(queue.pop).to eq :resumed terminate_actors test + end + it 'pauses on error and resets' do + queue = Queue.new test = AdHoc.spawn name: :tester, behaviour_definition: Behaviour.restarting_behaviour_definition do actor = AdHoc.spawn name: :pausing, - supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do queue << :init -> m { m == :object_id ? self.object_id : pass } end - queue << actor.ask!(:supervisor) + queue << actor.ask!(:linked?) queue << actor.ask!(:object_id) actor << nil queue << actor.ask(:object_id) @@ -325,14 +327,16 @@ def on_message(message) end expect(queue.pop).to eq :init - expect(queue.pop).to eq test + expect(queue.pop).to eq true first_id = queue.pop second_id = queue.pop.value expect(first_id).not_to eq second_id # context already reset expect(queue.pop).to eq :init # rebuilds context expect(queue.pop).to eq :reset terminate_actors test + end + it 'pauses on error and restarts' do queue = Queue.new resuming_behaviour = Behaviour.restarting_behaviour_definition.map do |c, args| if Behaviour::Supervising == c @@ -349,8 +353,8 @@ def on_message(message) -> m { m == :add ? 1 : pass } end - actor << :supervise - queue << actor.ask!(:supervisor) + actor << :link + queue << actor.ask!(:linked?) actor << nil queue << actor.ask(:add) @@ -360,7 +364,7 @@ def on_message(message) end expect(queue.pop).to eq :init - expect(queue.pop).to eq test + expect(queue.pop).to eq true expect(queue.pop.wait.reason).to be_a_kind_of(ActorTerminated) expect(queue.pop).to eq :init expect(queue.pop).to eq :restarted @@ -378,7 +382,7 @@ def work(message) end pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |balancer, index| - worker.spawn name: "worker-#{index}", supervise: true, args: [balancer] + worker.spawn name: "worker-#{index}", args: [balancer] end expect(pool.ask!(5)).to eq 10 From d16bbecc0be8f836319c85b14f007477a00c51bf Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Thu, 1 Jan 2015 20:55:30 +0100 Subject: [PATCH 11/16] Linking behavior responds to :linked message by returning array of linked actors --- lib/concurrent/actor/behaviour/linking.rb | 4 ++-- spec/concurrent/actor_spec.rb | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/concurrent/actor/behaviour/linking.rb b/lib/concurrent/actor/behaviour/linking.rb index 8a8dae233..7046773de 100644 --- a/lib/concurrent/actor/behaviour/linking.rb +++ b/lib/concurrent/actor/behaviour/linking.rb @@ -46,8 +46,6 @@ def initialize(core, subsequent, core_options) @linked.add Actor.current if core_options[:link] != false end - # TODO also handle :linked_actors returning array - def on_envelope(envelope) case envelope.message when :link @@ -56,6 +54,8 @@ def on_envelope(envelope) unlink envelope.sender when :linked? @linked.include? envelope.sender + when :linked + @linked.to_a else pass envelope end diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index e4ee4be05..db0c408bb 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -292,7 +292,7 @@ def on_message(message) end actor << :link - queue << actor.ask!(:linked?) + queue << actor.ask!(:linked) actor << nil queue << actor.ask(:add) @@ -300,7 +300,7 @@ def on_message(message) end expect(queue.pop).to eq :init - expect(queue.pop).to eq true + expect(queue.pop).to include(test) expect(queue.pop.value).to eq 1 expect(queue.pop).to eq :resumed terminate_actors test @@ -316,7 +316,7 @@ def on_message(message) -> m { m == :object_id ? self.object_id : pass } end - queue << actor.ask!(:linked?) + queue << actor.ask!(:linked) queue << actor.ask!(:object_id) actor << nil queue << actor.ask(:object_id) @@ -327,7 +327,7 @@ def on_message(message) end expect(queue.pop).to eq :init - expect(queue.pop).to eq true + expect(queue.pop).to include(test) first_id = queue.pop second_id = queue.pop.value expect(first_id).not_to eq second_id # context already reset @@ -354,7 +354,7 @@ def on_message(message) end actor << :link - queue << actor.ask!(:linked?) + queue << actor.ask!(:linked) actor << nil queue << actor.ask(:add) @@ -364,7 +364,7 @@ def on_message(message) end expect(queue.pop).to eq :init - expect(queue.pop).to eq true + expect(queue.pop).to include(test) expect(queue.pop.wait.reason).to be_a_kind_of(ActorTerminated) expect(queue.pop).to eq :init expect(queue.pop).to eq :restarted From db921a79f314fc55a025b29e375d03cdf0dddf2a Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sat, 3 Jan 2015 16:24:39 +0100 Subject: [PATCH 12/16] Simplify behavior definition --- lib/concurrent/actor/behaviour.rb | 57 +++++++++++++++---------------- lib/concurrent/actor/core.rb | 9 ++--- spec/concurrent/actor_spec.rb | 6 ++-- 3 files changed, 34 insertions(+), 38 deletions(-) diff --git a/lib/concurrent/actor/behaviour.rb b/lib/concurrent/actor/behaviour.rb index 469b52cc7..aa25c6081 100644 --- a/lib/concurrent/actor/behaviour.rb +++ b/lib/concurrent/actor/behaviour.rb @@ -70,14 +70,14 @@ module Behaviour # Array of behaviours and their construction parameters. # - # [[Behaviour::SetResults, [:terminate!]], - # [Behaviour::RemovesChild, []], - # [Behaviour::Termination, []], - # [Behaviour::TerminatesChildren, []], - # [Behaviour::Linking, []], - # [Behaviour::Awaits, []], - # [Behaviour::ExecutesContext, []], - # [Behaviour::ErrorsOnUnknownMessage, []]] + # [[Behaviour::SetResults, :terminate!], + # [Behaviour::RemovesChild], + # [Behaviour::Termination], + # [Behaviour::TerminatesChildren], + # [Behaviour::Linking], + # [Behaviour::Awaits], + # [Behaviour::ExecutesContext], + # [Behaviour::ErrorsOnUnknownMessage]] # # @see '' its source code def self.basic_behaviour_definition @@ -88,17 +88,16 @@ def self.basic_behaviour_definition # Array of behaviours and their construction parameters. # - # [[Behaviour::SetResults, [:pause!]], - # [Behaviour::RemovesChild, []], - # [Behaviour::Termination, []], - # [Behaviour::TerminatesChildren, []], - # [Behaviour::Linking, []], - # [Behaviour::Supervised, []], - # [Behaviour::Pausing, []], - # [Behaviour::Supervising, [:reset!, :one_for_one]], - # [Behaviour::Awaits, []], - # [Behaviour::ExecutesContext, []], - # [Behaviour::ErrorsOnUnknownMessage, []]] + # [[Behaviour::SetResults, :pause!], + # [Behaviour::RemovesChild], + # [Behaviour::Termination], + # [Behaviour::TerminatesChildren], + # [Behaviour::Linking], + # [Behaviour::Pausing], + # [Behaviour::Supervising, :reset!, :one_for_one], + # [Behaviour::Awaits], + # [Behaviour::ExecutesContext], + # [Behaviour::ErrorsOnUnknownMessage]] # # @see '' its source code def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_one) @@ -111,33 +110,33 @@ def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_o # @see '' its source code def self.base(on_error) - [[SetResults, [on_error]], + [[SetResults, on_error], # has to be before Termination to be able to remove children form terminated actor - [RemovesChild, []], - [Termination, []], - [TerminatesChildren, []]] + RemovesChild, + Termination, + TerminatesChildren] end # @see '' its source code def self.linking - [[Linking, []]] + [Linking] end # @see '' its source code def self.supervised - [[Pausing, []]] + [Pausing] end # @see '' its source code def self.supervising(handle = :reset!, strategy = :one_for_one) - [[Behaviour::Supervising, [handle, strategy]]] + [[Behaviour::Supervising, handle, strategy]] end # @see '' its source code def self.user_messages - [[Awaits, []], - [ExecutesContext, []], - [ErrorsOnUnknownMessage, []]] + [Awaits, + ExecutesContext, + ErrorsOnUnknownMessage] end end end diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 001076b85..12ef38ca9 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -202,15 +202,12 @@ def process_envelope(envelope) private def initialize_behaviours(opts) - @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, Array).each do |v| - Type! v, Array - Match! v.size, 2 - Child! v[0], Behaviour::Abstract - Type! v[1], Array + @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, Array).each do |(behaviour, *args)| + Child! behaviour, Behaviour::Abstract end @behaviours = {} @first_behaviour = @behaviour_definition.reverse. - reduce(nil) { |last, (behaviour, args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) } + reduce(nil) { |last, (behaviour, *args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) } end end end diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index db0c408bb..ef441516b 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -338,11 +338,11 @@ def on_message(message) it 'pauses on error and restarts' do queue = Queue.new - resuming_behaviour = Behaviour.restarting_behaviour_definition.map do |c, args| + resuming_behaviour = Behaviour.restarting_behaviour_definition.map do |c, *args| if Behaviour::Supervising == c - [c, [:restart!, :one_for_one]] + [c, *[:restart!, :one_for_one]] else - [c, args] + [c, *args] end end From a8fe2980c9e4a6b6182f7e8cc5c93af2a69de539 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 24 May 2015 12:12:25 +0200 Subject: [PATCH 13/16] internal actor events now support payload & minor improvements --- doc/actor/define.in.rb | 2 - doc/actor/define.out.rb | 4 +- doc/actor/format.rb | 5 ++- doc/actor/init.rb | 3 +- doc/actor/io.in.rb | 6 +-- doc/actor/io.out.rb | 14 +++---- doc/actor/main.md | 4 ++ doc/actor/messaging.in.rb | 12 +++--- doc/actor/messaging.out.rb | 20 ++++++---- doc/actor/quick.out.rb | 6 +-- doc/actor/supervision_tree.in.rb | 17 +++------ doc/actor/supervision_tree.out.rb | 29 ++++++-------- lib/concurrent/actor.rb | 18 ++++----- lib/concurrent/actor/behaviour.rb | 2 +- lib/concurrent/actor/behaviour/buffer.rb | 5 ++- lib/concurrent/actor/behaviour/linking.rb | 3 +- lib/concurrent/actor/behaviour/pausing.rb | 38 ++++++++++++------- .../actor/behaviour/terminates_children.rb | 4 +- lib/concurrent/actor/behaviour/termination.rb | 37 +++++++++++++----- lib/concurrent/actor/context.rb | 8 ++-- lib/concurrent/actor/core.rb | 1 + lib/concurrent/actor/internal_delegations.rb | 16 ++++++-- lib/concurrent/actor/reference.rb | 2 +- lib/concurrent/actor/utils/ad_hoc.rb | 20 ++++++---- lib/concurrent/edge/future.rb | 20 +++++++--- .../executor/serialized_execution.rb | 2 +- spec/concurrent/actor_spec.rb | 14 ++----- spec/spec_helper.rb | 11 +++++- 28 files changed, 187 insertions(+), 136 deletions(-) diff --git a/doc/actor/define.in.rb b/doc/actor/define.in.rb index 0a136a4e4..565ff6c5a 100644 --- a/doc/actor/define.in.rb +++ b/doc/actor/define.in.rb @@ -1,5 +1,3 @@ -require 'concurrent' - Message = Struct.new :action, :value # class AnActor < Concurrent::Actor::RestartingContext diff --git a/doc/actor/define.out.rb b/doc/actor/define.out.rb index d6c3c0775..e00f5de7c 100644 --- a/doc/actor/define.out.rb +++ b/doc/actor/define.out.rb @@ -1,5 +1,3 @@ -require 'concurrent' # => true - Message = Struct.new :action, :value class AnActor < Concurrent::Actor::RestartingContext @@ -35,5 +33,5 @@ def on_event(event) an_actor << :boo << Message.new(:add, 1) an_actor.ask!(Message.new(:value, nil)) # => 1 an_actor << :terminate! - # => # + # => # diff --git a/doc/actor/format.rb b/doc/actor/format.rb index 8d6a457e1..9d31b77af 100644 --- a/doc/actor/format.rb +++ b/doc/actor/format.rb @@ -3,8 +3,9 @@ require 'pry' require 'pp' +root = File.dirname(File.expand_path(Process.argv0)) input_paths = if ARGV.empty? - Dir.glob("#{File.dirname(__FILE__)}/*.in.rb") + Dir.glob("#{root}/*.in.rb") else ARGV end.map { |p| File.expand_path p } @@ -12,7 +13,7 @@ input_paths.each_with_index do |input_path, i| pid = fork do - require_relative 'init.rb' + require File.join(root, 'init.rb') begin output_path = input_path.gsub /\.in\.rb$/, '.out.rb' diff --git a/doc/actor/init.rb b/doc/actor/init.rb index 6da69ee27..30261d189 100644 --- a/doc/actor/init.rb +++ b/doc/actor/init.rb @@ -1 +1,2 @@ -require 'concurrent/actor' +require 'concurrent' +require 'concurrent-edge' diff --git a/doc/actor/io.in.rb b/doc/actor/io.in.rb index f3830d19f..71fbefc68 100644 --- a/doc/actor/io.in.rb +++ b/doc/actor/io.in.rb @@ -23,8 +23,8 @@ def default_executor class IOWorker < Concurrent::Actor::Utils::AbstractWorker def work(io_job) # do IO work - sleep 1 - puts "#{path} second:#{Time.now.to_i} message:#{io_job}" + sleep 0.1 + puts "#{path} second:#{(Time.now.to_f*100).floor} message:#{io_job}" end def default_executor @@ -46,4 +46,4 @@ def default_executor # /pool/worker-0 second:1414677668 message:5 # /pool/worker-1 second:1414677668 message:6 -sleep 4 +sleep 1 diff --git a/doc/actor/io.out.rb b/doc/actor/io.out.rb index 0b84da48f..a7c0f0d9f 100644 --- a/doc/actor/io.out.rb +++ b/doc/actor/io.out.rb @@ -1,4 +1,4 @@ -require 'concurrent' # => true +require 'concurrent' # => false # logger = Logger.new(STDOUT) # Concurrent.configuration.logger = logger.method(:add) @@ -16,7 +16,7 @@ def default_executor end actor_doing_io = ActorDoingIO.spawn :actor_doing_io - # => # + # => # actor_doing_io.executor == Concurrent.configuration.global_operation_pool # => true @@ -25,8 +25,8 @@ def default_executor class IOWorker < Concurrent::Actor::Utils::AbstractWorker def work(io_job) # do IO work - sleep 1 - puts "#{path} second:#{Time.now.to_i} message:#{io_job}" + sleep 0.1 + puts "#{path} second:#{(Time.now.to_f*100).floor} message:#{io_job}" end def default_executor @@ -37,10 +37,10 @@ def default_executor pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index| IOWorker.spawn(name: "worker-#{index}", args: [balancer]) end - # => # + # => # pool << 1 << 2 << 3 << 4 << 5 << 6 - # => # + # => # # prints two lines each second # /pool/worker-0 second:1414677666 message:1 @@ -50,4 +50,4 @@ def default_executor # /pool/worker-0 second:1414677668 message:5 # /pool/worker-1 second:1414677668 message:6 -sleep 4 # => 4 +sleep 1 # => 1 diff --git a/doc/actor/main.md b/doc/actor/main.md index 2c8724dbd..5300d7d9f 100644 --- a/doc/actor/main.md +++ b/doc/actor/main.md @@ -161,6 +161,10 @@ Any existing behavior can be subclassed By subclassing {Behaviour::Pausing} and overriding {Behaviour::Pausing#restart!}. Implementing {AbstractContext#on_event} could be also considered. +_We'll be happy to answer any other questions, +just [open an Issue](https://github.com/ruby-concurrency/concurrent-ruby/issues/new) or find us on +https://gitter.im/ruby-concurrency/concurrent-ruby._ + ## Speed Simple benchmark Actor vs Celluloid, the numbers are looking good diff --git a/doc/actor/messaging.in.rb b/doc/actor/messaging.in.rb index 02575cbcd..510fdb7ec 100644 --- a/doc/actor/messaging.in.rb +++ b/doc/actor/messaging.in.rb @@ -1,4 +1,3 @@ -require 'concurrent' require 'algebrick' # Actor message protocol definition with Algebrick @@ -17,7 +16,7 @@ def on_message(message) (on Add.(~any, ~any) do |a, b| a + b end), - # or use multi-assignment + # or using multi-assignment (on ~Subtract do |(a, b)| a - b end) @@ -25,6 +24,9 @@ def on_message(message) end # calculator = Calculator.spawn('calculator') -calculator.ask! Add[1, 2] -calculator.ask! Subtract[1, 0.5] -calculator << :terminate! +addition = calculator.ask Add[1, 2] +substraction = calculator.ask Subtract[1, 0.5] +results = (addition & substraction) +results.value! + +calculator.ask! :terminate! diff --git a/doc/actor/messaging.out.rb b/doc/actor/messaging.out.rb index 9c4089002..2c5854c50 100644 --- a/doc/actor/messaging.out.rb +++ b/doc/actor/messaging.out.rb @@ -1,5 +1,4 @@ -require 'concurrent' # => true -require 'algebrick' # => true +require 'algebrick' # => false # Actor message protocol definition with Algebrick Protocol = Algebrick.type do @@ -17,7 +16,7 @@ def on_message(message) (on Add.(~any, ~any) do |a, b| a + b end), - # or use multi-assignment + # or using multi-assignment (on ~Subtract do |(a, b)| a - b end) @@ -25,8 +24,13 @@ def on_message(message) end calculator = Calculator.spawn('calculator') - # => # -calculator.ask! Add[1, 2] # => 3 -calculator.ask! Subtract[1, 0.5] # => 0.5 -calculator << :terminate! - # => # + # => # +addition = calculator.ask Add[1, 2] + # => <#Concurrent::Edge::Future:0x7fb6fc937190 pending blocks:[]> +substraction = calculator.ask Subtract[1, 0.5] + # => <#Concurrent::Edge::Future:0x7fb6fc935598 pending blocks:[]> +results = (addition & substraction) + # => <#Concurrent::Edge::ArrayFuture:0x7fb6fc967ea8 pending blocks:[]> +results.value! # => [3, 0.5] + +calculator.ask! :terminate! # => true diff --git a/doc/actor/quick.out.rb b/doc/actor/quick.out.rb index 0586b91d5..64c0eabf9 100644 --- a/doc/actor/quick.out.rb +++ b/doc/actor/quick.out.rb @@ -17,13 +17,13 @@ def on_message(message) # `link: true` makes the actor linked to root actor and supervised # which is default behavior adder = Adder.spawn(name: :adder, link: true, args: [1]) - # => # + # => # adder.parent - # => # + # => # # tell and forget adder.tell(:add).tell(:add) - # => # + # => # # ask to get result adder.ask!(:add) # => 4 # fail the actor diff --git a/doc/actor/supervision_tree.in.rb b/doc/actor/supervision_tree.in.rb index 0fc2ca8c5..f9f549d7f 100644 --- a/doc/actor/supervision_tree.in.rb +++ b/doc/actor/supervision_tree.in.rb @@ -1,19 +1,13 @@ -require 'concurrent' - -logger = Logger.new($stderr) # -Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| - logger.add level, message, progname, &block -end # - class Master < Concurrent::Actor::RestartingContext def initialize # for listener to be child of master - @listener = Listener.spawn(name: 'listener1', supervise: true, args: [self]) + @listener = Listener.spawn(name: 'listener1', supervise: true) end def on_message(msg) - case msg + command, *args = msg + case command when :listener @listener when :reset, :terminated, :resumed, :paused @@ -25,7 +19,8 @@ def on_message(msg) # TODO turn this into Behaviour and make it default part of RestartingContext def on_event(event) - case event + event_name, _ = event + case event_name when :resetting, :restarting @listener << :terminate! when Exception, :paused @@ -37,7 +32,7 @@ def on_event(event) end # class Listener < Concurrent::Actor::RestartingContext - def initialize(master) + def initialize @number = (rand() * 100).to_i end diff --git a/doc/actor/supervision_tree.out.rb b/doc/actor/supervision_tree.out.rb index 1abe1a533..8ee4306e4 100644 --- a/doc/actor/supervision_tree.out.rb +++ b/doc/actor/supervision_tree.out.rb @@ -1,19 +1,13 @@ -require 'concurrent' # => true - -logger = Logger.new($stderr) -Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| - logger.add level, message, progname, &block -end - class Master < Concurrent::Actor::RestartingContext def initialize # for listener to be child of master - @listener = Listener.spawn(name: 'listener1', supervise: true, args: [self]) + @listener = Listener.spawn(name: 'listener1', supervise: true) end def on_message(msg) - case msg + command, *args = msg + case command when :listener @listener when :reset, :terminated, :resumed, :paused @@ -25,7 +19,8 @@ def on_message(msg) # TODO turn this into Behaviour and make it default part of RestartingContext def on_event(event) - case event + event_name, _ = event + case event_name when :resetting, :restarting @listener << :terminate! when Exception, :paused @@ -37,7 +32,7 @@ def on_event(event) end class Listener < Concurrent::Actor::RestartingContext - def initialize(master) + def initialize @number = (rand() * 100).to_i end @@ -53,21 +48,21 @@ def on_message(msg) end master = Master.spawn(name: 'master', supervise: true) - # => # + # => # listener = master.ask!(:listener) - # => # -listener.ask!(:number) # => 92 + # => # +listener.ask!(:number) # => 53 master << :crash - # => # + # => # sleep 0.1 # => 0 # ask for listener again, old one is terminated listener.ask!(:terminated?) # => true listener = master.ask!(:listener) - # => # -listener.ask!(:number) # => 99 + # => # +listener.ask!(:number) # => 71 master.ask!(:terminate!) # => true diff --git a/lib/concurrent/actor.rb b/lib/concurrent/actor.rb index 2f9c3eb66..cd31e8fb8 100644 --- a/lib/concurrent/actor.rb +++ b/lib/concurrent/actor.rb @@ -61,29 +61,29 @@ def self.root # inc2.ask!(2) # => 4 # # @param block for context_class instantiation - # @param args see {.spawn_optionify} + # @param args see {.to_spawn_options} # @return [Reference] never the actual actor def self.spawn(*args, &block) if Actor.current - Core.new(spawn_optionify(*args).merge(parent: Actor.current), &block).reference + Core.new(to_spawn_options(*args).merge(parent: Actor.current), &block).reference else - root.ask([:spawn, spawn_optionify(*args), block]).value! + root.ask([:spawn, to_spawn_options(*args), block]).value! end end # as {.spawn} but it'll block until actor is initialized or it'll raise exception on error def self.spawn!(*args, &block) - spawn(spawn_optionify(*args).merge(initialized: future = Concurrent.future), &block).tap { future.wait! } + spawn(to_spawn_options(*args).merge(initialized: future = Concurrent.future), &block).tap { future.wait! } end - # @overload spawn_optionify(context_class, name, *args) + # @overload to_spawn_options(context_class, name, *args) # @param [AbstractContext] context_class to be spawned # @param [String, Symbol] name of the instance, it's used to generate the # {Core#path} of the actor # @param args for context_class instantiation - # @overload spawn_optionify(opts) + # @overload to_spawn_options(opts) # see {Core#initialize} opts - def self.spawn_optionify(*args) + def self.to_spawn_options(*args) if args.size == 1 && args.first.is_a?(Hash) args.first else @@ -93,9 +93,5 @@ def self.spawn_optionify(*args) end end - # call this to disable experimental warning - def self.i_know_it_is_experimental! - warn 'Method Actor.i_know_it_is_experimental! is deprecated. The Actors are no longer experimental.' - end end end diff --git a/lib/concurrent/actor/behaviour.rb b/lib/concurrent/actor/behaviour.rb index aa25c6081..f24d42de0 100644 --- a/lib/concurrent/actor/behaviour.rb +++ b/lib/concurrent/actor/behaviour.rb @@ -111,7 +111,7 @@ def self.restarting_behaviour_definition(handle = :reset!, strategy = :one_for_o # @see '' its source code def self.base(on_error) [[SetResults, on_error], - # has to be before Termination to be able to remove children form terminated actor + # has to be before Termination to be able to remove children from terminated actor RemovesChild, Termination, TerminatesChildren] diff --git a/lib/concurrent/actor/behaviour/buffer.rb b/lib/concurrent/actor/behaviour/buffer.rb index fa48b3a56..7e3cf62b2 100644 --- a/lib/concurrent/actor/behaviour/buffer.rb +++ b/lib/concurrent/actor/behaviour/buffer.rb @@ -42,12 +42,13 @@ def process_envelope end def on_event(public, event) - case event + event_name, _ = event + case event_name when :terminated, :restarted @buffer.each { |envelope| reject_envelope envelope } @buffer.clear end - super public, event + super public, event_name end end end diff --git a/lib/concurrent/actor/behaviour/linking.rb b/lib/concurrent/actor/behaviour/linking.rb index 7046773de..b94a15dd8 100644 --- a/lib/concurrent/actor/behaviour/linking.rb +++ b/lib/concurrent/actor/behaviour/linking.rb @@ -72,8 +72,9 @@ def unlink(ref) end def on_event(public, event) + event_name, _ = event @linked.each { |a| a << event } if public - @linked.clear if event == :terminated + @linked.clear if event_name == :terminated super public, event end end diff --git a/lib/concurrent/actor/behaviour/pausing.rb b/lib/concurrent/actor/behaviour/pausing.rb index 2929a30c7..bb13677c2 100644 --- a/lib/concurrent/actor/behaviour/pausing.rb +++ b/lib/concurrent/actor/behaviour/pausing.rb @@ -10,14 +10,20 @@ module Behaviour class Pausing < Abstract def initialize(core, subsequent, core_options) super core, subsequent, core_options - @paused = false - @buffer = [] + @paused = false + @deferred = [] + end + + def paused? + @paused end def on_envelope(envelope) case envelope.message when :pause! pause! + when :paused? + paused? when :resume! resume! when :reset! @@ -25,8 +31,8 @@ def on_envelope(envelope) when :restart! restart! else - if @paused - @buffer << envelope + if paused? + @deferred << envelope MESSAGE_PROCESSED else pass envelope @@ -41,12 +47,14 @@ def pause!(error = nil) end def resume! + return false unless paused? do_resume broadcast(true, :resumed) true end def reset! + return false unless paused? broadcast(false, :resetting) do_reset broadcast(true, :reset) @@ -54,6 +62,7 @@ def reset! end def restart! + return false unless paused? broadcast(false, :restarting) do_restart broadcast(true, :restarted) @@ -61,7 +70,8 @@ def restart! end def on_event(public, event) - reject_buffer if event == :terminated + event_name, _ = event + reject_deferred if event_name == :terminated super public, event end @@ -74,20 +84,20 @@ def do_pause def do_resume @paused = false - reschedule_buffer + reschedule_deferred nil end def do_reset rebuild_context do_resume - reschedule_buffer + reschedule_deferred nil end def do_restart rebuild_context - reject_buffer + reject_deferred do_resume nil end @@ -98,14 +108,14 @@ def rebuild_context nil end - def reschedule_buffer - @buffer.each { |envelope| core.schedule_execution { core.process_envelope envelope } } - @buffer.clear + def reschedule_deferred + @deferred.each { |envelope| core.schedule_execution { core.process_envelope envelope } } + @deferred.clear end - def reject_buffer - @buffer.each { |envelope| reject_envelope envelope } - @buffer.clear + def reject_deferred + @deferred.each { |envelope| reject_envelope envelope } + @deferred.clear end end end diff --git a/lib/concurrent/actor/behaviour/terminates_children.rb b/lib/concurrent/actor/behaviour/terminates_children.rb index e831e010a..d4ed57d70 100644 --- a/lib/concurrent/actor/behaviour/terminates_children.rb +++ b/lib/concurrent/actor/behaviour/terminates_children.rb @@ -4,8 +4,8 @@ module Behaviour # Terminates all children when the actor terminates. class TerminatesChildren < Abstract def on_event(public, event) - # TODO set event in Termination after all children are terminated, requires new non-blocking join on Future - children.map { |ch| ch << :terminate! } if event == :terminated + event_name, _ = event + children.map { |ch| ch << :terminate! } if event_name == :terminated super public, event end end diff --git a/lib/concurrent/actor/behaviour/termination.rb b/lib/concurrent/actor/behaviour/termination.rb index af8ca305c..8fd4341d2 100644 --- a/lib/concurrent/actor/behaviour/termination.rb +++ b/lib/concurrent/actor/behaviour/termination.rb @@ -9,11 +9,15 @@ class Termination < Abstract # @!attribute [r] terminated # @return [Edge::Event] event which will become set when actor is terminated. - attr_reader :terminated + # @!attribute [r] reason + attr_reader :terminated, :reason - def initialize(core, subsequent, core_options) + def initialize(core, subsequent, core_options, trapping = false) super core, subsequent, core_options - @terminated = Concurrent.event + @terminated = Concurrent.event + @public_terminated = @terminated.hide_completable + @reason = nil + @trapping = trapping end # @note Actor rejects envelopes when terminated. @@ -22,14 +26,27 @@ def terminated? @terminated.completed? end + def trapping? + @trapping + end + + def trapping=(val) + @trapping = !!val + end + def on_envelope(envelope) - case envelope.message + command, reason = envelope.message + case command when :terminated? terminated? when :terminate! - terminate! - when :terminated_event # TODO rename to :termination_event - terminated + if trapping? && reason != :kill + pass envelope + else + terminate! reason + end + when :termination_event + @public_terminated else if terminated? reject_envelope envelope @@ -42,10 +59,12 @@ def on_envelope(envelope) # Terminates the actor. Any Envelope received after termination is rejected. # Terminates all its children, does not wait until they are terminated. - def terminate! + def terminate!(reason = :normal) + # TODO return after all children are terminated return true if terminated? + @reason = reason terminated.complete - broadcast(true, :terminated) # TODO do not end up in Dead Letter Router + broadcast(true, [:terminated, reason]) # TODO do not end up in Dead Letter Router parent << :remove_child if parent true end diff --git a/lib/concurrent/actor/context.rb b/lib/concurrent/actor/context.rb index d3da272a7..5a629599c 100644 --- a/lib/concurrent/actor/context.rb +++ b/lib/concurrent/actor/context.rb @@ -83,7 +83,7 @@ def default_reference_class # override to se different default executor, e.g. to change it to global_operation_pool # @return [Executor] def default_executor - Concurrent.configuration.global_task_pool + Concurrent.global_io_executor end # tell self a message @@ -111,12 +111,12 @@ def ask(message) # inc2.ask!(2) # => 4 # @see Concurrent::Actor.spawn def self.spawn(name_or_opts, *args, &block) - Actor.spawn spawn_optionify(name_or_opts, *args), &block + Actor.spawn to_spawn_options(name_or_opts, *args), &block end # behaves as {Concurrent::Actor.spawn!} but :class is auto-inserted based on receiver so it can be omitted. def self.spawn!(name_or_opts, *args, &block) - Actor.spawn! spawn_optionify(name_or_opts, *args), &block + Actor.spawn! to_spawn_options(name_or_opts, *args), &block end private @@ -125,7 +125,7 @@ def initialize_core(core) @core = Type! core, Core end - def self.spawn_optionify(name_or_opts, *args) + def self.to_spawn_options(name_or_opts, *args) if name_or_opts.is_a? Hash if name_or_opts.key?(:class) && name_or_opts[:class] != self raise ArgumentError, diff --git a/lib/concurrent/actor/core.rb b/lib/concurrent/actor/core.rb index 749f63540..118183861 100644 --- a/lib/concurrent/actor/core.rb +++ b/lib/concurrent/actor/core.rb @@ -197,6 +197,7 @@ def ns_initialize(opts, &block) begin build_context initialized.success reference if initialized + log DEBUG, 'spawned' rescue => ex log ERROR, ex @first_behaviour.terminate! diff --git a/lib/concurrent/actor/internal_delegations.rb b/lib/concurrent/actor/internal_delegations.rb index 0b5f12871..9feb94c3b 100644 --- a/lib/concurrent/actor/internal_delegations.rb +++ b/lib/concurrent/actor/internal_delegations.rb @@ -8,9 +8,19 @@ def children core.children end - # @see Core#terminate! - def terminate! - behaviour!(Behaviour::Termination).terminate! + # @see Termination#terminate! + def terminate!(reason = nil) + behaviour!(Behaviour::Termination).terminate!(reason) + end + + # @see Termination#terminated? + def terminated? + behaviour!(Behaviour::Termination).terminated? + end + + # @see Termination#reason + def reason + behaviour!(Behaviour::Termination).reason end # delegates to core.log diff --git a/lib/concurrent/actor/reference.rb b/lib/concurrent/actor/reference.rb index d8ee520be..857c8ef04 100644 --- a/lib/concurrent/actor/reference.rb +++ b/lib/concurrent/actor/reference.rb @@ -80,7 +80,7 @@ def map(messages) # behaves as {#tell} when no future and as {#ask} when future def message(message, future = nil) core.on_envelope Envelope.new(message, future, Actor.current || Thread.current, self) - return future || self + return future ? future.hide_completable : self end # @see AbstractContext#dead_letter_routing diff --git a/lib/concurrent/actor/utils/ad_hoc.rb b/lib/concurrent/actor/utils/ad_hoc.rb index f9686a4f2..993a2d995 100644 --- a/lib/concurrent/actor/utils/ad_hoc.rb +++ b/lib/concurrent/actor/utils/ad_hoc.rb @@ -1,13 +1,8 @@ module Concurrent module Actor module Utils - # Allows quick creation of actors with behaviour defined by blocks. - # @example ping - # AdHoc.spawn :forward, an_actor do |where| - # # this block has to return proc defining #on_message behaviour - # -> message { where.tell message } - # end - class AdHoc < Context + + module AsAdHoc def initialize(*args, &initializer) @on_message = Type! initializer.call(*args), Proc end @@ -16,6 +11,17 @@ def on_message(message) instance_exec message, &@on_message end end + + # Allows quick creation of actors with behaviour defined by blocks. + # @example ping + # AdHoc.spawn :forward, an_actor do |where| + # # this block has to return proc defining #on_message behaviour + # -> message { where.tell message } + # end + # @note TODO remove in favor of the module + class AdHoc < Context + include AsAdHoc + end end end end diff --git a/lib/concurrent/edge/future.rb b/lib/concurrent/edge/future.rb index 8f099d7e6..58b4ac5b8 100644 --- a/lib/concurrent/edge/future.rb +++ b/lib/concurrent/edge/future.rb @@ -497,12 +497,22 @@ def pr_async_callback_on_completion(success, value, reason, executor, callback) end end + class ArrayFuture < Future + def apply_value(value, block) + block.call(*value) + end + end + class CompletableEvent < Event # Complete the event # @api public def complete(raise = true) super raise end + + def hide_completable + Concurrent.zip(self) + end end class CompletableFuture < Future @@ -535,6 +545,10 @@ def evaluate_to(*args, &block) def evaluate_to!(*args, &block) promise.evaluate_to!(*args, block) end + + def hide_completable + Concurrent.zip(self) + end end # TODO modularize blocked_by and notify blocked @@ -823,12 +837,6 @@ def clear_blocked_by! # used internally to support #with_default_executor class AllPromise < BlockedPromise - class ArrayFuture < Future - def apply_value(value, block) - block.call(*value) - end - end - private def initialize(blocked_by_futures, default_executor = :io) diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index 429a7e809..9ad97f64d 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -122,7 +122,7 @@ def initialize(executor) # @!macro executor_service_method_post def post(*args, &task) - raise ArgumentError.new('no block given') unless block_given? + Kernel.raise ArgumentError.new('no block given') unless block_given? return false unless running? @serializer.post(@executor, *args, &task) end diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index 3d87943d8..16274dc5a 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -6,14 +6,6 @@ module Actor # FIXME better tests! - # class Reference - # def backdoor(&block) - # core.send :schedule_execution do - # core.instance_eval &block - # end - # end - # end - describe 'Concurrent::Actor' do prepend_before do do_no_reset! @@ -164,7 +156,7 @@ def on_message(message) expect(subject.ask!(:terminated?)).to be_falsey subject.ask(:terminate!).wait expect(subject.ask!(:terminated?)).to be_truthy - child.ask!(:terminated_event).wait + child.ask!(:termination_event).wait expect(child.ask!(:terminated?)).to be_truthy terminate_actors subject, child @@ -212,7 +204,7 @@ def on_message(message) end failure << :hehe failure << :terminate! - expect(queue.pop).to eq [:terminated, failure] + expect(queue.pop).to eq [[:terminated, nil], failure] terminate_actors monitor end @@ -227,7 +219,7 @@ def on_message(message) failure << :hehe failure << :terminate! - expect(queue.pop).to eq [:terminated, failure] + expect(queue.pop).to eq [[:terminated, nil], failure] terminate_actors monitor end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 46e378d4f..0e9e272bf 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -27,11 +27,20 @@ logger.level = Logger::WARN logger.formatter = lambda do |severity, datetime, progname, msg| + formatted_message = case msg + when String + msg + when Exception + format "%s (%s)\n%s", + msg.message, msg.class, (msg.backtrace || []).join("\n") + else + msg.inspect + end format "[%s] %5s -- %s: %s\n", datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), severity, progname, - msg + formatted_message end Concurrent.global_logger = lambda do |level, progname, message = nil, &block| From 9906e23a481c3e43cbea230e6bbf1f04fdfcad8a Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 24 May 2015 14:48:31 +0200 Subject: [PATCH 14/16] Add helper methods for deprecations --- lib/concurrent/agent.rb | 30 +++++++++-------- lib/concurrent/atomic/condition.rb | 4 ++- lib/concurrent/configuration.rb | 33 ++++++++++--------- lib/concurrent/executor/executor.rb | 10 +++--- lib/concurrent/executor/executor_service.rb | 2 ++ .../executor/java_cached_thread_pool.rb | 2 +- .../executor/java_thread_pool_executor.rb | 2 +- .../executor/ruby_thread_pool_executor.rb | 2 +- .../executor/simple_executor_service.rb | 2 +- lib/concurrent/executor/timer_set.rb | 4 ++- lib/concurrent/obligation.rb | 4 ++- lib/concurrent/scheduled_task.rb | 4 ++- lib/concurrent/utility/deprecation.rb | 22 +++++++++++++ lib/concurrent/utility/timeout.rb | 3 +- lib/concurrent/utility/timer.rb | 2 +- spec/concurrent/actor_spec.rb | 6 ++-- .../executor/executor_options_spec.rb | 12 ------- 17 files changed, 86 insertions(+), 58 deletions(-) create mode 100644 lib/concurrent/utility/deprecation.rb diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 1e84d6c4d..f0f4cf957 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -3,6 +3,7 @@ require 'concurrent/observable' require 'concurrent/logging' require 'concurrent/executor/executor' +require 'concurrent/utility/deprecation' module Concurrent @@ -14,6 +15,7 @@ class Agent include Dereferenceable include Observable include Logging + include Deprecation attr_reader :timeout, :io_executor, :fast_executor @@ -115,21 +117,21 @@ def post(&block) # @yieldreturn [Object] the new value # @return [true, nil] nil when no block is given def post_off(timeout = nil, &block) - warn '[DEPRECATED] post_off with timeout options is deprecated and will be removed' task = if timeout - lambda do |value| - future = Future.execute do - block.call(value) - end - if future.wait(timeout) - future.value! - else - raise Concurrent::TimeoutError - end - end - else - block - end + deprecated 'post_off with option timeout options is deprecated and will be removed' + lambda do |value| + future = Future.execute do + block.call(value) + end + if future.wait(timeout) + future.value! + else + raise Concurrent::TimeoutError + end + end + else + block + end post_on(@io_executor, &task) end diff --git a/lib/concurrent/atomic/condition.rb b/lib/concurrent/atomic/condition.rb index 72598cad5..f5110e885 100644 --- a/lib/concurrent/atomic/condition.rb +++ b/lib/concurrent/atomic/condition.rb @@ -1,4 +1,5 @@ require 'concurrent/utility/monotonic_time' +require 'concurrent/utility/deprecation' module Concurrent @@ -14,6 +15,7 @@ module Concurrent # # @deprecated class Condition + include Deprecation class Result def initialize(remaining_time) @@ -39,7 +41,7 @@ def timed_out? end def initialize - warn '[DEPRECATED] Will be replaced with Synchronization::Object in v1.0.' + deprecated 'Will be replaced with Synchronization::Object in v1.0.' @condition = ConditionVariable.new end diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 7041fd46c..a4921a7a5 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -4,13 +4,14 @@ require 'concurrent/at_exit' require 'concurrent/executors' require 'concurrent/utility/processor_count' +require 'concurrent/utility/deprecation' module Concurrent extend Logging + extend Deprecation # Suppresses all output when used for logging. NULL_LOGGER = lambda { |level, progname, message = nil, &block| } - private_constant :NULL_LOGGER # @!visibility private GLOBAL_LOGGER = AtomicReference.new(NULL_LOGGER) @@ -60,21 +61,21 @@ def self.disable_at_exit_hooks! end def self.disable_executor_auto_termination! - warn '[DEPRECATED] Use Concurrent.disable_at_exit_hooks! instead' + deprecated_method 'disable_executor_auto_termination!', 'disable_at_exit_hooks!' disable_at_exit_hooks! end # @return [true,false] # @see .disable_executor_auto_termination! def self.disable_executor_auto_termination? - warn '[DEPRECATED] Use Concurrent::AtExit.enabled? instead' + deprecated_method 'disable_executor_auto_termination?', 'Concurrent::AtExit.enabled?' AtExit.enabled? end # terminates all pools and blocks until they are terminated # @see .disable_executor_auto_termination! def self.terminate_pools! - warn '[DEPRECATED] Use Concurrent::AtExit.run instead' + deprecated_method 'terminate_pools!', 'Concurrent::AtExit.run' AtExit.run end @@ -139,6 +140,7 @@ def self.new_io_executor(opts = {}) # A gem-level configuration object. class Configuration + include Deprecation # Create a new configuration object. def initialize @@ -148,6 +150,7 @@ def initialize # @deprecated Use Concurrent::NULL_LOGGER instead def no_logger warn '[DEPRECATED] Use Concurrent::NULL_LOGGER instead' + deprecated_method 'Concurrent.configuration.no_logger', 'Concurrent::NULL_LOGGER' NULL_LOGGER end @@ -156,7 +159,7 @@ def no_logger # # @deprecated Use Concurrent.global_logger instead def logger - warn '[DEPRECATED] Use Concurrent.global_logger instead' + deprecated_method 'Concurrent.configuration.logger', 'Concurrent.global_logger' Concurrent.global_logger.value end @@ -165,32 +168,32 @@ def logger # # @deprecated Use Concurrent.global_logger instead def logger=(value) - warn '[DEPRECATED] Use Concurrent.global_logger instead' + deprecated_method 'Concurrent.configuration.logger=', 'Concurrent.global_logger=' Concurrent.global_logger = value end # @deprecated Use Concurrent.global_io_executor instead def global_task_pool - warn '[DEPRECATED] Use Concurrent.global_io_executor instead' + deprecated_method 'Concurrent.configuration.global_task_pool', 'Concurrent.global_io_executor' Concurrent.global_io_executor end # @deprecated Use Concurrent.global_fast_executor instead def global_operation_pool - warn '[DEPRECATED] Use Concurrent.global_fast_executor instead' + deprecated_method 'Concurrent.configuration.global_operation_pool', 'Concurrent.global_fast_executor' Concurrent.global_fast_executor end # @deprecated Use Concurrent.global_timer_set instead def global_timer_set - warn '[DEPRECATED] Use Concurrent.global_timer_set instead' + deprecated_method 'Concurrent.configuration.global_timer_set', 'Concurrent.global_timer_set' Concurrent.global_timer_set end # @deprecated Replacing global thread pools is deprecated. # Use the :executor constructor option instead. def global_task_pool=(executor) - warn '[DEPRECATED] Replacing global thread pools is deprecated. Use the :executor constructor option instead.' + deprecated 'Replacing global thread pools is deprecated. Use the :executor constructor option instead.' GLOBAL_IO_EXECUTOR.reconfigure { executor } or raise ConfigurationError.new('global task pool was already set') end @@ -198,32 +201,32 @@ def global_task_pool=(executor) # @deprecated Replacing global thread pools is deprecated. # Use the :executor constructor option instead. def global_operation_pool=(executor) - warn '[DEPRECATED] Replacing global thread pools is deprecated. Use the :executor constructor option instead.' + deprecated 'Replacing global thread pools is deprecated. Use the :executor constructor option instead.' GLOBAL_FAST_EXECUTOR.reconfigure { executor } or raise ConfigurationError.new('global operation pool was already set') end # @deprecated Use Concurrent.new_io_executor instead def new_task_pool - warn '[DEPRECATED] Use Concurrent.new_io_executor instead' + deprecated_method 'Concurrent.configuration.new_task_pool', 'Concurrent.new_io_executor' Concurrent.new_io_executor end # @deprecated Use Concurrent.new_fast_executor instead def new_operation_pool - warn '[DEPRECATED] Use Concurrent.new_fast_executor instead' + deprecated_method 'Concurrent.configuration.new_operation_pool', 'Concurrent.new_fast_executor' Concurrent.new_fast_executor end # @deprecated Use Concurrent.disable_auto_termination_of_global_executors! instead def auto_terminate=(value) - warn '[DEPRECATED] Use Concurrent.disable_auto_termination_of_global_executors! instead' + deprecated_method 'Concurrent.configuration.auto_terminate=', 'Concurrent.disable_auto_termination_of_global_executors!' Concurrent.disable_auto_termination_of_global_executors! if !value end # @deprecated Use Concurrent.auto_terminate_global_executors? instead def auto_terminate - warn '[DEPRECATED] Use Concurrent.auto_terminate_global_executors? instead' + deprecated_method 'Concurrent.configuration.auto_terminate', 'Concurrent.auto_terminate_global_executors?' Concurrent.auto_terminate_global_executors? end end diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index 7edf1f021..3e8cd6447 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -1,8 +1,10 @@ require 'concurrent/executor/executor_service' +require 'concurrent/utility/deprecation' module Concurrent module Executor + extend Deprecation # Get the requested `Executor` based on the values set in the options hash. # @@ -25,12 +27,12 @@ def self.executor_from_options(opts = {}) # :nodoc: end when opts.key?(:operation) || opts.key?(:task) if opts[:operation] == true || opts[:task] == false - Kernel.warn '[DEPRECATED] use `executor: :fast` instead' + deprecated 'use `executor: :fast` instead' return Concurrent.global_fast_executor end if opts[:operation] == false || opts[:task] == true - Kernel.warn '[DEPRECATED] use `executor: :io` instead' + deprecated 'use `executor: :io` instead' return Concurrent.global_io_executor end @@ -49,10 +51,10 @@ def self.executor(executor_identifier) when :immediate Concurrent.global_immediate_executor when :operation - Kernel.warn '[DEPRECATED] use `executor: :fast` instead' + deprecated 'use `executor: :fast` instead' Concurrent.global_fast_executor when :task - Kernel.warn '[DEPRECATED] use `executor: :io` instead' + deprecated 'use `executor: :io` instead' Concurrent.global_io_executor when Concurrent::ExecutorService executor_identifier diff --git a/lib/concurrent/executor/executor_service.rb b/lib/concurrent/executor/executor_service.rb index 5d2a2c720..e29840a41 100644 --- a/lib/concurrent/executor/executor_service.rb +++ b/lib/concurrent/executor/executor_service.rb @@ -3,11 +3,13 @@ require 'concurrent/at_exit' require 'concurrent/atomic/event' require 'concurrent/synchronization' +require 'concurrent/utility/deprecation' module Concurrent module ExecutorService include Logging + include Deprecation # @!macro [attach] executor_service_method_post # diff --git a/lib/concurrent/executor/java_cached_thread_pool.rb b/lib/concurrent/executor/java_cached_thread_pool.rb index caf7a1734..e3fae93c8 100644 --- a/lib/concurrent/executor/java_cached_thread_pool.rb +++ b/lib/concurrent/executor/java_cached_thread_pool.rb @@ -24,7 +24,7 @@ def initialize(opts = {}) def ns_initialize(opts) @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) - warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy) + deprecated ':overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy) @max_queue = 0 raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy) diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index f7a410ebb..ed875f7cf 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -139,7 +139,7 @@ def ns_initialize(opts) idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) - warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy) + deprecated ' :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy) raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0 raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0 diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index 233e61ca0..112e339c1 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -135,7 +135,7 @@ def ns_initialize(opts) @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) - warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy) + deprecated ':overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy) raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0 raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0 diff --git a/lib/concurrent/executor/simple_executor_service.rb b/lib/concurrent/executor/simple_executor_service.rb index df987b56e..8ad1d1bd4 100644 --- a/lib/concurrent/executor/simple_executor_service.rb +++ b/lib/concurrent/executor/simple_executor_service.rb @@ -102,7 +102,7 @@ def ns_initialize class PerThreadExecutor < SimpleExecutorService def initialize - warn '[DEPRECATED] use SimpleExecutorService instead' + deprecated 'use SimpleExecutorService instead' super end end diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 5016cd372..e68586674 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -5,6 +5,7 @@ require 'concurrent/executor/executor_service' require 'concurrent/executor/single_thread_executor' require 'concurrent/utility/monotonic_time' +require 'concurrent/utility/deprecation' module Concurrent @@ -14,6 +15,7 @@ module Concurrent # # @!macro monotonic_clock_warning class TimerSet < RubyExecutorService + extend Deprecation # Create a new set of timed tasks. # @@ -82,7 +84,7 @@ def kill # @!visibility private def self.calculate_delay!(delay) if delay.is_a?(Time) - warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock' + deprecated 'Use an interval not a clock time; schedule is now based on a monotonic clock' now = Time.now raise ArgumentError.new('schedule time must be in the future') if delay <= now delay.to_f - now.to_f diff --git a/lib/concurrent/obligation.rb b/lib/concurrent/obligation.rb index 1c7441c94..dfecf0589 100644 --- a/lib/concurrent/obligation.rb +++ b/lib/concurrent/obligation.rb @@ -3,11 +3,13 @@ require 'concurrent/dereferenceable' require 'concurrent/atomic/event' +require 'concurrent/utility/deprecation' module Concurrent module Obligation include Dereferenceable + include Deprecation # Has the obligation been fulfilled? # @@ -51,7 +53,7 @@ def complete? # # @deprecated def completed? - warn '[DEPRECATED] Use #complete? instead' + deprecated_method 'completed?', 'complete?' complete? end diff --git a/lib/concurrent/scheduled_task.rb b/lib/concurrent/scheduled_task.rb index 774075dd1..fa42d302a 100644 --- a/lib/concurrent/scheduled_task.rb +++ b/lib/concurrent/scheduled_task.rb @@ -1,5 +1,6 @@ require 'concurrent/ivar' require 'concurrent/utility/timer' +require 'concurrent/utility/deprecation' require 'concurrent/executor/executor' require 'concurrent/executor/safe_task_executor' @@ -134,6 +135,7 @@ module Concurrent # # @!macro monotonic_clock_warning class ScheduledTask < IVar + include Deprecation attr_reader :delay @@ -206,7 +208,7 @@ def self.execute(delay, opts = {}, &block) # @deprecated def schedule_time - warn '[DEPRECATED] time is now based on a monotonic clock' + Deprecation.deprecated 'schedule_time is now based on a monotonic clock' @schedule_time end diff --git a/lib/concurrent/utility/deprecation.rb b/lib/concurrent/utility/deprecation.rb new file mode 100644 index 000000000..121a946fa --- /dev/null +++ b/lib/concurrent/utility/deprecation.rb @@ -0,0 +1,22 @@ +require 'concurrent/logging' + +module Concurrent + module Deprecation + # TODO require additional parameter: a version. Display when it'll be removed based on that. Error if not removed. + include Logging + + def deprecated(message, strip = 2) + caller_line = caller(strip).first + klass = if Class === self + self + else + self.class + end + log WARN, klass.to_s, format("[DEPRECATED] %s\ncalled on: %s", message, caller_line) + end + + def deprecated_method(old_name, new_name) + deprecated "`#{old_name}` is deprecated and it'll removed in next release, use `#{new_name}` instead", 3 + end + end +end diff --git a/lib/concurrent/utility/timeout.rb b/lib/concurrent/utility/timeout.rb index 68ba8589b..e5c0b809a 100644 --- a/lib/concurrent/utility/timeout.rb +++ b/lib/concurrent/utility/timeout.rb @@ -2,6 +2,7 @@ require 'thread' require 'concurrent/errors' +require 'concurrent/utility/deprecation' module Concurrent @@ -21,7 +22,7 @@ module Concurrent # # @!macro monotonic_clock_warning def timeout(seconds, &block) - warn '[DEPRECATED] timeout is deprecated and will be removed' + deprecated 'timeout is deprecated and will be removed' future = Future.execute(&block) future.wait(seconds) diff --git a/lib/concurrent/utility/timer.rb b/lib/concurrent/utility/timer.rb index 86851dd7e..ca594588a 100644 --- a/lib/concurrent/utility/timer.rb +++ b/lib/concurrent/utility/timer.rb @@ -16,7 +16,7 @@ def timer(seconds, *args, &block) raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0 - Concurrent.configuration.global_timer_set.post(seconds, *args, &block) + Concurrent.global_timer_set.post(seconds, *args, &block) true end module_function :timer diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index 16274dc5a..885edce26 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -80,19 +80,19 @@ def on_message(message) end it 'terminates on failed initialization' do - a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { raise } + a = AdHoc.spawn(name: :fail, logger: Concurrent::NULL_LOGGER) { raise } expect(a.ask(nil).wait.failed?).to be_truthy expect(a.ask!(:terminated?)).to be_truthy end it 'terminates on failed initialization and raises with spawn!' do expect do - AdHoc.spawn!(name: :fail, logger: Concurrent.configuration.no_logger) { raise 'm' } + AdHoc.spawn!(name: :fail, logger: Concurrent::NULL_LOGGER) { raise 'm' } end.to raise_error(StandardError, 'm') end it 'terminates on failed message processing' do - a = AdHoc.spawn(name: :fail, logger: Concurrent.configuration.no_logger) { -> _ { raise } } + a = AdHoc.spawn(name: :fail, logger: Concurrent::NULL_LOGGER) { -> _ { raise } } expect(a.ask(nil).wait.failed?).to be_truthy expect(a.ask!(:terminated?)).to be_truthy end diff --git a/spec/concurrent/executor/executor_options_spec.rb b/spec/concurrent/executor/executor_options_spec.rb index 2d02584e8..8e3f31c62 100644 --- a/spec/concurrent/executor/executor_options_spec.rb +++ b/spec/concurrent/executor/executor_options_spec.rb @@ -31,32 +31,24 @@ module Concurrent end it 'returns the global fast executor when :operation is true' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) expect(Concurrent).to receive(:global_fast_executor). and_return(:fast_executor) Executor.executor_from_options(operation: true) end it 'returns the global io executor when :operation is false' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) expect(Concurrent).to receive(:global_io_executor). and_return(:io_executor) Executor.executor_from_options(operation: false) end it 'returns the global fast executor when :task is false' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) expect(Concurrent).to receive(:global_fast_executor). and_return(:fast_executor) Executor.executor_from_options(task: false) end it 'returns the global io executor when :task is true' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) expect(Concurrent).to receive(:global_io_executor). and_return(:io_executor) Executor.executor_from_options(task: true) @@ -71,20 +63,16 @@ module Concurrent end specify ':executor overrides :operation' do - warn 'deprecated syntax' expect(Executor.executor_from_options(executor: executor, operation: true)). to eq executor end specify ':executor overrides :task' do - warn 'deprecated syntax' expect(Executor.executor_from_options(executor: executor, task: true)). to eq executor end specify ':operation overrides :task' do - warn 'deprecated syntax' - expect(Kernel).to receive(:warn).with(anything) expect(Concurrent).to receive(:global_fast_executor). and_return(:fast_executor) Executor.executor_from_options(operation: true, task: true) From c94f03b04e3e562ca6d3a5b21fe0479fab4f0bbd Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 31 May 2015 13:40:49 +0200 Subject: [PATCH 15/16] Remove redundant logger configuration --- spec/concurrent/edge/future_spec.rb | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spec/concurrent/edge/future_spec.rb b/spec/concurrent/edge/future_spec.rb index 7af735066..558cc8b2d 100644 --- a/spec/concurrent/edge/future_spec.rb +++ b/spec/concurrent/edge/future_spec.rb @@ -1,12 +1,6 @@ require 'concurrent' require 'thread' -logger = Logger.new($stderr) -logger.level = Logger::DEBUG -Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| - logger.add level, message, progname, &block -end - describe 'Concurrent::Edge futures' do describe '.post' do From 0cead11b59e6e3a3ebf9bfa165eaf982c0d98817 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Sun, 31 May 2015 14:50:38 +0200 Subject: [PATCH 16/16] Stabilize RubyThreadPool specs --- spec/concurrent/executor/ruby_cached_thread_pool_spec.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb b/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb index 226b79c05..42dc66479 100644 --- a/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb @@ -92,12 +92,13 @@ module Concurrent specify do pool = RubyThreadPoolExecutor.new(config) - 100.times do + 10.times do count = Concurrent::CountDownLatch.new(100) 100.times do pool.post { count.count_down } end count.wait + sleep 0.01 # let the tasks end after count_down expect(pool.length).to be <= [200, config[:max_threads]].min if pool.length > [110, config[:max_threads]].min puts "ERRORSIZE #{pool.length} max #{config[:max_threads]}"