Skip to content

Commit 9dc888d

Browse files
author
Petr Chalupa
committed
Merge pull request #179 from pitr-ch/pools
Add missing synchronizations to TimerSet
2 parents 3707ebd + 296a567 commit 9dc888d

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

lib/concurrent/executor/timer_set.rb

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ class TimerSet
2222
# @option opts [object] :executor when provided will run all operations on
2323
# this executor rather than the global thread pool (overrides :operation)
2424
def initialize(opts = {})
25-
@queue = PriorityQueue.new(order: :min)
26-
@task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool
25+
@queue = PriorityQueue.new(order: :min)
26+
@task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool
2727
@timer_executor = SingleThreadExecutor.new
28-
@condition = Condition.new
28+
@condition = Condition.new
2929
init_executor
3030
end
3131

@@ -64,7 +64,7 @@ def post(intended_time, *args, &task)
6464
# For a timer, #kill is like an orderly shutdown, except we need to manually
6565
# (and destructively) clear the queue first
6666
def kill
67-
@queue.clear
67+
mutex.synchronize { @queue.clear }
6868
shutdown
6969
end
7070

@@ -124,14 +124,13 @@ def shutdown_execution
124124
# @!visibility private
125125
def process_tasks
126126
loop do
127-
break if @queue.empty?
128-
129-
task = @queue.peek
127+
task = mutex.synchronize { @queue.peek }
128+
break unless task
130129
interval = task.time - Time.now.to_f
131130

132131
if interval <= 0
133132
@task_executor.post(*task.args, &task.op)
134-
@queue.pop
133+
mutex.synchronize { @queue.pop }
135134
else
136135
mutex.synchronize do
137136
@condition.wait(mutex, [interval, 60].min)

0 commit comments

Comments
 (0)