Skip to content

Commit 2a4888c

Browse files
committed
Cooperative cancellation for promises and any other async processing
1 parent ca3456e commit 2a4888c

File tree

3 files changed

+91
-2
lines changed

3 files changed

+91
-2
lines changed

lib/concurrent/edge/promises.rb

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,82 @@ def each_body(value, &block)
7070

7171
end
7272
end
73+
74+
# inspired by https://msdn.microsoft.com/en-us/library/dd537607(v=vs.110).aspx
75+
class Cancellation < Synchronization::Object
76+
safe_initialization!
77+
78+
def self.create
79+
[(i = new), i.token]
80+
end
81+
82+
private_class_method :new
83+
84+
def initialize
85+
@Cancel = Promises.completable_event
86+
@Token = Token.new @Cancel.with_hidden_completable
87+
end
88+
89+
def token
90+
@Token
91+
end
92+
93+
def cancel
94+
try_cancel or raise MultipleAssignmentError, 'cannot cancel twice'
95+
end
96+
97+
def try_cancel
98+
!!@Cancel.complete(false)
99+
end
100+
101+
def canceled?
102+
@Cancel.complete?
103+
end
104+
105+
class Token < Synchronization::Object
106+
safe_initialization!
107+
108+
def initialize(cancel)
109+
@Cancel = cancel
110+
end
111+
112+
def event
113+
@Cancel
114+
end
115+
116+
def on_cancellation(*args, &block)
117+
@Cancel.on_completion *args, &block
118+
end
119+
120+
def then(*args, &block)
121+
@Cancel.chain *args, &block
122+
end
123+
124+
def canceled?
125+
@Cancel.complete?
126+
end
127+
128+
def loop_until_canceled(&block)
129+
until canceled?
130+
result = block.call
131+
end
132+
result
133+
end
134+
135+
def raise_if_canceled
136+
raise CancelledOperationError if canceled?
137+
self
138+
end
139+
140+
def join(*tokens)
141+
Token.new Promises.any_event(@Cancel, *tokens.map(&:event))
142+
end
143+
144+
end
145+
146+
private_constant :Token
147+
148+
# TODO (pitr-ch 27-Mar-2016): cooperation with mutex, select etc?
149+
# TODO (pitr-ch 27-Mar-2016): examples (scheduled to be cancelled in 10 sec)
150+
end
73151
end

lib/concurrent/promises.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,7 +1425,5 @@ def initialize(default_executor, intended_time)
14251425
end
14261426
end
14271427

1428-
# TODO cancelable Futures, will cancel the future but the task will finish anyway
1429-
# TODO task interrupts, how to support?
14301428
# TODO when value is requested the current thread may evaluate the tasks to get the value for performance reasons it may not evaluate :io though
14311429
# TODO try work stealing pool, each thread has it's own queue

spec/concurrent/promises_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,4 +434,17 @@ def behaves_as_delay(delay, value)
434434
end
435435
end
436436

437+
describe 'Cancellation', edge: true do
438+
specify do
439+
source, token = Concurrent::Cancellation.create
440+
441+
futures = Array.new(2) { future(token) { |t| t.loop_until_canceled { Thread.pass }; :done } }
442+
443+
source.cancel
444+
futures.each do |future|
445+
expect(future.value!).to eq :done
446+
end
447+
end
448+
end
449+
437450
end

0 commit comments

Comments
 (0)