Skip to content

Commit 591ca92

Browse files
committed
Simplify pool using Future on_completion! callback
1 parent d194d71 commit 591ca92

File tree

7 files changed

+94
-112
lines changed

7 files changed

+94
-112
lines changed

examples/edge_futures.in.rb

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -204,23 +204,17 @@ def schedule_job
204204

205205

206206
# In reality there is often a pool though:
207-
class DBConnection < Concurrent::Actor::Utils::AbstractWorker
208-
def initialize(balancer, data)
209-
super balancer
210-
@data = data
211-
end
212-
213-
def work(message)
214-
# pretending that this queries a DB
215-
@data[message]
216-
end
217-
end
218-
219207
data = Array.new(10) { |i| '*' * i }
220208
pool_size = 5
221209

222-
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |balancer, index|
223-
DBConnection.spawn(name: "worker-#{index}", args: [balancer, data])
210+
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |index|
211+
# DB connection constructor
212+
Concurrent::Actor::Utils::AdHoc.spawn(name: "worker-#{index}", args: [data]) do
213+
lambda do |message|
214+
# pretending that this queries a DB
215+
data[message]
216+
end
217+
end
224218
end
225219

226220
concurrent_jobs = 11.times.map do |v|

examples/edge_futures.out.rb

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
### Simple asynchronous task
22

33
future = Concurrent.future { sleep 0.1; 1 + 1 } # evaluation starts immediately
4-
# => <#Concurrent::Edge::Future:0x7f9f549266e8 pending blocks:[]>
4+
# => <#Concurrent::Edge::Future:0x7fa1b59bbbb8 pending blocks:[]>
55
future.completed? # => false
66
# block until evaluated
77
future.value # => 2
@@ -11,7 +11,7 @@
1111
### Failing asynchronous task
1212

1313
future = Concurrent.future { raise 'Boom' }
14-
# => <#Concurrent::Edge::Future:0x7f9f548dd3f8 pending blocks:[]>
14+
# => <#Concurrent::Edge::Future:0x7fa1b59b2fe0 failed blocks:[]>
1515
future.value # => nil
1616
future.value! rescue $! # => #<RuntimeError: Boom>
1717
future.reason # => #<RuntimeError: Boom>
@@ -23,8 +23,7 @@
2323

2424
head = Concurrent.future { 1 }
2525
branch1 = head.then(&:succ)
26-
branch2 = head.then(&:succ).then(&:succ)
27-
# zipping futures
26+
branch2 = head.then(&:succ).then(&:succ)
2827
branch1.zip(branch2).value # => [2, 3]
2928
(branch1 & branch2).then { |a, b| a + b }.value! # => 5
3029
(branch1 & branch2).then(&:+).value! # => 5
@@ -48,21 +47,21 @@
4847

4948
# will not evaluate until asked by #value or other method requiring completion
5049
future = Concurrent.delay { 'lazy' }
51-
# => <#Concurrent::Edge::Future:0x7f9f53a57b58 pending blocks:[]>
50+
# => <#Concurrent::Edge::Future:0x7fa1b5948f00 pending blocks:[]>
5251
sleep 0.1
5352
future.completed? # => false
5453
future.value # => "lazy"
5554

5655
# propagates trough chain allowing whole or partial lazy chains
5756

5857
head = Concurrent.delay { 1 }
59-
# => <#Concurrent::Edge::Future:0x7f9f53a35508 pending blocks:[]>
58+
# => <#Concurrent::Edge::Future:0x7fa1b59401c0 pending blocks:[]>
6059
branch1 = head.then(&:succ)
61-
# => <#Concurrent::Edge::Future:0x7f9f53a17328 pending blocks:[]>
60+
# => <#Concurrent::Edge::Future:0x7fa1b5933290 pending blocks:[]>
6261
branch2 = head.delay.then(&:succ)
63-
# => <#Concurrent::Edge::Future:0x7f9f5590d868 pending blocks:[]>
62+
# => <#Concurrent::Edge::Future:0x7fa1b5930c98 pending blocks:[]>
6463
join = branch1 & branch2
65-
# => <#Concurrent::Edge::ArrayFuture:0x7f9f558fd7d8 pending blocks:[]>
64+
# => <#Concurrent::Edge::ArrayFuture:0x7fa1b592b7c0 pending blocks:[]>
6665

6766
sleep 0.1 # nothing will complete # => 0
6867
[head, branch1, branch2, join].map(&:completed?) # => [false, false, false, false]
@@ -90,14 +89,14 @@
9089
### Schedule
9190

9291
scheduled = Concurrent.schedule(0.1) { 1 }
93-
# => <#Concurrent::Edge::Future:0x7f9f5581ed30 pending blocks:[]>
92+
# => <#Concurrent::Edge::Future:0x7fa1b49df190 pending blocks:[]>
9493

9594
scheduled.completed? # => false
9695
scheduled.value # available after 0.1sec # => 1
9796

9897
# and in chain
9998
scheduled = Concurrent.delay { 1 }.schedule(0.1).then(&:succ)
100-
# => <#Concurrent::Edge::Future:0x7f9f548494c8 pending blocks:[]>
99+
# => <#Concurrent::Edge::Future:0x7fa1b49d4ec0 pending blocks:[]>
101100
# will not be scheduled until value is requested
102101
sleep 0.1
103102
scheduled.value # returns after another 0.1sec # => 2
@@ -106,35 +105,35 @@
106105
### Completable Future and Event
107106

108107
future = Concurrent.future
109-
# => <#Concurrent::Edge::CompletableFuture:0x7f9f5385bcf0 pending blocks:[]>
108+
# => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 pending blocks:[]>
110109
event = Concurrent.event
111-
# => <#Concurrent::Edge::CompletableEvent:0x7f9f53858ca8 pending blocks:[]>
110+
# => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 pending blocks:[]>
112111

113112
# will be blocked until completed
114113
t1 = Thread.new { future.value }
115114
t2 = Thread.new { event.wait }
116115

117116
future.success 1
118-
# => <#Concurrent::Edge::CompletableFuture:0x7f9f5385bcf0 success blocks:[]>
117+
# => <#Concurrent::Edge::CompletableFuture:0x7fa1b49bce10 success blocks:[]>
119118
future.success 1 rescue $!
120119
# => #<Concurrent::MultipleAssignmentError: multiple assignment>
121120
future.try_success 2 # => false
122121
event.complete
123-
# => <#Concurrent::Edge::CompletableEvent:0x7f9f53858ca8 completed blocks:[]>
122+
# => <#Concurrent::Edge::CompletableEvent:0x7fa1b49bc118 completed blocks:[]>
124123

125124
[t1, t2].each &:join
126125

127126

128127
### Callbacks
129128

130-
queue = Queue.new # => #<Thread::Queue:0x007f9f53861b50>
129+
queue = Queue.new # => #<Thread::Queue:0x007fa1b49acec0>
131130
future = Concurrent.delay { 1 + 1 }
132-
# => <#Concurrent::Edge::Future:0x7f9f54853d10 pending blocks:[]>
131+
# => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
133132

134133
future.on_success { queue << 1 } # evaluated asynchronously
135-
# => <#Concurrent::Edge::Future:0x7f9f54853d10 pending blocks:[]>
134+
# => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
136135
future.on_success! { queue << 2 } # evaluated on completing thread
137-
# => <#Concurrent::Edge::Future:0x7f9f54853d10 pending blocks:[]>
136+
# => <#Concurrent::Edge::Future:0x7fa1b49ac150 pending blocks:[]>
138137

139138
queue.empty? # => true
140139
future.value # => 2
@@ -145,7 +144,7 @@
145144
### Thread-pools
146145

147146
Concurrent.future(:fast) { 2 }.then(:io) { File.read __FILE__ }.wait
148-
# => <#Concurrent::Edge::Future:0x7f9f539545f8 success blocks:[]>
147+
# => <#Concurrent::Edge::Future:0x7fa1b498dd18 success blocks:[]>
149148

150149

151150
### Interoperability with actors
@@ -166,32 +165,32 @@
166165

167166
### Interoperability with channels
168167

169-
ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007f9f558e5db8>
170-
ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007f9f558e4f80>
168+
ch1 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fa1b41c5e28>
169+
ch2 = Concurrent::Edge::Channel.new # => #<Concurrent::Edge::Channel:0x007fa1b41c5338>
171170

172171
result = Concurrent.select(ch1, ch2)
173-
# => <#Concurrent::Edge::CompletableFuture:0x7f9f53a4fe58 pending blocks:[]>
172+
# => <#Concurrent::Edge::CompletableFuture:0x7fa1b41bf4d8 pending blocks:[]>
174173
ch1.push 1 # => nil
175174
result.value!
176-
# => [1, #<Concurrent::Edge::Channel:0x007f9f558e5db8>]
175+
# => [1, #<Concurrent::Edge::Channel:0x007fa1b41c5e28>]
177176

178177
Concurrent.
179178
future { 1+1 }.
180179
then_push(ch1)
181-
# => <#Concurrent::Edge::Future:0x7f9f53a2e758 pending blocks:[]>
180+
# => <#Concurrent::Edge::Future:0x7fa1b41b6450 pending blocks:[]>
182181
result = Concurrent.
183182
future { '%02d' }.
184183
then_select(ch1, ch2).
185184
then { |format, (value, channel)| format format, value }
186-
# => <#Concurrent::Edge::Future:0x7f9f55925f30 pending blocks:[]>
185+
# => <#Concurrent::Edge::Future:0x7fa1b41a7f40 pending blocks:[]>
187186
result.value! # => "02"
188187

189188

190189
### Common use-cases Examples
191190

192191
# simple background processing
193192
Concurrent.future { do_stuff }
194-
# => <#Concurrent::Edge::Future:0x7f9f53a93e50 pending blocks:[]>
193+
# => <#Concurrent::Edge::Future:0x7fa1b4196d08 pending blocks:[]>
195194

196195
# parallel background processing
197196
jobs = 10.times.map { |i| Concurrent.future { i } }
@@ -208,7 +207,7 @@ def schedule_job
208207
end # => :schedule_job
209208

210209
schedule_job
211-
# => <#Concurrent::Edge::Future:0x7f9f548b01a0 pending blocks:[]>
210+
# => <#Concurrent::Edge::Future:0x7fa1b4147960 pending blocks:[]>
212211
@end = true # => true
213212

214213

@@ -238,24 +237,18 @@ def schedule_job
238237

239238

240239
# In reality there is often a pool though:
241-
class DBConnection < Concurrent::Actor::Utils::AbstractWorker
242-
def initialize(balancer, data)
243-
super balancer
244-
@data = data
245-
end
246-
247-
def work(message)
248-
# pretending that this queries a DB
249-
@data[message]
250-
end
251-
end # => :work
252-
253240
data = Array.new(10) { |i| '*' * i }
254241
# => ["", "*", "**", "***", "****", "*****", "******", "*******", "********", "*********"]
255242
pool_size = 5 # => 5
256243

257-
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |balancer, index|
258-
DBConnection.spawn(name: "worker-#{index}", args: [balancer, data])
244+
DB_POOL = Concurrent::Actor::Utils::Pool.spawn!('DB-pool', pool_size) do |index|
245+
# DB connection constructor
246+
Concurrent::Actor::Utils::AdHoc.spawn(name: "worker-#{index}", args: [data]) do
247+
lambda do |message|
248+
# pretending that this queries a DB
249+
data[message]
250+
end
251+
end
259252
end
260253
# => #<Concurrent::Actor::Reference /DB-pool (Concurrent::Actor::Utils::Pool)>
261254

lib/concurrent/actor/core.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def ns_initialize(opts, &block)
208208
private
209209

210210
def handle_envelope(envelope)
211-
log DEBUG, "received #{envelope.message.inspect} from #{envelope.sender}"
211+
log DEBUG, "#{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} from #{envelope.sender}"
212212
@first_behaviour.on_envelope envelope
213213
end
214214

lib/concurrent/actor/utils/balancer.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@ def initialize
1414
end
1515

1616
def on_message(message)
17-
case message
17+
command, who = message
18+
case command
1819
when :subscribe
19-
@receivers << envelope.sender
20+
@receivers << (who || envelope.sender)
2021
distribute
2122
true
2223
when :unsubscribe
23-
@receivers.delete envelope.sender
24+
@receivers.delete(who || envelope.sender)
2425
true
2526
when :subscribed?
26-
@receivers.include? envelope.sender
27+
@receivers.include?(who || envelope.sender)
2728
else
2829
@buffer << envelope
2930
distribute

lib/concurrent/actor/utils/pool.rb

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,21 @@ module Utils
2929
class Pool < RestartingContext
3030
def initialize(size, &worker_initializer)
3131
@balancer = Balancer.spawn name: :balancer, supervise: true
32-
@workers = Array.new(size, &worker_initializer.curry[@balancer])
33-
@workers.each { |w| Type! w, Reference }
32+
@workers = Array.new(size, &worker_initializer)
33+
@workers.each do |worker|
34+
Type! worker, Reference
35+
@balancer << [:subscribe, worker]
36+
end
3437
end
3538

3639
def on_message(message)
37-
redirect @balancer
38-
end
39-
end
40-
41-
class AbstractWorker < RestartingContext
42-
def initialize(balancer)
43-
@balancer = balancer
44-
@balancer << :subscribe
45-
end
46-
47-
def on_message(message)
48-
work message
49-
ensure
50-
@balancer << :subscribe
51-
end
52-
53-
def work(message)
54-
raise NotImplementedError
40+
envelope_to_redirect = if envelope.future
41+
envelope
42+
else
43+
Envelope.new(envelope.message, Concurrent.future, envelope.sender, envelope.sender)
44+
end
45+
envelope.future.on_completion! { @balancer << :subscribe }
46+
redirect @balancer, envelope_to_redirect
5547
end
5648
end
5749
end

lib/concurrent/edge/future.rb

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def delay_on(default_executor, *args, &task)
5757
end
5858

5959
# Schedules the block to be executed on executor in given intended_time.
60+
# @param [Numeric, Time] intended_time Numeric => run in `intended_time` seconds. Time => eun on time.
6061
# @return [Future]
6162
def schedule(intended_time, *args, &task)
6263
schedule_on :io, intended_time, *args, &task
@@ -228,13 +229,13 @@ def inspect
228229
end
229230

230231
# @!visibility private
231-
def complete(raise = true)
232+
def complete(raise_on_reassign = true)
232233
if complete_state
233234
# go to synchronized block only if there were waiting threads
234235
synchronize { ns_broadcast } if @Waiters.clear
235236
call_callbacks
236237
else
237-
Concurrent::MultipleAssignmentError.new('multiple assignment') if raise
238+
Concurrent::MultipleAssignmentError.new('multiple assignment') if raise_on_reassign
238239
return false
239240
end
240241
self
@@ -498,13 +499,13 @@ def apply_value(value, block)
498499
end
499500

500501
# @!visibility private
501-
def complete(success, value, reason, raise = true)
502+
def complete(success, value, reason, raise_on_reassign = true)
502503
if complete_state success, value, reason
503504
@Waiters.clear
504505
synchronize { ns_broadcast }
505506
call_callbacks success, value, reason
506507
else
507-
raise reason || Concurrent::MultipleAssignmentError.new('multiple assignment') if raise
508+
raise reason || Concurrent::MultipleAssignmentError.new('multiple assignment') if raise_on_reassign
508509
return false
509510
end
510511
self
@@ -594,8 +595,8 @@ def apply_value(value, block)
594595
# A Event which can be completed by user.
595596
class CompletableEvent < Event
596597
# Complete the Event, `raise` if already completed
597-
def complete(raise = true)
598-
super raise
598+
def complete(raise_on_reassign = true)
599+
super raise_on_reassign
599600
end
600601
end
601602

@@ -604,8 +605,8 @@ class CompletableFuture < Future
604605
# Complete the future with triplet od `success`, `value`, `reason`
605606
# `raise` if already completed
606607
# return [self]
607-
def complete(success, value, reason, raise = true)
608-
super success, value, reason, raise
608+
def complete(success, value, reason, raise_on_reassign = true)
609+
super success, value, reason, raise_on_reassign
609610
end
610611

611612
# Complete the future with value

0 commit comments

Comments
 (0)