diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index 4b64f68ca..ce82e0c4c 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -78,7 +78,16 @@ module RubyExecutor def post(*args, &task) raise ArgumentError.new('no block given') unless block_given? mutex.synchronize do - return false unless running? + unless running? + # The executor is shut down - figure out how to reject this task + if self.respond_to?(:handle_overflow, true) + # Reject this task in the same way we'd reject an overflow + return handle_overflow(*args, &task) + else + # No handle_overflow method defined - just return false + return false + end + end execute(*args, &task) true end diff --git a/spec/concurrent/executor/executor_service_shared.rb b/spec/concurrent/executor/executor_service_shared.rb index 44a28eb02..55b737d38 100644 --- a/spec/concurrent/executor/executor_service_shared.rb +++ b/spec/concurrent/executor/executor_service_shared.rb @@ -16,30 +16,22 @@ latch = Concurrent::CountDownLatch.new(1) subject.post{ sleep(1) } subject.shutdown - subject.post{ latch.count_down } + begin + subject.post{ latch.count_down } + rescue Concurrent::RejectedExecutionError + end expect(latch.wait(0.1)).to be_falsey end - it 'returns false while shutting down' do - subject.post{ sleep(1) } - subject.shutdown - expect(subject.post{ nil }).to be_falsey - end - it 'rejects the block once shutdown' do subject.shutdown latch = Concurrent::CountDownLatch.new(1) - subject.post{ sleep(1) } - subject.post{ latch.count_down } + begin + subject.post{ latch.count_down } + rescue Concurrent::RejectedExecutionError + end expect(latch.wait(0.1)).to be_falsey end - - it 'returns false once shutdown' do - subject.post{ nil } - subject.shutdown - sleep(0.1) - expect(subject.post{ nil }).to be_falsey - end end context '#running?' do @@ -75,7 +67,10 @@ latch2 = Concurrent::CountDownLatch.new(1) subject.post{ sleep(0.2); latch1.count_down } subject.shutdown - expect(subject.post{ latch2.count_down }).to be_falsey + begin + expect(subject.post{ latch2.count_down }).to be_falsey + rescue Concurrent::RejectedExecutionError + end expect(latch1.wait(1)).to be_truthy expect(latch2.wait(0.2)).to be_falsey end @@ -121,7 +116,10 @@ subject.post{ sleep(0.1); expected.increment } subject.post{ sleep(0.1); expected.increment } subject.shutdown - subject.post{ expected.increment } + begin + subject.post{ expected.increment } + rescue Concurrent::RejectedExecutionError + end subject.wait_for_termination(1) expect(expected.value).to eq(2) end @@ -135,7 +133,10 @@ subject.post{ sleep(0.1); latch.count_down } latch.wait(1) subject.kill - expect(subject.post{ expected.make_true }).to be_falsey + begin + expect(subject.post{ expected.make_true }).to be_falsey + rescue Concurrent::RejectedExecutionError + end sleep(0.1) expect(expected.value).to be_falsey end @@ -145,7 +146,10 @@ sleep(0.1) subject.kill sleep(0.1) - expect(subject.post{ nil }).to be_falsey + begin + expect(subject.post{ nil }).to be_falsey + rescue Concurrent::RejectedExecutionError + end end end diff --git a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb index 22aac73d1..7ec824897 100644 --- a/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb +++ b/spec/concurrent/executor/ruby_thread_pool_executor_spec.rb @@ -80,6 +80,18 @@ module Concurrent }.to raise_error(Concurrent::RejectedExecutionError) end + specify '#post raises an error when the executor is shutting down' do + expect { + subject.shutdown; subject.post{ sleep(1) } + }.to raise_error(Concurrent::RejectedExecutionError) + end + + specify '#<< raises an error when the executor is shutting down' do + expect { + subject.shutdown; subject << proc { sleep(1) } + }.to raise_error(Concurrent::RejectedExecutionError) + end + specify 'a #post task is never executed when the queue is at capacity' do executed = Concurrent::AtomicFixnum.new(0) 10.times do @@ -134,6 +146,29 @@ module Concurrent sleep(0.1) expect(executed.value).to be < 1000 end + + specify 'a #post task is never executed when the executor is shutting down' do + executed = Concurrent::AtomicFixnum.new(0) + subject.shutdown + subject.post{ executed.increment } + sleep(0.1) + expect(executed.value).to be 0 + end + + specify 'a #<< task is never executed when the executor is shutting down' do + executed = Concurrent::AtomicFixnum.new(0) + subject.shutdown + subject << proc { executed.increment } + sleep(0.1) + expect(executed.value).to be 0 + end + + specify '#post returns false when the executor is shutting down' do + executed = Concurrent::AtomicFixnum.new(0) + subject.shutdown + ret = subject.post{ executed.increment } + expect(ret).to be false + end end context ':caller_runs' do @@ -165,6 +200,20 @@ module Concurrent 5.times{|i| subject.post{ latch.count_down } } latch.wait(0.1) end + + specify '#post executes the task on the current thread when the executor is shutting down' do + latch = Concurrent::CountDownLatch.new(1) + subject.shutdown + subject.post{ latch.count_down } + latch.wait(0.1) + end + + specify '#<< executes the task on the current thread when the executor is shutting down' do + latch = Concurrent::CountDownLatch.new(1) + subject.shutdown + subject << proc { latch.count_down } + latch.wait(0.1) + end end end end