Skip to content

Commit 1f3a128

Browse files
committed
Add AtExit hook manager
executors' auto_termination feature is migrated
1 parent 5495f9d commit 1f3a128

File tree

6 files changed

+133
-67
lines changed

6 files changed

+133
-67
lines changed

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'concurrent/version'
22

33
require 'concurrent/synchronization'
4+
require 'concurrent/at_exit'
45

56
require 'concurrent/configuration'
67

lib/concurrent/at_exit.rb

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
require 'concurrent/logging'
2+
require 'concurrent/synchronization'
3+
4+
module Concurrent
5+
6+
# Provides ability to add and remove hooks to be run at `Kernel#at_exit`, order is undefined.
7+
# Each hook is executed at most once.
8+
class AtExitImplementation < Synchronization::Object
9+
include Logging
10+
11+
def initialize(enabled = true)
12+
super()
13+
synchronize do
14+
@hooks = {}
15+
@enabled = enabled
16+
end
17+
end
18+
19+
# Add a hook to be run at `Kernel#at_exit`
20+
# @param [Object] hook_id optionally provide an id, if allready present, hook is replaced
21+
# @yield the hook
22+
# @return id of the hook
23+
def add(hook_id = nil, &hook)
24+
id = hook_id || hook.object_id
25+
synchronize { @hooks[id] = hook }
26+
id
27+
end
28+
29+
# Delete a hook by hook_id
30+
# @return [true, false]
31+
def delete(hook_id)
32+
!!synchronize { @hooks.delete hook_id }
33+
end
34+
35+
# Is hook with hook_id rpesent?
36+
# @return [true, false]
37+
def hook?(hook_id)
38+
synchronize { @hooks.key? hook_id }
39+
end
40+
41+
# @return copy of the hooks
42+
def hooks
43+
synchronize { @hooks }.clone
44+
end
45+
46+
# install `Kernel#at_exit` callback to execute added hooks
47+
def install
48+
synchronize do
49+
@installed ||= begin
50+
at_exit { runner }
51+
true
52+
end
53+
self
54+
end
55+
end
56+
57+
# Will it run during `Kernel#at_exit`
58+
def enabled?
59+
synchronize { @enabled }
60+
end
61+
62+
# Configure if it runs during `Kernel#at_exit`
63+
def enabled=(value)
64+
synchronize { @enabled = value }
65+
end
66+
67+
# run the hooks manually
68+
# @return ids of the hooks
69+
def run
70+
hooks, _ = synchronize { hooks, @hooks = @hooks, {} }
71+
hooks.each do |_, hook|
72+
begin
73+
hook.call
74+
rescue => error
75+
log ERROR, error
76+
end
77+
end
78+
hooks.keys
79+
end
80+
81+
private
82+
83+
def runner
84+
run if synchronize { @enabled }
85+
end
86+
end
87+
88+
private_constant :AtExitImplementation
89+
90+
# @see AtExitImplementation
91+
AtExit = AtExitImplementation.new.install
92+
end

lib/concurrent/configuration.rb

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'thread'
22
require 'concurrent/atomics'
33
require 'concurrent/errors'
4+
require 'concurrent/at_exit'
45
require 'concurrent/executors'
56
require 'concurrent/utility/processor_count'
67

@@ -39,11 +40,11 @@ def self.global_logger=(value)
3940
GLOBAL_LOGGER.value = value
4041
end
4142

42-
# Disables pool auto-termination which is called on `at_exit` callback.
43+
# Disables AtExit hooks including pool auto-termination hooks.
4344
# When disabled it will be the application
44-
# programmer's responsibility to ensure that the thread pools
45+
# programmer's responsibility to ensure that the hooks
4546
# are shutdown properly prior to application exit
46-
# by calling {.terminate_pools!} method.
47+
# by calling {AtExit.run} method.
4748
#
4849
# @note this option should be needed only because of `at_exit` ordering
4950
# issues which may arise when running some of the testing frameworks.
@@ -53,20 +54,28 @@ def self.global_logger=(value)
5354
# @note This method should *never* be called
5455
# from within a gem. It should *only* be used from within the main
5556
# application and even then it should be used only when necessary.
57+
# @see AtExit
58+
def self.disable_at_exit_hooks!
59+
AtExit.enabled = false
60+
end
61+
5662
def self.disable_executor_auto_termination!
57-
Executor::AT_EXIT_AUTO_TERMINATION.enabled = false
63+
warn '[DEPRECATED] Use Concurrent.disable_at_exit_hooks! instead'
64+
disable_at_exit_hooks!
5865
end
5966

6067
# @return [true,false]
6168
# @see .disable_executor_auto_termination!
6269
def self.disable_executor_auto_termination?
63-
Executor::AT_EXIT_AUTO_TERMINATION.enabled?
70+
warn '[DEPRECATED] Use Concurrent::AtExit.enabled? instead'
71+
AtExit.enabled?
6472
end
6573

6674
# terminates all pools and blocks until they are terminated
6775
# @see .disable_executor_auto_termination!
6876
def self.terminate_pools!
69-
Executor::AT_EXIT_AUTO_TERMINATION.terminate
77+
warn '[DEPRECATED] Use Concurrent::AtExit.run instead'
78+
AtExit.run
7079
end
7180

7281
# Global thread pool optimized for short, fast *operations*.

lib/concurrent/executor/executor.rb

Lines changed: 17 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,11 @@
11
require 'concurrent/errors'
22
require 'concurrent/logging'
3+
require 'concurrent/at_exit'
34
require 'concurrent/atomic/event'
45

56
module Concurrent
67

78
module Executor
8-
class AtExitAutoTermination
9-
def initialize(enabled = true)
10-
@mutex = Mutex.new
11-
@executors = Set.new
12-
@enabled = enabled
13-
@installed = false
14-
end
15-
16-
def add(executor)
17-
@mutex.synchronize { @executors.add executor }
18-
end
19-
20-
def remove(executor)
21-
@mutex.synchronize { @executors.delete executor }
22-
end
23-
24-
def enabled?
25-
@mutex.synchronize { @enabled }
26-
end
27-
28-
def enabled=(value)
29-
@mutex.synchronize { @enabled = value }
30-
end
31-
32-
def terminate
33-
executors = @mutex.synchronize { @executors.to_a if @enabled }
34-
if executors
35-
executors.each(&:kill) # TODO be gentle first
36-
executors.each(&:wait_for_termination)
37-
executors.clear
38-
true
39-
else
40-
false
41-
end
42-
end
43-
44-
def install
45-
@installed ||= begin
46-
at_exit { terminate }
47-
true
48-
end
49-
self
50-
end
51-
52-
def executors
53-
@mutex.synchronize { @executors.to_a }
54-
end
55-
end
56-
57-
AT_EXIT_AUTO_TERMINATION = AtExitAutoTermination.new.install
58-
599
# The policy defining how rejected tasks (tasks received once the
6010
# queue size reaches the configured `max_queue`, or after the
6111
# executor has shut down) are handled. Must be one of the values
@@ -126,13 +76,25 @@ def serialized?
12676
private
12777

12878
def ns_auto_terminate?
129-
@auto_terminate
79+
!!@auto_terminate
13080
end
13181

13282
def ns_auto_terminate=(value)
133-
AT_EXIT_AUTO_TERMINATION.add self if value == true
134-
AT_EXIT_AUTO_TERMINATION.remove self if value == false
135-
@auto_terminate = value
83+
case value
84+
when true
85+
AtExit.add(self) { terminate_at_exit }
86+
@auto_terminate = true
87+
when false
88+
AtExit.delete(self)
89+
@auto_terminate = false
90+
else
91+
raise ArgumentError
92+
end
93+
end
94+
95+
def terminate_at_exit
96+
kill # TODO be gentle first
97+
wait_for_termination(10)
13698
end
13799
end
138100

lib/concurrent/executor/serialized_execution.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ def work(job)
9898
job = @stash.shift || (@being_executed = false)
9999
end
100100

101+
# TODO maybe be able to tell caching pool to just enqueue this job, because the current one end at the end
102+
# of this block
101103
call_job job if job
102104
end
103105
end

spec/concurrent/executor/ruby_cached_thread_pool_spec.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ module Concurrent
3535
it 'deals with dead threads' do
3636
expect(subject).to receive(:ns_worker_died).exactly(5).times.and_call_original
3737

38-
5.times { subject.post { sleep 0.1 } }
39-
worker_ids = subject.instance_variable_get(:@pool).map(&:object_id)
40-
5.times { subject.post { sleep 0.1; raise Exception } }
41-
sleep(0.1)
38+
dead_threads_queue = Queue.new
39+
5.times { subject.post { sleep 0.1; dead_threads_queue.push Thread.current; raise Exception } }
40+
sleep(0.2)
4241
latch = Concurrent::CountDownLatch.new(5)
4342
5.times { subject.post { sleep 0.1; latch.count_down } }
44-
# processes
4543
expect(latch.wait(1)).to be true
4644

47-
expect(worker_ids - subject.instance_variable_get(:@pool).map(&:object_id)).not_to be_empty # some were replaced
45+
dead_threads = []
46+
dead_threads << dead_threads_queue.pop until dead_threads_queue.empty?
47+
expect(dead_threads.all? { |t| !t.alive? }).to be true
4848
end
4949
end
5050

0 commit comments

Comments
 (0)