Skip to content

Commit ac51231

Browse files
committed
Channel::Probe is now just an IVar.
1 parent ac9a7b3 commit ac51231

File tree

8 files changed

+27
-63
lines changed

8 files changed

+27
-63
lines changed

lib/concurrent/channel/buffered_channel.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def select(probe)
4040
@probe_set.put(probe)
4141
true
4242
else
43-
shift_buffer if probe.set_unless_assigned(peek_buffer, self)
43+
shift_buffer if probe.set?([peek_buffer, self])
4444
end
4545

4646
end
@@ -76,7 +76,7 @@ def set_probe_or_push_into_buffer(value)
7676
push_into_buffer(value)
7777
true
7878
else
79-
@probe_set.take.set_unless_assigned(value, self)
79+
@probe_set.take.set?([value, self])
8080
end
8181
end
8282
end

lib/concurrent/channel/channel.rb

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,12 @@
33
module Concurrent
44
module Channel
55

6-
class Probe < Concurrent::IVar
7-
8-
def initialize(value = NO_VALUE, opts = {})
9-
super(value, opts)
10-
end
11-
12-
def set_unless_assigned(value, channel)
13-
mutex.synchronize do
14-
return false if [:fulfilled, :rejected].include? @state
15-
16-
set_state(true, [value, channel], nil)
17-
event.set
18-
true
19-
end
20-
end
21-
22-
alias_method :composite_value, :value
23-
24-
def value
25-
composite_value.nil? ? nil : composite_value[0]
26-
end
27-
28-
def channel
29-
composite_value.nil? ? nil : composite_value[1]
30-
end
31-
end
6+
Probe = IVar
327

338
def self.select(*channels)
349
probe = Probe.new
3510
channels.each { |channel| channel.select(probe) }
36-
result = probe.composite_value
11+
result = probe.value
3712
channels.each { |channel| channel.remove_probe(probe) }
3813
result
3914
end

lib/concurrent/channel/unbuffered_channel.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ def probe_set_size
1212
end
1313

1414
def push(value)
15-
# TODO set_unless_assigned define on IVar as #set_state? or #try_set_state
16-
until @probe_set.take.set_unless_assigned(value, self)
15+
until @probe_set.take.set?([value, self])
1716
end
1817
end
1918

lib/concurrent/future.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ def set(value = IVar::NO_VALUE, &block)
8888
execute
8989
end
9090

91-
protected :complete
92-
9391
private
9492

9593
# @!visibility private

lib/concurrent/ivar.rb

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,8 @@ def add_observer(observer = nil, func = :update, &block)
103103
# been set or otherwise completed
104104
# @return [IVar] self
105105
def set(value = NO_VALUE)
106-
if (block_given? && value != NO_VALUE) || (!block_given? && value == NO_VALUE)
107-
raise ArgumentError.new('must set with either a value or a block')
108-
elsif ! compare_and_set_state(:processing, :pending)
109-
raise MultipleAssignmentError
110-
end
106+
check_for_block_or_value!(block_given?, value)
107+
raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending)
111108

112109
begin
113110
value = yield if block_given?

spec/concurrent/channel/buffered_channel_spec.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ module Concurrent
4747
it 'should assign value to a probe if probe set is not empty' do
4848
channel.select(probe)
4949
Thread.new { sleep(0.1); channel.push 3 }
50-
expect(probe.value).to eq 3
50+
expect(probe.value.first).to eq 3
5151
end
5252
end
5353

@@ -62,14 +62,14 @@ module Concurrent
6262
channel.push 1
6363
result = channel.pop
6464

65-
expect(result).to eq 1
65+
expect(result.first).to eq 1
6666
end
6767

6868
it 'removes the first value from the buffer' do
6969
channel.push 'a'
7070
channel.push 'b'
7171

72-
expect(channel.pop).to eq 'a'
72+
expect(channel.pop.first).to eq 'a'
7373
expect(channel.buffer_queue_size).to eq 1
7474
end
7575
end
@@ -91,7 +91,7 @@ module Concurrent
9191

9292
Thread.new { channel.push 82 }
9393

94-
expect(probe.value).to eq 82
94+
expect(probe.value.first).to eq 82
9595
end
9696

9797
end
@@ -120,7 +120,7 @@ module Concurrent
120120

121121
expect(channel.buffer_queue_size).to eq 1
122122

123-
expect(channel.pop).to eq 82
123+
expect(channel.pop.first).to eq 82
124124

125125
end
126126
end

spec/concurrent/channel/probe_spec.rb

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,57 +20,52 @@ def trigger_observable(observable)
2020
it_should_behave_like :observable
2121
end
2222

23-
describe '#set_unless_assigned' do
23+
describe '#set?' do
2424
context 'empty probe' do
2525
it 'assigns the value' do
26-
probe.set_unless_assigned(32, channel)
27-
expect(probe.value).to eq 32
26+
probe.set?([32, channel])
27+
expect(probe.value.first).to eq 32
2828
end
2929

3030
it 'assign the channel' do
31-
probe.set_unless_assigned(32, channel)
32-
expect(probe.channel).to be channel
31+
probe.set?([32, channel])
32+
expect(probe.value.last).to be channel
3333
end
3434

3535
it 'returns true' do
36-
expect(probe.set_unless_assigned('hi', channel)).to eq true
36+
expect(probe.set?(['hi', channel])).to eq true
3737
end
3838
end
3939

4040
context 'fulfilled probe' do
4141
before(:each) { probe.set([27, nil]) }
4242

4343
it 'does not assign the value' do
44-
probe.set_unless_assigned(88, channel)
45-
expect(probe.value).to eq 27
44+
probe.set?([88, channel])
45+
expect(probe.value.first).to eq 27
4646
end
4747

4848
it 'returns false' do
49-
expect(probe.set_unless_assigned('hello', channel)).to eq false
49+
expect(probe.set?(['hello', channel])).to eq false
5050
end
5151
end
5252

5353
context 'rejected probe' do
5454
before(:each) { probe.fail }
5555

5656
it 'does not assign the value' do
57-
probe.set_unless_assigned(88, channel)
57+
probe.set?([88, channel])
5858
expect(probe).to be_rejected
5959
end
6060

6161
it 'has a nil value' do
6262
expect(probe.value).to be_nil
6363
end
6464

65-
it 'has a nil channel' do
66-
expect(probe.channel).to be_nil
67-
end
68-
6965
it 'returns false' do
70-
expect(probe.set_unless_assigned('hello', channel)).to eq false
66+
expect(probe.set?(['hello', channel])).to eq false
7167
end
7268
end
7369
end
74-
7570
end
7671
end

spec/concurrent/channel/unbuffered_channel_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ module Concurrent
3939

4040
sleep(0.1)
4141

42-
expect(result).to eq 42
42+
expect(result.first).to eq 42
4343
end
4444

4545
it 'passes the pushed value to only one thread' do
@@ -62,7 +62,7 @@ module Concurrent
6262

6363
sleep(0.1)
6464

65-
expect(result).to eq 57
65+
expect(result.first).to eq 57
6666
end
6767
end
6868

@@ -81,7 +81,7 @@ module Concurrent
8181

8282
Thread.new { channel.push 82 }
8383

84-
expect(probe.value).to eq 82
84+
expect(probe.value.first).to eq 82
8585
end
8686

8787
it 'ignores already set probes and waits for a new one' do
@@ -101,7 +101,7 @@ module Concurrent
101101

102102
sleep(0.05)
103103

104-
expect(new_probe.value).to eq 72
104+
expect(new_probe.value.first).to eq 72
105105
end
106106

107107
end

0 commit comments

Comments
 (0)