diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 27239c898..e93e703b5 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -22,10 +22,10 @@ class TimerSet # @option opts [object] :executor when provided will run all operations on # this executor rather than the global thread pool (overrides :operation) def initialize(opts = {}) - @queue = PriorityQueue.new(order: :min) - @task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool + @queue = PriorityQueue.new(order: :min) + @task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool @timer_executor = SingleThreadExecutor.new - @condition = Condition.new + @condition = Condition.new init_executor end @@ -64,7 +64,7 @@ def post(intended_time, *args, &task) # For a timer, #kill is like an orderly shutdown, except we need to manually # (and destructively) clear the queue first def kill - @queue.clear + mutex.synchronize { @queue.clear } shutdown end @@ -124,14 +124,13 @@ def shutdown_execution # @!visibility private def process_tasks loop do - break if @queue.empty? - - task = @queue.peek + task = mutex.synchronize { @queue.peek } + break unless task interval = task.time - Time.now.to_f if interval <= 0 @task_executor.post(*task.args, &task.op) - @queue.pop + mutex.synchronize { @queue.pop } else mutex.synchronize do @condition.wait(mutex, [interval, 60].min)