Skip to content

Improve behaviour when posting to a shutdown thread pool #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 7, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/concurrent/executor/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,16 @@ module RubyExecutor
def post(*args, &task)
raise ArgumentError.new('no block given') unless block_given?
mutex.synchronize do
return false unless running?
unless running?
# The executor is shut down - figure out how to reject this task
if self.respond_to?(:handle_overflow, true)
# Reject this task in the same way we'd reject an overflow
return handle_overflow(*args, &task)
else
# No handle_overflow method defined - just return false
return false
end
end
execute(*args, &task)
true
end
Expand Down
44 changes: 24 additions & 20 deletions spec/concurrent/executor/executor_service_shared.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,22 @@
latch = Concurrent::CountDownLatch.new(1)
subject.post{ sleep(1) }
subject.shutdown
subject.post{ latch.count_down }
begin
subject.post{ latch.count_down }
rescue Concurrent::RejectedExecutionError
end
expect(latch.wait(0.1)).to be_falsey
end

it 'returns false while shutting down' do
subject.post{ sleep(1) }
subject.shutdown
expect(subject.post{ nil }).to be_falsey
end

it 'rejects the block once shutdown' do
subject.shutdown
latch = Concurrent::CountDownLatch.new(1)
subject.post{ sleep(1) }
subject.post{ latch.count_down }
begin
subject.post{ latch.count_down }
rescue Concurrent::RejectedExecutionError
end
expect(latch.wait(0.1)).to be_falsey
end

it 'returns false once shutdown' do
subject.post{ nil }
subject.shutdown
sleep(0.1)
expect(subject.post{ nil }).to be_falsey
end
end

context '#running?' do
Expand Down Expand Up @@ -75,7 +67,10 @@
latch2 = Concurrent::CountDownLatch.new(1)
subject.post{ sleep(0.2); latch1.count_down }
subject.shutdown
expect(subject.post{ latch2.count_down }).to be_falsey
begin
expect(subject.post{ latch2.count_down }).to be_falsey
rescue Concurrent::RejectedExecutionError
end
expect(latch1.wait(1)).to be_truthy
expect(latch2.wait(0.2)).to be_falsey
end
Expand Down Expand Up @@ -121,7 +116,10 @@
subject.post{ sleep(0.1); expected.increment }
subject.post{ sleep(0.1); expected.increment }
subject.shutdown
subject.post{ expected.increment }
begin
subject.post{ expected.increment }
rescue Concurrent::RejectedExecutionError
end
subject.wait_for_termination(1)
expect(expected.value).to eq(2)
end
Expand All @@ -135,7 +133,10 @@
subject.post{ sleep(0.1); latch.count_down }
latch.wait(1)
subject.kill
expect(subject.post{ expected.make_true }).to be_falsey
begin
expect(subject.post{ expected.make_true }).to be_falsey
rescue Concurrent::RejectedExecutionError
end
sleep(0.1)
expect(expected.value).to be_falsey
end
Expand All @@ -145,7 +146,10 @@
sleep(0.1)
subject.kill
sleep(0.1)
expect(subject.post{ nil }).to be_falsey
begin
expect(subject.post{ nil }).to be_falsey
rescue Concurrent::RejectedExecutionError
end
end
end

Expand Down
49 changes: 49 additions & 0 deletions spec/concurrent/executor/ruby_thread_pool_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ module Concurrent
}.to raise_error(Concurrent::RejectedExecutionError)
end

specify '#post raises an error when the executor is shutting down' do
expect {
subject.shutdown; subject.post{ sleep(1) }
}.to raise_error(Concurrent::RejectedExecutionError)
end

specify '#<< raises an error when the executor is shutting down' do
expect {
subject.shutdown; subject << proc { sleep(1) }
}.to raise_error(Concurrent::RejectedExecutionError)
end

specify 'a #post task is never executed when the queue is at capacity' do
executed = Concurrent::AtomicFixnum.new(0)
10.times do
Expand Down Expand Up @@ -134,6 +146,29 @@ module Concurrent
sleep(0.1)
expect(executed.value).to be < 1000
end

specify 'a #post task is never executed when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)
subject.shutdown
subject.post{ executed.increment }
sleep(0.1)
expect(executed.value).to be 0
end

specify 'a #<< task is never executed when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)
subject.shutdown
subject << proc { executed.increment }
sleep(0.1)
expect(executed.value).to be 0
end

specify '#post returns false when the executor is shutting down' do
executed = Concurrent::AtomicFixnum.new(0)
subject.shutdown
ret = subject.post{ executed.increment }
expect(ret).to be false
end
end

context ':caller_runs' do
Expand Down Expand Up @@ -165,6 +200,20 @@ module Concurrent
5.times{|i| subject.post{ latch.count_down } }
latch.wait(0.1)
end

specify '#post executes the task on the current thread when the executor is shutting down' do
latch = Concurrent::CountDownLatch.new(1)
subject.shutdown
subject.post{ latch.count_down }
latch.wait(0.1)
end

specify '#<< executes the task on the current thread when the executor is shutting down' do
latch = Concurrent::CountDownLatch.new(1)
subject.shutdown
subject << proc { latch.count_down }
latch.wait(0.1)
end
end
end
end
Expand Down