diff --git a/.travis.yml b/.travis.yml index e1abf724a..ca43ce242 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ rvm: - 2.0.0 - 1.9.3 - ruby-head - - jruby-1.7.18 + - jruby-1.7.19 - jruby-head - rbx-2 diff --git a/Gemfile b/Gemfile index 1489b8380..f86c8fbd4 100644 --- a/Gemfile +++ b/Gemfile @@ -6,6 +6,7 @@ group :development do gem 'rake', '~> 10.4.2' gem 'rake-compiler', '~> 0.9.5' gem 'gem-compiler', '~> 0.3.0' + gem 'benchmark-ips' end group :testing do diff --git a/examples/benchmark_pool.rb b/examples/benchmark_pool.rb new file mode 100644 index 000000000..38b5d9355 --- /dev/null +++ b/examples/benchmark_pool.rb @@ -0,0 +1,369 @@ +require 'thread' +require 'concurrent' +require 'concurrent/logging' +require 'concurrent/utility/monotonic_time' +require 'concurrent/atomic/event' +require 'concurrent/executor/executor' + +module Concurrent + + # @!visibility private + class RubyThreadPoolWorker + include Logging + + # @!visibility private + def initialize(queue, parent) + @queue = queue + @parent = parent + @mutex = Mutex.new + @last_activity = Concurrent.monotonic_time + @thread = nil + end + + # @!visibility private + def dead? + return @mutex.synchronize do + @thread.nil? ? false : !@thread.alive? + end + end + + # @!visibility private + def last_activity + @mutex.synchronize { @last_activity } + end + + def status + @mutex.synchronize do + return 'not running' if @thread.nil? + @thread.status + end + end + + # @!visibility private + def kill + @mutex.synchronize do + Thread.kill(@thread) unless @thread.nil? + @thread = nil + end + end + + # @!visibility private + def run(thread = Thread.current) + @mutex.synchronize do + raise StandardError.new('already running') unless @thread.nil? + @thread = thread + end + + loop do + task = @queue.pop + if task == :stop + @thread = nil + @parent.on_worker_exit(self) + break + end + + begin + task.last.call(*task.first) + rescue => ex + # let it fail + log DEBUG, ex + ensure + @last_activity = Concurrent.monotonic_time + @parent.on_end_task + end + end + end + end + + class OldRubyThreadPoolExecutor + include RubyExecutor + + # Default maximum number of threads that will be created in the pool. + DEFAULT_MAX_POOL_SIZE = 2**15 # 32768 + + # Default minimum number of threads that will be retained in the pool. + DEFAULT_MIN_POOL_SIZE = 0 + + # Default maximum number of tasks that may be added to the task queue. + DEFAULT_MAX_QUEUE_SIZE = 0 + + # Default maximum number of seconds a thread in the pool may remain idle + # before being reclaimed. + DEFAULT_THREAD_IDLETIMEOUT = 60 + + # The maximum number of threads that may be created in the pool. + attr_reader :max_length + + # The minimum number of threads that may be retained in the pool. + attr_reader :min_length + + # The largest number of threads that have been created in the pool since construction. + attr_reader :largest_length + + # The number of tasks that have been scheduled for execution on the pool since construction. + attr_reader :scheduled_task_count + + # The number of tasks that have been completed by the pool since construction. + attr_reader :completed_task_count + + # The number of seconds that a thread may be idle before being reclaimed. + attr_reader :idletime + + # The maximum number of tasks that may be waiting in the work queue at any one time. + # When the queue size reaches `max_queue` subsequent tasks will be rejected in + # accordance with the configured `fallback_policy`. + attr_reader :max_queue + + # Create a new thread pool. + # + # @param [Hash] opts the options which configure the thread pool + # + # @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum + # number of threads to be created + # @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) the minimum + # number of threads to be retained + # @option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT) the maximum + # number of seconds a thread may be idle before being reclaimed + # @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum + # number of tasks allowed in the work queue at any one time; a value of + # zero means the queue may grow without bound + # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new + # tasks that are received when the queue size has reached + # `max_queue` or the executor has shut down + # + # @raise [ArgumentError] if `:max_threads` is less than one + # @raise [ArgumentError] if `:min_threads` is less than zero + # @raise [ArgumentError] if `:fallback_policy` is not one of the values specified + # in `FALLBACK_POLICIES` + # + # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html + def initialize(opts = {}) + @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i + @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i + @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i + @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i + @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort)) + warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy) + + raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0 + raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0 + raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) + raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length + + init_executor + self.auto_terminate = opts.fetch(:auto_terminate, true) + + @pool = [] + @queue = Queue.new + @scheduled_task_count = 0 + @completed_task_count = 0 + @largest_length = 0 + + @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented + @last_gc_time = Concurrent.monotonic_time - [1.0, (@gc_interval * 2.0)].max + end + + # @!macro executor_module_method_can_overflow_question + def can_overflow? + @max_queue != 0 + end + + # The number of threads currently in the pool. + # + # @return [Integer] the length + def length + mutex.synchronize { running? ? @pool.length : 0 } + end + + alias_method :current_length, :length + + # The number of tasks in the queue awaiting execution. + # + # @return [Integer] the queue_length + def queue_length + mutex.synchronize { running? ? @queue.length : 0 } + end + + # Number of tasks that may be enqueued before reaching `max_queue` and rejecting + # new tasks. A value of -1 indicates that the queue may grow without bound. + # + # @return [Integer] the remaining_capacity + def remaining_capacity + mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length } + end + + # Returns an array with the status of each thread in the pool + # + # This method is deprecated and will be removed soon. + def status + warn '[DEPRECATED] `status` is deprecated and will be removed soon.' + mutex.synchronize { @pool.collect { |worker| worker.status } } + end + + # Run on task completion. + # + # @!visibility private + def on_end_task + mutex.synchronize do + @completed_task_count += 1 #if success + break unless running? + end + end + + # Run when a thread worker exits. + # + # @!visibility private + def on_worker_exit(worker) + mutex.synchronize do + @pool.delete(worker) + if @pool.empty? && !running? + stop_event.set + stopped_event.set + end + end + end + + protected + + # @!visibility private + def execute(*args, &task) + if ensure_capacity? + @scheduled_task_count += 1 + @queue << [args, task] + else + handle_fallback(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue + end + prune_pool + end + + # @!visibility private + def shutdown_execution + if @pool.empty? + stopped_event.set + else + @pool.length.times { @queue << :stop } + end + end + + # @!visibility private + def kill_execution + @queue.clear + drain_pool + end + + # Check the thread pool configuration and determine if the pool + # has enought capacity to handle the request. Will grow the size + # of the pool if necessary. + # + # @return [Boolean] true if the pool has enough capacity else false + # + # @!visibility private + def ensure_capacity? + additional = 0 + capacity = true + + if @pool.size < @min_length + additional = @min_length - @pool.size + elsif @queue.empty? && @queue.num_waiting >= 1 + additional = 0 + elsif @pool.size == 0 && @min_length == 0 + additional = 1 + elsif @pool.size < @max_length || @max_length == 0 + additional = 1 + elsif @max_queue == 0 || @queue.size < @max_queue + additional = 0 + else + capacity = false + end + + # puts format('pool %3d queue %3d waiting %3d additional %3d capacity %s', @pool.size, @queue.size, @queue.num_waiting, additional, capacity.to_s) + + additional.times do + @pool << create_worker_thread + end + + if additional > 0 + @largest_length = [@largest_length, @pool.length].max + end + + capacity + end + + # Scan all threads in the pool and reclaim any that are dead or + # have been idle too long. Will check the last time the pool was + # pruned and only run if the configured garbage collection + # interval has passed. + # + # @!visibility private + def prune_pool + if Concurrent.monotonic_time - @gc_interval >= @last_gc_time + @pool.delete_if { |worker| worker.dead? } + # send :stop for each thread over idletime + @pool. + select { |worker| @idletime != 0 && Concurrent.monotonic_time - @idletime > worker.last_activity }. + each { @queue << :stop } + @last_gc_time = Concurrent.monotonic_time + end + end + + # Reclaim all threads in the pool. + # + # @!visibility private + def drain_pool + @pool.each { |worker| worker.kill } + @pool.clear + end + + # Create a single worker thread to be added to the pool. + # + # @return [Thread] the new thread. + # + # @!visibility private + def create_worker_thread + wrkr = RubyThreadPoolWorker.new(@queue, self) + Thread.new(wrkr, self) do |worker, parent| + Thread.current.abort_on_exception = false + worker.run + parent.on_worker_exit(worker) + end + return wrkr + end + end +end + +require 'benchmark/ips' + +Benchmark.ips do |x| + + x.time = 10 + x.warmup = if RUBY_ENGINE == 'jruby' + 30 + else + 5 + end + + configuration = { min_threads: 2, + max_threads: 8, + stop_on_exit: false, + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs } + + pools = { old: Concurrent::OldRubyThreadPoolExecutor.new(configuration), + new: Concurrent::RubyThreadPoolExecutor.new(configuration) } + pools.update java: Concurrent::JavaThreadPoolExecutor.new(configuration) if RUBY_ENGINE == 'jruby' + + pools.each do |name, pool| + x.report(name.to_s) do + count = Concurrent::CountDownLatch.new(100) + 100.times do + pool.post { count.count_down } + end + count.wait + end + end + + x.compare! +end + + diff --git a/lib/concurrent.rb b/lib/concurrent.rb index 535e8d89c..99c02f2ed 100644 --- a/lib/concurrent.rb +++ b/lib/concurrent.rb @@ -1,6 +1,7 @@ require 'concurrent/version' require 'concurrent/synchronization' +require 'concurrent/at_exit' require 'concurrent/configuration' diff --git a/lib/concurrent/at_exit.rb b/lib/concurrent/at_exit.rb new file mode 100644 index 000000000..c5dfa2be1 --- /dev/null +++ b/lib/concurrent/at_exit.rb @@ -0,0 +1,92 @@ +require 'concurrent/logging' +require 'concurrent/synchronization' + +module Concurrent + + # Provides ability to add and remove handlers to be run at `Kernel#at_exit`, order is undefined. + # Each handler is executed at most once. + class AtExitImplementation < Synchronization::Object + include Logging + + def initialize(enabled = true) + super() + synchronize do + @handlers = {} + @enabled = enabled + end + end + + # Add a handler to be run at `Kernel#at_exit` + # @param [Object] handler_id optionally provide an id, if allready present, handler is replaced + # @yield the handler + # @return id of the handler + def add(handler_id = nil, &handler) + id = handler_id || handler.object_id + synchronize { @handlers[id] = handler } + id + end + + # Delete a handler by handler_id + # @return [true, false] + def delete(handler_id) + !!synchronize { @handlers.delete handler_id } + end + + # Is handler with handler_id rpesent? + # @return [true, false] + def handler?(handler_id) + synchronize { @handlers.key? handler_id } + end + + # @return copy of the handlers + def handlers + synchronize { @handlers }.clone + end + + # install `Kernel#at_exit` callback to execute added handlers + def install + synchronize do + @installed ||= begin + at_exit { runner } + true + end + self + end + end + + # Will it run during `Kernel#at_exit` + def enabled? + synchronize { @enabled } + end + + # Configure if it runs during `Kernel#at_exit` + def enabled=(value) + synchronize { @enabled = value } + end + + # run the handlers manually + # @return ids of the handlers + def run + handlers, _ = synchronize { handlers, @handlers = @handlers, {} } + handlers.each do |_, handler| + begin + handler.call + rescue => error + log ERROR, error + end + end + handlers.keys + end + + private + + def runner + run if synchronize { @enabled } + end + end + + private_constant :AtExitImplementation + + # @see AtExitImplementation + AtExit = AtExitImplementation.new.install +end diff --git a/lib/concurrent/configuration.rb b/lib/concurrent/configuration.rb index 3b19a944a..e7e9d7b05 100644 --- a/lib/concurrent/configuration.rb +++ b/lib/concurrent/configuration.rb @@ -1,6 +1,7 @@ require 'thread' require 'concurrent/atomics' require 'concurrent/errors' +require 'concurrent/at_exit' require 'concurrent/executors' require 'concurrent/utility/processor_count' @@ -16,35 +17,20 @@ module Concurrent private_constant :GLOBAL_LOGGER # @!visibility private - AUTO_TERMINATE_GLOBAL_EXECUTORS = AtomicBoolean.new(true) - private_constant :AUTO_TERMINATE_GLOBAL_EXECUTORS - - # @!visibility private - AUTO_TERMINATE_ALL_EXECUTORS = AtomicBoolean.new(true) - private_constant :AUTO_TERMINATE_ALL_EXECUTORS - - # @!visibility private - GLOBAL_FAST_EXECUTOR = Delay.new do - Concurrent.new_fast_executor( - stop_on_exit: AUTO_TERMINATE_GLOBAL_EXECUTORS.value) - end + GLOBAL_FAST_EXECUTOR = Delay.new { Concurrent.new_fast_executor(auto_terminate: true) } private_constant :GLOBAL_FAST_EXECUTOR # @!visibility private - GLOBAL_IO_EXECUTOR = Delay.new do - Concurrent.new_io_executor( - stop_on_exit: AUTO_TERMINATE_GLOBAL_EXECUTORS.value) - end + GLOBAL_IO_EXECUTOR = Delay.new { Concurrent.new_io_executor(auto_terminate: true) } private_constant :GLOBAL_IO_EXECUTOR # @!visibility private - GLOBAL_TIMER_SET = Delay.new do - TimerSet.new(stop_on_exit: AUTO_TERMINATE_GLOBAL_EXECUTORS.value) - end + GLOBAL_TIMER_SET = Delay.new { TimerSet.new(auto_terminate: true) } private_constant :GLOBAL_TIMER_SET # @!visibility private GLOBAL_IMMEDIATE_EXECUTOR = ImmediateExecutor.new + private_constant :GLOBAL_IMMEDIATE_EXECUTOR def self.global_logger GLOBAL_LOGGER.value @@ -54,87 +40,42 @@ def self.global_logger=(value) GLOBAL_LOGGER.value = value end - # Defines if global executors should be auto-terminated with an - # `at_exit` callback. When set to `false` it will be the application - # programmer's responsibility to ensure that the global thread pools - # are shutdown properly prior to application exit. + # Disables AtExit hooks including pool auto-termination hooks. + # When disabled it will be the application + # programmer's responsibility to ensure that the hooks + # are shutdown properly prior to application exit + # by calling {AtExit.run} method. # - # @note Only change this option if you know what you are doing! - # When this is set to true (the default) then `at_exit` handlers - # will be registered automatically for the *global* thread pools - # to ensure that they are shutdown when the application ends. When - # changed to false, the `at_exit` handlers will be circumvented - # for all *global* thread pools. This method should *never* be called + # @note this option should be needed only because of `at_exit` ordering + # issues which may arise when running some of the testing frameworks. + # E.g. Minitest's test-suite runs itself in `at_exit` callback which + # executes after the pools are already terminated. Then auto termination + # needs to be disabled and called manually after test-suite ends. + # @note This method should *never* be called # from within a gem. It should *only* be used from within the main # application and even then it should be used only when necessary. - # - def self.disable_auto_termination_of_global_executors! - AUTO_TERMINATE_GLOBAL_EXECUTORS.make_false + # @see AtExit + def self.disable_at_exit_hooks! + AtExit.enabled = false end - # Defines if global executors should be auto-terminated with an - # `at_exit` callback. When set to `false` it will be the application - # programmer's responsibility to ensure that the global thread pools - # are shutdown properly prior to application exit. - # - # @note Only change this option if you know what you are doing! - # When this is set to true (the default) then `at_exit` handlers - # will be registered automatically for the *global* thread pools - # to ensure that they are shutdown when the application ends. When - # changed to false, the `at_exit` handlers will be circumvented - # for all *global* thread pools. This method should *never* be called - # from within a gem. It should *only* be used from within the main - # application and even then it should be used only when necessary. - # - # @return [Boolean] true when global thread pools will auto-terminate on - # application exit using an `at_exit` handler; false when no auto-termination - # will occur. - def self.auto_terminate_global_executors? - AUTO_TERMINATE_GLOBAL_EXECUTORS.value + def self.disable_executor_auto_termination! + warn '[DEPRECATED] Use Concurrent.disable_at_exit_hooks! instead' + disable_at_exit_hooks! end - # Defines if *ALL* executors should be auto-terminated with an - # `at_exit` callback. When set to `false` it will be the application - # programmer's responsibility to ensure that *all* thread pools, - # including the global thread pools, are shutdown properly prior to - # application exit. - # - # @note Only change this option if you know what you are doing! - # When this is set to true (the default) then `at_exit` handlers - # will be registered automatically for *all* thread pools to - # ensure that they are shutdown when the application ends. When - # changed to false, the `at_exit` handlers will be circumvented - # for *all* Concurrent Ruby thread pools running within the - # application. Even those created within other gems used by the - # application. This method should *never* be called from within a - # gem. It should *only* be used from within the main application. - # And even then it should be used only when necessary. - def self.disable_auto_termination_of_all_executors! - AUTO_TERMINATE_ALL_EXECUTORS.make_false + # @return [true,false] + # @see .disable_executor_auto_termination! + def self.disable_executor_auto_termination? + warn '[DEPRECATED] Use Concurrent::AtExit.enabled? instead' + AtExit.enabled? end - # Defines if *ALL* executors should be auto-terminated with an - # `at_exit` callback. When set to `false` it will be the application - # programmer's responsibility to ensure that *all* thread pools, - # including the global thread pools, are shutdown properly prior to - # application exit. - # - # @note Only change this option if you know what you are doing! - # When this is set to true (the default) then `at_exit` handlers - # will be registered automatically for *all* thread pools to - # ensure that they are shutdown when the application ends. When - # changed to false, the `at_exit` handlers will be circumvented - # for *all* Concurrent Ruby thread pools running within the - # application. Even those created within other gems used by the - # application. This method should *never* be called from within a - # gem. It should *only* be used from within the main application. - # And even then it should be used only when necessary. - # - # @return [Boolean] true when *all* thread pools will auto-terminate on - # application exit using an `at_exit` handler; false when no auto-termination - # will occur. - def self.auto_terminate_all_executors? - AUTO_TERMINATE_ALL_EXECUTORS.value + # terminates all pools and blocks until they are terminated + # @see .disable_executor_auto_termination! + def self.terminate_pools! + warn '[DEPRECATED] Use Concurrent::AtExit.run instead' + AtExit.run end # Global thread pool optimized for short, fast *operations*. @@ -151,6 +92,10 @@ def self.global_io_executor GLOBAL_IO_EXECUTOR.value end + def self.global_immediate_executor + GLOBAL_IMMEDIATE_EXECUTOR + end + # Global thread pool user for global *timers*. # # @return [Concurrent::TimerSet] the thread pool @@ -160,44 +105,25 @@ def self.global_timer_set GLOBAL_TIMER_SET.value end - def self.shutdown_global_executors - global_fast_executor.shutdown - global_io_executor.shutdown - global_timer_set.shutdown - end - - def self.kill_global_executors - global_fast_executor.kill - global_io_executor.kill - global_timer_set.kill - end - - def self.wait_for_global_executors_termination(timeout = nil) - latch = CountDownLatch.new(3) - [ global_fast_executor, global_io_executor, global_timer_set ].each do |executor| - Thread.new{ executor.wait_for_termination(timeout); latch.count_down } - end - latch.wait(timeout) - end - def self.new_fast_executor(opts = {}) FixedThreadPool.new( - [2, Concurrent.processor_count].max, - stop_on_exit: opts.fetch(:stop_on_exit, true), - idletime: 60, # 1 minute - max_queue: 0, # unlimited - fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + [2, Concurrent.processor_count].max, + auto_terminate: opts.fetch(:auto_terminate, true), + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue ) end def self.new_io_executor(opts = {}) ThreadPoolExecutor.new( - min_threads: [2, Concurrent.processor_count].max, - max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, - stop_on_exit: opts.fetch(:stop_on_exit, true), - idletime: 60, # 1 minute - max_queue: 0, # unlimited - fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + min_threads: [2, Concurrent.processor_count].max, + max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, + # max_threads: 1000, + auto_terminate: opts.fetch(:auto_terminate, true), + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue ) end @@ -256,7 +182,7 @@ def global_timer_set def global_task_pool=(executor) warn '[DEPRECATED] Replacing global thread pools is deprecated. Use the :executor constructor option instead.' GLOBAL_IO_EXECUTOR.reconfigure { executor } or - raise ConfigurationError.new('global task pool was already set') + raise ConfigurationError.new('global task pool was already set') end # @deprecated Replacing global thread pools is deprecated. @@ -264,7 +190,7 @@ def global_task_pool=(executor) def global_operation_pool=(executor) warn '[DEPRECATED] Replacing global thread pools is deprecated. Use the :executor constructor option instead.' GLOBAL_FAST_EXECUTOR.reconfigure { executor } or - raise ConfigurationError.new('global operation pool was already set') + raise ConfigurationError.new('global operation pool was already set') end # @deprecated Use Concurrent.new_io_executor instead @@ -293,11 +219,12 @@ def auto_terminate end # create the default configuration on load - @configuration = Atomic.new(Configuration.new) + CONFIGURATION = Atomic.new(Configuration.new) + private_constant :CONFIGURATION # @return [Configuration] def self.configuration - @configuration.value + CONFIGURATION.value end # Perform gem-level configuration. diff --git a/lib/concurrent/executor/executor.rb b/lib/concurrent/executor/executor.rb index 7ed5bba62..d97b5379c 100644 --- a/lib/concurrent/executor/executor.rb +++ b/lib/concurrent/executor/executor.rb @@ -1,5 +1,6 @@ require 'concurrent/errors' require 'concurrent/logging' +require 'concurrent/at_exit' require 'concurrent/atomic/event' module Concurrent @@ -11,6 +12,18 @@ module Executor # specified in `FALLBACK_POLICIES`. attr_reader :fallback_policy + # def initialize(opts) + # @auto_terminate = opts.fetch(:auto_terminate, true) + # end + + def auto_terminate? + mutex.synchronize { ns_auto_terminate? } + end + + def auto_terminate=(value) + mutex.synchronize { self.ns_auto_terminate = value } + end + # @!macro [attach] executor_module_method_can_overflow_question # # Does the task queue have a maximum size? @@ -60,36 +73,28 @@ def serialized? false end - def auto_terminate? - !! @auto_terminate - end - - protected + private - def enable_at_exit_handler!(opts = {}) - if opts.fetch(:stop_on_exit, true) - @auto_terminate = true - if Concurrent.on_cruby? - create_mri_at_exit_handler!(self.object_id) - else - create_at_exit_handler!(self) - end - end + def ns_auto_terminate? + !!@auto_terminate end - def create_mri_at_exit_handler!(id) - at_exit do - if Concurrent.auto_terminate_all_executors? - this = ObjectSpace._id2ref(id) - this.kill if this - end + def ns_auto_terminate=(value) + case value + when true + AtExit.add(self) { terminate_at_exit } + @auto_terminate = true + when false + AtExit.delete(self) + @auto_terminate = false + else + raise ArgumentError end end - def create_at_exit_handler!(this) - at_exit do - this.kill if Concurrent.auto_terminate_all_executors? - end + def terminate_at_exit + kill # TODO be gentle first + wait_for_termination(10) end end @@ -126,7 +131,7 @@ module RubyExecutor include Logging # The set of possible fallback policies that may be set at thread pool creation. - FALLBACK_POLICIES = [:abort, :discard, :caller_runs] + FALLBACK_POLICIES = [:abort, :discard, :caller_runs] # @!macro [attach] executor_method_post # @@ -168,7 +173,7 @@ def <<(task) # # @return [Boolean] `true` when running, `false` when shutting down or shutdown def running? - ! stop_event.set? + !stop_event.set? end # @!macro [attach] executor_method_shuttingdown_question @@ -177,7 +182,7 @@ def running? # # @return [Boolean] `true` when not running and not shutdown, else `false` def shuttingdown? - ! (running? || shutdown?) + !(running? || shutdown?) end # @!macro [attach] executor_method_shutdown_question @@ -197,6 +202,7 @@ def shutdown? def shutdown mutex.synchronize do break unless running? + self.ns_auto_terminate = false stop_event.set shutdown_execution end @@ -212,6 +218,7 @@ def shutdown def kill mutex.synchronize do break if shutdown? + self.ns_auto_terminate = false stop_event.set kill_execution stopped_event.set @@ -243,8 +250,8 @@ def wait_for_termination(timeout = nil) # Initialize the executor by creating and initializing all the # internal synchronization objects. def init_executor - @mutex = Mutex.new - @stop_event = Event.new + @mutex = Mutex.new + @stop_event = Event.new @stopped_event = Event.new end @@ -254,7 +261,7 @@ def execute(*args, &task) end # @!macro [attach] executor_method_shutdown_execution - # + # # Callback method called when an orderly shutdown has completed. # The default behavior is to signal all waiting threads. def shutdown_execution @@ -278,9 +285,9 @@ module JavaExecutor # The set of possible fallback policies that may be set at thread pool creation. FALLBACK_POLICIES = { - abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy, - discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy, - caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy + abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy, + discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy, + caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy }.freeze # @!macro executor_method_post @@ -302,7 +309,7 @@ def <<(task) # @!macro executor_method_running_question def running? - ! (shuttingdown? || shutdown?) + !(shuttingdown? || shutdown?) end # @!macro executor_method_shuttingdown_question @@ -331,15 +338,29 @@ def wait_for_termination(timeout = nil) # @!macro executor_method_shutdown def shutdown + self.ns_auto_terminate = false @executor.shutdown nil end # @!macro executor_method_kill def kill + self.ns_auto_terminate = false @executor.shutdownNow nil end + + protected + + # FIXME: it's here just for synchronization in auto_terminate methods, should be replaced and solved + # by the synchronization layer + def mutex + self + end + + def synchronize + JRuby.reference0(self).synchronized { yield } + end end end end diff --git a/lib/concurrent/executor/executor_options.rb b/lib/concurrent/executor/executor_options.rb index 611cae55f..dfe3f95fc 100644 --- a/lib/concurrent/executor/executor_options.rb +++ b/lib/concurrent/executor/executor_options.rb @@ -19,31 +19,41 @@ module ExecutorOptions # :nodoc: # # @!visibility private def get_executor_from(opts = {}) # :nodoc: - if (executor = opts[:executor]).is_a? Symbol + case + when opts.key?(:executor) case opts[:executor] when :fast Concurrent.global_fast_executor when :io Concurrent.global_io_executor when :immediate - Concurrent::ImmediateExecutor.new + Concurrent.global_immediate_executor when :operation Kernel.warn '[DEPRECATED] use `executor: :fast` instead' Concurrent.global_fast_executor when :task Kernel.warn '[DEPRECATED] use `executor: :io` instead' Concurrent.global_io_executor + when Executor + opts[:executor] + when nil + nil else - raise ArgumentError.new("executor '#{executor}' not recognized") + raise ArgumentError.new("executor '#{opts[:executor]}' not recognized") end - elsif opts[:executor] - opts[:executor] - elsif opts[:operation] == true || opts[:task] == false - Kernel.warn '[DEPRECATED] use `executor: :fast` instead' - Concurrent.global_fast_executor - elsif opts[:operation] == false || opts[:task] == true - Kernel.warn '[DEPRECATED] use `executor: :io` instead' - Concurrent.global_io_executor + + when opts.key?(:operation) || opts.key?(:task) + if opts[:operation] == true || opts[:task] == false + Kernel.warn '[DEPRECATED] use `executor: :fast` instead' + return Concurrent.global_fast_executor + end + + if opts[:operation] == false || opts[:task] == true + Kernel.warn '[DEPRECATED] use `executor: :io` instead' + return Concurrent.global_io_executor + end + + raise ArgumentError.new("executor '#{opts[:executor]}' not recognized") else nil end diff --git a/lib/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent/executor/fixed_thread_pool.rb index c37ce6dda..88d1406cd 100644 --- a/lib/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent/executor/fixed_thread_pool.rb @@ -27,7 +27,7 @@ module Concurrent # * `max_queue`: The maximum number of tasks that may be waiting in the work queue at # any one time. When the queue size reaches `max_queue` subsequent tasks will be # rejected in accordance with the configured `fallback_policy`. - # * `stop_on_exit`: When true (default) an `at_exit` handler will be registered which + # * `auto_terminate`: When true (default) an `at_exit` handler will be registered which # will stop the thread pool when the application exits. See below for more information # on shutting down thread pools. # * `fallback_policy`: The policy defining how rejected tasks are handled. @@ -58,12 +58,12 @@ module Concurrent # stop the thread pool when the application exists. This handler uses a brute # force method to stop the pool and makes no guarantees regarding resources being # used by any tasks still running. Registration of this `at_exit` handler can be - # prevented by setting the thread pool's constructor `:stop_on_exit` option to + # prevented by setting the thread pool's constructor `:auto_terminate` option to # `false` when the thread pool is created. All thread pools support this option. # # ```ruby # pool1 = Concurrent::FixedThreadPool.new(5) # an `at_exit` handler will be registered - # pool2 = Concurrent::FixedThreadPool.new(5, stop_on_exit: false) # prevent `at_exit` handler registration + # pool2 = Concurrent::FixedThreadPool.new(5, auto_terminate: false) # prevent `at_exit` handler registration # ``` # # @note Failure to properly shutdown a thread pool can lead to unpredictable results. diff --git a/lib/concurrent/executor/java_cached_thread_pool.rb b/lib/concurrent/executor/java_cached_thread_pool.rb index eec042702..f5a8bf044 100644 --- a/lib/concurrent/executor/java_cached_thread_pool.rb +++ b/lib/concurrent/executor/java_cached_thread_pool.rb @@ -25,8 +25,9 @@ def initialize(opts = {}) @executor = java.util.concurrent.Executors.newCachedThreadPool @executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@fallback_policy].new) + @executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS) - enable_at_exit_handler!(opts) + self.auto_terminate = opts.fetch(:auto_terminate, true) end end end diff --git a/lib/concurrent/executor/java_fixed_thread_pool.rb b/lib/concurrent/executor/java_fixed_thread_pool.rb index df7eead5b..8860896fb 100644 --- a/lib/concurrent/executor/java_fixed_thread_pool.rb +++ b/lib/concurrent/executor/java_fixed_thread_pool.rb @@ -25,7 +25,7 @@ def initialize(num_threads, opts = {}) }.merge(opts) super(opts) - enable_at_exit_handler!(opts) + self.auto_terminate = opts.fetch(:auto_terminate, true) end end end diff --git a/lib/concurrent/executor/java_single_thread_executor.rb b/lib/concurrent/executor/java_single_thread_executor.rb index 6fc3134a3..7931734f7 100644 --- a/lib/concurrent/executor/java_single_thread_executor.rb +++ b/lib/concurrent/executor/java_single_thread_executor.rb @@ -22,7 +22,7 @@ def initialize(opts = {}) @executor = java.util.concurrent.Executors.newSingleThreadExecutor @fallback_policy = opts.fetch(:fallback_policy, :discard) raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy) - enable_at_exit_handler!(opts) + self.auto_terminate = opts.fetch(:auto_terminate, true) end end end diff --git a/lib/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent/executor/java_thread_pool_executor.rb index 9a5ad34b8..d9ba7c57a 100644 --- a/lib/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent/executor/java_thread_pool_executor.rb @@ -76,7 +76,7 @@ def initialize(opts = {}) idletime, java.util.concurrent.TimeUnit::SECONDS, queue, FALLBACK_POLICIES[@fallback_policy].new) - enable_at_exit_handler!(opts) + self.auto_terminate = opts.fetch(:auto_terminate, true) end # @!macro executor_module_method_can_overflow_question @@ -104,7 +104,6 @@ def max_length def length @executor.getPoolSize end - alias_method :current_length, :length # The largest number of threads that have been created in the pool since construction. # @@ -149,15 +148,6 @@ def remaining_capacity @max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity end - # This method is deprecated and will be removed soon. - # This method is supost to return the threads status, but Java API doesn't - # provide a way to get the thread status. So we return an empty Array instead. - def status - warn '[DEPRECATED] `status` is deprecated and will be removed soon.' - warn "Calls to `status` return an empty Array. Java ThreadPoolExecutor does not provide thread's status." - [] - end - # Is the thread pool running? # # @return [Boolean] `true` when running, `false` when shutting down or shutdown diff --git a/lib/concurrent/executor/ruby_cached_thread_pool.rb b/lib/concurrent/executor/ruby_cached_thread_pool.rb index 62dd76f96..ec539f302 100644 --- a/lib/concurrent/executor/ruby_cached_thread_pool.rb +++ b/lib/concurrent/executor/ruby_cached_thread_pool.rb @@ -17,14 +17,13 @@ def initialize(opts = {}) raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy) - opts = opts.merge( - min_threads: 0, - max_threads: DEFAULT_MAX_POOL_SIZE, - fallback_policy: fallback_policy, - max_queue: DEFAULT_MAX_QUEUE_SIZE, - idletime: DEFAULT_THREAD_IDLETIMEOUT - ) - super(opts) + defaults = { idletime: DEFAULT_THREAD_IDLETIMEOUT } + overrides = { min_threads: 0, + max_threads: DEFAULT_MAX_POOL_SIZE, + fallback_policy: fallback_policy, + max_queue: DEFAULT_MAX_QUEUE_SIZE } + + super(defaults.merge(opts).merge(overrides)) end end end diff --git a/lib/concurrent/executor/ruby_single_thread_executor.rb b/lib/concurrent/executor/ruby_single_thread_executor.rb index 297bf9c03..e8aeacd0d 100644 --- a/lib/concurrent/executor/ruby_single_thread_executor.rb +++ b/lib/concurrent/executor/ruby_single_thread_executor.rb @@ -23,7 +23,7 @@ def initialize(opts = {}) @fallback_policy = opts.fetch(:fallback_policy, :discard) raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy) init_executor - enable_at_exit_handler!(opts) + self.auto_terminate = opts.fetch(:auto_terminate, true) end protected diff --git a/lib/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent/executor/ruby_thread_pool_executor.rb index f4cb42cc0..e93ccd005 100644 --- a/lib/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent/executor/ruby_thread_pool_executor.rb @@ -2,7 +2,6 @@ require 'concurrent/atomic/event' require 'concurrent/executor/executor' -require 'concurrent/executor/ruby_thread_pool_worker' require 'concurrent/utility/monotonic_time' module Concurrent @@ -85,37 +84,37 @@ def initialize(opts = {}) raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length init_executor - enable_at_exit_handler!(opts) + self.auto_terminate = opts.fetch(:auto_terminate, true) - @pool = [] - @queue = Queue.new + @pool = [] # all workers + @ready = [] # used as a stash (most idle worker is at the start) + @queue = [] # used as queue + # @ready or @queue is empty at all times @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 - @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented - @last_gc_time = Concurrent.monotonic_time - [1.0, (@gc_interval * 2.0)].max + @gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented + @next_gc_time = Concurrent.monotonic_time + @gc_interval end # @!macro executor_module_method_can_overflow_question def can_overflow? - @max_queue != 0 + mutex.synchronize { ns_limited_queue? } end # The number of threads currently in the pool. # # @return [Integer] the length def length - mutex.synchronize { running? ? @pool.length : 0 } + mutex.synchronize { @pool.length } end - alias_method :current_length, :length - # The number of tasks in the queue awaiting execution. # # @return [Integer] the queue_length def queue_length - mutex.synchronize { running? ? @queue.length : 0 } + mutex.synchronize { @queue.length } end # Number of tasks that may be enqueued before reaching `max_queue` and rejecting @@ -123,142 +122,229 @@ def queue_length # # @return [Integer] the remaining_capacity def remaining_capacity - mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length } + mutex.synchronize do + if ns_limited_queue? + @max_queue - @queue.length + else + -1 + end + end end - # Returns an array with the status of each thread in the pool - # - # This method is deprecated and will be removed soon. - def status - warn '[DEPRECATED] `status` is deprecated and will be removed soon.' - mutex.synchronize { @pool.collect { |worker| worker.status } } + # @api private + def remove_busy_worker(worker) + mutex.synchronize { ns_remove_busy_worker worker } end - # Run on task completion. - # - # @!visibility private - def on_end_task - mutex.synchronize do - @completed_task_count += 1 #if success - break unless running? - end + # @api private + def ready_worker(worker) + mutex.synchronize { ns_ready_worker worker } end - # Run when a thread worker exits. - # - # @!visibility private - def on_worker_exit(worker) - mutex.synchronize do - @pool.delete(worker) - if @pool.empty? && !running? - stop_event.set - stopped_event.set - end - end + # @api private + def worker_not_old_enough(worker) + mutex.synchronize { ns_worker_not_old_enough worker } + end + + # @api private + def worker_died(worker) + mutex.synchronize { ns_worker_died worker } end protected - # @!visibility private - def execute(*args, &task) - if ensure_capacity? + def ns_limited_queue? + @max_queue != 0 + end + + def ns_execute(*args, &task) + if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task) @scheduled_task_count += 1 - @queue << [args, task] else - handle_fallback(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue + handle_fallback(*args, &task) end - prune_pool + + ns_prune_pool if @next_gc_time < Concurrent.monotonic_time + # raise unless @ready.empty? || @queue.empty? # assert end - # @!visibility private - def shutdown_execution + alias_method :execute, :ns_execute + + def ns_shutdown_execution if @pool.empty? + # nothing to do stopped_event.set - else - @pool.length.times { @queue << :stop } end + if @queue.empty? + # no more tasks will be accepted, just stop all workers + @pool.each(&:stop) + end + + # raise unless @ready.empty? || @queue.empty? # assert end - # @!visibility private - def kill_execution - @queue.clear - drain_pool + alias_method :shutdown_execution, :ns_shutdown_execution + + # @api private + def ns_kill_execution + ns_shutdown_execution + unless stopped_event.wait(1) + @pool.each &:kill + @pool.clear + @ready.clear + # TODO log out unprocessed tasks in queue + end end - # Check the thread pool configuration and determine if the pool - # has enought capacity to handle the request. Will grow the size - # of the pool if necessary. - # - # @return [Boolean] true if the pool has enough capacity else false - # - # @!visibility private - def ensure_capacity? - additional = 0 - capacity = true - - if @pool.size < @min_length - additional = @min_length - @pool.size - elsif @queue.empty? && @queue.num_waiting >= 1 - additional = 0 - elsif @pool.size == 0 && @min_length == 0 - additional = 1 - elsif @pool.size < @max_length || @max_length == 0 - additional = 1 - elsif @max_queue == 0 || @queue.size < @max_queue - additional = 0 + alias_method :kill_execution, :ns_kill_execution + + # tries to assign task to a worker, tries to get one from @ready or to create new one + # @return [true, false] if task is assigned to a worker + def ns_assign_worker(*args, &task) + # keep growing if the pool is not at the minimum yet + worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker + if worker + worker << [task, args] + true else - capacity = false + false end + end - additional.times do - @pool << create_worker_thread + # tries to enqueue task + # @return [true, false] if enqueued + def ns_enqueue(*args, &task) + if !ns_limited_queue? || @queue.size < @max_queue + @queue << [task, args] + true + else + false end + end - if additional > 0 - @largest_length = [@largest_length, @pool.length].max - end + def ns_worker_died(worker) + ns_remove_busy_worker worker + replacement_worker = ns_add_busy_worker + ns_ready_worker replacement_worker, false if replacement_worker + end + + # creates new worker which has to receive work to do after it's added + # @return [nil, Worker] nil of max capacity is reached + def ns_add_busy_worker + return if @pool.size >= @max_length - capacity + @pool << (worker = Worker.new(self)) + @largest_length = @pool.length if @pool.length > @largest_length + worker end - # Scan all threads in the pool and reclaim any that are dead or - # have been idle too long. Will check the last time the pool was - # pruned and only run if the configured garbage collection - # interval has passed. - # - # @!visibility private - def prune_pool - if Concurrent.monotonic_time - @gc_interval >= @last_gc_time - @pool.delete_if { |worker| worker.dead? } - # send :stop for each thread over idletime - @pool. - select { |worker| @idletime != 0 && Concurrent.monotonic_time - @idletime > worker.last_activity }. - each { @queue << :stop } - @last_gc_time = Concurrent.monotonic_time + # handle ready worker, giving it new job or assigning back to @ready + def ns_ready_worker(worker, success = true) + @completed_task_count += 1 if success + task_and_args = @queue.shift + if task_and_args + worker << task_and_args + else + # stop workers when !running?, do not return them to @ready + if running? + @ready.push(worker) + else + worker.stop + end end end - # Reclaim all threads in the pool. - # - # @!visibility private - def drain_pool - @pool.each { |worker| worker.kill } - @pool.clear + # returns back worker to @ready which was not idle for enough time + def ns_worker_not_old_enough(worker) + # let's put workers coming from idle_test back to the start (as the oldest worker) + @ready.unshift(worker) + true end - # Create a single worker thread to be added to the pool. - # - # @return [Thread] the new thread. - # - # @!visibility private - def create_worker_thread - wrkr = RubyThreadPoolWorker.new(@queue, self) - Thread.new(wrkr, self) do |worker, parent| - Thread.current.abort_on_exception = false - worker.run - parent.on_worker_exit(worker) + # removes a worker which is not in not tracked in @ready + def ns_remove_busy_worker(worker) + @pool.delete(worker) + stopped_event.set if @pool.empty? && !running? + true + end + + # try oldest worker if it is idle for enough time, it's returned back at the start + def ns_prune_pool + return if @pool.size <= @min_length + + last_used = @ready.shift + last_used << :idle_test if last_used + + @next_gc_time = Concurrent.monotonic_time + @gc_interval + end + + class Worker + include Logging + + def initialize(pool) + # instance variables accessed only under pool's lock so no need to sync here again + @queue = Queue.new + @pool = pool + @thread = create_worker @queue, pool, pool.idletime + end + + def <<(message) + @queue << message + end + + def stop + @queue << :stop + end + + def kill + @thread.kill + end + + private + + def create_worker(queue, pool, idletime) + Thread.new(queue, pool, idletime) do |queue, pool, idletime| + last_message = Concurrent.monotonic_time + catch(:stop) do + loop do + + case message = queue.pop + when :idle_test + if (Concurrent.monotonic_time - last_message) > idletime + pool.remove_busy_worker(self) + throw :stop + else + pool.worker_not_old_enough(self) + end + + when :stop + pool.remove_busy_worker(self) + throw :stop + + else + task, args = message + run_task pool, task, args + last_message = Concurrent.monotonic_time + + pool.ready_worker(self) + end + + end + end + end + end + + def run_task(pool, task, args) + task.call(*args) + rescue => ex + # let it fail + log DEBUG, ex + rescue Exception => ex + log ERROR, ex + pool.worker_died(self) + throw :stop end - return wrkr end + end end diff --git a/lib/concurrent/executor/ruby_thread_pool_worker.rb b/lib/concurrent/executor/ruby_thread_pool_worker.rb deleted file mode 100644 index 0949bfc52..000000000 --- a/lib/concurrent/executor/ruby_thread_pool_worker.rb +++ /dev/null @@ -1,74 +0,0 @@ -require 'thread' -require 'concurrent/logging' -require 'concurrent/utility/monotonic_time' - -module Concurrent - - # @!visibility private - class RubyThreadPoolWorker - include Logging - - # @!visibility private - def initialize(queue, parent) - @queue = queue - @parent = parent - @mutex = Mutex.new - @last_activity = Concurrent.monotonic_time - @thread = nil - end - - # @!visibility private - def dead? - return @mutex.synchronize do - @thread.nil? ? false : ! @thread.alive? - end - end - - # @!visibility private - def last_activity - @mutex.synchronize { @last_activity } - end - - def status - @mutex.synchronize do - return 'not running' if @thread.nil? - @thread.status - end - end - - # @!visibility private - def kill - @mutex.synchronize do - Thread.kill(@thread) unless @thread.nil? - @thread = nil - end - end - - # @!visibility private - def run(thread = Thread.current) - @mutex.synchronize do - raise StandardError.new('already running') unless @thread.nil? - @thread = thread - end - - loop do - task = @queue.pop - if task == :stop - @thread = nil - @parent.on_worker_exit(self) - break - end - - begin - task.last.call(*task.first) - rescue => ex - # let it fail - log DEBUG, ex - ensure - @last_activity = Concurrent.monotonic_time - @parent.on_end_task - end - end - end - end -end diff --git a/lib/concurrent/executor/serialized_execution.rb b/lib/concurrent/executor/serialized_execution.rb index a63583ad3..b8c4e0d45 100644 --- a/lib/concurrent/executor/serialized_execution.rb +++ b/lib/concurrent/executor/serialized_execution.rb @@ -98,6 +98,8 @@ def work(job) job = @stash.shift || (@being_executed = false) end + # TODO maybe be able to tell caching pool to just enqueue this job, because the current one end at the end + # of this block call_job job if job end end diff --git a/lib/concurrent/executor/timer_set.rb b/lib/concurrent/executor/timer_set.rb index 75f2bd110..a3d042fea 100644 --- a/lib/concurrent/executor/timer_set.rb +++ b/lib/concurrent/executor/timer_set.rb @@ -32,7 +32,7 @@ def initialize(opts = {}) @timer_executor = SingleThreadExecutor.new @condition = Condition.new init_executor - enable_at_exit_handler!(opts) + self.auto_terminate = opts.fetch(:auto_terminate, true) end # Post a task to be execute run after a given delay (in seconds). If the diff --git a/spec/concurrent/configuration_spec.rb b/spec/concurrent/configuration_spec.rb index 075ab28be..79dd269dd 100644 --- a/spec/concurrent/configuration_spec.rb +++ b/spec/concurrent/configuration_spec.rb @@ -3,15 +3,7 @@ module Concurrent describe Configuration do before(:each) do - Concurrent.const_set( - :GLOBAL_FAST_EXECUTOR, - Concurrent::Delay.new{ Concurrent::ImmediateExecutor.new }) - Concurrent.const_set( - :GLOBAL_IO_EXECUTOR, - Concurrent::Delay.new{ Concurrent::ImmediateExecutor.new }) - Concurrent.const_set( - :GLOBAL_TIMER_SET, - Concurrent::Delay.new{ Concurrent::ImmediateExecutor.new }) + reset_gem_configuration end after(:each) do @@ -35,28 +27,11 @@ module Concurrent expect(Concurrent.global_io_executor).to respond_to(:post) end - specify '#shutdown_global_executors acts on all global executors' do - expect(Concurrent.global_fast_executor).to receive(:shutdown).with(no_args) - expect(Concurrent.global_io_executor).to receive(:shutdown).with(no_args) - expect(Concurrent.global_timer_set).to receive(:shutdown).with(no_args) - Concurrent.shutdown_global_executors - end - - specify '#kill_global_executors acts on all global executors' do - expect(Concurrent.global_fast_executor).to receive(:kill).with(no_args) - expect(Concurrent.global_io_executor).to receive(:kill).with(no_args) - expect(Concurrent.global_timer_set).to receive(:kill).with(no_args) - Concurrent.kill_global_executors - end - - context '#wait_for_global_executors_termination' do - - it 'acts on all global executors' do - expect(Concurrent.global_fast_executor).to receive(:wait_for_termination).with(0.1) - expect(Concurrent.global_io_executor).to receive(:wait_for_termination).with(0.1) - expect(Concurrent.global_timer_set).to receive(:wait_for_termination).with(0.1) - Concurrent.wait_for_global_executors_termination(0.1) - end + specify '#terminate_pools! acts on all executors with auto_terminate: true' do + expect(Concurrent.global_fast_executor).to receive(:kill).with(no_args).and_call_original + expect(Concurrent.global_io_executor).to receive(:kill).with(no_args).and_call_original + expect(Concurrent.global_timer_set).to receive(:kill).with(no_args).and_call_original + Concurrent.terminate_pools! end end end diff --git a/spec/concurrent/executor/cached_thread_pool_shared.rb b/spec/concurrent/executor/cached_thread_pool_shared.rb index 2567d7481..f284688d6 100644 --- a/spec/concurrent/executor/cached_thread_pool_shared.rb +++ b/spec/concurrent/executor/cached_thread_pool_shared.rb @@ -95,20 +95,12 @@ end end - context '#status' do - - it 'returns an array' do - allow(subject).to receive(:warn) - expect(subject.status).to be_kind_of(Array) - end - end - context '#idletime' do subject{ described_class.new(idletime: 42) } it 'returns the thread idletime' do - expect(subject.idletime).to eq described_class::DEFAULT_THREAD_IDLETIMEOUT + expect(subject.idletime).to eq 42 end end end diff --git a/spec/concurrent/executor/fixed_thread_pool_shared.rb b/spec/concurrent/executor/fixed_thread_pool_shared.rb index f34e4054d..65fb214d5 100644 --- a/spec/concurrent/executor/fixed_thread_pool_shared.rb +++ b/spec/concurrent/executor/fixed_thread_pool_shared.rb @@ -145,21 +145,15 @@ end end - context '#status' do - - it 'returns an array' do - allow(subject).to receive(:warn) - expect(subject.status).to be_kind_of(Array) - end - end - - context '#kill' do it 'attempts to kill all in-progress tasks' do thread_count = [subject.length, 5].max @expected = false - thread_count.times{ subject.post{ sleep(1) } } + thread_count.times do + # kill tries to shutdown first with 1sec timeout, so wait 2sec here + subject.post { sleep(2) } + end subject.post{ @expected = true } sleep(0.1) subject.kill @@ -174,7 +168,7 @@ pool = described_class.new(5) 100.times{ pool << proc{ sleep(1) } } sleep(0.1) - expect(pool.current_length).to eq 5 + expect(pool.length).to eq 5 pool.kill end end diff --git a/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb b/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb index fb77b021b..2807073a5 100644 --- a/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb +++ b/spec/concurrent/executor/ruby_cached_thread_pool_spec.rb @@ -2,12 +2,12 @@ module Concurrent - describe RubyCachedThreadPool, :type=>:mrirbx do + describe RubyCachedThreadPool, :type => :mrirbx do subject do described_class.new( - fallback_policy: :discard, - gc_interval: 0 + fallback_policy: :discard, + gc_interval: 0 ) end @@ -20,62 +20,95 @@ module Concurrent context 'garbage collection' do - subject{ described_class.new(idletime: 1, max_threads: 2, gc_interval: 0) } + subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) } it 'removes from pool any thread that has been idle too long' do - latch = Concurrent::CountDownLatch.new(3) - 4.times { subject << proc { sleep 0.1; latch.count_down } } + latch = Concurrent::CountDownLatch.new(4) + 4.times { subject.post { sleep 0.1; latch.count_down } } expect(latch.wait(1)).to be true - - subject.instance_variable_set(:@idletime, 1) - - sleep 1.5 - - max_threads = subject.length - - subject.send :prune_pool - sleep 0.1 - subject.send :prune_pool - - expect(subject.length).to be < max_threads + sleep 0.2 + subject.post {} + sleep 0.2 + expect(subject.length).to be < 4 end - it 'removes from pool any dead thread' do - subject.instance_variable_set(:@idletime, 1) - latch = Concurrent::CountDownLatch.new(3) - 3.times { subject << proc{ sleep(0.1); latch.count_down; raise Exception } } - expect(latch.wait(1)).to be true - - max_threads = subject.length - sleep(2) + it 'deals with dead threads' do + expect(subject).to receive(:ns_worker_died).exactly(5).times.and_call_original - latch = Concurrent::CountDownLatch.new(1) - subject << proc{ latch.count_down } + dead_threads_queue = Queue.new + 5.times { subject.post { sleep 0.1; dead_threads_queue.push Thread.current; raise Exception } } + sleep(0.2) + latch = Concurrent::CountDownLatch.new(5) + 5.times { subject.post { sleep 0.1; latch.count_down } } expect(latch.wait(1)).to be true - expect(subject.length).to be < max_threads + dead_threads = [] + dead_threads << dead_threads_queue.pop until dead_threads_queue.empty? + expect(dead_threads.all? { |t| !t.alive? }).to be true end end context 'worker creation and caching' do - subject{ described_class.new(idletime: 1, max_threads: 5) } + subject { described_class.new(idletime: 1, max_threads: 5) } it 'creates new workers when there are none available' do expect(subject.length).to eq 0 - 5.times{ sleep(0.1); subject << proc{ sleep(1) } } + 5.times { sleep(0.1); subject << proc { sleep(1) } } sleep(1) expect(subject.length).to eq 5 end it 'uses existing idle threads' do - 5.times{ subject << proc{ sleep(0.1) } } + 5.times { subject << proc { sleep(0.1) } } sleep(1) expect(subject.length).to be >= 5 - 3.times{ subject << proc{ sleep(1) } } + 3.times { subject << proc { sleep(1) } } sleep(0.1) expect(subject.length).to be >= 5 end end end + + + context 'stress' do + configurations = [ + { min_threads: 2, + max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, + auto_terminate: false, + idletime: 0.1, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue + gc_interval: 0.1 }, + { min_threads: 2, + max_threads: 4, + auto_terminate: false, + idletime: 0.1, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue + gc_interval: 0.1 } + ] + + + configurations.each do |config| + specify do + pool = RubyThreadPoolExecutor.new(config) + + 100.times do + count = Concurrent::CountDownLatch.new(100) + 100.times do + pool.post { count.count_down } + end + count.wait + expect(pool.length).to be <= [200, config[:max_threads]].min + if pool.length > [110, config[:max_threads]].min + puts "ERRORSIZE #{pool.length} max #{config[:max_threads]}" + end + end + end + + end + + + end end diff --git a/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb b/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb index b42f9124f..877f8cb92 100644 --- a/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb +++ b/spec/concurrent/executor/ruby_fixed_thread_pool_spec.rb @@ -27,10 +27,10 @@ module Concurrent it 'creates new workers when there are none available' do pool = described_class.new(5) - expect(pool.current_length).to eq 0 + expect(pool.length).to eq 0 5.times{ pool << proc{ sleep(1) } } sleep(0.1) - expect(pool.current_length).to eq 5 + expect(pool.length).to eq 5 pool.kill end end diff --git a/spec/concurrent/executor/thread_pool_shared.rb b/spec/concurrent/executor/thread_pool_shared.rb index 2a69ca62a..93d543cdd 100644 --- a/spec/concurrent/executor/thread_pool_shared.rb +++ b/spec/concurrent/executor/thread_pool_shared.rb @@ -17,18 +17,18 @@ it 'returns true when :enable_at_exit_handler is true' do if described_class.to_s =~ /FixedThreadPool$/ - subject = described_class.new(1, stop_on_exit: true) + subject = described_class.new(1, auto_terminate: true) else - subject = described_class.new(stop_on_exit: true) + subject = described_class.new(auto_terminate: true) end expect(subject.auto_terminate?).to be true end it 'returns false when :enable_at_exit_handler is false' do if described_class.to_s =~ /FixedThreadPool$/ - subject = described_class.new(1, stop_on_exit: false) + subject = described_class.new(1, auto_terminate: false) else - subject = described_class.new(stop_on_exit: false) + subject = described_class.new(auto_terminate: false) end expect(subject.auto_terminate?).to be false end @@ -47,12 +47,6 @@ subject.wait_for_termination(1) expect(subject.length).to eq 0 end - - it 'aliased as #current_length' do - 5.times{ subject.post{ sleep(0.1) } } - sleep(0.1) - expect(subject.current_length).to eq subject.length - end end context '#scheduled_task_count' do diff --git a/spec/concurrent/executor/timer_set_spec.rb b/spec/concurrent/executor/timer_set_spec.rb index 8e8fbb6e0..3bdfb18fc 100644 --- a/spec/concurrent/executor/timer_set_spec.rb +++ b/spec/concurrent/executor/timer_set_spec.rb @@ -2,12 +2,12 @@ module Concurrent describe TimerSet do - subject{ TimerSet.new(executor: ImmediateExecutor.new) } + subject{ TimerSet.new(executor: :immediate) } after(:each){ subject.kill } it 'uses the executor given at construction' do - executor = double(:executor) + executor = Concurrent.global_immediate_executor expect(executor).to receive(:post).with(no_args) subject = TimerSet.new(executor: executor) subject.post(0){ nil } diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index a952ebd6f..06d746351 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -24,9 +24,9 @@ def do_no_reset! end GLOBAL_EXECUTORS = [ - [:GLOBAL_FAST_EXECUTOR, ->{ Delay.new{ Concurrent.new_fast_executor }}], - [:GLOBAL_IO_EXECUTOR, ->{ Delay.new{ Concurrent.new_io_executor }}], - [:GLOBAL_TIMER_SET, ->{ Delay.new{ Concurrent::TimerSet.new }}], + [:GLOBAL_FAST_EXECUTOR, -> { Delay.new { Concurrent.new_fast_executor(auto_terminate: true) } }], + [:GLOBAL_IO_EXECUTOR, -> { Delay.new { Concurrent.new_io_executor(auto_terminate: true) } }], + [:GLOBAL_TIMER_SET, -> { Delay.new { Concurrent::TimerSet.new(auto_terminate: true) } }], ] @@killed = false @@ -34,10 +34,9 @@ def do_no_reset! def reset_gem_configuration if @@killed GLOBAL_EXECUTORS.each do |var, factory| - executor = Concurrent.const_get(var).value + executor = Concurrent.const_get(var).value! executor.shutdown executor.kill - executor = nil Concurrent.const_set(var, factory.call) end @@killed = false