Skip to content

Commit eefb242

Browse files
author
Petr Chalupa
committed
Merge pull request #174 from pitr-ch/pools
Fix leaking threads when pruning the pool
2 parents da27542 + 722e59d commit eefb242

File tree

2 files changed

+32
-28
lines changed

2 files changed

+32
-28
lines changed

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@ class RubyThreadPoolExecutor
1111
include RubyExecutor
1212

1313
# Default maximum number of threads that will be created in the pool.
14-
DEFAULT_MAX_POOL_SIZE = 2**15 # 32768
14+
DEFAULT_MAX_POOL_SIZE = 2**15 # 32768
1515

1616
# Default minimum number of threads that will be retained in the pool.
17-
DEFAULT_MIN_POOL_SIZE = 0
17+
DEFAULT_MIN_POOL_SIZE = 0
1818

1919
# Default maximum number of tasks that may be added to the task queue.
20-
DEFAULT_MAX_QUEUE_SIZE = 0
20+
DEFAULT_MAX_QUEUE_SIZE = 0
2121

2222
# Default maximum number of seconds a thread in the pool may remain idle
2323
# before being reclaimed.
2424
DEFAULT_THREAD_IDLETIMEOUT = 60
2525

2626
# The set of possible overflow policies that may be set at thread pool creation.
27-
OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
27+
OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
2828

2929
# The maximum number of threads that may be created in the pool.
3030
attr_reader :max_length
@@ -77,10 +77,10 @@ class RubyThreadPoolExecutor
7777
#
7878
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
7979
def initialize(opts = {})
80-
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
81-
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
82-
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
83-
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
80+
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
81+
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
82+
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
83+
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
8484
@overflow_policy = opts.fetch(:overflow_policy, :abort)
8585

8686
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
@@ -90,13 +90,13 @@ def initialize(opts = {})
9090

9191
init_executor
9292

93-
@pool = []
94-
@queue = Queue.new
93+
@pool = []
94+
@queue = Queue.new
9595
@scheduled_task_count = 0
9696
@completed_task_count = 0
97-
@largest_length = 0
97+
@largest_length = 0
9898

99-
@gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
99+
@gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
100100
@last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
101101
end
102102

@@ -109,15 +109,16 @@ def can_overflow?
109109
#
110110
# @return [Integer] the length
111111
def length
112-
mutex.synchronize{ running? ? @pool.length : 0 }
112+
mutex.synchronize { running? ? @pool.length : 0 }
113113
end
114+
114115
alias_method :current_length, :length
115116

116117
# The number of tasks in the queue awaiting execution.
117118
#
118119
# @return [Integer] the queue_length
119120
def queue_length
120-
mutex.synchronize{ running? ? @queue.length : 0 }
121+
mutex.synchronize { running? ? @queue.length : 0 }
121122
end
122123

123124
# Number of tasks that may be enqueued before reaching `max_queue` and rejecting
@@ -152,7 +153,7 @@ def on_end_task
152153
def on_worker_exit(worker)
153154
mutex.synchronize do
154155
@pool.delete(worker)
155-
if @pool.empty? && ! running?
156+
if @pool.empty? && !running?
156157
stop_event.set
157158
stopped_event.set
158159
end
@@ -177,7 +178,7 @@ def shutdown_execution
177178
if @pool.empty?
178179
stopped_event.set
179180
else
180-
@pool.length.times{ @queue << :stop }
181+
@pool.length.times { @queue << :stop }
181182
end
182183
end
183184

@@ -196,7 +197,7 @@ def kill_execution
196197
# @!visibility private
197198
def ensure_capacity?
198199
additional = 0
199-
capacity = true
200+
capacity = true
200201

201202
if @pool.size < @min_length
202203
additional = @min_length - @pool.size
@@ -254,10 +255,11 @@ def handle_overflow(*args)
254255
# @!visibility private
255256
def prune_pool
256257
if Time.now.to_f - @gc_interval >= @last_gc_time
257-
@pool.delete_if do |worker|
258-
worker.dead? ||
259-
(@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
260-
end
258+
@pool.delete_if { |worker| worker.dead? }
259+
# send :stop for each thread over idletime
260+
@pool.
261+
select { |worker| @idletime != 0 && Time.now.to_f - @idletime > worker.last_activity }.
262+
each { @queue << :stop }
261263
@last_gc_time = Time.now.to_f
262264
end
263265
end
@@ -266,7 +268,7 @@ def prune_pool
266268
#
267269
# @!visibility private
268270
def drain_pool
269-
@pool.each {|worker| worker.kill }
271+
@pool.each { |worker| worker.kill }
270272
@pool.clear
271273
end
272274

spec/concurrent/executor/ruby_cached_thread_pool_spec.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,19 @@ module Concurrent
2424
subject{ described_class.new(idletime: 1, max_threads: 2, gc_interval: 0) }
2525

2626
it 'removes from pool any thread that has been idle too long' do
27-
subject.instance_variable_set(:@idletime, 1)
2827
latch = Concurrent::CountDownLatch.new(3)
29-
3.times { subject << proc{ sleep(0.1); latch.count_down } }
28+
4.times { subject << proc { sleep 0.1; latch.count_down } }
3029
expect(latch.wait(1)).to be true
3130

31+
subject.instance_variable_set(:@idletime, 1)
32+
33+
sleep 1.5
34+
3235
max_threads = subject.length
33-
sleep(2)
3436

35-
latch = Concurrent::CountDownLatch.new(1)
36-
subject << proc{ latch.count_down }
37-
expect(latch.wait(1)).to be true
37+
subject.send :prune_pool
38+
sleep 0.1
39+
subject.send :prune_pool
3840

3941
expect(subject.length).to be < max_threads
4042
end

0 commit comments

Comments
 (0)