1
+ require 'concurrent/synchronized_object'
2
+
1
3
module Concurrent
2
4
3
- class CyclicBarrier
5
+ class CyclicBarrier < SynchronizedObject
4
6
5
7
Generation = Struct . new ( :status )
6
8
private_constant :Generation
@@ -13,23 +15,26 @@ class CyclicBarrier
13
15
#
14
16
# @raise [ArgumentError] if `parties` is not an integer or is less than zero
15
17
def initialize ( parties , &block )
16
- raise ArgumentError . new ( 'count must be in integer greater than or equal zero' ) if !parties . is_a? ( Fixnum ) || parties < 1
17
- @parties = parties
18
- @mutex = Mutex . new
19
- @condition = Condition . new
20
- @number_waiting = 0
21
- @action = block
22
- @generation = Generation . new ( :waiting )
18
+ super ( &nil )
19
+ if !parties . is_a? ( Fixnum ) || parties < 1
20
+ raise ArgumentError . new ( 'count must be in integer greater than or equal zero' )
21
+ end
22
+ synchronize do
23
+ @parties = parties
24
+ @number_waiting = 0
25
+ @action = block
26
+ @generation = Generation . new ( :waiting )
27
+ end
23
28
end
24
29
25
30
# @return [Fixnum] the number of threads needed to pass the barrier
26
31
def parties
27
- @parties
32
+ synchronize { @parties }
28
33
end
29
34
30
35
# @return [Fixnum] the number of threads currently waiting on the barrier
31
36
def number_waiting
32
- @number_waiting
37
+ synchronize { @number_waiting }
33
38
end
34
39
35
40
# Blocks on the barrier until the number of waiting threads is equal to
@@ -41,7 +46,7 @@ def number_waiting
41
46
# @return [Boolean] `true` if the `count` reaches zero else false on
42
47
# `timeout` or on `reset` or if the barrier is broken
43
48
def wait ( timeout = nil )
44
- @mutex . synchronize do
49
+ synchronize do
45
50
46
51
return false unless @generation . status == :waiting
47
52
@@ -58,17 +63,14 @@ def wait(timeout = nil)
58
63
end
59
64
60
65
61
-
62
66
# resets the barrier to its initial state
63
67
# If there is at least one waiting thread, it will be woken up, the `wait`
64
68
# method will return false and the barrier will be broken
65
69
# If the barrier is broken, this method restores it to the original state
66
70
#
67
71
# @return [nil]
68
72
def reset
69
- @mutex . synchronize do
70
- set_status_and_restore ( :reset )
71
- end
73
+ synchronize { set_status_and_restore ( :reset ) }
72
74
end
73
75
74
76
# A barrier can be broken when:
@@ -78,35 +80,26 @@ def reset
78
80
# A broken barrier can be restored using `reset` it's safer to create a new one
79
81
# @return [Boolean] true if the barrier is broken otherwise false
80
82
def broken?
81
- @mutex . synchronize { @generation . status != :waiting }
83
+ synchronize { @generation . status != :waiting }
82
84
end
83
85
84
86
private
85
87
86
88
def set_status_and_restore ( new_status )
87
89
@generation . status = new_status
88
- @condition . broadcast
89
- @generation = Generation . new ( :waiting )
90
+ ns_broadcast
91
+ @generation = Generation . new ( :waiting )
90
92
@number_waiting = 0
91
93
end
92
94
93
95
def wait_for_wake_up ( generation , timeout )
94
- if wait_while_waiting ( generation , timeout )
96
+ if ns_wait_until ( timeout ) { generation . status != :waiting }
95
97
generation . status == :fulfilled
96
98
else
97
99
generation . status = :broken
98
- @condition . broadcast
100
+ ns_broadcast
99
101
false
100
102
end
101
103
end
102
-
103
- def wait_while_waiting ( generation , timeout )
104
- remaining = Condition ::Result . new ( timeout )
105
- while generation . status == :waiting && remaining . can_wait?
106
- remaining = @condition . wait ( @mutex , remaining . remaining_time )
107
- end
108
- remaining . woken_up?
109
- end
110
-
111
104
end
112
105
end
0 commit comments