Skip to content

Commit 0d43d21

Browse files
committed
Merge pull request #201 from rkday/fallback_handling
Posting to a shutdown thread pool - JRuby consistency and better naming
2 parents e415a38 + 6bd961c commit 0d43d21

19 files changed

+338
-314
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
module Concurrent
66

77
module Executor
8-
8+
# The policy defining how rejected tasks (tasks received once the
9+
# queue size reaches the configured `max_queue`, or after the
10+
# executor has shut down) are handled. Must be one of the values
11+
# specified in `FALLBACK_POLICIES`.
12+
attr_reader :fallback_policy
13+
914
# @!macro [attach] executor_module_method_can_overflow_question
1015
#
1116
# Does the task queue have a maximum size?
@@ -17,6 +22,31 @@ def can_overflow?
1722
false
1823
end
1924

25+
# Handler which executes the `fallback_policy` once the queue size
26+
# reaches `max_queue`.
27+
#
28+
# @param [Array] args the arguments to the task which is being handled.
29+
#
30+
# @!visibility private
31+
def handle_fallback(*args)
32+
case @fallback_policy
33+
when :abort
34+
raise RejectedExecutionError
35+
when :discard
36+
false
37+
when :caller_runs
38+
begin
39+
yield(*args)
40+
rescue => ex
41+
# let it fail
42+
log DEBUG, ex
43+
end
44+
true
45+
else
46+
fail "Unknown fallback policy #{@fallback_policy}"
47+
end
48+
end
49+
2050
# @!macro [attach] executor_module_method_serialized_question
2151
#
2252
# Does this executor guarantee serialization of its operations?
@@ -63,6 +93,9 @@ module RubyExecutor
6393
include Executor
6494
include Logging
6595

96+
# The set of possible fallback policies that may be set at thread pool creation.
97+
FALLBACK_POLICIES = [:abort, :discard, :caller_runs]
98+
6699
# @!macro [attach] executor_method_post
67100
#
68101
# Submit a task to the executor for asynchronous processing.
@@ -78,16 +111,8 @@ module RubyExecutor
78111
def post(*args, &task)
79112
raise ArgumentError.new('no block given') unless block_given?
80113
mutex.synchronize do
81-
unless running?
82-
# The executor is shut down - figure out how to reject this task
83-
if self.respond_to?(:handle_overflow, true)
84-
# Reject this task in the same way we'd reject an overflow
85-
return handle_overflow(*args, &task)
86-
else
87-
# No handle_overflow method defined - just return false
88-
return false
89-
end
90-
end
114+
# If the executor is shut down, reject this task
115+
return handle_fallback(*args, &task) unless running?
91116
execute(*args, &task)
92117
true
93118
end
@@ -219,16 +244,20 @@ module JavaExecutor
219244
include Executor
220245
java_import 'java.lang.Runnable'
221246

247+
# The set of possible fallback policies that may be set at thread pool creation.
248+
FALLBACK_POLICIES = {
249+
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
250+
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
251+
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
252+
}.freeze
253+
222254
# @!macro executor_method_post
223-
def post(*args)
255+
def post(*args, &task)
224256
raise ArgumentError.new('no block given') unless block_given?
225-
if running?
226-
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
227-
executor_submit.call { yield(*args) }
228-
true
229-
else
230-
false
231-
end
257+
return handle_fallback(*args, &task) unless running?
258+
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
259+
executor_submit.call { yield(*args) }
260+
true
232261
rescue Java::JavaUtilConcurrent::RejectedExecutionException
233262
raise RejectedExecutionError
234263
end

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1010
# Create a new thread pool.
1111
#
1212
# @param [Hash] opts the options defining pool behavior.
13-
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
13+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1414
#
15-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
15+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1616
#
1717
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
1818
def initialize(opts = {})
19-
@overflow_policy = opts.fetch(:overflow_policy, :abort)
19+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
2020
@max_queue = 0
2121

22-
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
22+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
2323

2424
@executor = java.util.concurrent.Executors.newCachedThreadPool
25-
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
25+
@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@fallback_policy].new)
2626

2727
set_shutdown_hook
2828
end

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor
1010
# Create a new thread pool.
1111
#
1212
# @param [Hash] opts the options defining pool behavior.
13-
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
13+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1414
#
1515
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
16-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
16+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1717
#
1818
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
1919
def initialize(num_threads, opts = {})
@@ -25,14 +25,14 @@ def initialize(num_threads, opts = {})
2525
super(opts)
2626

2727

28-
#@overflow_policy = opts.fetch(:overflow_policy, :abort)
28+
#@fallback_policy = opts.fetch(:fallback_policy, :abort)
2929
#@max_queue = 0
3030
#
3131
#raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
32-
#raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
32+
#raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
3333
#
3434
#@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads)
35-
#@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
35+
#@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@FALLBACK_policy].new)
3636

3737
set_shutdown_hook
3838
end

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,17 @@ class JavaSingleThreadExecutor
1010

1111
# Create a new thread pool.
1212
#
13+
# @option opts [Symbol] :fallback_policy (:discard) the policy
14+
# for handling new tasks that are received when the queue size
15+
# has reached `max_queue` or after the executor has shut down
16+
#
1317
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1418
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1519
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1620
def initialize(opts = {})
1721
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
22+
@fallback_policy = opts.fetch(:fallback_policy, :discard)
23+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
1824
set_shutdown_hook
1925
end
2026
end

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,14 @@ class JavaThreadPoolExecutor
2020
# before being reclaimed.
2121
DEFAULT_THREAD_IDLETIMEOUT = 60
2222

23-
# The set of possible overflow policies that may be set at thread pool creation.
24-
OVERFLOW_POLICIES = {
25-
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
26-
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
27-
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
28-
}.freeze
29-
3023
# The maximum number of threads that may be created in the pool.
3124
attr_reader :max_length
3225

3326
# The maximum number of tasks that may be waiting in the work queue at any one time.
3427
# When the queue size reaches `max_queue` subsequent tasks will be rejected in
35-
# accordance with the configured `overflow_policy`.
28+
# accordance with the configured `fallback_policy`.
3629
attr_reader :max_queue
3730

38-
# The policy defining how rejected tasks (tasks received once the queue size reaches
39-
# the configured `max_queue`) are handled. Must be one of the values specified in
40-
# `OVERFLOW_POLICIES`.
41-
attr_reader :overflow_policy
42-
4331
# Create a new thread pool.
4432
#
4533
# @param [Hash] opts the options which configure the thread pool
@@ -52,27 +40,28 @@ class JavaThreadPoolExecutor
5240
# number of seconds a thread may be idle before being reclaimed
5341
# @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
5442
# number of tasks allowed in the work queue at any one time; a value of
55-
# zero means the queue may grow without bounnd
56-
# @option opts [Symbol] :overflow_policy (:abort) the policy for handling new
57-
# tasks that are received when the queue size has reached `max_queue`
43+
# zero means the queue may grow without bound
44+
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
45+
# tasks that are received when the queue size has reached
46+
# `max_queue` or the executir has shut down
5847
#
5948
# @raise [ArgumentError] if `:max_threads` is less than one
6049
# @raise [ArgumentError] if `:min_threads` is less than zero
61-
# @raise [ArgumentError] if `:overflow_policy` is not one of the values specified
62-
# in `OVERFLOW_POLICIES`
50+
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
51+
# in `FALLBACK_POLICIES`
6352
#
6453
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
6554
def initialize(opts = {})
6655
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
6756
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
6857
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
6958
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
70-
@overflow_policy = opts.fetch(:overflow_policy, :abort)
59+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
7160

7261
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
7362
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
7463
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
75-
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
64+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
7665

7766
if @max_queue == 0
7867
queue = java.util.concurrent.LinkedBlockingQueue.new
@@ -83,7 +72,7 @@ def initialize(opts = {})
8372
@executor = java.util.concurrent.ThreadPoolExecutor.new(
8473
min_length, max_length,
8574
idletime, java.util.concurrent.TimeUnit::SECONDS,
86-
queue, OVERFLOW_POLICIES[@overflow_policy].new)
75+
queue, FALLBACK_POLICIES[@fallback_policy].new)
8776

8877
set_shutdown_hook
8978
end

lib/concurrent/executor/ruby_cached_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
88
# Create a new thread pool.
99
#
1010
# @param [Hash] opts the options defining pool behavior.
11-
# number of seconds a thread may be idle before it is reclaimed
11+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1212
#
13-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
13+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1414
def initialize(opts = {})
15-
overflow_policy = opts.fetch(:overflow_policy, :abort)
15+
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
1616

17-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
17+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
1818

1919
opts = opts.merge(
2020
min_threads: 0,
2121
max_threads: DEFAULT_MAX_POOL_SIZE,
22-
overflow_policy: overflow_policy,
22+
fallback_policy: fallback_policy,
2323
max_queue: DEFAULT_MAX_QUEUE_SIZE,
2424
idletime: DEFAULT_THREAD_IDLETIMEOUT
2525
)

lib/concurrent/executor/ruby_fixed_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,20 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor
99
#
1010
# @param [Integer] num_threads the number of threads to allocate
1111
# @param [Hash] opts the options defining pool behavior.
12-
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
12+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1313
#
1414
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
15-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
15+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1616
def initialize(num_threads, opts = {})
17-
overflow_policy = opts.fetch(:overflow_policy, :abort)
17+
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
1818

1919
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
20-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
20+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
2121

2222
opts = {
2323
min_threads: num_threads,
2424
max_threads: num_threads,
25-
overflow_policy: overflow_policy,
25+
fallback_policy: fallback_policy,
2626
max_queue: DEFAULT_MAX_QUEUE_SIZE,
2727
idletime: DEFAULT_THREAD_IDLETIMEOUT,
2828
}.merge(opts)

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,18 @@ class RubySingleThreadExecutor
99

1010
# Create a new thread pool.
1111
#
12+
# @option opts [Symbol] :fallback_policy (:discard) the policy for
13+
# handling new tasks that are received when the queue size has
14+
# reached `max_queue` or after the executor has shut down
15+
#
1216
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1317
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1418
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1519
def initialize(opts = {})
1620
@queue = Queue.new
1721
@thread = nil
22+
@fallback_policy = opts.fetch(:fallback_policy, :discard)
23+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
1824
init_executor
1925
end
2026

0 commit comments

Comments
 (0)