Skip to content

Commit bf0bc6f

Browse files
committed
Merge branch 'master' into promise-all
2 parents 36548df + 67d6631 commit bf0bc6f

11 files changed

+28
-30
lines changed

doc/thread_pools.md

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
# Thread Pools
22

3-
A Thread Pool is an abstraction that you can give a unit of work to, and the work will be executed by one of possibly several threads in the pool. One motivation for using thread pools is the overhead of creating and destroying threads. Creating a pool of reusable worker threads then repeatedly re-using threads from the pool can have huge performace benefits for a long-running application like a service.
3+
A Thread Pool is an abstraction that you can give a unit of work to, and the work will be executed by one of possibly several threads in the pool. One motivation for using thread pools is the overhead of creating and destroying threads. Creating a pool of reusable worker threads then repeatedly re-using threads from the pool can have huge performance benefits for a long-running application like a service.
44

55
`concurrent-ruby` also offers some higher level abstractions than thread pools. For many problems, you will be better served by using one of these -- if you are thinking of using a thread pool, we especially recommend you look at and understand [Future](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Future.html)s before deciding to use thread pools directly instead. Futures are implemented using thread pools, but offer a higher level abstraction.
66

77
But there are some problems for which directly using a thread pool is an appropriate solution. Or, you may wish to make your own thread pool to run Futures on, to be separate or have different characteristics than the global thread pool that Futures run on by default.
88

9-
Thread pools are considered 'executors' -- an object you can give a unit of work to, to have it exeucted. In fact, thread pools are the main kind of executor you will see, others are mainly for testing or odd edge cases. In some documentation or source code you'll see reference to an 'executor' -- this is commonly a thread pool, or else something similar that executes units of work (usually supplied as ruby blocks).
9+
Thread pools are considered 'executors' -- an object you can give a unit of work to, to have it executed. In fact, thread pools are the main kind of executor you will see - others are mainly for testing or odd edge cases. In some documentation or source code you'll see reference to an 'executor' -- this is commonly a thread pool, or else something similar that executes units of work (usually supplied as Ruby blocks).
1010

1111
## FixedThreadPool
1212

13-
A [FixedThreadPool](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/FixedThreadPool.html) contains a fixed number of threads. When you give a unit if work to it, an available thread will be used to execute.
13+
A [FixedThreadPool](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/FixedThreadPool.html) contains a fixed number of threads. When you give a unit of work to it, an available thread will be used to execute.
1414

1515
~~~ruby
1616
pool = Concurrent::FixedThreadPool.new(5) # 5 threads
@@ -21,15 +21,15 @@ end
2121
# while work is concurrently being done in the thread pool, at some possibly future point.
2222
~~~
2323

24-
What happens if you post new work when all (eg) 5 threads are currently busy? It will be added to a queue, and executed when a thread becomes available. In a `FixedThreadPool`, if you post work to the pool much faster than the work can be completed, the queue may grow without bounds, as the work piles up in the holding queue, using up memory without bounds. To limit the queue and apply some form of 'back pressure' instead, you can use the more configurable `ThreadPoolExecutor` (See below).
24+
What happens if you post new work when all (e.g.) 5 threads are currently busy? It will be added to a queue, and executed when a thread becomes available. In a `FixedThreadPool`, if you post work to the pool much faster than the work can be completed, the queue may grow without bounds, as the work piles up in the holding queue, using up memory without bounds. To limit the queue and apply some form of 'back pressure' instead, you can use the more configurable `ThreadPoolExecutor` (See below).
2525

2626
If you'd like to base the number of threads in the pool on the number of processors available, your code can consult [Concurrent.processor_count](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ProcessorCounter.html#processor_count-instance_method).
2727

2828
The `FixedThreadPool` is based on the semantics used in Java for [java.util.concurrent.Executors.newFixedThreadPool(int nThreads)](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int))
2929

3030
## CachedThreadPool
3131

32-
A [CachedThreadPool](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CachedThreadPool.html) will create as many threads as neccesary for work posted to it. If you post work to a `CachedThreadPool` when all it's existing threads are busy, it will create a new thread to execute that work, and then keep that thread cached for future work. Cached threads are reclaimed (destroyed) after they are idle for a while.
32+
A [CachedThreadPool](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/CachedThreadPool.html) will create as many threads as necessary for work posted to it. If you post work to a `CachedThreadPool` when all its existing threads are busy, it will create a new thread to execute that work, and then keep that thread cached for future work. Cached threads are reclaimed (destroyed) after they are idle for a while.
3333

3434
CachedThreadPools typically improve the performance of programs that execute many short-lived asynchronous tasks.
3535

@@ -58,18 +58,18 @@ pool = Concurrent::ThreadPoolExecutor.new(
5858
)
5959
~~~
6060

61-
If you want to provide a maximum queue size, you may also consider the `overflow_policy` -- what will happen if work is posted to a pool when the queue of waiting work has reached the maximum size? Available policies:
61+
If you want to provide a maximum queue size, you may also consider the `fallback_policy` -- what will happen if work is posted to a pool when the queue of waiting work has reached the maximum size? Available policies:
6262

6363
* abort: Raise a `Concurrent::RejectedExecutionError` exception and discard the task. (default policy)
6464
* discard: Silently discard the task and return nil as the task result.
65-
* caller_runs: The work will be executed in the thread of the caller, instead of being given to another thread in the pool
65+
* caller_runs: The work will be executed in the thread of the caller, instead of being given to another thread in the pool.
6666

6767
~~~ruby
6868
pool = Concurrent::ThreadPoolExecutor.new(
6969
min_threads: 5,
7070
max_threads: 5,
7171
max_queue: 100,
72-
overflow_policy: :caller_runs
72+
fallback_policy: :caller_runs
7373
)
7474
~~~
7575

@@ -83,14 +83,14 @@ pool = Concurrent::ThreadPoolExecutor.new(
8383
)
8484
~~~
8585

86-
Or, with a variable number of threads like a CachedThreadPool, but with a bounded queue and an overflow_policy:
86+
Or, with a variable number of threads like a CachedThreadPool, but with a bounded queue and a fallback_policy:
8787

8888
~~~ruby
8989
pool = Concurrent::ThreadPoolExecutor.new(
9090
min_threads: 3, # create 3 threads at startup
9191
max_threads: 10, # create at most 10 threads
9292
max_queue: 100, # at most 100 jobs waiting in the queue,
93-
overflow_policy: :abort
93+
fallback_policy: :abort
9494
)
9595
~~~
9696

@@ -106,11 +106,11 @@ ThreadPoolExecutors with `min_threads` and `max_threads` set to different values
106106

107107
A running thread pool can be shutdown in an orderly or disruptive manner. Once a thread pool has been shutdown it cannot be started again.
108108

109-
The `shutdown` method can be used to initiate an orderly shutdown of the thread pool. All new post calls will reject the given block and immediately return false. Threads in the pool will continue to process all in-progress work and will process all tasks still in the queue.
109+
The `shutdown` method can be used to initiate an orderly shutdown of the thread pool. All new post calls will be handled according to the `fallback_policy` (i.e. failing with a RejectedExecutionError by default). Threads in the pool will continue to process all in-progress work and will process all tasks still in the queue.
110110

111-
The `kill` method can be used to immediately shutdown the pool. All new post calls will reject the given block and immediately return false. Ruby's Thread.kill will be called on all threads in the pool, aborting all in-progress work. Tasks in the queue will be discarded.
111+
The `kill` method can be used to immediately shutdown the pool. All new post calls will be handled according to the `fallback_policy`. Ruby's `Thread.kill` will be called on all threads in the pool, aborting all in-progress work. Tasks in the queue will be discarded.
112112

113-
The method `wait_for_termination` can be used to block and wait for pool shutdown to complete. This is useful when shutting down an application and ensuring the app doesn't exit before pool processing is complete. The method wait_for_termination will block for a maximum of the given number of seconds then return true if shutdown completed successfully or false. When the timeout value is nil the call will block indefinitely. Calling wait_for_termination on a stopped thread pool will immediately return true.
113+
The method `wait_for_termination` can be used to block and wait for pool shutdown to complete. This is useful when shutting down an application and ensuring the app doesn't exit before pool processing is complete. The method wait_for_termination will block for a maximum of the given number of seconds then return true (if shutdown completed successfully) or false (if it was still ongoing). When the timeout value is `nil` the call will block indefinitely. Calling `wait_for_termination` on a stopped thread pool will immediately return true.
114114

115115
~~~ruby
116116
# tell the pool to shutdown in an orderly fashion, allowing in progress work to complete
@@ -155,10 +155,10 @@ pool = Concurrent::ThreadPoolExecutor.new(
155155
:min_threads => [2, Concurrent.processor_count].max,
156156
:max_threads => [2, Concurrent.processor_count].max,
157157
:max_queue => [2, Concurrent.processor_count].max * 5,
158-
:overflow_policy => :caller_runs
158+
:fallback_policy => :caller_runs
159159
)
160160

161161
future = Future.new(:executor => pool).execute do
162162
#work
163163
end
164-
~~~
164+
~~~

lib/concurrent/async.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,13 @@ def method_missing(method, *args, &block)
8181
super unless @delegate.respond_to?(method)
8282
Async::validate_argc(@delegate, method, *args)
8383

84-
self.define_singleton_method(method) do |*args|
85-
Async::validate_argc(@delegate, method, *args)
84+
self.define_singleton_method(method) do |*args2|
85+
Async::validate_argc(@delegate, method, *args2)
8686
ivar = Concurrent::IVar.new
8787
value, reason = nil, nil
8888
@serializer.post(@executor.value) do
8989
begin
90-
value = @delegate.send(method, *args, &block)
90+
value = @delegate.send(method, *args2, &block)
9191
rescue => reason
9292
# caught
9393
ensure

lib/concurrent/atomic/atomic_fixnum.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ module Concurrent
2525
class MutexAtomicFixnum
2626

2727
# http://stackoverflow.com/questions/535721/ruby-max-integer
28-
MIN_VALUE = -(2**(0.size * 8 -2))
29-
MAX_VALUE = (2**(0.size * 8 -2) -1)
28+
MIN_VALUE = -(2**(0.size * 8 - 2))
29+
MAX_VALUE = (2**(0.size * 8 - 2) - 1)
3030

3131
# @!macro [attach] atomic_fixnum_method_initialize
3232
#

lib/concurrent/executor/indirect_immediate_executor.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def post(*args, &task)
2828
return false unless running?
2929

3030
event = Concurrent::Event.new
31-
internal_executor.post do
31+
@internal_executor.post do
3232
begin
3333
task.call(*args)
3434
ensure
@@ -39,8 +39,5 @@ def post(*args, &task)
3939

4040
true
4141
end
42-
43-
private
44-
attr_reader :internal_executor
4542
end
4643
end

lib/concurrent/executor/ruby_thread_pool_worker.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def initialize(queue, parent)
1313
@parent = parent
1414
@mutex = Mutex.new
1515
@last_activity = Time.now.to_f
16+
@thread = nil
1617
end
1718

1819
# @!visibility private

lib/concurrent/executor/serialized_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class SerializedExecution
1212

1313
Job = Struct.new(:executor, :args, :block) do
1414
def call
15-
block.call *args
15+
block.call(*args)
1616
end
1717
end
1818

lib/concurrent/lazy_register.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def register(key, &block)
4949
# @return self
5050
# @param [Object] key
5151
def unregister(key)
52-
@data.update { |h| h.dup.tap { |h| h.delete(key) } }
52+
@data.update { |h| h.dup.tap { |j| j.delete(key) } }
5353
self
5454
end
5555

lib/concurrent/observable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def add_observer(*args, &block)
4646
# as #add_observer but it can be used for chaining
4747
# @return [Observable] self
4848
def with_observer(*args, &block)
49-
add_observer *args, &block
49+
add_observer(*args, &block)
5050
self
5151
end
5252

lib/concurrent/promise.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ def then(rescuer = nil, &block)
260260
# @return [Promise]
261261
def on_success(&block)
262262
raise ArgumentError.new('no block given') unless block_given?
263-
self.then &block
263+
self.then(&block)
264264
end
265265

266266
# @return [Promise]

lib/concurrent/scheduled_task.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def initialize(intended_time, opts = {}, &block)
2626
def execute
2727
if compare_and_set_state(:pending, :unscheduled)
2828
@schedule_time = TimerSet.calculate_schedule_time(@intended_time)
29-
Concurrent::timer(@schedule_time.to_f - Time.now.to_f) { @executor.post &method(:process_task) }
29+
Concurrent::timer(@schedule_time.to_f - Time.now.to_f) { @executor.post(&method(:process_task)) }
3030
self
3131
end
3232
end

0 commit comments

Comments
 (0)