Skip to content

Commit 734da17

Browse files
committed
Add java implementation of JavaSynchronizedObject
1 parent 18d56fa commit 734da17

File tree

7 files changed

+141
-24
lines changed

7 files changed

+141
-24
lines changed

ext/ConcurrentRubyExtService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException {
99
new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false);
1010
new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false);
1111
new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false);
12+
new com.concurrent_ruby.ext.JavaSynchronizedObjectLibrary().load(runtime, false);
1213
return true;
1314
}
1415
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package com.concurrent_ruby.ext;
2+
3+
import java.io.IOException;
4+
import java.util.concurrent.atomic.AtomicBoolean;
5+
6+
import org.jruby.Ruby;
7+
import org.jruby.RubyClass;
8+
import org.jruby.RubyModule;
9+
import org.jruby.RubyObject;
10+
import org.jruby.anno.JRubyClass;
11+
import org.jruby.anno.JRubyMethod;
12+
import org.jruby.runtime.ObjectAllocator;
13+
import org.jruby.runtime.builtin.IRubyObject;
14+
import org.jruby.runtime.load.Library;
15+
import org.jruby.runtime.Block;
16+
import org.jruby.RubyBoolean;
17+
import org.jruby.RubyNil;
18+
import org.jruby.runtime.ThreadContext;
19+
20+
public class JavaSynchronizedObjectLibrary implements Library {
21+
22+
public void load(Ruby runtime, boolean wrap) throws IOException {
23+
RubyModule concurrentMod = runtime.defineModule("Concurrent");
24+
RubyClass javaSynchronizedObjectClass = concurrentMod.defineClassUnder(
25+
"JavaSynchronizedObject",
26+
concurrentMod.getClass("AbstractSynchronizedObject"),
27+
JRUBYREFERENCE_ALLOCATOR);
28+
javaSynchronizedObjectClass.defineAnnotatedMethods(JavaSynchronizedObject.class);
29+
}
30+
31+
private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
32+
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
33+
return new JavaSynchronizedObject(runtime, klazz);
34+
}
35+
};
36+
37+
@JRubyClass(name = "JavaSynchronizedObject", parent = "AbstractSynchronizedObject")
38+
public static class JavaSynchronizedObject extends RubyObject {
39+
40+
public JavaSynchronizedObject(Ruby runtime, RubyClass metaClass) {
41+
super(runtime, metaClass);
42+
}
43+
44+
@JRubyMethod
45+
public IRubyObject initialize(ThreadContext context) {
46+
return context.nil;
47+
}
48+
49+
@JRubyMethod(name = "synchronize")
50+
public IRubyObject rubySynchronize(ThreadContext context, Block block) {
51+
synchronized (this) {
52+
return block.yield(context, null);
53+
}
54+
}
55+
56+
@JRubyMethod(name = "ns_wait", optional = 1)
57+
public IRubyObject nsWait(ThreadContext context, IRubyObject[] args) {
58+
Ruby runtime = context.runtime;
59+
if (args.length > 1) {
60+
throw runtime.newArgumentError(args.length, 1);
61+
}
62+
Double timeout = null;
63+
if (args.length > 0 && !args[0].isNil()) {
64+
timeout = args[0].convertToFloat().getDoubleValue();
65+
if (timeout < 0) {
66+
throw runtime.newArgumentError("time interval must be positive");
67+
}
68+
}
69+
if (Thread.interrupted()) {
70+
throw runtime.newConcurrencyError("thread interrupted");
71+
}
72+
boolean success = false;
73+
try {
74+
success = context.getThread().wait_timeout(this, timeout);
75+
} catch (InterruptedException ie) {
76+
throw runtime.newConcurrencyError(ie.getLocalizedMessage());
77+
} finally {
78+
// An interrupt or timeout may have caused us to miss
79+
// a notify that we consumed, so do another notify in
80+
// case someone else is available to pick it up.
81+
if (!success) {
82+
this.notify();
83+
}
84+
}
85+
return this;
86+
}
87+
88+
@JRubyMethod(name = "ns_signal")
89+
public IRubyObject nsSignal(ThreadContext context) {
90+
notify();
91+
return this;
92+
}
93+
94+
@JRubyMethod(name = "ns_broadcast")
95+
public IRubyObject nsBroadcast(ThreadContext context) {
96+
notifyAll();
97+
return this;
98+
}
99+
}
100+
}

lib/concurrent.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
require 'concurrent/version'
22

3+
require 'concurrent/synchronized_object'
4+
35
require 'concurrent/configuration'
46

57
require 'concurrent/actor'
@@ -9,7 +11,6 @@
911
require 'concurrent/errors'
1012
require 'concurrent/executors'
1113
require 'concurrent/utilities'
12-
require 'concurrent/synchronized_object'
1314

1415
require 'concurrent/atomic'
1516
require 'concurrent/agent'

lib/concurrent/atomic/cyclic_barrier.rb

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@ def initialize(parties, &block)
2020
raise ArgumentError.new('count must be in integer greater than or equal zero')
2121
end
2222
synchronize do
23-
@parties = parties
24-
@number_waiting = 0
25-
@action = block
26-
@generation = Generation.new(:waiting)
23+
@parties = parties
24+
@action = block
25+
ns_next_generation
2726
end
2827
end
2928

@@ -54,10 +53,16 @@ def wait(timeout = nil)
5453

5554
if @number_waiting == @parties
5655
@action.call if @action
57-
set_status_and_restore(:fulfilled)
56+
ns_generation_done @generation, :fulfilled
5857
true
5958
else
60-
wait_for_wake_up(@generation, timeout)
59+
generation = @generation
60+
if ns_wait_until(timeout) { generation.status != :waiting }
61+
generation.status == :fulfilled
62+
else
63+
ns_generation_done generation, :broken, false
64+
false
65+
end
6166
end
6267
end
6368
end
@@ -70,7 +75,7 @@ def wait(timeout = nil)
7075
#
7176
# @return [nil]
7277
def reset
73-
synchronize { set_status_and_restore(:reset) }
78+
synchronize { ns_generation_done @generation, :reset }
7479
end
7580

7681
# A barrier can be broken when:
@@ -85,21 +90,17 @@ def broken?
8590

8691
private
8792

88-
def set_status_and_restore(new_status)
89-
@generation.status = new_status
93+
def ns_generation_done(generation, status, continue = true)
94+
generation.status = status
95+
ns_next_generation if continue
9096
ns_broadcast
97+
end
98+
99+
def ns_next_generation
91100
@generation = Generation.new(:waiting)
92101
@number_waiting = 0
93102
end
94103

95-
def wait_for_wake_up(generation, timeout)
96-
if ns_wait_until(timeout) { generation.status != :waiting }
97-
generation.status == :fulfilled
98-
else
99-
generation.status = :broken
100-
ns_broadcast
101-
false
102-
end
103-
end
104+
104105
end
105106
end

lib/concurrent/extension_helper.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
module Concurrent
44

5+
class AbstractSynchronizedObject # FIXME has to be present before Java extensions are loaded
6+
end
7+
58
@@c_ext_loaded ||= false
69
@@java_ext_loaded ||= false
710

lib/concurrent/synchronized_object.rb

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'concurrent/utility/engine'
2+
13
module Concurrent
24

35
# Safe synchronization under any Ruby implementation.
@@ -75,7 +77,7 @@ def broadcast
7577
def ns_wait_until(timeout, &condition)
7678
if timeout
7779
wait_until = Concurrent.monotonic_time + timeout
78-
while true
80+
loop do
7981
now = Concurrent.monotonic_time
8082
condition_result = condition.call
8183
# 0.001 correction to avoid error when `wait_until - now` is smaller than 0.0005 and rounded to 0
@@ -109,8 +111,7 @@ def ns_broadcast
109111
if Concurrent.on_jruby?
110112
require 'jruby'
111113

112-
# roughly more than 2x faster
113-
class JavaSynchronizedObject < AbstractSynchronizedObject
114+
class JavaPureSynchronizedObject < AbstractSynchronizedObject
114115
def initialize
115116
end
116117

@@ -121,8 +122,10 @@ def synchronize
121122
private
122123

123124
def ns_wait(timeout = nil)
124-
JRuby.reference0(Thread.current).wait_timeout(self, timeout)
125+
success = JRuby.reference0(Thread.current).wait_timeout(JRuby.reference0(self), timeout)
125126
self
127+
ensure
128+
ns_signal unless success
126129
end
127130

128131
def ns_broadcast

spec/concurrent/atomic/cyclic_barrier_spec.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,15 @@ module Concurrent
173173
expect(barrier).to be_broken
174174
end
175175

176+
it 'breaks the barrier and release all other threads 2' do
177+
t1 = Thread.new { barrier.wait(0.1) }
178+
t2 = Thread.new { barrier.wait(0.1) }
179+
180+
[t1, t2].each(&:join)
181+
182+
expect(barrier).to be_broken
183+
end
184+
176185
it 'does not execute the block on timeout' do
177186
counter = AtomicFixnum.new
178187
barrier = described_class.new(parties) { counter.increment }
@@ -242,5 +251,4 @@ def barrier.simulate_spurious_wake_up
242251
end
243252
end
244253
end
245-
246254
end

0 commit comments

Comments
 (0)