Skip to content

Commit 77362cf

Browse files
committed
Adding Delay and benchmark of the synchronization
1 parent f7bf6a2 commit 77362cf

File tree

1 file changed

+158
-42
lines changed

1 file changed

+158
-42
lines changed

lib/concurrent/next.rb

Lines changed: 158 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,21 @@ def future(executor = :fast, &block)
5959
Future.execute executor, &block
6060
end
6161

62+
# @return [Delay]
63+
def delay(executor = :fast, &block)
64+
Delay.new(executor, &block)
65+
end
66+
6267
alias_method :async, :future
6368
end
6469

6570
extend Shortcuts
6671

67-
# TODO benchmark java implementation, is it faster as expected?
68-
class SynchronizedObject
69-
70-
engine = defined?(RUBY_ENGINE) && RUBY_ENGINE
71-
72-
case engine
73-
when 'jruby'
74-
require 'jruby'
72+
begin
73+
require 'jruby'
7574

75+
# roughly more than 2x faster
76+
class JavaSynchronizedObject
7677
def initialize
7778
end
7879

@@ -81,51 +82,59 @@ def synchronize
8182
end
8283

8384
def wait(timeout)
84-
JRuby.reference0(self).wait(timeout ? timeout * 1000 : nil)
85+
if timeout
86+
JRuby.reference0(self).wait(timeout * 1000)
87+
else
88+
JRuby.reference0(self).wait
89+
end
8590
end
8691

8792
def notify_all
8893
JRuby.reference0(self).notifyAll
8994
end
95+
end
96+
rescue LoadError
97+
# ignore
98+
end
9099

91-
when 'rbx'
92-
93-
raise NotImplementedError # TODO
100+
class RubySynchronizedObject
101+
def initialize
102+
@mutex = Mutex.new
103+
@condition = Concurrent::Condition.new
104+
end
94105

95-
# def synchronize
96-
# Rubinius.lock(self)
106+
def synchronize
107+
# if @mutex.owned?
97108
# yield
98-
# ensure
99-
# Rubinius.unlock(self)
109+
# else
110+
@mutex.synchronize { yield }
111+
rescue ThreadError
112+
yield
100113
# end
114+
end
101115

102-
else
103-
104-
def initialize
105-
@mutex = Mutex.new
106-
@condition = Concurrent::Condition.new
107-
end
108-
109-
def synchronize
110-
if @mutex.owned?
111-
yield
112-
else
113-
@mutex.synchronize { yield }
114-
end
115-
end
116-
117-
def wait(timeout)
118-
@condition.wait @mutex, timeout
119-
end
116+
def wait(timeout)
117+
@condition.wait @mutex, timeout
118+
end
120119

121-
def notify
122-
@condition.signal
123-
end
120+
def notify
121+
@condition.signal
122+
end
124123

125-
def notify_all
126-
@condition.broadcast
127-
end
124+
def notify_all
125+
@condition.broadcast
126+
end
127+
end
128128

129+
engine = defined?(RUBY_ENGINE) && RUBY_ENGINE
130+
case engine
131+
when 'jruby'
132+
class SynchronizedObject < JavaSynchronizedObject
133+
end
134+
when 'rbx'
135+
raise NotImplementedError # TODO
136+
else
137+
class SynchronizedObject < RubySynchronizedObject
129138
end
130139
end
131140

@@ -161,6 +170,7 @@ class Future < SynchronizedObject
161170

162171
singleton_class.send :alias_method, :dataflow, :join
163172

173+
# @api private
164174
def initialize(default_executor = :fast)
165175
super()
166176
synchronize do
@@ -361,9 +371,15 @@ def add_callback(&callback)
361371
end
362372

363373
class Promise < SynchronizedObject
364-
def initialize(executor = :fast)
374+
# @api private
375+
def initialize(executor_or_future = :fast)
365376
super()
366-
synchronize { @future = Future.new(executor) }
377+
future = if Future === executor_or_future
378+
executor_or_future
379+
else
380+
Future.new(executor_or_future)
381+
end
382+
synchronize { @future = future }
367383
end
368384

369385
def future
@@ -416,6 +432,33 @@ def connect_to(future)
416432

417433
end
418434

435+
class Delay < Future
436+
437+
def initialize(default_executor = :fast, &block)
438+
super(default_executor)
439+
raise ArgumentError.new('no block given') unless block_given?
440+
synchronize do
441+
@computing = false
442+
@task = block
443+
end
444+
end
445+
446+
def wait(timeout = nil)
447+
execute_task_once
448+
super timeout
449+
end
450+
451+
private
452+
453+
def execute_task_once
454+
execute, task = synchronize do
455+
[(@computing = true unless @computing), @task]
456+
end
457+
458+
Next.executor(default_executor).post { Promise.new(self).evaluate_to &task } if execute
459+
end
460+
end
461+
419462
end
420463
end
421464

@@ -465,4 +508,77 @@ def connect_to(future)
465508
# 6 true Boo io
466509
# 7 true [3, "boo"] fast
467510

511+
puts '-- delay'
512+
513+
# evaluated on #wait, #value
514+
delay = delay { 1 + 1 }
515+
p delay.completed?, delay.value
516+
517+
puts '-- promise like tree'
518+
519+
# if head of the tree is not constructed with #future but with #delay it does not start execute,
520+
# it's triggered later by `head.wait`
521+
head = delay { 1 }
522+
tree = head.then(&:succ).then(&:succ).then(&:succ)
523+
thread = Thread.new { p tree.value } # prints 4
524+
head.wait
525+
thread.join
526+
527+
puts '-- bench'
528+
require 'benchmark'
529+
530+
Benchmark.bmbm(20) do |b|
531+
532+
parents = [RubySynchronizedObject, (JavaSynchronizedObject if defined? JavaSynchronizedObject)].compact
533+
classes = parents.map do |parent|
534+
klass = Class.new(parent) do
535+
def initialize
536+
super
537+
synchronize do
538+
@q = []
539+
end
540+
end
541+
542+
def add(v)
543+
synchronize do
544+
@q << v
545+
if @q.size > 100
546+
@q.clear
547+
end
548+
end
549+
end
550+
end
551+
[parent, klass]
552+
end
553+
554+
classes.each do |parent, klass|
555+
b.report(parent) do
556+
s = klass.new
557+
2.times.map do
558+
Thread.new do
559+
5_000_000.times { s.add :a }
560+
end
561+
end.each &:join
562+
end
563+
564+
end
565+
566+
end
468567

568+
# MRI
569+
# Rehearsal ----------------------------------------------------------------------------
570+
# Concurrent::Next::RubySynchronizedObject 8.010000 6.290000 14.300000 ( 12.197402)
571+
# ------------------------------------------------------------------ total: 14.300000sec
572+
#
573+
# user system total real
574+
# Concurrent::Next::RubySynchronizedObject 8.950000 9.320000 18.270000 ( 15.053220)
575+
#
576+
# JRuby
577+
# Rehearsal ----------------------------------------------------------------------------
578+
# Concurrent::Next::RubySynchronizedObject 10.500000 6.440000 16.940000 ( 10.640000)
579+
# Concurrent::Next::JavaSynchronizedObject 8.410000 0.050000 8.460000 ( 4.132000)
580+
# ------------------------------------------------------------------ total: 25.400000sec
581+
#
582+
# user system total real
583+
# Concurrent::Next::RubySynchronizedObject 9.090000 6.640000 15.730000 ( 10.690000)
584+
# Concurrent::Next::JavaSynchronizedObject 8.200000 0.030000 8.230000 ( 4.141000)

0 commit comments

Comments
 (0)