Skip to content

Commit 5148d10

Browse files
committed
Rename 'overflow_policy' to 'fallback_policy'.
This reflects its broader scope (e.g. deciding how to handle tasks after shutdown).
1 parent 314d999 commit 5148d10

19 files changed

+138
-116
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
module Concurrent
66

77
module Executor
8-
# The policy defining how rejected tasks (tasks received once the queue size reaches
9-
# the configured `max_queue`) are handled. Must be one of the values specified in
10-
# `OVERFLOW_POLICIES`.
11-
attr_reader :overflow_policy
8+
# The policy defining how rejected tasks (tasks received once the
9+
# queue size reaches the configured `max_queue`, or after the
10+
# executor has shut down) are handled. Must be one of the values
11+
# specified in `FALLBACK_POLICIES`.
12+
attr_reader :fallback_policy
1213

1314
# @!macro [attach] executor_module_method_can_overflow_question
1415
#
@@ -21,14 +22,14 @@ def can_overflow?
2122
false
2223
end
2324

24-
# Handler which executes the `overflow_policy` once the queue size
25+
# Handler which executes the `fallback_policy` once the queue size
2526
# reaches `max_queue`.
2627
#
2728
# @param [Array] args the arguments to the task which is being handled.
2829
#
2930
# @!visibility private
30-
def handle_overflow(*args)
31-
case @overflow_policy
31+
def handle_fallback(*args)
32+
case @fallback_policy
3233
when :abort
3334
raise RejectedExecutionError
3435
when :discard
@@ -42,7 +43,7 @@ def handle_overflow(*args)
4243
end
4344
true
4445
else
45-
fail "Unknown overflow policy #{@overflow_policy}"
46+
fail "Unknown fallback policy #{@fallback_policy}"
4647
end
4748
end
4849

@@ -92,8 +93,8 @@ module RubyExecutor
9293
include Executor
9394
include Logging
9495

95-
# The set of possible overflow policies that may be set at thread pool creation.
96-
OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
96+
# The set of possible fallback policies that may be set at thread pool creation.
97+
FALLBACK_POLICIES = [:abort, :discard, :caller_runs]
9798

9899
# @!macro [attach] executor_method_post
99100
#
@@ -111,7 +112,7 @@ def post(*args, &task)
111112
raise ArgumentError.new('no block given') unless block_given?
112113
mutex.synchronize do
113114
# If the executor is shut down, reject this task
114-
return handle_overflow(*args, &task) unless running?
115+
return handle_fallback(*args, &task) unless running?
115116
execute(*args, &task)
116117
true
117118
end
@@ -243,8 +244,8 @@ module JavaExecutor
243244
include Executor
244245
java_import 'java.lang.Runnable'
245246

246-
# The set of possible overflow policies that may be set at thread pool creation.
247-
OVERFLOW_POLICIES = {
247+
# The set of possible fallback policies that may be set at thread pool creation.
248+
FALLBACK_POLICIES = {
248249
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
249250
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
250251
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
@@ -253,7 +254,7 @@ module JavaExecutor
253254
# @!macro executor_method_post
254255
def post(*args)
255256
raise ArgumentError.new('no block given') unless block_given?
256-
return handle_overflow(*args, &task) unless running?
257+
return handle_fallback(*args, &task) unless running?
257258
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
258259
executor_submit.call { yield(*args) }
259260
true

lib/concurrent/executor/java_cached_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@ class JavaCachedThreadPool < JavaThreadPoolExecutor
1010
# Create a new thread pool.
1111
#
1212
# @param [Hash] opts the options defining pool behavior.
13-
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
13+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1414
#
15-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
15+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1616
#
1717
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--
1818
def initialize(opts = {})
19-
@overflow_policy = opts.fetch(:overflow_policy, :abort)
19+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
2020
@max_queue = 0
2121

22-
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
22+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
2323

2424
@executor = java.util.concurrent.Executors.newCachedThreadPool
25-
@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
25+
@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@fallback_policy].new)
2626

2727
set_shutdown_hook
2828
end

lib/concurrent/executor/java_fixed_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ class JavaFixedThreadPool < JavaThreadPoolExecutor
1010
# Create a new thread pool.
1111
#
1212
# @param [Hash] opts the options defining pool behavior.
13-
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
13+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1414
#
1515
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
16-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
16+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1717
#
1818
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int-
1919
def initialize(num_threads, opts = {})
@@ -25,14 +25,14 @@ def initialize(num_threads, opts = {})
2525
super(opts)
2626

2727

28-
#@overflow_policy = opts.fetch(:overflow_policy, :abort)
28+
#@fallback_policy = opts.fetch(:fallback_policy, :abort)
2929
#@max_queue = 0
3030
#
3131
#raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
32-
#raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
32+
#raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
3333
#
3434
#@executor = java.util.concurrent.Executors.newFixedThreadPool(num_threads)
35-
#@executor.setRejectedExecutionHandler(OVERFLOW_POLICIES[@overflow_policy].new)
35+
#@executor.setRejectedExecutionHandler(FALLBACK_POLICIES[@FALLBACK_policy].new)
3636

3737
set_shutdown_hook
3838
end

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ class JavaSingleThreadExecutor
1010

1111
# Create a new thread pool.
1212
#
13-
# @option opts [Symbol] :overflow_policy (:discard) the policy for handling new
14-
# tasks that are received when the queue size has reached `max_queue`
13+
# @option opts [Symbol] :fallback_policy (:discard) the policy
14+
# for handling new tasks that are received when the queue size
15+
# has reached `max_queue` or after the executor has shut down
1516
#
1617
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1718
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1819
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1920
def initialize(opts = {})
2021
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
21-
@overflow_policy = opts.fetch(:overflow_policy, :discard)
22-
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
22+
@fallback_policy = opts.fetch(:fallback_policy, :discard)
23+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.keys.include?(@fallback_policy)
2324
set_shutdown_hook
2425
end
2526
end

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,9 @@ class JavaThreadPoolExecutor
2525

2626
# The maximum number of tasks that may be waiting in the work queue at any one time.
2727
# When the queue size reaches `max_queue` subsequent tasks will be rejected in
28-
# accordance with the configured `overflow_policy`.
28+
# accordance with the configured `fallback_policy`.
2929
attr_reader :max_queue
3030

31-
# The policy defining how rejected tasks (tasks received once the queue size reaches
32-
# the configured `max_queue`) are handled. Must be one of the values specified in
33-
# `OVERFLOW_POLICIES`.
34-
attr_reader :overflow_policy
35-
3631
# Create a new thread pool.
3732
#
3833
# @param [Hash] opts the options which configure the thread pool
@@ -45,27 +40,28 @@ class JavaThreadPoolExecutor
4540
# number of seconds a thread may be idle before being reclaimed
4641
# @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
4742
# number of tasks allowed in the work queue at any one time; a value of
48-
# zero means the queue may grow without bounnd
49-
# @option opts [Symbol] :overflow_policy (:abort) the policy for handling new
50-
# tasks that are received when the queue size has reached `max_queue`
43+
# zero means the queue may grow without bound
44+
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
45+
# tasks that are received when the queue size has reached
46+
# `max_queue` or the executir has shut down
5147
#
5248
# @raise [ArgumentError] if `:max_threads` is less than one
5349
# @raise [ArgumentError] if `:min_threads` is less than zero
54-
# @raise [ArgumentError] if `:overflow_policy` is not one of the values specified
55-
# in `OVERFLOW_POLICIES`
50+
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
51+
# in `FALLBACK_POLICIES`
5652
#
5753
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
5854
def initialize(opts = {})
5955
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
6056
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
6157
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
6258
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
63-
@overflow_policy = opts.fetch(:overflow_policy, :abort)
59+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
6460

6561
raise ArgumentError.new('max_threads must be greater than zero') if max_length <= 0
6662
raise ArgumentError.new('min_threads cannot be less than zero') if min_length < 0
6763
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
68-
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
64+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
6965

7066
if @max_queue == 0
7167
queue = java.util.concurrent.LinkedBlockingQueue.new
@@ -76,7 +72,7 @@ def initialize(opts = {})
7672
@executor = java.util.concurrent.ThreadPoolExecutor.new(
7773
min_length, max_length,
7874
idletime, java.util.concurrent.TimeUnit::SECONDS,
79-
queue, OVERFLOW_POLICIES[@overflow_policy].new)
75+
queue, FALLBACK_POLICIES[@fallback_policy].new)
8076

8177
set_shutdown_hook
8278
end

lib/concurrent/executor/ruby_cached_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@ class RubyCachedThreadPool < RubyThreadPoolExecutor
88
# Create a new thread pool.
99
#
1010
# @param [Hash] opts the options defining pool behavior.
11-
# number of seconds a thread may be idle before it is reclaimed
11+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1212
#
13-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
13+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1414
def initialize(opts = {})
15-
overflow_policy = opts.fetch(:overflow_policy, :abort)
15+
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
1616

17-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
17+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
1818

1919
opts = opts.merge(
2020
min_threads: 0,
2121
max_threads: DEFAULT_MAX_POOL_SIZE,
22-
overflow_policy: overflow_policy,
22+
fallback_policy: fallback_policy,
2323
max_queue: DEFAULT_MAX_QUEUE_SIZE,
2424
idletime: DEFAULT_THREAD_IDLETIMEOUT
2525
)

lib/concurrent/executor/ruby_fixed_thread_pool.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,20 @@ class RubyFixedThreadPool < RubyThreadPoolExecutor
99
#
1010
# @param [Integer] num_threads the number of threads to allocate
1111
# @param [Hash] opts the options defining pool behavior.
12-
# @option opts [Symbol] :overflow_policy (`:abort`) the overflow policy
12+
# @option opts [Symbol] :fallback_policy (`:abort`) the fallback policy
1313
#
1414
# @raise [ArgumentError] if `num_threads` is less than or equal to zero
15-
# @raise [ArgumentError] if `overflow_policy` is not a known policy
15+
# @raise [ArgumentError] if `fallback_policy` is not a known policy
1616
def initialize(num_threads, opts = {})
17-
overflow_policy = opts.fetch(:overflow_policy, :abort)
17+
fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
1818

1919
raise ArgumentError.new('number of threads must be greater than zero') if num_threads < 1
20-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(overflow_policy)
20+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(fallback_policy)
2121

2222
opts = {
2323
min_threads: num_threads,
2424
max_threads: num_threads,
25-
overflow_policy: overflow_policy,
25+
fallback_policy: fallback_policy,
2626
max_queue: DEFAULT_MAX_QUEUE_SIZE,
2727
idletime: DEFAULT_THREAD_IDLETIMEOUT,
2828
}.merge(opts)

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,18 @@ class RubySingleThreadExecutor
99

1010
# Create a new thread pool.
1111
#
12-
# @option opts [Symbol] :overflow_policy (:discard) the policy for handling new
13-
# tasks that are received when the queue size has reached `max_queue`
12+
# @option opts [Symbol] :fallback_policy (:discard) the policy for
13+
# handling new tasks that are received when the queue size has
14+
# reached `max_queue` or after the executor has shut down
1415
#
1516
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1617
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1718
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1819
def initialize(opts = {})
1920
@queue = Queue.new
2021
@thread = nil
21-
@overflow_policy = opts.fetch(:overflow_policy, :discard)
22-
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
22+
@fallback_policy = opts.fetch(:fallback_policy, :discard)
23+
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
2324
init_executor
2425
end
2526

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class RubyThreadPoolExecutor
4343

4444
# The maximum number of tasks that may be waiting in the work queue at any one time.
4545
# When the queue size reaches `max_queue` subsequent tasks will be rejected in
46-
# accordance with the configured `overflow_policy`.
46+
# accordance with the configured `fallback_policy`.
4747
attr_reader :max_queue
4848

4949
# Create a new thread pool.
@@ -58,26 +58,27 @@ class RubyThreadPoolExecutor
5858
# number of seconds a thread may be idle before being reclaimed
5959
# @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
6060
# number of tasks allowed in the work queue at any one time; a value of
61-
# zero means the queue may grow without bounnd
62-
# @option opts [Symbol] :overflow_policy (:abort) the policy for handling new
63-
# tasks that are received when the queue size has reached `max_queue`
61+
# zero means the queue may grow without bound
62+
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
63+
# tasks that are received when the queue size has reached
64+
# `max_queue` or the executor has shut down
6465
#
6566
# @raise [ArgumentError] if `:max_threads` is less than one
6667
# @raise [ArgumentError] if `:min_threads` is less than zero
67-
# @raise [ArgumentError] if `:overflow_policy` is not one of the values specified
68-
# in `OVERFLOW_POLICIES`
68+
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
69+
# in `FALLBACK_POLICIES`
6970
#
7071
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
7172
def initialize(opts = {})
7273
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
7374
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
7475
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
7576
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
76-
@overflow_policy = opts.fetch(:overflow_policy, :abort)
77+
@fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
7778

7879
raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
7980
raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
80-
raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
81+
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
8182
raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
8283

8384
init_executor
@@ -161,7 +162,7 @@ def execute(*args, &task)
161162
@scheduled_task_count += 1
162163
@queue << [args, task]
163164
else
164-
handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
165+
handle_fallback(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
165166
end
166167
end
167168

lib/concurrent/executor/thread_pool_executor.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ module Concurrent
4040
# * `idletime`: The number of seconds that a thread may be idle before being reclaimed.
4141
# * `max_queue`: The maximum number of tasks that may be waiting in the work queue at
4242
# any one time. When the queue size reaches `max_queue` subsequent tasks will be
43-
# rejected in accordance with the configured `overflow_policy`.
44-
# * `overflow_policy`: The policy defining how rejected tasks are handled. #
43+
# rejected in accordance with the configured `fallback_policy`.
44+
# * `fallback_policy`: The policy defining how rejected tasks are handled. #
4545
#
46-
# Three overflow policies are supported:
46+
# Three fallback policies are supported:
4747
#
4848
# * `:abort`: Raise a `RejectedExecutionError` exception and discard the task.
49-
# * `:discard`: Silently discard the task and return `nil` as the task result.
49+
# * `:discard`: Discard the task and return false.
5050
# * `:caller_runs`: Execute the task on the calling thread.
5151
#
5252
# {include:file:doc/thread_pools.md}

0 commit comments

Comments
 (0)