Skip to content

Commit ddc7597

Browse files
committed
Add benchmarks
1 parent e325471 commit ddc7597

File tree

3 files changed

+368
-2
lines changed

3 files changed

+368
-2
lines changed

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ group :development do
66
gem 'rake', '~> 10.4.2'
77
gem 'rake-compiler', '~> 0.9.5'
88
gem 'gem-compiler', '~> 0.3.0'
9+
gem 'benchmark-ips'
910
end
1011

1112
group :testing do

examples/benchmark_pool.rb

Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
require 'thread'
2+
require 'concurrent'
3+
require 'concurrent/logging'
4+
require 'concurrent/utility/monotonic_time'
5+
require 'concurrent/atomic/event'
6+
require 'concurrent/executor/executor'
7+
8+
module Concurrent
9+
10+
# @!visibility private
11+
class RubyThreadPoolWorker
12+
include Logging
13+
14+
# @!visibility private
15+
def initialize(queue, parent)
16+
@queue = queue
17+
@parent = parent
18+
@mutex = Mutex.new
19+
@last_activity = Concurrent.monotonic_time
20+
@thread = nil
21+
end
22+
23+
# @!visibility private
24+
def dead?
25+
return @mutex.synchronize do
26+
@thread.nil? ? false : !@thread.alive?
27+
end
28+
end
29+
30+
# @!visibility private
31+
def last_activity
32+
@mutex.synchronize { @last_activity }
33+
end
34+
35+
def status
36+
@mutex.synchronize do
37+
return 'not running' if @thread.nil?
38+
@thread.status
39+
end
40+
end
41+
42+
# @!visibility private
43+
def kill
44+
@mutex.synchronize do
45+
Thread.kill(@thread) unless @thread.nil?
46+
@thread = nil
47+
end
48+
end
49+
50+
# @!visibility private
51+
def run(thread = Thread.current)
52+
@mutex.synchronize do
53+
raise StandardError.new('already running') unless @thread.nil?
54+
@thread = thread
55+
end
56+
57+
loop do
58+
task = @queue.pop
59+
if task == :stop
60+
@thread = nil
61+
@parent.on_worker_exit(self)
62+
break
63+
end
64+
65+
begin
66+
task.last.call(*task.first)
67+
rescue => ex
68+
# let it fail
69+
log DEBUG, ex
70+
ensure
71+
@last_activity = Concurrent.monotonic_time
72+
@parent.on_end_task
73+
end
74+
end
75+
end
76+
end
77+
78+
class OldRubyThreadPoolExecutor
79+
include RubyExecutor
80+
81+
# Default maximum number of threads that will be created in the pool.
82+
DEFAULT_MAX_POOL_SIZE = 2**15 # 32768
83+
84+
# Default minimum number of threads that will be retained in the pool.
85+
DEFAULT_MIN_POOL_SIZE = 0
86+
87+
# Default maximum number of tasks that may be added to the task queue.
88+
DEFAULT_MAX_QUEUE_SIZE = 0
89+
90+
# Default maximum number of seconds a thread in the pool may remain idle
91+
# before being reclaimed.
92+
DEFAULT_THREAD_IDLETIMEOUT = 60
93+
94+
# The maximum number of threads that may be created in the pool.
95+
attr_reader :max_length
96+
97+
# The minimum number of threads that may be retained in the pool.
98+
attr_reader :min_length
99+
100+
# The largest number of threads that have been created in the pool since construction.
101+
attr_reader :largest_length
102+
103+
# The number of tasks that have been scheduled for execution on the pool since construction.
104+
attr_reader :scheduled_task_count
105+
106+
# The number of tasks that have been completed by the pool since construction.
107+
attr_reader :completed_task_count
108+
109+
# The number of seconds that a thread may be idle before being reclaimed.
110+
attr_reader :idletime
111+
112+
# The maximum number of tasks that may be waiting in the work queue at any one time.
113+
# When the queue size reaches `max_queue` subsequent tasks will be rejected in
114+
# accordance with the configured `fallback_policy`.
115+
attr_reader :max_queue
116+
117+
# Create a new thread pool.
118+
#
119+
# @param [Hash] opts the options which configure the thread pool
120+
#
121+
# @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
122+
# number of threads to be created
123+
# @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) the minimum
124+
# number of threads to be retained
125+
# @option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT) the maximum
126+
# number of seconds a thread may be idle before being reclaimed
127+
# @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
128+
# number of tasks allowed in the work queue at any one time; a value of
129+
# zero means the queue may grow without bound
130+
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
131+
# tasks that are received when the queue size has reached
132+
# `max_queue` or the executor has shut down
133+
#
134+
# @raise [ArgumentError] if `:max_threads` is less than one
135+
# @raise [ArgumentError] if `:min_threads` is less than zero
136+
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
137+
# in `FALLBACK_POLICIES`
138+
#
139+
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
140+
def initialize(opts = {})
141+
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
142+
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
143+
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
144+
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
145+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
146+
warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
147+
148+
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
149+
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
150+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
151+
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
152+
153+
init_executor
154+
enable_at_exit_handler!(opts)
155+
156+
@pool = []
157+
@queue = Queue.new
158+
@scheduled_task_count = 0
159+
@completed_task_count = 0
160+
@largest_length = 0
161+
162+
@gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
163+
@last_gc_time = Concurrent.monotonic_time - [1.0, (@gc_interval * 2.0)].max
164+
end
165+
166+
# @!macro executor_module_method_can_overflow_question
167+
def can_overflow?
168+
@max_queue != 0
169+
end
170+
171+
# The number of threads currently in the pool.
172+
#
173+
# @return [Integer] the length
174+
def length
175+
mutex.synchronize { running? ? @pool.length : 0 }
176+
end
177+
178+
alias_method :current_length, :length
179+
180+
# The number of tasks in the queue awaiting execution.
181+
#
182+
# @return [Integer] the queue_length
183+
def queue_length
184+
mutex.synchronize { running? ? @queue.length : 0 }
185+
end
186+
187+
# Number of tasks that may be enqueued before reaching `max_queue` and rejecting
188+
# new tasks. A value of -1 indicates that the queue may grow without bound.
189+
#
190+
# @return [Integer] the remaining_capacity
191+
def remaining_capacity
192+
mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
193+
end
194+
195+
# Returns an array with the status of each thread in the pool
196+
#
197+
# This method is deprecated and will be removed soon.
198+
def status
199+
warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
200+
mutex.synchronize { @pool.collect { |worker| worker.status } }
201+
end
202+
203+
# Run on task completion.
204+
#
205+
# @!visibility private
206+
def on_end_task
207+
mutex.synchronize do
208+
@completed_task_count += 1 #if success
209+
break unless running?
210+
end
211+
end
212+
213+
# Run when a thread worker exits.
214+
#
215+
# @!visibility private
216+
def on_worker_exit(worker)
217+
mutex.synchronize do
218+
@pool.delete(worker)
219+
if @pool.empty? && !running?
220+
stop_event.set
221+
stopped_event.set
222+
end
223+
end
224+
end
225+
226+
protected
227+
228+
# @!visibility private
229+
def execute(*args, &task)
230+
if ensure_capacity?
231+
@scheduled_task_count += 1
232+
@queue << [args, task]
233+
else
234+
handle_fallback(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
235+
end
236+
prune_pool
237+
end
238+
239+
# @!visibility private
240+
def shutdown_execution
241+
if @pool.empty?
242+
stopped_event.set
243+
else
244+
@pool.length.times { @queue << :stop }
245+
end
246+
end
247+
248+
# @!visibility private
249+
def kill_execution
250+
@queue.clear
251+
drain_pool
252+
end
253+
254+
# Check the thread pool configuration and determine if the pool
255+
# has enought capacity to handle the request. Will grow the size
256+
# of the pool if necessary.
257+
#
258+
# @return [Boolean] true if the pool has enough capacity else false
259+
#
260+
# @!visibility private
261+
def ensure_capacity?
262+
additional = 0
263+
capacity = true
264+
265+
if @pool.size < @min_length
266+
additional = @min_length - @pool.size
267+
elsif @queue.empty? && @queue.num_waiting >= 1
268+
additional = 0
269+
elsif @pool.size == 0 && @min_length == 0
270+
additional = 1
271+
elsif @pool.size < @max_length || @max_length == 0
272+
additional = 1
273+
elsif @max_queue == 0 || @queue.size < @max_queue
274+
additional = 0
275+
else
276+
capacity = false
277+
end
278+
279+
# puts format('pool %3d queue %3d waiting %3d additional %3d capacity %s', @pool.size, @queue.size, @queue.num_waiting, additional, capacity.to_s)
280+
281+
additional.times do
282+
@pool << create_worker_thread
283+
end
284+
285+
if additional > 0
286+
@largest_length = [@largest_length, @pool.length].max
287+
end
288+
289+
capacity
290+
end
291+
292+
# Scan all threads in the pool and reclaim any that are dead or
293+
# have been idle too long. Will check the last time the pool was
294+
# pruned and only run if the configured garbage collection
295+
# interval has passed.
296+
#
297+
# @!visibility private
298+
def prune_pool
299+
if Concurrent.monotonic_time - @gc_interval >= @last_gc_time
300+
@pool.delete_if { |worker| worker.dead? }
301+
# send :stop for each thread over idletime
302+
@pool.
303+
select { |worker| @idletime != 0 && Concurrent.monotonic_time - @idletime > worker.last_activity }.
304+
each { @queue << :stop }
305+
@last_gc_time = Concurrent.monotonic_time
306+
end
307+
end
308+
309+
# Reclaim all threads in the pool.
310+
#
311+
# @!visibility private
312+
def drain_pool
313+
@pool.each { |worker| worker.kill }
314+
@pool.clear
315+
end
316+
317+
# Create a single worker thread to be added to the pool.
318+
#
319+
# @return [Thread] the new thread.
320+
#
321+
# @!visibility private
322+
def create_worker_thread
323+
wrkr = RubyThreadPoolWorker.new(@queue, self)
324+
Thread.new(wrkr, self) do |worker, parent|
325+
Thread.current.abort_on_exception = false
326+
worker.run
327+
parent.on_worker_exit(worker)
328+
end
329+
return wrkr
330+
end
331+
end
332+
end
333+
334+
require 'benchmark/ips'
335+
336+
Benchmark.ips do |x|
337+
x.time = 10
338+
x.warmup = 10
339+
340+
configuration = { min_threads: 2,
341+
max_threads: 8,
342+
stop_on_exit: false,
343+
idletime: 0.1, # 1 minute
344+
max_queue: 0, # unlimited
345+
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
346+
gc_interval: 0.1 }
347+
348+
pools = { old: Concurrent::OldRubyThreadPoolExecutor.new(configuration),
349+
new: Concurrent::RubyThreadPoolExecutor.new(configuration) }
350+
pools.update java: Concurrent::JavaThreadPoolExecutor.new(configuration) if RUBY_ENGINE == 'jruby'
351+
352+
pools.each do |name, pool|
353+
x.report(name.to_s) do
354+
count = Concurrent::CountDownLatch.new(100)
355+
100.times do
356+
pool.post { count.count_down }
357+
end
358+
count.wait
359+
end
360+
end
361+
362+
x.compare!
363+
end
364+
365+

spec/concurrent/executor/ruby_cached_thread_pool_spec.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,14 @@ module Concurrent
7373

7474
context 'stress' do
7575
configurations = [
76-
{ min_threads: [2, Concurrent.processor_count].max,
76+
{ min_threads: 2,
7777
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
7878
stop_on_exit: false,
7979
idletime: 0.1, # 1 minute
8080
max_queue: 0, # unlimited
8181
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
8282
gc_interval: 0.1 },
83-
{ min_threads: [2, Concurrent.processor_count].max,
83+
{ min_threads: 2,
8484
max_threads: 4,
8585
stop_on_exit: false,
8686
idletime: 0.1, # 1 minute

0 commit comments

Comments
 (0)