Skip to content

Actors - Improving documentation; other improvements #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/actor/celluloid_benchmark.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require 'benchmark'
require 'concurrent/actor'
Concurrent::Actor.i_know_it_is_experimental!

require 'celluloid'
require 'celluloid/autostart'
Expand Down
36 changes: 36 additions & 0 deletions doc/actor/define.in.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
Message = Struct.new :action, :value #

class AnActor < Concurrent::Actor::RestartingContext
def initialize(init)
@counter = init
end

# override #on_message to define actor's behaviour on message received
def on_message(message)
case message.action
when :add
@counter = @counter + message.value
when :subtract
@counter = @counter - message.value
when :value
@counter
else
pass
end
end

# set counter to zero when there is an error
def on_event(event)
if event == :reset
@counter = 0 # ignore initial value
end
end
end #

an_actor = AnActor.spawn name: 'an_actor', args: 10 #
an_actor << Message.new(:add, 1) << Message.new(:subtract, 2) #
an_actor.ask!(Message.new(:value, nil))
an_actor << :boo << Message.new(:add, 1) #
an_actor.ask!(Message.new(:value, nil))
an_actor << :terminate!

37 changes: 37 additions & 0 deletions doc/actor/define.out.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Message = Struct.new :action, :value

class AnActor < Concurrent::Actor::RestartingContext
def initialize(init)
@counter = init
end

# override #on_message to define actor's behaviour on message received
def on_message(message)
case message.action
when :add
@counter = @counter + message.value
when :subtract
@counter = @counter - message.value
when :value
@counter
else
pass
end
end

# set counter to zero when there is an error
def on_event(event)
if event == :reset
@counter = 0 # ignore initial value
end
end
end

an_actor = AnActor.spawn name: 'an_actor', args: 10
an_actor << Message.new(:add, 1) << Message.new(:subtract, 2)
an_actor.ask!(Message.new(:value, nil)) # => 9
an_actor << :boo << Message.new(:add, 1)
an_actor.ask!(Message.new(:value, nil)) # => 1
an_actor << :terminate!
# => #<Concurrent::Actor::Reference:0x7fb6fc9165d0 /an_actor (AnActor)>

5 changes: 3 additions & 2 deletions doc/actor/format.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
require 'pry'
require 'pp'

root = File.dirname(File.expand_path(Process.argv0))
input_paths = if ARGV.empty?
Dir.glob("#{File.dirname(__FILE__)}/*.in.rb")
Dir.glob("#{root}/*.in.rb")
else
ARGV
end.map { |p| File.expand_path p }

input_paths.each_with_index do |input_path, i|

pid = fork do
require_relative 'init.rb'
require File.join(root, 'init.rb')

begin
output_path = input_path.gsub /\.in\.rb$/, '.out.rb'
Expand Down
4 changes: 2 additions & 2 deletions doc/actor/init.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
require 'concurrent/actor'
Concurrent::Actor.i_know_it_is_experimental!
require 'concurrent'
require 'concurrent-edge'
49 changes: 49 additions & 0 deletions doc/actor/io.in.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
require 'concurrent'

# logger = Logger.new(STDOUT)
# Concurrent.configuration.logger = logger.method(:add)

# First option is to use operation pool

class ActorDoingIO < Concurrent::Actor::RestartingContext
def on_message(message)
# do IO operation
end

def default_executor
Concurrent.configuration.global_operation_pool
end
end #

actor_doing_io = ActorDoingIO.spawn :actor_doing_io
actor_doing_io.executor == Concurrent.configuration.global_operation_pool

# It can be also built into a pool so there is not too many IO operations

class IOWorker < Concurrent::Actor::Utils::AbstractWorker
def work(io_job)
# do IO work
sleep 0.1
puts "#{path} second:#{(Time.now.to_f*100).floor} message:#{io_job}"
end

def default_executor
Concurrent.configuration.global_operation_pool
end
end #

pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index|
IOWorker.spawn(name: "worker-#{index}", args: [balancer])
end

pool << 1 << 2 << 3 << 4 << 5 << 6

# prints two lines each second
# /pool/worker-0 second:1414677666 message:1
# /pool/worker-1 second:1414677666 message:2
# /pool/worker-0 second:1414677667 message:3
# /pool/worker-1 second:1414677667 message:4
# /pool/worker-0 second:1414677668 message:5
# /pool/worker-1 second:1414677668 message:6

sleep 1
53 changes: 53 additions & 0 deletions doc/actor/io.out.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
require 'concurrent' # => false

# logger = Logger.new(STDOUT)
# Concurrent.configuration.logger = logger.method(:add)

# First option is to use operation pool

class ActorDoingIO < Concurrent::Actor::RestartingContext
def on_message(message)
# do IO operation
end

def default_executor
Concurrent.configuration.global_operation_pool
end
end

actor_doing_io = ActorDoingIO.spawn :actor_doing_io
# => #<Concurrent::Actor::Reference:0x7fb6fc906068 /actor_doing_io (ActorDoingIO)>
actor_doing_io.executor == Concurrent.configuration.global_operation_pool
# => true

# It can be also built into a pool so there is not too many IO operations

class IOWorker < Concurrent::Actor::Utils::AbstractWorker
def work(io_job)
# do IO work
sleep 0.1
puts "#{path} second:#{(Time.now.to_f*100).floor} message:#{io_job}"
end

def default_executor
Concurrent.configuration.global_operation_pool
end
end

pool = Concurrent::Actor::Utils::Pool.spawn('pool', 2) do |balancer, index|
IOWorker.spawn(name: "worker-#{index}", args: [balancer])
end
# => #<Concurrent::Actor::Reference:0x7fb6fc964190 /pool (Concurrent::Actor::Utils::Pool)>

pool << 1 << 2 << 3 << 4 << 5 << 6
# => #<Concurrent::Actor::Reference:0x7fb6fc964190 /pool (Concurrent::Actor::Utils::Pool)>

# prints two lines each second
# /pool/worker-0 second:1414677666 message:1
# /pool/worker-1 second:1414677666 message:2
# /pool/worker-0 second:1414677667 message:3
# /pool/worker-1 second:1414677667 message:4
# /pool/worker-0 second:1414677668 message:5
# /pool/worker-1 second:1414677668 message:6

sleep 1 # => 1
Loading