Skip to content

Commit 19e1926

Browse files
authored
Merge pull request #911 from ruby-concurrency/pitr-ch/set
Fix Set thread safety
2 parents 382550c + 7e1dc82 commit 19e1926

File tree

4 files changed

+99
-9
lines changed

4 files changed

+99
-9
lines changed

lib/concurrent-ruby/concurrent/set.rb

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,27 @@ module Concurrent
1919
#
2020
# @see http://ruby-doc.org/stdlib-2.4.0/libdoc/set/rdoc/Set.html Ruby standard library `Set`
2121

22-
2322
# @!macro internal_implementation_note
2423
SetImplementation = case
2524
when Concurrent.on_cruby?
26-
# Because MRI never runs code in parallel, the existing
27-
# non-thread-safe structures should usually work fine.
28-
::Set
25+
# The CRuby implementation of Set is written in Ruby itself and is
26+
# not thread safe for certain methods.
27+
require 'monitor'
28+
require 'concurrent/thread_safe/util/data_structures'
29+
30+
class CRubySet < ::Set
31+
end
32+
33+
ThreadSafe::Util.make_synchronized_on_cruby CRubySet
34+
CRubySet
2935

3036
when Concurrent.on_jruby?
3137
require 'jruby/synchronized'
3238

3339
class JRubySet < ::Set
3440
include JRuby::Synchronized
3541
end
42+
3643
JRubySet
3744

3845
when Concurrent.on_rbx?
@@ -41,7 +48,8 @@ class JRubySet < ::Set
4148

4249
class RbxSet < ::Set
4350
end
44-
ThreadSafe::Util.make_synchronized_on_rbx Concurrent::RbxSet
51+
52+
ThreadSafe::Util.make_synchronized_on_rbx RbxSet
4553
RbxSet
4654

4755
when Concurrent.on_truffleruby?
@@ -50,7 +58,7 @@ class RbxSet < ::Set
5058
class TruffleRubySet < ::Set
5159
end
5260

53-
ThreadSafe::Util.make_synchronized_on_truffleruby Concurrent::TruffleRubySet
61+
ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubySet
5462
TruffleRubySet
5563

5664
else

lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,37 @@ def self.synchronized(object, &block)
1212
module Concurrent
1313
module ThreadSafe
1414
module Util
15+
def self.make_synchronized_on_cruby(klass)
16+
klass.class_eval do
17+
def initialize(*args, &block)
18+
@_monitor = Monitor.new
19+
super
20+
end
21+
22+
def initialize_copy(other)
23+
# make sure a copy is not sharing a monitor with the original object!
24+
@_monitor = Monitor.new
25+
super
26+
end
27+
end
28+
29+
klass.superclass.instance_methods(false).each do |method|
30+
klass.class_eval <<-RUBY, __FILE__, __LINE__ + 1
31+
def #{method}(*args)
32+
monitor = @_monitor
33+
monitor or raise("BUG: Internal monitor was not properly initialized. Please report this to the concurrent-ruby developers.")
34+
monitor.synchronize { super }
35+
end
36+
RUBY
37+
end
38+
end
39+
1540
def self.make_synchronized_on_rbx(klass)
1641
klass.class_eval do
1742
private
1843

1944
def _mon_initialize
20-
@_monitor = Monitor.new unless @_monitor # avoid double initialisation
45+
@_monitor ||= Monitor.new # avoid double initialisation
2146
end
2247

2348
def self.new(*args)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module Concurrent
2-
VERSION = '1.1.8'
2+
VERSION = '1.1.8'
33
end

spec/concurrent/set_spec.rb

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ module Concurrent
4242
end
4343

4444
context 'concurrency' do
45-
it do
45+
it '#add and #delete' do
4646
(1..Concurrent::ThreadSafe::Test::THREADS).map do |i|
4747
in_thread do
4848
1000.times do
@@ -55,6 +55,63 @@ module Concurrent
5555
end.map(&:join)
5656
expect(set).to be_empty
5757
end
58+
59+
it 'force context switch' do
60+
barrier = Concurrent::CyclicBarrier.new(2)
61+
62+
# methods like include? or delete? are implemented for CRuby in Ruby itself
63+
# @see https://github.com/ruby/ruby/blob/master/lib/set.rb
64+
set.clear
65+
66+
# add a single element
67+
set.add(1)
68+
69+
# This thread should start and `Set#reject!` in CRuby should cache a value of `0` for size
70+
thread_reject = in_thread do
71+
# we expect this to return nil since nothing should have changed.
72+
expect(set.reject! do |v|
73+
barrier.wait
74+
v == 1 # only delete the 1 value
75+
end).to eq set
76+
end
77+
78+
thread_add = in_thread do
79+
barrier.wait
80+
expect(set.add?(1)).to eq set
81+
end
82+
83+
join_with [thread_reject, thread_add]
84+
end
85+
86+
it '#each' do
87+
threads = []
88+
("a".."z").inject(set, &:<<) # setup a non-empty set
89+
90+
threads << in_thread do
91+
2000.times do
92+
size = nil
93+
set.each do |member|
94+
if size.nil?
95+
size = set.length
96+
else
97+
expect(set.length).to eq(size)
98+
end
99+
end
100+
end
101+
end
102+
103+
threads += (1..19).map do |i|
104+
in_thread do
105+
v = i * 1000
106+
10.times do
107+
200.times { |j| set << (v+j) }
108+
200.times { |j| set.delete(v+j) }
109+
end
110+
end
111+
end
112+
113+
threads.map(&:join)
114+
end
58115
end
59116
end
60117
end

0 commit comments

Comments
 (0)