Skip to content

Commit 314d999

Browse files
committed
Unify overflow/fallback handling between all four Executors
1 parent 7051d2f commit 314d999

File tree

5 files changed

+56
-56
lines changed

5 files changed

+56
-56
lines changed

lib/concurrent/executor/executor.rb

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
module Concurrent
66

77
module Executor
8-
8+
# The policy defining how rejected tasks (tasks received once the queue size reaches
9+
# the configured `max_queue`) are handled. Must be one of the values specified in
10+
# `OVERFLOW_POLICIES`.
11+
attr_reader :overflow_policy
12+
913
# @!macro [attach] executor_module_method_can_overflow_question
1014
#
1115
# Does the task queue have a maximum size?
@@ -17,6 +21,31 @@ def can_overflow?
1721
false
1822
end
1923

24+
# Handler which executes the `overflow_policy` once the queue size
25+
# reaches `max_queue`.
26+
#
27+
# @param [Array] args the arguments to the task which is being handled.
28+
#
29+
# @!visibility private
30+
def handle_overflow(*args)
31+
case @overflow_policy
32+
when :abort
33+
raise RejectedExecutionError
34+
when :discard
35+
false
36+
when :caller_runs
37+
begin
38+
yield(*args)
39+
rescue => ex
40+
# let it fail
41+
log DEBUG, ex
42+
end
43+
true
44+
else
45+
fail "Unknown overflow policy #{@overflow_policy}"
46+
end
47+
end
48+
2049
# @!macro [attach] executor_module_method_serialized_question
2150
#
2251
# Does this executor guarantee serialization of its operations?
@@ -63,6 +92,9 @@ module RubyExecutor
6392
include Executor
6493
include Logging
6594

95+
# The set of possible overflow policies that may be set at thread pool creation.
96+
OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
97+
6698
# @!macro [attach] executor_method_post
6799
#
68100
# Submit a task to the executor for asynchronous processing.
@@ -78,16 +110,8 @@ module RubyExecutor
78110
def post(*args, &task)
79111
raise ArgumentError.new('no block given') unless block_given?
80112
mutex.synchronize do
81-
unless running?
82-
# The executor is shut down - figure out how to reject this task
83-
if self.respond_to?(:handle_overflow, true)
84-
# Reject this task in the same way we'd reject an overflow
85-
return handle_overflow(*args, &task)
86-
else
87-
# No handle_overflow method defined - just return false
88-
return false
89-
end
90-
end
113+
# If the executor is shut down, reject this task
114+
return handle_overflow(*args, &task) unless running?
91115
execute(*args, &task)
92116
true
93117
end
@@ -219,16 +243,20 @@ module JavaExecutor
219243
include Executor
220244
java_import 'java.lang.Runnable'
221245

246+
# The set of possible overflow policies that may be set at thread pool creation.
247+
OVERFLOW_POLICIES = {
248+
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
249+
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
250+
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
251+
}.freeze
252+
222253
# @!macro executor_method_post
223254
def post(*args)
224255
raise ArgumentError.new('no block given') unless block_given?
225-
if running?
226-
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
227-
executor_submit.call { yield(*args) }
228-
true
229-
else
230-
false
231-
end
256+
return handle_overflow(*args, &task) unless running?
257+
executor_submit = @executor.java_method(:submit, [Runnable.java_class])
258+
executor_submit.call { yield(*args) }
259+
true
232260
rescue Java::JavaUtilConcurrent::RejectedExecutionException
233261
raise RejectedExecutionError
234262
end

lib/concurrent/executor/java_single_thread_executor.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,16 @@ class JavaSingleThreadExecutor
1010

1111
# Create a new thread pool.
1212
#
13+
# @option opts [Symbol] :overflow_policy (:discard) the policy for handling new
14+
# tasks that are received when the queue size has reached `max_queue`
15+
#
1316
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1417
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1518
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1619
def initialize(opts = {})
1720
@executor = java.util.concurrent.Executors.newSingleThreadExecutor
21+
@overflow_policy = opts.fetch(:overflow_policy, :discard)
22+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.keys.include?(@overflow_policy)
1823
set_shutdown_hook
1924
end
2025
end

lib/concurrent/executor/java_thread_pool_executor.rb

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,6 @@ class JavaThreadPoolExecutor
2020
# before being reclaimed.
2121
DEFAULT_THREAD_IDLETIMEOUT = 60
2222

23-
# The set of possible overflow policies that may be set at thread pool creation.
24-
OVERFLOW_POLICIES = {
25-
abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
26-
discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
27-
caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
28-
}.freeze
29-
3023
# The maximum number of threads that may be created in the pool.
3124
attr_reader :max_length
3225

lib/concurrent/executor/ruby_single_thread_executor.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@ class RubySingleThreadExecutor
99

1010
# Create a new thread pool.
1111
#
12+
# @option opts [Symbol] :overflow_policy (:discard) the policy for handling new
13+
# tasks that are received when the queue size has reached `max_queue`
14+
#
1215
# @see http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
1316
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html
1417
# @see http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1518
def initialize(opts = {})
1619
@queue = Queue.new
1720
@thread = nil
21+
@overflow_policy = opts.fetch(:overflow_policy, :discard)
22+
raise ArgumentError.new("#{@overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
1823
init_executor
1924
end
2025

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ class RubyThreadPoolExecutor
2323
# before being reclaimed.
2424
DEFAULT_THREAD_IDLETIMEOUT = 60
2525

26-
# The set of possible overflow policies that may be set at thread pool creation.
27-
OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
28-
2926
# The maximum number of threads that may be created in the pool.
3027
attr_reader :max_length
3128

@@ -49,11 +46,6 @@ class RubyThreadPoolExecutor
4946
# accordance with the configured `overflow_policy`.
5047
attr_reader :max_queue
5148

52-
# The policy defining how rejected tasks (tasks received once the queue size reaches
53-
# the configured `max_queue`) are handled. Must be one of the values specified in
54-
# `OVERFLOW_POLICIES`.
55-
attr_reader :overflow_policy
56-
5749
# Create a new thread pool.
5850
#
5951
# @param [Hash] opts the options which configure the thread pool
@@ -224,29 +216,6 @@ def ensure_capacity?
224216
capacity
225217
end
226218

227-
# Handler which executes the `overflow_policy` once the queue size
228-
# reaches `max_queue`.
229-
#
230-
# @param [Array] args the arguments to the task which is being handled.
231-
#
232-
# @!visibility private
233-
def handle_overflow(*args)
234-
case @overflow_policy
235-
when :abort
236-
raise RejectedExecutionError
237-
when :discard
238-
false
239-
when :caller_runs
240-
begin
241-
yield(*args)
242-
rescue => ex
243-
# let it fail
244-
log DEBUG, ex
245-
end
246-
true
247-
end
248-
end
249-
250219
# Scan all threads in the pool and reclaim any that are dead or
251220
# have been idle too long. Will check the last time the pool was
252221
# pruned and only run if the configured garbage collection

0 commit comments

Comments
 (0)