Skip to content

Commit 24336f5

Browse files
committed
Merge pull request #326 from ruby-concurrency/mri-max-threads
Prevent Ruby thread pools from creating too many threads.
2 parents dcac23a + e005f53 commit 24336f5

File tree

8 files changed

+69
-40
lines changed

8 files changed

+69
-40
lines changed

examples/stress_ruby_thread_pool.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/usr/bin/env ruby
2+
3+
$: << File.expand_path('../../lib', __FILE__)
4+
5+
require 'benchmark'
6+
require 'concurrent/executors'
7+
8+
COUNT = 100_000
9+
10+
executor = Concurrent::CachedThreadPool.new
11+
latch = Concurrent::CountDownLatch.new
12+
13+
COUNT.times { executor.post{ nil } }
14+
15+
#COUNT.times do |i|
16+
# executor.post{ nil }
17+
# sleep(0.01) if i % 1000 == 0
18+
#end
19+
20+
executor.post{ latch.count_down }
21+
latch.wait
22+
23+
puts "Max length: #{executor.max_length}" if executor.respond_to?(:max_length)
24+
puts "Largest length: #{executor.largest_length}" if executor.respond_to?(:largest_length)
25+
puts "Scheduled task count: #{executor.scheduled_task_count}" if executor.respond_to?(:scheduled_task_count)
26+
puts "Completed task count: #{executor.completed_task_count}" if executor.respond_to?(:completed_task_count)

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,21 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1717
#
1818
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
1919
def initialize(opts = {})
20-
super(opts)
20+
defaults = { idletime: DEFAULT_THREAD_IDLETIMEOUT }
21+
overrides = { min_threads: 0,
22+
max_threads: DEFAULT_MAX_POOL_SIZE,
23+
max_queue: 0 }
24+
super(defaults.merge(opts).merge(overrides))
2125
end
2226

2327
protected
2428

2529
def ns_initialize(opts)
26-
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
27-
deprecated ':overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
30+
super(opts)
2831
@max_queue = 0
29-
30-
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.keys.include?(@fallback_policy)
31-
3232
@executor = java.util.concurrent.Executors.newCachedThreadPool
3333
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
3434
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
35-
3635
self.auto_terminate = opts.fetch(:auto_terminate, true)
3736
end
3837
end

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor
1818
#
1919
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
2020
def initialize(num_threads, opts = {})
21-
22-
opts = {
23-
min_threads: num_threads,
24-
max_threads: num_threads
25-
}.merge(opts)
26-
super(opts)
21+
raise ArgumentError.new('number of threads must be greater than zero') if num_threads.to_i < 1
22+
defaults = { max_queue: DEFAULT_MAX_QUEUE_SIZE,
23+
idletime: DEFAULT_THREAD_IDLETIMEOUT }
24+
overrides = { min_threads: num_threads,
25+
max_threads: num_threads }
26+
super(defaults.merge(opts).merge(overrides))
2727
end
2828
end
2929
end

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,10 @@ def ns_initialize(opts)
141141
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
142142
deprecated ' :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
143143

144-
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
145-
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
146-
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
144+
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
145+
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
146+
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
147+
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
147148
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
148149

149150
if @max_queue == 0

lib/concurrent/executor/ruby_cached_thread_pool.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,10 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
1313
#
1414
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1515
def initialize(opts = {})
16-
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
17-
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
18-
1916
defaults = { idletime: DEFAULT_THREAD_IDLETIMEOUT }
2017
overrides = { min_threads: 0,
2118
max_threads: DEFAULT_MAX_POOL_SIZE,
22-
fallback_policy: fallback_policy,
2319
max_queue: DEFAULT_MAX_QUEUE_SIZE }
24-
2520
super(defaults.merge(opts).merge(overrides))
2621
end
2722
end

lib/concurrent/executor/ruby_fixed_thread_pool.rb

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,12 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor
1515
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
1616
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1717
def initialize(num_threads, opts = {})
18-
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
19-
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
20-
21-
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
22-
23-
opts = {
24-
min_threads: num_threads,
25-
max_threads: num_threads,
26-
fallback_policy: fallback_policy,
27-
max_queue: DEFAULT_MAX_QUEUE_SIZE,
28-
idletime: DEFAULT_THREAD_IDLETIMEOUT,
29-
}.merge(opts)
30-
super(opts)
18+
raise ArgumentError.new('number of threads must be greater than zero') if num_threads.to_i < 1
19+
defaults = { max_queue: DEFAULT_MAX_QUEUE_SIZE,
20+
idletime: DEFAULT_THREAD_IDLETIMEOUT }
21+
overrides = { min_threads: num_threads,
22+
max_threads: num_threads }
23+
super(defaults.merge(opts).merge(overrides))
3124
end
3225
end
3326
end

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module Concurrent
1212
class RubyThreadPoolExecutor < RubyExecutorService
1313

1414
# Default maximum number of threads that will be created in the pool.
15-
DEFAULT_MAX_POOL_SIZE = 2**13 # 8192
15+
DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
1616

1717
# Default minimum number of threads that will be retained in the pool.
1818
DEFAULT_MIN_POOL_SIZE = 0
@@ -139,9 +139,10 @@ def ns_initialize(opts)
139139
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
140140
deprecated ':overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
141141

142-
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
143-
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
144-
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
142+
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE
143+
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE
144+
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
145+
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
145146

146147
self.auto_terminate = opts.fetch(:auto_terminate, true)
147148

@@ -216,6 +217,9 @@ def ns_assign_worker(*args, &task)
216217
else
217218
false
218219
end
220+
rescue ThreadError
221+
# Raised when the operating system refuses to create the new thread
222+
return false
219223
end
220224

221225
# tries to enqueue task

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,27 @@
6060
end
6161
end
6262

63+
it 'raises an exception if :max_threads is less than zero' do
64+
expect {
65+
described_class.new(max_threads: -1)
66+
}.to raise_error(ArgumentError)
67+
end
6368

6469
it 'raises an exception if :min_threads is less than zero' do
6570
expect {
6671
described_class.new(min_threads: -1)
6772
}.to raise_error(ArgumentError)
6873
end
6974

70-
it 'raises an exception if :max_threads is not greater than zero' do
75+
it 'raises an exception if :max_threads greater than the max allowable' do
76+
expect {
77+
described_class.new(max_threads: described_class::DEFAULT_MAX_POOL_SIZE+1)
78+
}.to raise_error(ArgumentError)
79+
end
80+
81+
it 'raises an exception if :max_threads is less than :min_threads' do
7182
expect {
72-
described_class.new(max_threads: 0)
83+
described_class.new(max_threads: 1, min_threads: 100)
7384
}.to raise_error(ArgumentError)
7485
end
7586

0 commit comments

Comments
 (0)